Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
statestore.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "statestore/statestore.h"
16 
17 #include <boost/foreach.hpp>
18 #include <boost/thread.hpp>
19 #include <thrift/Thrift.h>
20 #include <gutil/strings/substitute.h>
21 
22 #include "common/status.h"
23 #include "gen-cpp/StatestoreService_types.h"
25 #include "rpc/thrift-util.h"
26 #include "util/debug-util.h"
27 #include "util/time.h"
28 #include "util/uid-util.h"
29 #include "util/webserver.h"
30 
31 #include "common/names.h"
32 
33 using namespace apache::thrift;
34 using namespace impala;
35 using namespace rapidjson;
36 using namespace strings;
37 
38 DEFINE_int32(statestore_max_missed_heartbeats, 10, "Maximum number of consecutive "
39  "heartbeat messages an impalad can miss before being declared failed by the "
40  "statestore.");
41 
42 DEFINE_int32(statestore_num_update_threads, 10, "(Advanced) Number of threads used to "
43  " send topic updates in parallel to all registered subscribers.");
44 DEFINE_int32(statestore_update_frequency_ms, 2000, "(Advanced) Frequency (in ms) with"
45  " which the statestore sends topic updates to subscribers.");
46 
47 DEFINE_int32(statestore_num_heartbeat_threads, 10, "(Advanced) Number of threads used to "
48  " send heartbeats in parallel to all registered subscribers.");
49 DEFINE_int32(statestore_heartbeat_frequency_ms, 1000, "(Advanced) Frequency (in ms) with"
50  " which the statestore sends heartbeat heartbeats to subscribers.");
51 
52 DEFINE_int32(state_store_port, 24000, "port where StatestoreService is running");
53 
54 DEFINE_int32(statestore_heartbeat_tcp_timeout_seconds, 3, "(Advanced) The time after "
55  "which a heartbeat RPC to a subscriber will timeout. This setting protects against "
56  "badly hung machines that are not able to respond to the heartbeat RPC in short "
57  "order");
58 
59 // If this value is set too low, it's possible that UpdateState() might timeout during a
60 // working invocation, and only a restart of the statestore with a change in value would
61 // allow progress to be made. If set too high, a hung subscriber will waste an update
62 // thread for much longer than it needs to. We choose 5 minutes as a safe default because
63 // large catalogs can take a very long time to process, but rarely more than a minute. The
64 // loss of a single thread for five minutes should usually be invisible to the user; if
65 // there is a correlated set of machine hangs that exhausts most threads the cluster can
66 // already be said to be in a bad state. Note that the heartbeat mechanism will still
67 // evict those subscribers, so many queries will continue to operate.
68 DEFINE_int32(statestore_update_tcp_timeout_seconds, 300, "(Advanced) The time after "
69  "which an update RPC to a subscriber will timeout. This setting protects against "
70  "badly hung machines that are not able to respond to the update RPC in short "
71  "order.");
72 
73 // Metric keys
74 // TODO: Replace 'backend' with 'subscriber' when we can coordinate a change with CM
75 const string STATESTORE_LIVE_SUBSCRIBERS = "statestore.live-backends";
76 const string STATESTORE_LIVE_SUBSCRIBERS_LIST = "statestore.live-backends.list";
77 const string STATESTORE_TOTAL_KEY_SIZE_BYTES = "statestore.total-key-size-bytes";
78 const string STATESTORE_TOTAL_VALUE_SIZE_BYTES = "statestore.total-value-size-bytes";
79 const string STATESTORE_TOTAL_TOPIC_SIZE_BYTES = "statestore.total-topic-size-bytes";
80 const string STATESTORE_UPDATE_DURATION = "statestore.topic-update-durations";
81 const string STATESTORE_HEARTBEAT_DURATION = "statestore.heartbeat-durations";
82 
84 
85 // Initial version for each Topic registered by a Subscriber. Generally, the Topic will
86 // have a Version that is the MAX() of all entries in the Topic, but this initial
87 // value needs to be less than TopicEntry::TOPIC_ENTRY_INITIAL_VERSION to distinguish
88 // between the case where a Topic is empty and the case where the Topic only contains
89 // an item with the initial version.
90 const Statestore::TopicEntry::Version Statestore::Subscriber::TOPIC_INITIAL_VERSION = 0L;
91 
92 // Used to control the maximum size of the pending topic-update queue, in which there is
93 // at most one entry per subscriber.
94 const int32_t STATESTORE_MAX_SUBSCRIBERS = 10000;
95 
96 // Updates or heartbeats that miss their deadline by this much are logged.
97 const uint32_t DEADLINE_MISS_THRESHOLD_MS = 2000;
98 
100 
102  public:
104  : statestore_(statestore) {
105  DCHECK(statestore_ != NULL);
106  }
107 
108  virtual void RegisterSubscriber(TRegisterSubscriberResponse& response,
109  const TRegisterSubscriberRequest& params) {
110  TUniqueId registration_id;
111  Status status = statestore_->RegisterSubscriber(params.subscriber_id,
112  params.subscriber_location, params.topic_registrations, &registration_id);
113  status.ToThrift(&response.status);
114  response.__set_registration_id(registration_id);
115  }
116  private:
118 };
119 
120 void Statestore::TopicEntry::SetValue(const Statestore::TopicEntry::Value& bytes,
121  TopicEntry::Version version) {
122  DCHECK(bytes == Statestore::TopicEntry::NULL_VALUE || bytes.size() > 0);
123  value_ = bytes;
124  version_ = version;
125 }
126 
127 Statestore::TopicEntry::Version Statestore::Topic::Put(const string& key,
128  const Statestore::TopicEntry::Value& bytes) {
129  TopicEntryMap::iterator entry_it = entries_.find(key);
130  int64_t key_size_delta = 0L;
131  int64_t value_size_delta = 0L;
132  if (entry_it == entries_.end()) {
133  entry_it = entries_.insert(make_pair(key, TopicEntry())).first;
134  key_size_delta += key.size();
135  } else {
136  // Delete the old item from the version history. There is no need to search the
137  // version_history because there should only be at most a single item in the history
138  // at any given time
139  topic_update_log_.erase(entry_it->second.version());
140  value_size_delta -= entry_it->second.value().size();
141  }
142  value_size_delta += bytes.size();
143 
144  entry_it->second.SetValue(bytes, ++last_version_);
145  topic_update_log_.insert(make_pair(entry_it->second.version(), key));
146 
147  total_key_size_bytes_ += key_size_delta;
148  total_value_size_bytes_ += value_size_delta;
149  DCHECK_GE(total_key_size_bytes_, 0L);
150  DCHECK_GE(total_value_size_bytes_, 0L);
151  key_size_metric_->Increment(key_size_delta);
152  value_size_metric_->Increment(value_size_delta);
153  topic_size_metric_->Increment(key_size_delta + value_size_delta);
154 
155  return entry_it->second.version();
156 }
157 
158 void Statestore::Topic::DeleteIfVersionsMatch(TopicEntry::Version version,
159  const Statestore::TopicEntryKey& key) {
160  TopicEntryMap::iterator entry_it = entries_.find(key);
161  if (entry_it != entries_.end() && entry_it->second.version() == version) {
162  // Add a new entry with the the version history for this deletion and remove the old
163  // entry
164  topic_update_log_.erase(version);
165  topic_update_log_.insert(make_pair(++last_version_, key));
166  total_value_size_bytes_ -= entry_it->second.value().size();
167  DCHECK_GE(total_value_size_bytes_, 0L);
168 
169  value_size_metric_->Increment(entry_it->second.value().size());
170  topic_size_metric_->Increment(entry_it->second.value().size());
171  entry_it->second.SetValue(Statestore::TopicEntry::NULL_VALUE, last_version_);
172  }
173 }
174 
175 Statestore::Subscriber::Subscriber(const SubscriberId& subscriber_id,
176  const TUniqueId& registration_id, const TNetworkAddress& network_address,
177  const vector<TTopicRegistration>& subscribed_topics)
178  : subscriber_id_(subscriber_id),
179  registration_id_(registration_id),
180  network_address_(network_address) {
181  BOOST_FOREACH(const TTopicRegistration& topic, subscribed_topics) {
182  TopicState topic_state;
183  topic_state.is_transient = topic.is_transient;
184  topic_state.last_version = TOPIC_INITIAL_VERSION;
185  subscribed_topics_[topic.topic_name] = topic_state;
186  }
187 }
188 
190  const TopicEntryKey& topic_key, TopicEntry::Version version) {
191  // Only record the update if the topic is transient
192  const Topics::const_iterator topic_it = subscribed_topics_.find(topic_id);
193  DCHECK(topic_it != subscribed_topics_.end());
194  if (topic_it->second.is_transient == true) {
195  transient_entries_[make_pair(topic_id, topic_key)] = version;
196  }
197 }
198 
200  const TopicId& topic_id) const {
201  Topics::const_iterator itr = subscribed_topics_.find(topic_id);
202  return itr == subscribed_topics_.end() ?
203  TOPIC_INITIAL_VERSION : itr->second.last_version;
204 }
205 
207  TopicEntry::Version version) {
208  subscribed_topics_[topic_id].last_version = version;
209 }
210 
212  : exit_flag_(false),
213  subscriber_topic_update_threadpool_("statestore-update",
214  "subscriber-update-worker",
215  FLAGS_statestore_num_update_threads,
217  bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this, false, _1, _2)),
218  subscriber_heartbeat_threadpool_("statestore-heartbeat",
219  "subscriber-heartbeat-worker",
220  FLAGS_statestore_num_heartbeat_threads,
222  bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this, true, _1, _2)),
223  update_state_client_cache_(new ClientCache<StatestoreSubscriberClient>(1, 0,
224  FLAGS_statestore_update_tcp_timeout_seconds * 1000,
225  FLAGS_statestore_update_tcp_timeout_seconds * 1000)),
226  heartbeat_client_cache_(new ClientCache<StatestoreSubscriberClient>(1, 0,
227  FLAGS_statestore_heartbeat_tcp_timeout_seconds * 1000,
228  FLAGS_statestore_heartbeat_tcp_timeout_seconds * 1000)),
231  FLAGS_statestore_max_missed_heartbeats,
232  FLAGS_statestore_max_missed_heartbeats / 2)) {
233 
234  DCHECK(metrics != NULL);
239  set<string>()));
243 
245  new StatsMetric<double>(STATESTORE_UPDATE_DURATION, TUnit::TIME_S));
248 
249  update_state_client_cache_->InitMetrics(metrics, "subscriber-update-state");
250  heartbeat_client_cache_->InitMetrics(metrics, "subscriber-heartbeat");
251 }
252 
254  Webserver::UrlCallback topics_callback =
255  bind<void>(mem_fn(&Statestore::TopicsHandler), this, _1, _2);
256  webserver->RegisterUrlCallback("/topics", "statestore_topics.tmpl",
257  topics_callback);
258 
259  Webserver::UrlCallback subscribers_callback =
260  bind<void>(&Statestore::SubscribersHandler, this, _1, _2);
261  webserver->RegisterUrlCallback("/subscribers", "statestore_subscribers.tmpl",
262  subscribers_callback);
263 }
264 
266  Document* document) {
267  lock_guard<mutex> l(subscribers_lock_);
268  lock_guard<mutex> t(topic_lock_);
269 
270  Value topics(kArrayType);
271 
272  BOOST_FOREACH(const TopicMap::value_type& topic, topics_) {
273  Value topic_json(kObjectType);
274 
275  Value topic_id(topic.second.id().c_str(), document->GetAllocator());
276  topic_json.AddMember("topic_id", topic_id, document->GetAllocator());
277  topic_json.AddMember("num_entries", topic.second.entries().size(),
278  document->GetAllocator());
279  topic_json.AddMember("version", topic.second.last_version(), document->GetAllocator());
280 
281  SubscriberId oldest_subscriber_id;
282  TopicEntry::Version oldest_subscriber_version =
283  GetMinSubscriberTopicVersion(topic.first, &oldest_subscriber_id);
284 
285  topic_json.AddMember("oldest_version", oldest_subscriber_version,
286  document->GetAllocator());
287  Value oldest_id(oldest_subscriber_id.c_str(), document->GetAllocator());
288  topic_json.AddMember("oldest_id", oldest_id, document->GetAllocator());
289 
290  int64_t key_size = topic.second.total_key_size_bytes();
291  int64_t value_size = topic.second.total_value_size_bytes();
292  Value key_size_json(PrettyPrinter::Print(key_size, TUnit::BYTES).c_str(),
293  document->GetAllocator());
294  topic_json.AddMember("key_size", key_size_json, document->GetAllocator());
295  Value value_size_json(PrettyPrinter::Print(value_size, TUnit::BYTES).c_str(),
296  document->GetAllocator());
297  topic_json.AddMember("value_size", value_size_json, document->GetAllocator());
298  Value total_size_json(
299  PrettyPrinter::Print(key_size + value_size, TUnit::BYTES).c_str(),
300  document->GetAllocator());
301  topic_json.AddMember("total_size", total_size_json, document->GetAllocator());
302  topics.PushBack(topic_json, document->GetAllocator());
303  }
304  document->AddMember("topics", topics, document->GetAllocator());
305 }
306 
308  Document* document) {
309  lock_guard<mutex> l(subscribers_lock_);
310  Value subscribers(kArrayType);
311  BOOST_FOREACH(const SubscriberMap::value_type& subscriber, subscribers_) {
312  Value sub_json(kObjectType);
313 
314  Value subscriber_id(subscriber.second->id().c_str(), document->GetAllocator());
315  sub_json.AddMember("id", subscriber_id, document->GetAllocator());
316 
317  Value address(lexical_cast<string>(subscriber.second->network_address()).c_str(),
318  document->GetAllocator());
319  sub_json.AddMember("address", address, document->GetAllocator());
320 
321  sub_json.AddMember("num_topics", subscriber.second->subscribed_topics().size(),
322  document->GetAllocator());
323  sub_json.AddMember("num_transient", subscriber.second->transient_entries().size(),
324  document->GetAllocator());
325 
326  Value registration_id(PrintId(subscriber.second->registration_id()).c_str(),
327  document->GetAllocator());
328  sub_json.AddMember("registration_id", registration_id, document->GetAllocator());
329 
330  subscribers.PushBack(sub_json, document->GetAllocator());
331  }
332  document->AddMember("subscribers", subscribers, document->GetAllocator());
333 }
334 
337  if (threadpool->GetQueueSize() >= STATESTORE_MAX_SUBSCRIBERS
338  || !threadpool->Offer(update)) {
339  stringstream ss;
340  ss << "Maximum subscriber limit reached: " << STATESTORE_MAX_SUBSCRIBERS;
341  lock_guard<mutex> l(subscribers_lock_);
342  SubscriberMap::iterator subscriber_it = subscribers_.find(update.second);
343  DCHECK(subscriber_it != subscribers_.end());
344  subscribers_.erase(subscriber_it);
345  LOG(ERROR) << ss.str();
346  return Status(ss.str());
347  }
348 
349  return Status::OK;
350 }
351 
353  const TNetworkAddress& location,
354  const vector<TTopicRegistration>& topic_registrations, TUniqueId* registration_id) {
355  if (subscriber_id.empty()) return Status("Subscriber ID cannot be empty string");
356 
357  // Create any new topics first, so that when the subscriber is first sent a topic update
358  // by the worker threads its topics are guaranteed to exist.
359  {
360  lock_guard<mutex> l(topic_lock_);
361  BOOST_FOREACH(const TTopicRegistration& topic, topic_registrations) {
362  TopicMap::iterator topic_it = topics_.find(topic.topic_name);
363  if (topic_it == topics_.end()) {
364  LOG(INFO) << "Creating new topic: ''" << topic.topic_name
365  << "' on behalf of subscriber: '" << subscriber_id;
366  topics_.insert(make_pair(topic.topic_name, Topic(topic.topic_name,
368  }
369  }
370  }
371  LOG(INFO) << "Registering: " << subscriber_id;
372  {
373  lock_guard<mutex> l(subscribers_lock_);
374  SubscriberMap::iterator subscriber_it = subscribers_.find(subscriber_id);
375  if (subscriber_it != subscribers_.end()) {
376  UnregisterSubscriber(subscriber_it->second.get());
377  }
378 
379  UUIDToTUniqueId(subscriber_uuid_generator_(), registration_id);
380  shared_ptr<Subscriber> current_registration(
381  new Subscriber(subscriber_id, *registration_id, location, topic_registrations));
382  subscribers_.insert(make_pair(subscriber_id, current_registration));
383  failure_detector_->UpdateHeartbeat(
384  PrintId(current_registration->registration_id()), true);
385  num_subscribers_metric_->set_value(subscribers_.size());
386  subscriber_set_metric_->Add(subscriber_id);
387  }
388 
389  // Add the subscriber to the update queue, with an immediate schedule.
390  ScheduledSubscriberUpdate update = make_pair(0L, subscriber_id);
393 
394  LOG(INFO) << "Subscriber '" << subscriber_id << "' registered (registration id: "
395  << PrintId(*registration_id) << ")";
396  return Status::OK;
397 }
398 
399 Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped) {
400  // Time any successful RPCs (i.e. those for which UpdateState() completed, even though
401  // it may have returned an error.)
403  sw.Start();
404 
405  // First thing: make a list of updates to send
406  TUpdateStateRequest update_state_request;
407  GatherTopicUpdates(*subscriber, &update_state_request);
408 
409  // Set the expected registration ID, so that the subscriber can reject this update if
410  // they have moved on to a new registration instance.
411  update_state_request.__set_registration_id(subscriber->registration_id());
412 
413  // Second: try and send it
414  Status status;
416  subscriber->network_address(), &status);
417  RETURN_IF_ERROR(status);
418 
419  TUpdateStateResponse response;
420  RETURN_IF_ERROR(client.DoRpc(
421  &StatestoreSubscriberClient::UpdateState, update_state_request, &response));
422 
423  status = Status(response.status);
424  if (!status.ok()) {
425  topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
426  return status;
427  }
428 
429  *update_skipped = (response.__isset.skipped && response.skipped);
430  if (*update_skipped) {
431  // The subscriber skipped processing this update. We don't consider this a failure
432  // - subscribers can decide what they do with any update - so, return OK and set
433  // update_skipped so the caller can compensate.
434  topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
435  return Status::OK;
436  }
437 
438  // At this point the updates are assumed to have been successfully processed by the
439  // subscriber. Update the subscriber's max version of each topic.
440  map<TopicEntryKey, TTopicDelta>::const_iterator topic_delta =
441  update_state_request.topic_deltas.begin();
442  for (; topic_delta != update_state_request.topic_deltas.end(); ++topic_delta) {
443  subscriber->SetLastTopicVersionProcessed(topic_delta->first,
444  topic_delta->second.to_version);
445  }
446 
447  // Thirdly: perform any / all updates returned by the subscriber
448  {
449  lock_guard<mutex> l(topic_lock_);
450  BOOST_FOREACH(const TTopicDelta& update, response.topic_updates) {
451  TopicMap::iterator topic_it = topics_.find(update.topic_name);
452  if (topic_it == topics_.end()) {
453  VLOG(1) << "Received update for unexpected topic:" << update.topic_name;
454  continue;
455  }
456 
457  // The subscriber sent back their from_version which indicates that they want to
458  // reset their max version for this topic to this value. The next update sent will
459  // be from this version.
460  if (update.__isset.from_version) {
461  LOG(INFO) << "Received request for different delta base of topic: "
462  << update.topic_name << " from: " << subscriber->id()
463  << " subscriber from_version: " << update.from_version;
464  subscriber->SetLastTopicVersionProcessed(topic_it->first, update.from_version);
465  }
466 
467  Topic* topic = &topic_it->second;
468  BOOST_FOREACH(const TTopicItem& item, update.topic_entries) {
469  TopicEntry::Version version = topic->Put(item.key, item.value);
470  subscriber->AddTransientUpdate(update.topic_name, item.key, version);
471  }
472 
473  BOOST_FOREACH(const string& key, update.topic_deletions) {
474  TopicEntry::Version version =
476  subscriber->AddTransientUpdate(update.topic_name, key, version);
477  }
478  }
479  }
480  topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
481  return Status::OK;
482 }
483 
485  TUpdateStateRequest* update_state_request) {
486  {
487  lock_guard<mutex> l(topic_lock_);
488  BOOST_FOREACH(const Subscriber::Topics::value_type& subscribed_topic,
489  subscriber.subscribed_topics()) {
490  TopicMap::const_iterator topic_it = topics_.find(subscribed_topic.first);
491  DCHECK(topic_it != topics_.end());
492 
493  TopicEntry::Version last_processed_version =
494  subscriber.LastTopicVersionProcessed(topic_it->first);
495  const Topic& topic = topic_it->second;
496 
497  TTopicDelta& topic_delta =
498  update_state_request->topic_deltas[subscribed_topic.first];
499  topic_delta.topic_name = subscribed_topic.first;
500 
501  // If the subscriber version is > 0, send this update as a delta. Otherwise, this is
502  // a new subscriber so send them a non-delta update that includes all items in the
503  // topic.
504  topic_delta.is_delta = last_processed_version > Subscriber::TOPIC_INITIAL_VERSION;
505  topic_delta.__set_from_version(last_processed_version);
506 
507  if (!topic_delta.is_delta &&
509  int64_t topic_size =
511  VLOG_QUERY << "Preparing initial " << topic_delta.topic_name
512  << " topic update for " << subscriber.id() << ". Size = "
513  << PrettyPrinter::Print(topic_size, TUnit::BYTES);
514  }
515 
516  TopicUpdateLog::const_iterator next_update =
517  topic.topic_update_log().upper_bound(last_processed_version);
518 
519  for (; next_update != topic.topic_update_log().end(); ++next_update) {
520  TopicEntryMap::const_iterator itr = topic.entries().find(next_update->second);
521  DCHECK(itr != topic.entries().end());
522  const TopicEntry& topic_entry = itr->second;
523  if (topic_entry.value() == Statestore::TopicEntry::NULL_VALUE) {
524  topic_delta.topic_deletions.push_back(itr->first);
525  } else {
526  topic_delta.topic_entries.push_back(TTopicItem());
527  TTopicItem& topic_item = topic_delta.topic_entries.back();
528  topic_item.key = itr->first;
529  // TODO: Does this do a needless copy?
530  topic_item.value = topic_entry.value();
531  }
532  }
533 
534  if (topic.topic_update_log().size() > 0) {
535  // The largest version for this topic will be the last item in the version history
536  // map.
537  topic_delta.__set_to_version(topic.topic_update_log().rbegin()->first);
538  } else {
539  // There are no updates in the version history
540  topic_delta.__set_to_version(Subscriber::TOPIC_INITIAL_VERSION);
541  }
542  }
543  }
544 
545  // Fill in the min subscriber topic version. This must be done after releasing
546  // topic_lock_.
547  lock_guard<mutex> l(subscribers_lock_);
548  typedef map<TopicId, TTopicDelta> TopicDeltaMap;
549  BOOST_FOREACH(TopicDeltaMap::value_type& topic_delta,
550  update_state_request->topic_deltas) {
551  topic_delta.second.__set_min_subscriber_topic_version(
552  GetMinSubscriberTopicVersion(topic_delta.first));
553  }
554 }
555 
557  const TopicId& topic_id, SubscriberId* subscriber_id) {
558  TopicEntry::Version min_topic_version = numeric_limits<int64_t>::max();
559  bool found = false;
560  // Find the minimum version processed for this topic across all topic subscribers.
561  BOOST_FOREACH(const SubscriberMap::value_type& subscriber, subscribers_) {
562  if (subscriber.second->subscribed_topics().find(topic_id) !=
563  subscriber.second->subscribed_topics().end()) {
564  found = true;
565  TopicEntry::Version last_processed_version =
566  subscriber.second->LastTopicVersionProcessed(topic_id);
567  if (last_processed_version < min_topic_version) {
568  min_topic_version = last_processed_version;
569  if (subscriber_id != NULL) *subscriber_id = subscriber.second->id();
570  }
571  }
572  }
573  return found ? min_topic_version : Subscriber::TOPIC_INITIAL_VERSION;
574 }
575 
577  lock_guard<mutex> l(exit_flag_lock_);
578  return exit_flag_;
579 }
580 
582  lock_guard<mutex> l(exit_flag_lock_);
583  exit_flag_ = true;
585 }
586 
589  sw.Start();
590 
591  Status status;
593  subscriber->network_address(), &status);
594  RETURN_IF_ERROR(status);
595 
596  THeartbeatRequest request;
597  THeartbeatResponse response;
598  request.__set_registration_id(subscriber->registration_id());
600  client.DoRpc(&StatestoreSubscriberClient::Heartbeat, request, &response));
601 
602  heartbeat_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
603  return Status::OK;
604 }
605 
606 void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
607  const ScheduledSubscriberUpdate& update) {
608  int64_t update_deadline = update.first;
609  const string hb_type = is_heartbeat ? "heartbeat" : "topic update";
610  if (update_deadline != 0L) {
611  // Wait until deadline.
612  int64_t diff_ms = update_deadline - UnixMillis();
613  while (diff_ms > 0) {
614  SleepForMs(diff_ms);
615  diff_ms = update_deadline - UnixMillis();
616  }
617  diff_ms = abs(diff_ms);
618  VLOG(3) << "Sending " << hb_type << " message to: " << update.second
619  << " (deadline accuracy: " << diff_ms << "ms)";
620 
621  if (diff_ms > DEADLINE_MISS_THRESHOLD_MS) {
622  // TODO: This should be a healthcheck in a monitored metric in CM, which would
623  // require a 'rate' metric type.
624  const string& msg = Substitute("Missed subscriber ($0) $1 deadline by $2ms, "
625  "consider increasing --$3 (currently $4)", update.second, hb_type, diff_ms,
626  is_heartbeat ? "statestore_heartbeat_frequency_ms" :
627  "statestore_update_frequency_ms",
628  is_heartbeat ? FLAGS_statestore_heartbeat_frequency_ms :
629  FLAGS_statestore_update_frequency_ms);
630  if (is_heartbeat) {
631  LOG(WARNING) << msg;
632  } else {
633  VLOG_QUERY << msg;
634  }
635  }
636  } else {
637  // The first update is scheduled immediately and has a deadline of 0. There's no need
638  // to wait.
639  VLOG(3) << "Initial " << hb_type << " message for: " << update.second;
640  }
641  shared_ptr<Subscriber> subscriber;
642  {
643  lock_guard<mutex> l(subscribers_lock_);
644  SubscriberMap::iterator it = subscribers_.find(update.second);
645  if (it == subscribers_.end()) return;
646  subscriber = it->second;
647  }
648  // Send the right message type, and compute the next deadline
649  int64_t deadline_ms = 0;
650  Status status;
651  if (is_heartbeat) {
652  status = SendHeartbeat(subscriber.get());
653  if (status.code() == TErrorCode::RPC_TIMEOUT) {
654  // Rewrite status to make it more useful, while preserving the stack
655  status.SetErrorMsg(ErrorMsg(TErrorCode::RPC_TIMEOUT, Substitute(
656  "Subscriber $0 ($1) timed-out during heartbeat RPC. Timeout is $2s.",
657  subscriber->id(), lexical_cast<string>(subscriber->network_address()),
658  FLAGS_statestore_heartbeat_tcp_timeout_seconds)));
659  }
660 
661  deadline_ms = UnixMillis() + FLAGS_statestore_heartbeat_frequency_ms;
662  } else {
663  bool update_skipped;
664  status = SendTopicUpdate(subscriber.get(), &update_skipped);
665  if (status.code() == TErrorCode::RPC_TIMEOUT) {
666  // Rewrite status to make it more useful, while preserving the stack
667  status.SetErrorMsg(ErrorMsg(TErrorCode::RPC_TIMEOUT, Substitute(
668  "Subscriber $0 ($1) timed-out during topic-update RPC. Timeout is $2s.",
669  subscriber->id(), lexical_cast<string>(subscriber->network_address()),
670  FLAGS_statestore_update_tcp_timeout_seconds)));
671  }
672  // If the subscriber responded that it skipped the last update sent, we assume that
673  // it was busy doing something else, and back off slightly before sending another.
674  int64_t update_interval = update_skipped ?
675  (2 * FLAGS_statestore_update_frequency_ms) :
676  FLAGS_statestore_update_frequency_ms;
677  deadline_ms = UnixMillis() + update_interval;
678  }
679 
680  {
681  lock_guard<mutex> l(subscribers_lock_);
682  // Check again if this registration has been removed while we were processing the
683  // message.
684  SubscriberMap::iterator it = subscribers_.find(update.second);
685  if (it == subscribers_.end()) return;
686  if (!status.ok()) {
687  LOG(INFO) << "Unable to send " << hb_type << " message to subscriber "
688  << update.second << ", received error: " << status.GetDetail();
689  }
690 
691  const string& registration_id = PrintId(subscriber->registration_id());
692  FailureDetector::PeerState state = is_heartbeat ?
693  failure_detector_->UpdateHeartbeat(registration_id, status.ok()) :
694  failure_detector_->GetPeerState(registration_id);
695 
696  if (state == FailureDetector::FAILED) {
697  if (is_heartbeat) {
698  // TODO: Consider if a metric to track the number of failures would be useful.
699  LOG(INFO) << "Subscriber '" << subscriber->id() << "' has failed, disconnected "
700  << "or re-registered (last known registration ID: " << update.second
701  << ")";
702  UnregisterSubscriber(subscriber.get());
703  }
704  } else {
705  // Schedule the next message.
706  VLOG(3) << "Next " << (is_heartbeat ? "heartbeat" : "update") << " deadline for: "
707  << subscriber->id() << " is in " << deadline_ms << "ms";
708  OfferUpdate(make_pair(deadline_ms, subscriber->id()), is_heartbeat ?
710  }
711  }
712 }
713 
715  SubscriberMap::const_iterator it = subscribers_.find(subscriber->id());
716  if (it == subscribers_.end() ||
717  it->second->registration_id() != subscriber->registration_id()) {
718  // Already failed and / or replaced with a new registration
719  return;
720  }
721 
722  // Close all active clients so that the next attempt to use them causes a Reopen()
723  update_state_client_cache_->CloseConnections(subscriber->network_address());
724  heartbeat_client_cache_->CloseConnections(subscriber->network_address());
725 
726  // Prevent the failure detector from growing without bound
727  failure_detector_->EvictPeer(PrintId(subscriber->registration_id()));
728 
729  // Delete all transient entries
730  lock_guard<mutex> topic_lock(topic_lock_);
731  BOOST_FOREACH(Statestore::Subscriber::TransientEntryMap::value_type entry,
732  subscriber->transient_entries()) {
733  Statestore::TopicMap::iterator topic_it = topics_.find(entry.first.first);
734  DCHECK(topic_it != topics_.end());
735  topic_it->second.DeleteIfVersionsMatch(entry.second, // version
736  entry.first.second); // key
737  }
738  num_subscribers_metric_->Increment(-1L);
739  subscriber_set_metric_->Remove(subscriber->id());
740  subscribers_.erase(subscriber->id());
741 }
742 
745  return Status::OK;
746 }
Status RegisterSubscriber(const SubscriberId &subscriber_id, const TNetworkAddress &location, const std::vector< TTopicRegistration > &topic_registrations, TUniqueId *registration_id)
Definition: statestore.cc:352
boost::scoped_ptr< ClientCache< StatestoreSubscriberClient > > heartbeat_client_cache_
Definition: statestore.h:405
static const Value NULL_VALUE
Representation of an empty Value. Must have size() == 0.
Definition: statestore.h:141
const TNetworkAddress & network_address() const
Definition: statestore.h:287
IntGauge * topic_size_metric_
Definition: statestore.h:422
const string STATESTORE_TOTAL_TOPIC_SIZE_BYTES
Definition: statestore.cc:79
const std::string GetDetail() const
Definition: status.cc:184
std::pair< int64_t, SubscriberId > ScheduledSubscriberUpdate
Definition: statestore.h:363
int64_t total_value_size_bytes() const
Definition: statestore.h:209
void Remove(const T &item)
Remove an item from this set by value.
std::string Value
A Value is a string of bytes, for which std::string is a convenient representation.
Definition: statestore.h:130
Statestore(MetricGroup *metrics)
The only constructor; initialises member variables only.
Definition: statestore.cc:211
int128_t abs(const int128_t &x)
boost::function< void(const ArgumentMap &args, rapidjson::Document *json)> UrlCallback
Definition: webserver.h:38
void RegisterWebpages(Webserver *webserver)
Definition: statestore.cc:253
const TopicUpdateLog & topic_update_log() const
Definition: statestore.h:207
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
M * RegisterMetric(M *metric)
Definition: metrics.h:211
const Value & value() const
Definition: statestore.h:151
MetricGroups may be organised hierarchically as a tree.
Definition: metrics.h:200
void RegisterUrlCallback(const std::string &path, const std::string &template_filename, const UrlCallback &callback, bool is_on_nav_bar=true)
Only one callback may be registered per URL.
Definition: webserver.cc:412
void Add(const T &item)
Put an item in this set.
boost::scoped_ptr< ClientCache< StatestoreSubscriberClient > > update_state_client_cache_
Definition: statestore.h:399
const string STATESTORE_TOTAL_KEY_SIZE_BYTES
Definition: statestore.cc:77
void TopicsHandler(const Webserver::ArgumentMap &args, rapidjson::Document *document)
Definition: statestore.cc:265
std::string SubscriberId
Definition: statestore.h:83
std::string TopicEntryKey
A TopicEntryKey uniquely identifies a single entry in a topic.
Definition: statestore.h:89
DEFINE_int32(statestore_max_missed_heartbeats, 10,"Maximum number of consecutive ""heartbeat messages an impalad can miss before being declared failed by the ""statestore.")
SetMetric< std::string > * subscriber_set_metric_
Definition: statestore.h:417
const string STATESTORE_LIVE_SUBSCRIBERS_LIST
Definition: statestore.cc:76
const SubscriberId & id() const
Definition: statestore.h:288
string PrintId(const TUniqueId &id, const string &separator)
Definition: debug-util.cc:97
const TransientEntryMap & transient_entries() const
Definition: statestore.h:304
void SleepForMs(const int64_t duration_ms)
Sleeps the current thread for at least duration_ms milliseconds.
Definition: time.cc:21
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
std::map< std::string, std::string > ArgumentMap
Definition: webserver.h:36
const TopicEntryMap & entries() const
Definition: statestore.h:205
int64_t UnixMillis()
Definition: time.h:51
bool Offer(const T &work)
Definition: thread-pool.h:74
StatsMetric< double > * heartbeat_duration_metric_
Same as above, but for SendHeartbeat() RPCs.
Definition: statestore.h:430
static const TopicEntry::Version TOPIC_INITIAL_VERSION
The Version value used to initialize new Topic subscriptions for this Subscriber. ...
Definition: statestore.h:284
#define VLOG_QUERY
Definition: logging.h:57
bool ShouldExit()
True if the shutdown flag has been set true, false otherwise.
Definition: statestore.cc:576
boost::uuids::random_generator subscriber_uuid_generator_
Used to generated unique IDs for each new registration.
Definition: statestore.h:358
ThreadPool< ScheduledSubscriberUpdate > subscriber_topic_update_threadpool_
Definition: statestore.h:393
std::string TopicId
A TopicId uniquely identifies a single topic.
Definition: statestore.h:86
virtual void RegisterSubscriber(TRegisterSubscriberResponse &response, const TRegisterSubscriberRequest &params)
Definition: statestore.cc:108
Status SendTopicUpdate(Subscriber *subscriber, bool *update_skipped)
Definition: statestore.cc:399
void AddTransientUpdate(const TopicId &topic_id, const TopicEntryKey &topic_key, TopicEntry::Version version)
Definition: statestore.cc:189
IntGauge * num_subscribers_metric_
Metric that track the registered, non-failed subscribers.
Definition: statestore.h:416
void SubscribersHandler(const Webserver::ArgumentMap &args, rapidjson::Document *document)
Definition: statestore.cc:307
TErrorCode::type code() const
Definition: status.h:226
IntGauge * value_size_metric_
Definition: statestore.h:421
boost::mutex exit_flag_lock_
Definition: statestore.h:250
int64_t total_key_size_bytes() const
Definition: statestore.h:208
StatsMetric< double > * topic_update_duration_metric_
Definition: statestore.h:427
void ToThrift(TStatus *status) const
Convert into TStatus.
Definition: status.cc:188
Statestore * statestore_
Definition: statestore.cc:117
IntGauge * key_size_metric_
Metrics shared across all topics to sum the size in bytes of keys, values and both.
Definition: statestore.h:420
void SetLastTopicVersionProcessed(const TopicId &topic_id, TopicEntry::Version version)
Definition: statestore.cc:206
boost::shared_ptr< StatestoreServiceIf > thrift_iface_
Thrift API implementation which proxies requests onto this Statestore.
Definition: statestore.h:408
void Update(const T &value)
Metric whose value is a set of items.
TopicEntry::Version last_version() const
Definition: statestore.h:206
void GatherTopicUpdates(const Subscriber &subscriber, TUpdateStateRequest *update_state_request)
Definition: statestore.cc:484
boost::scoped_ptr< MissedHeartbeatFailureDetector > failure_detector_
Definition: statestore.h:413
static int64_t NULL_VALUE[]
Definition: hash-table.cc:68
void UnregisterSubscriber(Subscriber *subscriber)
Definition: statestore.cc:714
const string STATESTORE_TOTAL_VALUE_SIZE_BYTES
Definition: statestore.cc:78
const uint32_t DEADLINE_MISS_THRESHOLD_MS
Definition: statestore.cc:97
const string STATESTORE_UPDATE_DURATION
Definition: statestore.cc:80
uint64_t ElapsedTime() const
Returns time in nanosecond.
Definition: stopwatch.h:105
const int32_t STATESTORE_MAX_SUBSCRIBERS
Definition: statestore.cc:94
uint32_t GetQueueSize() const
Definition: thread-pool.h:96
SubscriberMap subscribers_
Definition: statestore.h:355
TopicEntry::Version Put(const TopicEntryKey &key, const TopicEntry::Value &bytes)
Must be called holding the topic lock.
Definition: statestore.cc:127
const string STATESTORE_LIVE_SUBSCRIBERS
Definition: statestore.cc:75
StatestoreThriftIf(Statestore *statestore)
Definition: statestore.cc:103
Status MainLoop()
The main processing loop. Blocks until the exit flag is set.
Definition: statestore.cc:743
void SetErrorMsg(const ErrorMsg &m)
Definition: status.h:197
SimpleMetric< T > * AddGauge(const std::string &key, const T &value, const TUnit::type unit=TUnit::NONE, const std::string &description="")
Create a gauge metric object with given key and initial value (owned by this object) ...
Definition: metrics.h:223
void UUIDToTUniqueId(const boost::uuids::uuid &uuid, T *unique_id)
Definition: uid-util.h:38
void DoSubscriberUpdate(bool is_heartbeat, int thread_id, const ScheduledSubscriberUpdate &update)
Definition: statestore.cc:606
boost::mutex topic_lock_
Controls access to topics_. Cannot take subscribers_lock_ after acquiring this lock.
Definition: statestore.h:254
static const Status OK
Definition: status.h:87
ThreadPool< ScheduledSubscriberUpdate > subscriber_heartbeat_threadpool_
Definition: statestore.h:395
const Topics & subscribed_topics() const
Definition: statestore.h:286
Status OfferUpdate(const ScheduledSubscriberUpdate &update, ThreadPool< ScheduledSubscriberUpdate > *thread_pool)
Definition: statestore.cc:335
boost::mutex subscribers_lock_
Definition: statestore.h:343
const TopicEntry::Version LastTopicVersionProcessed(const TopicId &topic_id) const
Definition: statestore.cc:199
const TopicEntry::Version GetMinSubscriberTopicVersion(const TopicId &topic_id, SubscriberId *subscriber_id=NULL)
Must be called holding the subscribers_ lock.
Definition: statestore.cc:556
ClientConnection< StatestoreSubscriberClient > StatestoreSubscriberConnection
Definition: statestore.cc:99
bool ok() const
Definition: status.h:172
Status SendHeartbeat(Subscriber *subscriber)
Definition: statestore.cc:587
const TUniqueId & registration_id() const
Definition: statestore.h:289
const string STATESTORE_HEARTBEAT_DURATION
Definition: statestore.cc:81