Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
statestore-subscriber.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <sstream>
18 #include <utility>
19 
20 #include <boost/algorithm/string/join.hpp>
21 #include <boost/foreach.hpp>
22 #include <boost/date_time/posix_time/posix_time.hpp>
23 #include <boost/thread/shared_mutex.hpp>
24 #include <gutil/strings/substitute.h>
25 
26 #include "common/logging.h"
27 #include "common/status.h"
29 #include "gen-cpp/StatestoreService_types.h"
30 #include "rpc/rpc-trace.h"
31 #include "rpc/thrift-util.h"
32 #include "util/time.h"
33 #include "util/debug-util.h"
34 
35 #include "common/names.h"
36 
37 using boost::posix_time::seconds;
38 using namespace apache::thrift;
39 using namespace strings;
40 
41 DEFINE_int32(statestore_subscriber_timeout_seconds, 30, "The amount of time (in seconds)"
42  " that may elapse before the connection with the statestore is considered lost.");
43 DEFINE_int32(statestore_subscriber_cnxn_attempts, 10, "The number of times to retry an "
44  "RPC connection to the statestore. A setting of 0 means retry indefinitely");
45 DEFINE_int32(statestore_subscriber_cnxn_retry_interval_ms, 3000, "The interval, in ms, "
46  "to wait between attempts to make an RPC connection to the statestore.");
47 
48 namespace impala {
49 
50 // Used to identify the statestore in the failure detector
51 const string STATESTORE_ID = "STATESTORE";
52 
53 // Template for metrics that measure the processing time for individual topics.
54 const string CALLBACK_METRIC_PATTERN = "statestore-subscriber.topic-$0.processing-time-s";
55 
56 // Duration, in ms, to sleep between attempts to reconnect to the
57 // statestore after a failure.
58 const int32_t SLEEP_INTERVAL_MS = 5000;
59 
61 
62 // Proxy class for the subscriber heartbeat thrift API, which
63 // translates RPCs into method calls on the local subscriber object.
65  public:
67  : subscriber_(subscriber) { DCHECK(subscriber != NULL); }
68  virtual void UpdateState(TUpdateStateResponse& response,
69  const TUpdateStateRequest& params) {
70  TUniqueId registration_id;
71  if (params.__isset.registration_id) {
72  registration_id = params.registration_id;
73  }
74 
75  subscriber_->UpdateState(params.topic_deltas, registration_id,
76  &response.topic_updates, &response.skipped).ToThrift(&response.status);
77  // Make sure Thrift thinks the field is set.
78  response.__set_skipped(response.skipped);
79  }
80 
81  virtual void Heartbeat(THeartbeatResponse& response, const THeartbeatRequest& request) {
82  subscriber_->Heartbeat(request.registration_id);
83  }
84 
85  private:
87 };
88 
89 StatestoreSubscriber::StatestoreSubscriber(const std::string& subscriber_id,
90  const TNetworkAddress& heartbeat_address, const TNetworkAddress& statestore_address,
91  MetricGroup* metrics)
92  : subscriber_id_(subscriber_id), heartbeat_address_(heartbeat_address),
93  statestore_address_(statestore_address),
94  thrift_iface_(new StatestoreSubscriberThriftIf(this)),
95  failure_detector_(new TimeoutFailureDetector(
96  seconds(FLAGS_statestore_subscriber_timeout_seconds),
97  seconds(FLAGS_statestore_subscriber_timeout_seconds / 2))),
98  is_registered_(false),
99  client_cache_(new StatestoreClientCache(FLAGS_statestore_subscriber_cnxn_attempts,
100  FLAGS_statestore_subscriber_cnxn_retry_interval_ms)),
101  metrics_(metrics->GetChildGroup("statestore-subscriber")) {
103  metrics_->AddProperty("statestore-subscriber.connected", false);
105  "statestore-subscriber.last-recovery-duration", 0.0);
107  "statestore-subscriber.last-recovery-time", "N/A");
109  new StatsMetric<double>("statestore-subscriber.topic-update-interval-time",
110  TUnit::TIME_S));
112  new StatsMetric<double>("statestore-subscriber.topic-update-duration",
113  TUnit::TIME_S));
115  new StatsMetric<double>("statestore-subscriber.heartbeat-interval-time",
116  TUnit::TIME_S));
117 
118  registration_id_metric_ = metrics->AddProperty<string>(
119  "statestore-subscriber.registration-id", "N/A",
120  "The most recent registration ID for this subscriber with the statestore. Set to "
121  "'N/A' if no registration has been completed");
122 
123  client_cache_->InitMetrics(metrics, "statestore-subscriber.statestore");
124 }
125 
127  bool is_transient, const UpdateCallback& callback) {
128  lock_guard<mutex> l(lock_);
129  if (is_registered_) return Status("Subscriber already started, can't add new topic");
130  Callbacks* cb = &(update_callbacks_[topic_id]);
131  cb->callbacks.push_back(callback);
132  if (cb->processing_time_metric == NULL) {
133  const string& metric_name = Substitute(CALLBACK_METRIC_PATTERN, topic_id);
135  new StatsMetric<double>(metric_name, TUnit::TIME_S));
136  }
137  topic_registrations_[topic_id] = is_transient;
138  return Status::OK;
139 }
140 
142  Status client_status;
143  StatestoreConnection client(client_cache_.get(), statestore_address_, &client_status);
144  RETURN_IF_ERROR(client_status);
145 
146  TRegisterSubscriberRequest request;
147  request.topic_registrations.reserve(update_callbacks_.size());
148  BOOST_FOREACH(const UpdateCallbacks::value_type& topic, update_callbacks_) {
149  TTopicRegistration thrift_topic;
150  thrift_topic.topic_name = topic.first;
151  thrift_topic.is_transient = topic_registrations_[topic.first];
152  request.topic_registrations.push_back(thrift_topic);
153  }
154 
155  request.subscriber_location = heartbeat_address_;
156  request.subscriber_id = subscriber_id_;
157  TRegisterSubscriberResponse response;
159  client.DoRpc(&StatestoreServiceClient::RegisterSubscriber, request, &response));
160  Status status = Status(response.status);
161  if (status.ok()) connected_to_statestore_metric_->set_value(true);
162  if (response.__isset.registration_id) {
163  lock_guard<mutex> l(registration_id_lock_);
164  registration_id_ = response.registration_id;
165  const string& registration_string = PrintId(registration_id_);
166  registration_id_metric_->set_value(registration_string);
167  VLOG(1) << "Subscriber registration ID: " << registration_string;
168  } else {
169  VLOG(1) << "No subscriber registration ID received from statestore";
170  }
173  return status;
174 }
175 
177  Status status;
178  {
179  // Take the lock to ensure that, if a topic-update is received during registration
180  // (perhaps because Register() has succeeded, but we haven't finished setting up state
181  // on the client side), UpdateState() will reject the message.
182  lock_guard<mutex> l(lock_);
183  LOG(INFO) << "Starting statestore subscriber";
184 
185  // Backend must be started before registration
186  shared_ptr<TProcessor> processor(new StatestoreSubscriberProcessor(thrift_iface_));
187  shared_ptr<TProcessorEventHandler> event_handler(
188  new RpcEventHandler("statestore-subscriber", metrics_));
189  processor->setEventHandler(event_handler);
190 
191  heartbeat_server_.reset(new ThriftServer("StatestoreSubscriber", processor,
192  heartbeat_address_.port, NULL, NULL, 5));
194  LOG(INFO) << "Registering with statestore";
195  status = Register();
196  if (status.ok()) {
197  is_registered_ = true;
198  LOG(INFO) << "statestore registration successful";
199  } else {
200  LOG(INFO) << "statestore registration unsuccessful: " << status.GetDetail();
201  }
202  }
203 
204  // Registration is finished at this point, so it's fine to release the lock.
205  recovery_mode_thread_.reset(new Thread("statestore-subscriber", "recovery-mode-thread",
207 
208  return status;
209 }
210 
212  failure_detector_->UpdateHeartbeat(STATESTORE_ID, true);
213 
214  // Every few seconds, wake up and check if the failure detector has determined
215  // that the statestore has failed from our perspective. If so, enter recovery
216  // mode and try to reconnect, followed by reregistering all subscriptions.
217  while (true) {
219  // When entering recovery mode, the class-wide lock_ is taken to
220  // ensure mutual exclusion with any operations in flight.
221  lock_guard<mutex> l(lock_);
222  MonotonicStopWatch recovery_timer;
223  recovery_timer.Start();
224  connected_to_statestore_metric_->set_value(false);
225  LOG(INFO) << subscriber_id_
226  << ": Connection with statestore lost, entering recovery mode";
227  uint32_t attempt_count = 1;
228  while (true) {
229  LOG(INFO) << "Trying to re-register with statestore, attempt: "
230  << attempt_count++;
231  Status status = Register();
232  if (status.ok()) {
233  // Make sure to update failure detector so that we don't immediately fail on the
234  // next loop while we're waiting for heartbeat messages to resume.
235  failure_detector_->UpdateHeartbeat(STATESTORE_ID, true);
236  LOG(INFO) << "Reconnected to statestore. Exiting recovery mode";
237 
238  // Break out of enclosing while (true) to top of outer-scope loop.
239  break;
240  } else {
241  // Don't exit recovery mode, continue
242  LOG(WARNING) << "Failed to re-register with statestore: "
243  << status.GetDetail();
245  }
247  recovery_timer.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
248  }
249  // When we're successful in re-registering, we don't do anything
250  // to re-send our updates to the statestore. It is the
251  // responsibility of individual clients to post missing updates
252  // back to the statestore. This saves a lot of complexity where
253  // we would otherwise have to cache updates here.
255  recovery_timer.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
257  }
258 
260  }
261 }
262 
263 Status StatestoreSubscriber::CheckRegistrationId(const TUniqueId& registration_id) {
264  {
265  lock_guard<mutex> r(registration_id_lock_);
266  // If this subscriber has just started, the registration_id_ may not have been set
267  // despite the statestore starting to send updates. The 'unset' TUniqueId is 0:0, so
268  // we can differentiate between a) an early message from an eager statestore, and b)
269  // a message that's targeted to a previous registration.
270  if (registration_id_ != TUniqueId() && registration_id != registration_id_) {
271  return Status(Substitute("Unexpected registration ID: $0, was expecting $1",
272  PrintId(registration_id), PrintId(registration_id_)));
273  }
274  }
275 
276  return Status::OK;
277 }
278 
279 void StatestoreSubscriber::Heartbeat(const TUniqueId& registration_id) {
280  const Status& status = CheckRegistrationId(registration_id);
281  if (status.ok()) {
283  heartbeat_interval_timer_.Reset() / (1000.0 * 1000.0 * 1000.0));
284  failure_detector_->UpdateHeartbeat(STATESTORE_ID, true);
285  } else {
286  VLOG_RPC << "Heartbeat: " << status.GetDetail();
287  }
288 }
289 
291  const TUniqueId& registration_id, vector<TTopicDelta>* subscriber_topic_updates,
292  bool* skipped) {
293  // We don't want to block here because this is an RPC, and delaying the return causes
294  // the statestore to delay sending further messages. The only time that lock_ might be
295  // taken concurrently is if:
296  //
297  // a) another update is still being processed (i.e. is still in UpdateState()). This
298  // could happen only when the subscriber has re-registered, and the statestore is still
299  // sending an update for the previous registration. In this case, return OK but set
300  // *skipped = true to tell the statestore to retry this update in the future.
301  //
302  // b) the subscriber is recovering, and has the lock held during
303  // RecoveryModeChecker(). Similarly, we set *skipped = true.
304  // TODO: Consider returning an error in this case so that the statestore will eventually
305  // stop sending updates even if re-registration fails.
306  try_mutex::scoped_try_lock l(lock_);
307  if (l) {
308  *skipped = false;
309  RETURN_IF_ERROR(CheckRegistrationId(registration_id));
310 
311  // Only record updates received when not in recovery mode
313  topic_update_interval_timer_.Reset() / (1000.0 * 1000.0 * 1000.0));
315  sw.Start();
316 
317  // Check the version ranges of all delta updates to ensure they can be applied
318  // to this subscriber. If any invalid ranges are found, request new update(s) with
319  // version ranges applicable to this subscriber.
320  bool found_unexpected_delta = false;
321  BOOST_FOREACH(const TopicDeltaMap::value_type& delta, incoming_topic_deltas) {
322  TopicVersionMap::const_iterator itr = current_topic_versions_.find(delta.first);
323  if (itr != current_topic_versions_.end()) {
324  if (delta.second.is_delta && delta.second.from_version != itr->second) {
325  LOG(ERROR) << "Unexpected delta update to topic '" << delta.first << "' of "
326  << "version range (" << delta.second.from_version << ":"
327  << delta.second.to_version << "]. Expected delta start version: "
328  << itr->second;
329 
330  subscriber_topic_updates->push_back(TTopicDelta());
331  TTopicDelta& update = subscriber_topic_updates->back();
332  update.topic_name = delta.second.topic_name;
333  update.__set_from_version(itr->second);
334  found_unexpected_delta = true;
335  } else {
336  // Update the current topic version
337  current_topic_versions_[delta.first] = delta.second.to_version;
338  }
339  }
340  }
341 
342  // Skip calling the callbacks when an unexpected delta update is found.
343  if (!found_unexpected_delta) {
344  BOOST_FOREACH(const UpdateCallbacks::value_type& callbacks, update_callbacks_) {
346  sw.Start();
347  BOOST_FOREACH(const UpdateCallback& callback, callbacks.second.callbacks) {
348  // TODO: Consider filtering the topics to only send registered topics to
349  // callbacks
350  callback(incoming_topic_deltas, subscriber_topic_updates);
351  }
352  callbacks.second.processing_time_metric->Update(
353  sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
354  }
355  }
356  sw.Stop();
357  topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
358  } else {
359  *skipped = true;
360  }
361  return Status::OK;
362 }
363 
364 
365 }
MetricGroup * metrics_
MetricGroup instance that all metrics are registered in. Not owned by this class. ...
boost::shared_ptr< StatestoreSubscriberIf > thrift_iface_
const std::string GetDetail() const
Definition: status.cc:184
void Heartbeat(const TUniqueId &registration_id)
Called when the statestore sends a heartbeat message. Updates the failure detector.
std::vector< UpdateCallback > callbacks
List of callbacks to invoke for this topic.
TNetworkAddress statestore_address_
Address of the statestore.
const string CALLBACK_METRIC_PATTERN
TODO: Consider allowing fragment IDs as category parameters.
Definition: thread.h:45
ClientConnection< StatestoreServiceClient > StatestoreConnection
boost::scoped_ptr< Thread > recovery_mode_thread_
Thread in which RecoveryModeChecker runs.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
M * RegisterMetric(M *metric)
Definition: metrics.h:211
MetricGroups may be organised hierarchically as a tree.
Definition: metrics.h:200
Status CheckRegistrationId(const TUniqueId &registration_id)
virtual void UpdateState(TUpdateStateResponse &response, const TUpdateStateRequest &params)
boost::scoped_ptr< impala::TimeoutFailureDetector > failure_detector_
Failure detector that tracks heartbeat messages from the statestore.
string PrintId(const TUniqueId &id, const string &separator)
Definition: debug-util.cc:97
const int32_t SLEEP_INTERVAL_MS
void SleepForMs(const int64_t duration_ms)
Sleeps the current thread for at least duration_ms milliseconds.
Definition: time.cc:21
DoubleGauge * last_recovery_duration_metric_
Amount of time last spent in recovery mode.
uint64_t Reset()
Restarts the timer. Returns the elapsed time until this point.
Definition: stopwatch.h:96
TNetworkAddress heartbeat_address_
Address that the heartbeat service should be started on.
StringProperty * registration_id_metric_
Current registration ID, in string form.
boost::function< void(const TopicDeltaMap &state, std::vector< TTopicDelta > *topic_updates)> UpdateCallback
StringProperty * last_recovery_time_metric_
When the last recovery happened.
std::string TopicId
A TopicId uniquely identifies a single topic.
Definition: statestore.h:86
SimpleMetric< T, TMetricKind::PROPERTY > * AddProperty(const std::string &key, const T &value, const std::string &description="")
Definition: metrics.h:231
MonotonicStopWatch topic_update_interval_timer_
Tracks the time between topic-update mesages.
boost::scoped_ptr< StatestoreClientCache > client_cache_
statestore client cache - only one client is ever used.
DEFINE_int32(statestore_subscriber_timeout_seconds, 30,"The amount of time (in seconds)"" that may elapse before the connection with the statestore is considered lost.")
std::string DebugString(const T &val)
Definition: udf-debug.h:27
std::map< Statestore::TopicId, bool > topic_registrations_
void Update(const T &value)
std::map< Statestore::TopicId, TTopicDelta > TopicDeltaMap
A TopicDeltaMap is passed to each callback. See UpdateCallback for more details.
const std::string subscriber_id_
Unique, but opaque, identifier for this subscriber.
Status UpdateState(const TopicDeltaMap &incoming_topic_deltas, const TUniqueId &registration_id, std::vector< TTopicDelta > *subscriber_topic_updates, bool *skipped)
uint64_t ElapsedTime() const
Returns time in nanosecond.
Definition: stopwatch.h:105
virtual void Heartbeat(THeartbeatResponse &response, const THeartbeatRequest &request)
StatsMetric< double > * topic_update_duration_metric_
SimpleMetric< T > * AddGauge(const std::string &key, const T &value, const TUnit::type unit=TUnit::NONE, const std::string &description="")
Create a gauge metric object with given key and initial value (owned by this object) ...
Definition: metrics.h:223
static const Status OK
Definition: status.h:87
#define VLOG_RPC
Definition: logging.h:56
StatsMetric< double > * topic_update_interval_metric_
Accumulated statistics on the frequency of topic-update messages.
BooleanProperty * connected_to_statestore_metric_
Metric to indicate if we are successfully registered with the statestore.
bool ok() const
Definition: status.h:172
StatsMetric< double > * heartbeat_interval_metric_
Accumulated statistics on the frequency of heartbeat messages.
StatestoreSubscriberThriftIf(StatestoreSubscriber *subscriber)
const string STATESTORE_ID
Status Start()
Returns OK unless some error occurred, like a failure to connect.
Status AddTopic(const Statestore::TopicId &topic_id, bool is_transient, const UpdateCallback &callback)
boost::shared_ptr< ThriftServer > heartbeat_server_
Container for the heartbeat server.
MonotonicStopWatch heartbeat_interval_timer_
Tracks the time between heartbeat mesages.
static TimestampValue LocalTime()