17 #include <boost/foreach.hpp>
18 #include <boost/thread.hpp>
19 #include <thrift/Thrift.h>
20 #include <gutil/strings/substitute.h>
23 #include "gen-cpp/StatestoreService_types.h"
33 using namespace apache::thrift;
34 using namespace impala;
35 using namespace rapidjson;
36 using namespace strings;
38 DEFINE_int32(statestore_max_missed_heartbeats, 10,
"Maximum number of consecutive "
39 "heartbeat messages an impalad can miss before being declared failed by the "
42 DEFINE_int32(statestore_num_update_threads, 10,
"(Advanced) Number of threads used to "
43 " send topic updates in parallel to all registered subscribers.");
44 DEFINE_int32(statestore_update_frequency_ms, 2000,
"(Advanced) Frequency (in ms) with"
45 " which the statestore sends topic updates to subscribers.");
47 DEFINE_int32(statestore_num_heartbeat_threads, 10,
"(Advanced) Number of threads used to "
48 " send heartbeats in parallel to all registered subscribers.");
49 DEFINE_int32(statestore_heartbeat_frequency_ms, 1000,
"(Advanced) Frequency (in ms) with"
50 " which the statestore sends heartbeat heartbeats to subscribers.");
52 DEFINE_int32(state_store_port, 24000,
"port where StatestoreService is running");
54 DEFINE_int32(statestore_heartbeat_tcp_timeout_seconds, 3,
"(Advanced) The time after "
55 "which a heartbeat RPC to a subscriber will timeout. This setting protects against "
56 "badly hung machines that are not able to respond to the heartbeat RPC in short "
68 DEFINE_int32(statestore_update_tcp_timeout_seconds, 300,
"(Advanced) The time after "
69 "which an update RPC to a subscriber will timeout. This setting protects against "
70 "badly hung machines that are not able to respond to the update RPC in short "
104 : statestore_(statestore) {
105 DCHECK(statestore_ != NULL);
109 const TRegisterSubscriberRequest& params) {
110 TUniqueId registration_id;
111 Status status = statestore_->RegisterSubscriber(params.subscriber_id,
112 params.subscriber_location, params.topic_registrations, ®istration_id);
114 response.__set_registration_id(registration_id);
129 TopicEntryMap::iterator entry_it = entries_.find(key);
130 int64_t key_size_delta = 0L;
131 int64_t value_size_delta = 0L;
132 if (entry_it == entries_.end()) {
133 entry_it = entries_.insert(make_pair(key,
TopicEntry())).first;
134 key_size_delta += key.size();
139 topic_update_log_.erase(entry_it->second.version());
140 value_size_delta -= entry_it->second.value().size();
142 value_size_delta += bytes.size();
144 entry_it->second.SetValue(bytes, ++last_version_);
145 topic_update_log_.insert(make_pair(entry_it->second.version(), key));
147 total_key_size_bytes_ += key_size_delta;
148 total_value_size_bytes_ += value_size_delta;
149 DCHECK_GE(total_key_size_bytes_, 0L);
150 DCHECK_GE(total_value_size_bytes_, 0L);
151 key_size_metric_->Increment(key_size_delta);
152 value_size_metric_->Increment(value_size_delta);
153 topic_size_metric_->Increment(key_size_delta + value_size_delta);
155 return entry_it->second.version();
160 TopicEntryMap::iterator entry_it = entries_.find(key);
161 if (entry_it != entries_.end() && entry_it->second.version() == version) {
164 topic_update_log_.erase(version);
165 topic_update_log_.insert(make_pair(++last_version_, key));
166 total_value_size_bytes_ -= entry_it->second.value().size();
167 DCHECK_GE(total_value_size_bytes_, 0L);
169 value_size_metric_->Increment(entry_it->second.value().size());
170 topic_size_metric_->Increment(entry_it->second.value().size());
176 const TUniqueId& registration_id,
const TNetworkAddress& network_address,
177 const vector<TTopicRegistration>& subscribed_topics)
178 : subscriber_id_(subscriber_id),
179 registration_id_(registration_id),
180 network_address_(network_address) {
181 BOOST_FOREACH(
const TTopicRegistration& topic, subscribed_topics) {
192 const Topics::const_iterator topic_it = subscribed_topics_.find(topic_id);
193 DCHECK(topic_it != subscribed_topics_.end());
194 if (topic_it->second.is_transient ==
true) {
195 transient_entries_[make_pair(topic_id, topic_key)] = version;
200 const TopicId& topic_id)
const {
201 Topics::const_iterator itr = subscribed_topics_.find(topic_id);
202 return itr == subscribed_topics_.end() ?
203 TOPIC_INITIAL_VERSION : itr->second.last_version;
208 subscribed_topics_[topic_id].last_version = version;
214 "subscriber-update-worker",
215 FLAGS_statestore_num_update_threads,
219 "subscriber-heartbeat-worker",
220 FLAGS_statestore_num_heartbeat_threads,
224 FLAGS_statestore_update_tcp_timeout_seconds * 1000,
225 FLAGS_statestore_update_tcp_timeout_seconds * 1000)),
227 FLAGS_statestore_heartbeat_tcp_timeout_seconds * 1000,
228 FLAGS_statestore_heartbeat_tcp_timeout_seconds * 1000)),
231 FLAGS_statestore_max_missed_heartbeats,
232 FLAGS_statestore_max_missed_heartbeats / 2)) {
234 DCHECK(metrics != NULL);
262 subscribers_callback);
266 Document* document) {
270 Value topics(kArrayType);
272 BOOST_FOREACH(
const TopicMap::value_type& topic,
topics_) {
273 Value topic_json(kObjectType);
275 Value topic_id(topic.second.id().c_str(), document->GetAllocator());
276 topic_json.AddMember(
"topic_id", topic_id, document->GetAllocator());
277 topic_json.AddMember(
"num_entries", topic.second.entries().size(),
278 document->GetAllocator());
279 topic_json.AddMember(
"version", topic.second.last_version(), document->GetAllocator());
285 topic_json.AddMember(
"oldest_version", oldest_subscriber_version,
286 document->GetAllocator());
287 Value oldest_id(oldest_subscriber_id.c_str(), document->GetAllocator());
288 topic_json.AddMember(
"oldest_id", oldest_id, document->GetAllocator());
290 int64_t key_size = topic.second.total_key_size_bytes();
291 int64_t value_size = topic.second.total_value_size_bytes();
293 document->GetAllocator());
294 topic_json.AddMember(
"key_size", key_size_json, document->GetAllocator());
296 document->GetAllocator());
297 topic_json.AddMember(
"value_size", value_size_json, document->GetAllocator());
298 Value total_size_json(
300 document->GetAllocator());
301 topic_json.AddMember(
"total_size", total_size_json, document->GetAllocator());
302 topics.PushBack(topic_json, document->GetAllocator());
304 document->AddMember(
"topics", topics, document->GetAllocator());
308 Document* document) {
310 Value subscribers(kArrayType);
311 BOOST_FOREACH(
const SubscriberMap::value_type& subscriber,
subscribers_) {
312 Value sub_json(kObjectType);
314 Value subscriber_id(subscriber.second->id().c_str(), document->GetAllocator());
315 sub_json.AddMember(
"id", subscriber_id, document->GetAllocator());
317 Value address(lexical_cast<string>(subscriber.second->network_address()).c_str(),
318 document->GetAllocator());
319 sub_json.AddMember(
"address", address, document->GetAllocator());
321 sub_json.AddMember(
"num_topics", subscriber.second->subscribed_topics().size(),
322 document->GetAllocator());
323 sub_json.AddMember(
"num_transient", subscriber.second->transient_entries().size(),
324 document->GetAllocator());
326 Value registration_id(
PrintId(subscriber.second->registration_id()).c_str(),
327 document->GetAllocator());
328 sub_json.AddMember(
"registration_id", registration_id, document->GetAllocator());
330 subscribers.PushBack(sub_json, document->GetAllocator());
332 document->AddMember(
"subscribers", subscribers, document->GetAllocator());
338 || !threadpool->
Offer(update)) {
342 SubscriberMap::iterator subscriber_it =
subscribers_.find(update.second);
345 LOG(ERROR) << ss.str();
353 const TNetworkAddress& location,
354 const vector<TTopicRegistration>& topic_registrations, TUniqueId* registration_id) {
355 if (subscriber_id.empty())
return Status(
"Subscriber ID cannot be empty string");
361 BOOST_FOREACH(
const TTopicRegistration& topic, topic_registrations) {
362 TopicMap::iterator topic_it =
topics_.find(topic.topic_name);
363 if (topic_it ==
topics_.end()) {
364 LOG(INFO) <<
"Creating new topic: ''" << topic.topic_name
365 <<
"' on behalf of subscriber: '" << subscriber_id;
366 topics_.insert(make_pair(topic.topic_name,
Topic(topic.topic_name,
371 LOG(INFO) <<
"Registering: " << subscriber_id;
374 SubscriberMap::iterator subscriber_it =
subscribers_.find(subscriber_id);
380 shared_ptr<Subscriber> current_registration(
381 new Subscriber(subscriber_id, *registration_id, location, topic_registrations));
382 subscribers_.insert(make_pair(subscriber_id, current_registration));
384 PrintId(current_registration->registration_id()),
true);
394 LOG(INFO) <<
"Subscriber '" << subscriber_id <<
"' registered (registration id: "
395 <<
PrintId(*registration_id) <<
")";
406 TUpdateStateRequest update_state_request;
411 update_state_request.__set_registration_id(subscriber->
registration_id());
419 TUpdateStateResponse response;
421 &StatestoreSubscriberClient::UpdateState, update_state_request, &response));
423 status =
Status(response.status);
429 *update_skipped = (response.__isset.skipped && response.skipped);
430 if (*update_skipped) {
440 map<TopicEntryKey, TTopicDelta>::const_iterator topic_delta =
441 update_state_request.topic_deltas.begin();
442 for (; topic_delta != update_state_request.topic_deltas.end(); ++topic_delta) {
444 topic_delta->second.to_version);
450 BOOST_FOREACH(
const TTopicDelta& update, response.topic_updates) {
451 TopicMap::iterator topic_it =
topics_.find(update.topic_name);
452 if (topic_it ==
topics_.end()) {
453 VLOG(1) <<
"Received update for unexpected topic:" << update.topic_name;
460 if (update.__isset.from_version) {
461 LOG(INFO) <<
"Received request for different delta base of topic: "
462 << update.topic_name <<
" from: " << subscriber->
id()
463 <<
" subscriber from_version: " << update.from_version;
467 Topic* topic = &topic_it->second;
468 BOOST_FOREACH(
const TTopicItem& item, update.topic_entries) {
473 BOOST_FOREACH(
const string& key, update.topic_deletions) {
485 TUpdateStateRequest* update_state_request) {
488 BOOST_FOREACH(
const Subscriber::Topics::value_type& subscribed_topic,
490 TopicMap::const_iterator topic_it =
topics_.find(subscribed_topic.first);
491 DCHECK(topic_it !=
topics_.end());
495 const Topic& topic = topic_it->second;
497 TTopicDelta& topic_delta =
498 update_state_request->topic_deltas[subscribed_topic.first];
499 topic_delta.topic_name = subscribed_topic.first;
505 topic_delta.__set_from_version(last_processed_version);
507 if (!topic_delta.is_delta &&
511 VLOG_QUERY <<
"Preparing initial " << topic_delta.topic_name
512 <<
" topic update for " << subscriber.
id() <<
". Size = "
516 TopicUpdateLog::const_iterator next_update =
520 TopicEntryMap::const_iterator itr = topic.
entries().find(next_update->second);
521 DCHECK(itr != topic.
entries().end());
524 topic_delta.topic_deletions.push_back(itr->first);
526 topic_delta.topic_entries.push_back(TTopicItem());
527 TTopicItem& topic_item = topic_delta.topic_entries.back();
528 topic_item.key = itr->first;
530 topic_item.
value = topic_entry.value();
540 topic_delta.__set_to_version(Subscriber::TOPIC_INITIAL_VERSION);
548 typedef map<TopicId, TTopicDelta> TopicDeltaMap;
549 BOOST_FOREACH(TopicDeltaMap::value_type& topic_delta,
550 update_state_request->topic_deltas) {
551 topic_delta.second.__set_min_subscriber_topic_version(
561 BOOST_FOREACH(
const SubscriberMap::value_type& subscriber,
subscribers_) {
562 if (subscriber.second->subscribed_topics().find(topic_id) !=
563 subscriber.second->subscribed_topics().end()) {
566 subscriber.second->LastTopicVersionProcessed(topic_id);
567 if (last_processed_version < min_topic_version) {
568 min_topic_version = last_processed_version;
569 if (subscriber_id != NULL) *subscriber_id = subscriber.second->id();
596 THeartbeatRequest request;
597 THeartbeatResponse response;
600 client.DoRpc(&StatestoreSubscriberClient::Heartbeat, request, &response));
608 int64_t update_deadline = update.first;
609 const string hb_type = is_heartbeat ?
"heartbeat" :
"topic update";
610 if (update_deadline != 0L) {
612 int64_t diff_ms = update_deadline -
UnixMillis();
613 while (diff_ms > 0) {
617 diff_ms =
abs(diff_ms);
618 VLOG(3) <<
"Sending " << hb_type <<
" message to: " << update.second
619 <<
" (deadline accuracy: " << diff_ms <<
"ms)";
624 const string& msg = Substitute(
"Missed subscriber ($0) $1 deadline by $2ms, "
625 "consider increasing --$3 (currently $4)", update.second, hb_type, diff_ms,
626 is_heartbeat ?
"statestore_heartbeat_frequency_ms" :
627 "statestore_update_frequency_ms",
628 is_heartbeat ? FLAGS_statestore_heartbeat_frequency_ms :
629 FLAGS_statestore_update_frequency_ms);
639 VLOG(3) <<
"Initial " << hb_type <<
" message for: " << update.second;
641 shared_ptr<Subscriber> subscriber;
644 SubscriberMap::iterator it =
subscribers_.find(update.second);
646 subscriber = it->second;
649 int64_t deadline_ms = 0;
653 if (status.
code() == TErrorCode::RPC_TIMEOUT) {
656 "Subscriber $0 ($1) timed-out during heartbeat RPC. Timeout is $2s.",
657 subscriber->id(), lexical_cast<
string>(subscriber->network_address()),
658 FLAGS_statestore_heartbeat_tcp_timeout_seconds)));
661 deadline_ms =
UnixMillis() + FLAGS_statestore_heartbeat_frequency_ms;
665 if (status.
code() == TErrorCode::RPC_TIMEOUT) {
668 "Subscriber $0 ($1) timed-out during topic-update RPC. Timeout is $2s.",
669 subscriber->id(), lexical_cast<
string>(subscriber->network_address()),
670 FLAGS_statestore_update_tcp_timeout_seconds)));
674 int64_t update_interval = update_skipped ?
675 (2 * FLAGS_statestore_update_frequency_ms) :
676 FLAGS_statestore_update_frequency_ms;
684 SubscriberMap::iterator it =
subscribers_.find(update.second);
687 LOG(INFO) <<
"Unable to send " << hb_type <<
" message to subscriber "
688 << update.second <<
", received error: " << status.
GetDetail();
691 const string& registration_id =
PrintId(subscriber->registration_id());
699 LOG(INFO) <<
"Subscriber '" << subscriber->id() <<
"' has failed, disconnected "
700 <<
"or re-registered (last known registration ID: " << update.second
706 VLOG(3) <<
"Next " << (is_heartbeat ?
"heartbeat" :
"update") <<
" deadline for: "
707 << subscriber->id() <<
" is in " << deadline_ms <<
"ms";
708 OfferUpdate(make_pair(deadline_ms, subscriber->id()), is_heartbeat ?
715 SubscriberMap::const_iterator it =
subscribers_.find(subscriber->
id());
731 BOOST_FOREACH(Statestore::Subscriber::TransientEntryMap::value_type entry,
733 Statestore::TopicMap::iterator topic_it =
topics_.find(entry.first.first);
734 DCHECK(topic_it !=
topics_.end());
735 topic_it->second.DeleteIfVersionsMatch(entry.second,
Status RegisterSubscriber(const SubscriberId &subscriber_id, const TNetworkAddress &location, const std::vector< TTopicRegistration > &topic_registrations, TUniqueId *registration_id)
boost::scoped_ptr< ClientCache< StatestoreSubscriberClient > > heartbeat_client_cache_
static const Value NULL_VALUE
Representation of an empty Value. Must have size() == 0.
const TNetworkAddress & network_address() const
IntGauge * topic_size_metric_
const string STATESTORE_TOTAL_TOPIC_SIZE_BYTES
const std::string GetDetail() const
std::pair< int64_t, SubscriberId > ScheduledSubscriberUpdate
int64_t total_value_size_bytes() const
void Remove(const T &item)
Remove an item from this set by value.
std::string Value
A Value is a string of bytes, for which std::string is a convenient representation.
Statestore(MetricGroup *metrics)
The only constructor; initialises member variables only.
int128_t abs(const int128_t &x)
boost::function< void(const ArgumentMap &args, rapidjson::Document *json)> UrlCallback
void RegisterWebpages(Webserver *webserver)
const TopicUpdateLog & topic_update_log() const
#define RETURN_IF_ERROR(stmt)
some generally useful macros
M * RegisterMetric(M *metric)
const Value & value() const
MetricGroups may be organised hierarchically as a tree.
void RegisterUrlCallback(const std::string &path, const std::string &template_filename, const UrlCallback &callback, bool is_on_nav_bar=true)
Only one callback may be registered per URL.
void Add(const T &item)
Put an item in this set.
boost::scoped_ptr< ClientCache< StatestoreSubscriberClient > > update_state_client_cache_
const string STATESTORE_TOTAL_KEY_SIZE_BYTES
void TopicsHandler(const Webserver::ArgumentMap &args, rapidjson::Document *document)
std::string TopicEntryKey
A TopicEntryKey uniquely identifies a single entry in a topic.
DEFINE_int32(statestore_max_missed_heartbeats, 10,"Maximum number of consecutive ""heartbeat messages an impalad can miss before being declared failed by the ""statestore.")
SetMetric< std::string > * subscriber_set_metric_
const string STATESTORE_LIVE_SUBSCRIBERS_LIST
const SubscriberId & id() const
string PrintId(const TUniqueId &id, const string &separator)
const TransientEntryMap & transient_entries() const
void SleepForMs(const int64_t duration_ms)
Sleeps the current thread for at least duration_ms milliseconds.
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
std::map< std::string, std::string > ArgumentMap
const TopicEntryMap & entries() const
bool Offer(const T &work)
StatsMetric< double > * heartbeat_duration_metric_
Same as above, but for SendHeartbeat() RPCs.
static const TopicEntry::Version TOPIC_INITIAL_VERSION
The Version value used to initialize new Topic subscriptions for this Subscriber. ...
bool ShouldExit()
True if the shutdown flag has been set true, false otherwise.
boost::uuids::random_generator subscriber_uuid_generator_
Used to generated unique IDs for each new registration.
ThreadPool< ScheduledSubscriberUpdate > subscriber_topic_update_threadpool_
std::string TopicId
A TopicId uniquely identifies a single topic.
virtual void RegisterSubscriber(TRegisterSubscriberResponse &response, const TRegisterSubscriberRequest ¶ms)
Status SendTopicUpdate(Subscriber *subscriber, bool *update_skipped)
void AddTransientUpdate(const TopicId &topic_id, const TopicEntryKey &topic_key, TopicEntry::Version version)
IntGauge * num_subscribers_metric_
Metric that track the registered, non-failed subscribers.
void SubscribersHandler(const Webserver::ArgumentMap &args, rapidjson::Document *document)
TErrorCode::type code() const
IntGauge * value_size_metric_
boost::mutex exit_flag_lock_
int64_t total_key_size_bytes() const
StatsMetric< double > * topic_update_duration_metric_
void ToThrift(TStatus *status) const
Convert into TStatus.
IntGauge * key_size_metric_
Metrics shared across all topics to sum the size in bytes of keys, values and both.
void SetLastTopicVersionProcessed(const TopicId &topic_id, TopicEntry::Version version)
boost::shared_ptr< StatestoreServiceIf > thrift_iface_
Thrift API implementation which proxies requests onto this Statestore.
void Update(const T &value)
Metric whose value is a set of items.
TopicEntry::Version last_version() const
void GatherTopicUpdates(const Subscriber &subscriber, TUpdateStateRequest *update_state_request)
boost::scoped_ptr< MissedHeartbeatFailureDetector > failure_detector_
static int64_t NULL_VALUE[]
void UnregisterSubscriber(Subscriber *subscriber)
const string STATESTORE_TOTAL_VALUE_SIZE_BYTES
TopicEntry::Version last_version
const uint32_t DEADLINE_MISS_THRESHOLD_MS
const string STATESTORE_UPDATE_DURATION
uint64_t ElapsedTime() const
Returns time in nanosecond.
const int32_t STATESTORE_MAX_SUBSCRIBERS
uint32_t GetQueueSize() const
SubscriberMap subscribers_
TopicEntry::Version Put(const TopicEntryKey &key, const TopicEntry::Value &bytes)
Must be called holding the topic lock.
const string STATESTORE_LIVE_SUBSCRIBERS
StatestoreThriftIf(Statestore *statestore)
Status MainLoop()
The main processing loop. Blocks until the exit flag is set.
void SetErrorMsg(const ErrorMsg &m)
SimpleMetric< T > * AddGauge(const std::string &key, const T &value, const TUnit::type unit=TUnit::NONE, const std::string &description="")
Create a gauge metric object with given key and initial value (owned by this object) ...
void UUIDToTUniqueId(const boost::uuids::uuid &uuid, T *unique_id)
void DoSubscriberUpdate(bool is_heartbeat, int thread_id, const ScheduledSubscriberUpdate &update)
boost::mutex topic_lock_
Controls access to topics_. Cannot take subscribers_lock_ after acquiring this lock.
ThreadPool< ScheduledSubscriberUpdate > subscriber_heartbeat_threadpool_
const Topics & subscribed_topics() const
Status OfferUpdate(const ScheduledSubscriberUpdate &update, ThreadPool< ScheduledSubscriberUpdate > *thread_pool)
boost::mutex subscribers_lock_
const TopicEntry::Version LastTopicVersionProcessed(const TopicId &topic_id) const
const TopicEntry::Version GetMinSubscriberTopicVersion(const TopicId &topic_id, SubscriberId *subscriber_id=NULL)
Must be called holding the subscribers_ lock.
ClientConnection< StatestoreSubscriberClient > StatestoreSubscriberConnection
Topics subscribed_topics_
Status SendHeartbeat(Subscriber *subscriber)
const TUniqueId & registration_id() const
const string STATESTORE_HEARTBEAT_DURATION