Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
|
#include <statestore.h>
Classes | |
class | Subscriber |
class | Topic |
class | TopicEntry |
Public Types | |
typedef std::string | SubscriberId |
typedef std::string | TopicId |
A TopicId uniquely identifies a single topic. More... | |
typedef std::string | TopicEntryKey |
A TopicEntryKey uniquely identifies a single entry in a topic. More... | |
Public Member Functions | |
Statestore (MetricGroup *metrics) | |
The only constructor; initialises member variables only. More... | |
Status | RegisterSubscriber (const SubscriberId &subscriber_id, const TNetworkAddress &location, const std::vector< TTopicRegistration > &topic_registrations, TUniqueId *registration_id) |
void | RegisterWebpages (Webserver *webserver) |
Status | MainLoop () |
The main processing loop. Blocks until the exit flag is set. More... | |
const boost::shared_ptr < StatestoreServiceIf > & | thrift_iface () const |
Returns the Thrift API interface that proxies requests onto the local Statestore. More... | |
void | SetExitFlag () |
Private Types | |
typedef boost::unordered_map < TopicEntryKey, TopicEntry > | TopicEntryMap |
Map from TopicEntryKey to TopicEntry, maintained by a Topic object. More... | |
typedef std::map < TopicEntry::Version, TopicEntryKey > | TopicUpdateLog |
typedef boost::unordered_map < TopicId, Topic > | TopicMap |
The entire set of topics tracked by the statestore. More... | |
typedef boost::unordered_map < SubscriberId, boost::shared_ptr< Subscriber > > | SubscriberMap |
typedef std::pair< int64_t, SubscriberId > | ScheduledSubscriberUpdate |
Private Member Functions | |
Status | OfferUpdate (const ScheduledSubscriberUpdate &update, ThreadPool< ScheduledSubscriberUpdate > *thread_pool) |
void | DoSubscriberUpdate (bool is_heartbeat, int thread_id, const ScheduledSubscriberUpdate &update) |
Status | SendTopicUpdate (Subscriber *subscriber, bool *update_skipped) |
Status | SendHeartbeat (Subscriber *subscriber) |
void | UnregisterSubscriber (Subscriber *subscriber) |
void | GatherTopicUpdates (const Subscriber &subscriber, TUpdateStateRequest *update_state_request) |
const TopicEntry::Version | GetMinSubscriberTopicVersion (const TopicId &topic_id, SubscriberId *subscriber_id=NULL) |
Must be called holding the subscribers_ lock. More... | |
bool | ShouldExit () |
True if the shutdown flag has been set true, false otherwise. More... | |
void | TopicsHandler (const Webserver::ArgumentMap &args, rapidjson::Document *document) |
void | SubscribersHandler (const Webserver::ArgumentMap &args, rapidjson::Document *document) |
Private Attributes | |
boost::mutex | exit_flag_lock_ |
bool | exit_flag_ |
boost::mutex | topic_lock_ |
Controls access to topics_. Cannot take subscribers_lock_ after acquiring this lock. More... | |
TopicMap | topics_ |
boost::mutex | subscribers_lock_ |
SubscriberMap | subscribers_ |
boost::uuids::random_generator | subscriber_uuid_generator_ |
Used to generated unique IDs for each new registration. More... | |
ThreadPool < ScheduledSubscriberUpdate > | subscriber_topic_update_threadpool_ |
ThreadPool < ScheduledSubscriberUpdate > | subscriber_heartbeat_threadpool_ |
boost::scoped_ptr< ClientCache < StatestoreSubscriberClient > > | update_state_client_cache_ |
boost::scoped_ptr< ClientCache < StatestoreSubscriberClient > > | heartbeat_client_cache_ |
boost::shared_ptr < StatestoreServiceIf > | thrift_iface_ |
Thrift API implementation which proxies requests onto this Statestore. More... | |
boost::scoped_ptr < MissedHeartbeatFailureDetector > | failure_detector_ |
IntGauge * | num_subscribers_metric_ |
Metric that track the registered, non-failed subscribers. More... | |
SetMetric< std::string > * | subscriber_set_metric_ |
IntGauge * | key_size_metric_ |
Metrics shared across all topics to sum the size in bytes of keys, values and both. More... | |
IntGauge * | value_size_metric_ |
IntGauge * | topic_size_metric_ |
StatsMetric< double > * | topic_update_duration_metric_ |
StatsMetric< double > * | heartbeat_duration_metric_ |
Same as above, but for SendHeartbeat() RPCs. More... | |
The Statestore is a soft-state key-value store that maintains a set of Topics, which are maps from string keys to byte array values. Topics are subscribed to by subscribers, which are remote clients of the statestore which express an interest in some set of Topics. The statestore sends topic updates to subscribers via periodic 'update' messages, and also sends periodic 'heartbeat' messages, which are used to detect the liveness of a subscriber. In response to 'update' messages, subscribers, send topic updates to the statestore to merge with the current topic. These updates are then sent to all other subscribers in their next update message. The next message is scheduled for FLAGS_statestore_update_frequency_ms in the future, unless the subscriber indicated that it skipped processing an update, in which case the statestore will back off slightly before re-sending the same update. Topic entries usually have human-readable keys, and values which are some serialised representation of a data structure, e.g. a Thrift struct. The contents of a value's byte string is opaque to the statestore, which maintains no information about how to deserialise it. Subscribers must use convention to interpret each other's updates. A subscriber may have marked some updates that it made as 'transient', which implies that those entries should be deleted once the subscriber is no longer connected (this is judged by the statestore's failure-detector, which will mark a subscriber as failed when it has not responded to a number of successive heartbeat messages). Transience is tracked per-topic-per-subscriber, so two different subscribers may treat the same topic differently wrt to the transience of their updates. The statestore tracks the history of updates to each topic, with each topic update getting a sequentially increasing version number that is unique across the topic. Subscribers also track the max version of each topic which they have have successfully processed. The statestore can use this information to send a delta of updates to a subscriber, rather than all items in the topic. For non-delta updates, the statestore will send an update that includes all values in the topic.
Definition at line 79 of file statestore.h.
|
private |
Work item passed to both kinds of subscriber update threads. First entry is the earliest time (in microseconds since epoch) that the next message should be sent, the second entry is the subscriber to send it to.
Definition at line 363 of file statestore.h.
typedef std::string impala::Statestore::SubscriberId |
A SubscriberId uniquely identifies a single subscriber, and is provided by the subscriber at registration time.
Definition at line 83 of file statestore.h.
|
private |
Map of subscribers currently connected; upon failure their entry is removed from this map. Subscribers must only be removed by UnregisterSubscriber() which ensures that the correct cleanup is done. If a subscriber re-registers, it must be unregistered prior to re-entry into this map. Subscribers are held in shared_ptrs so that RegisterSubscriber() may overwrite their entry in this map while UpdateSubscriber() tries to update an existing registration without risk of use-after-free.
Definition at line 354 of file statestore.h.
typedef std::string impala::Statestore::TopicEntryKey |
A TopicEntryKey uniquely identifies a single entry in a topic.
Definition at line 89 of file statestore.h.
|
private |
Map from TopicEntryKey to TopicEntry, maintained by a Topic object.
Definition at line 167 of file statestore.h.
typedef std::string impala::Statestore::TopicId |
A TopicId uniquely identifies a single topic.
Definition at line 86 of file statestore.h.
|
private |
The entire set of topics tracked by the statestore.
Definition at line 257 of file statestore.h.
|
private |
Map from Version to TopicEntryKey, maintained by a Topic object. Effectively a log of the updates made to a Topic, ordered by version.
Definition at line 171 of file statestore.h.
Statestore::Statestore | ( | MetricGroup * | metrics | ) |
The only constructor; initialises member variables only.
Definition at line 211 of file statestore.cc.
References impala::MetricGroup::AddGauge(), heartbeat_client_cache_, heartbeat_duration_metric_, key_size_metric_, num_subscribers_metric_, impala::MetricGroup::RegisterMetric(), STATESTORE_HEARTBEAT_DURATION, STATESTORE_LIVE_SUBSCRIBERS, STATESTORE_LIVE_SUBSCRIBERS_LIST, STATESTORE_TOTAL_KEY_SIZE_BYTES, STATESTORE_TOTAL_TOPIC_SIZE_BYTES, STATESTORE_TOTAL_VALUE_SIZE_BYTES, STATESTORE_UPDATE_DURATION, subscriber_set_metric_, topic_size_metric_, topic_update_duration_metric_, update_state_client_cache_, and value_size_metric_.
|
private |
Sends either a heartbeat or topic update message to the subscriber in 'update' at the closest possible time to the first member of 'update'. If is_heartbeat is true, sends a heartbeat update, otherwise the set of pending topic updates is sent. Once complete, the next update is scheduled and added to the appropriate queue.
Definition at line 606 of file statestore.cc.
References impala::abs(), impala::Status::code(), DEADLINE_MISS_THRESHOLD_MS, impala::FailureDetector::FAILED, failure_detector_, impala::Status::GetDetail(), OfferUpdate(), impala::Status::ok(), impala::PrintId(), SendHeartbeat(), SendTopicUpdate(), impala::Status::SetErrorMsg(), impala::SleepForMs(), subscriber_heartbeat_threadpool_, subscriber_topic_update_threadpool_, subscribers_, subscribers_lock_, impala::UnixMillis(), UnregisterSubscriber(), and VLOG_QUERY.
|
private |
Populates a TUpdateStateRequest with the update state for this subscriber. Iterates over all updates in all subscribed topics, populating the given TUpdateStateRequest object. Takes the topic_lock_ and subscribers_lock_.
Definition at line 484 of file statestore.cc.
References impala::Statestore::Topic::entries(), GetMinSubscriberTopicVersion(), impala::Statestore::Subscriber::id(), impala::Statestore::Topic::last_version(), impala::Statestore::Subscriber::LastTopicVersionProcessed(), impala::Statestore::TopicEntry::NULL_VALUE, impala::PrettyPrinter::Print(), impala::Statestore::Subscriber::subscribed_topics(), subscribers_lock_, impala::Statestore::Subscriber::TOPIC_INITIAL_VERSION, topic_lock_, impala::Statestore::Topic::topic_update_log(), topics_, impala::Statestore::Topic::total_key_size_bytes(), impala::Statestore::Topic::total_value_size_bytes(), impala::Statestore::TopicEntry::value(), and VLOG_QUERY.
Referenced by SendTopicUpdate().
|
private |
Must be called holding the subscribers_ lock.
Returns the minimum last processed topic version across all subscribers for the given topic ID. Calculated by enumerating all subscribers and looking at their LastTopicVersionProcessed() for this topic. The value returned will always be <= topics_[topic_id].last_version_. Returns TOPIC_INITIAL_VERSION if no subscribers are registered to the topic. The subscriber ID to whom the min version belongs can also be retrieved using the optional subscriber_id output parameter. If multiple subscribers have the same min version, the subscriber_id may be set to any one of the matching subscribers.TODO: Update the min subscriber version only when a topic is updated, rather than each time a subscriber is updated. One way to do this would be to keep a priority queue in Topic of each subscriber's last processed version of the topic.
Definition at line 556 of file statestore.cc.
References subscribers_, and impala::Statestore::Subscriber::TOPIC_INITIAL_VERSION.
Referenced by GatherTopicUpdates(), and TopicsHandler().
Status Statestore::MainLoop | ( | ) |
The main processing loop. Blocks until the exit flag is set.
Returns OK unless there is an unrecoverable error.
Definition at line 743 of file statestore.cc.
References impala::ThreadPool< T >::Join(), impala::Status::OK, and subscriber_topic_update_threadpool_.
Referenced by impala::InProcessStatestore::Start().
|
private |
Utility method to add an update to the given thread pool, and to fail if the thread pool is already at capacity.
Definition at line 335 of file statestore.cc.
References impala::ThreadPool< T >::GetQueueSize(), impala::ThreadPool< T >::Offer(), impala::Status::OK, STATESTORE_MAX_SUBSCRIBERS, subscribers_, and subscribers_lock_.
Referenced by DoSubscriberUpdate(), and RegisterSubscriber().
Status Statestore::RegisterSubscriber | ( | const SubscriberId & | subscriber_id, |
const TNetworkAddress & | location, | ||
const std::vector< TTopicRegistration > & | topic_registrations, | ||
TUniqueId * | registration_id | ||
) |
Registers a new subscriber with the given unique subscriber ID, running a subscriber service at the given location, with the provided list of topic subscriptions. The registration_id output parameter is the unique ID for this registration, used to distinguish old registrations from new ones for the same subscriber. If a registration already exists for this subscriber, the old registration is removed and a new one is created. Subscribers may receive an update intended for the old registration, since one may be in flight when a new RegisterSubscriber() is received.
Definition at line 352 of file statestore.cc.
References impala::SetMetric< T >::Add(), failure_detector_, key_size_metric_, num_subscribers_metric_, OfferUpdate(), impala::Status::OK, impala::PrintId(), RETURN_IF_ERROR, subscriber_heartbeat_threadpool_, subscriber_set_metric_, subscriber_topic_update_threadpool_, subscriber_uuid_generator_, subscribers_, subscribers_lock_, topic_lock_, topic_size_metric_, topics_, UnregisterSubscriber(), impala::UUIDToTUniqueId(), and value_size_metric_.
void Statestore::RegisterWebpages | ( | Webserver * | webserver | ) |
Definition at line 253 of file statestore.cc.
References impala::Webserver::RegisterUrlCallback(), SubscribersHandler(), and TopicsHandler().
Referenced by main().
|
private |
Sends a heartbeat message to subscriber. Returns false if there was some error performing the RPC.
Definition at line 587 of file statestore.cc.
References impala::MonotonicStopWatch::ElapsedTime(), heartbeat_client_cache_, heartbeat_duration_metric_, impala::Statestore::Subscriber::network_address(), impala::Status::OK, impala::Statestore::Subscriber::registration_id(), RETURN_IF_ERROR, impala::MonotonicStopWatch::Start(), and impala::StatsMetric< T >::Update().
Referenced by DoSubscriberUpdate().
|
private |
Does the work of updating a single subscriber, by calling UpdateState() on the client to send a list of topic deltas to the subscriber. If that call fails (either because the RPC could not be completed, or the subscriber indicated an error), this method returns a non-OK status immediately without further processing. The subscriber may indicated that it skipped processing the message, either because it was not ready to do so or because it was busy. In that case, the UpdateState() RPC will return OK (since there was no error) and the output parameter update_skipped is set to true. Otherwise, any updates returned by the subscriber are applied to their target topics.
Definition at line 399 of file statestore.cc.
References impala::Statestore::Subscriber::AddTransientUpdate(), impala::MonotonicStopWatch::ElapsedTime(), GatherTopicUpdates(), impala::Statestore::Subscriber::id(), impala::Statestore::Subscriber::network_address(), impala::Statestore::TopicEntry::NULL_VALUE, impala::Status::OK, impala::Statestore::Topic::Put(), impala::Statestore::Subscriber::registration_id(), RETURN_IF_ERROR, impala::Statestore::Subscriber::SetLastTopicVersionProcessed(), impala::MonotonicStopWatch::Start(), topic_lock_, topic_update_duration_metric_, topics_, impala::StatsMetric< T >::Update(), and update_state_client_cache_.
Referenced by DoSubscriberUpdate().
void Statestore::SetExitFlag | ( | ) |
Tells the Statestore to shut down. Does not wait for the processing loop to exit before returning.
Definition at line 581 of file statestore.cc.
References exit_flag_, exit_flag_lock_, impala::ThreadPool< T >::Shutdown(), and subscriber_topic_update_threadpool_.
|
private |
True if the shutdown flag has been set true, false otherwise.
Definition at line 576 of file statestore.cc.
References exit_flag_, and exit_flag_lock_.
|
private |
Webpage handler: upon return 'document' will contain a list of subscribers as follows: "subscribers": [ { "id": "henry-impala:26000", "address": "henry-impala:23020", "num_topics": 1, "num_transient": 0, "registration_id": "414d28c84930d987:abcffd70b3346fb7" } ]
Definition at line 307 of file statestore.cc.
References impala::PrintId(), subscribers_, and subscribers_lock_.
Referenced by RegisterWebpages().
|
inline |
Returns the Thrift API interface that proxies requests onto the local Statestore.
Definition at line 115 of file statestore.h.
References thrift_iface_.
|
private |
Webpage handler: upon return, 'document' will contain a list of topics as follows: "topics": [ { "topic_id": "catalog-update", "num_entries": 1165, "version": 2476, "oldest_version": 2476, "oldest_id": "henry-impala:26000", "key_size": "42.94 KB", "value_size": "9.54 MB", "total_size": "9.58 MB" }, ]
Definition at line 265 of file statestore.cc.
References GetMinSubscriberTopicVersion(), impala::PrettyPrinter::Print(), subscribers_lock_, topic_lock_, and topics_.
Referenced by RegisterWebpages().
|
private |
Unregister a subscriber, removing all of its transient entries and evicting it from the subscriber map. Callers must hold subscribers_lock_ prior to calling this method.
Definition at line 714 of file statestore.cc.
References failure_detector_, heartbeat_client_cache_, impala::Statestore::Subscriber::id(), impala::Statestore::Subscriber::network_address(), num_subscribers_metric_, impala::PrintId(), impala::Statestore::Subscriber::registration_id(), impala::SetMetric< T >::Remove(), subscriber_set_metric_, subscribers_, topic_lock_, topics_, impala::Statestore::Subscriber::transient_entries(), and update_state_client_cache_.
Referenced by DoSubscriberUpdate(), and RegisterSubscriber().
|
private |
Definition at line 251 of file statestore.h.
Referenced by SetExitFlag(), and ShouldExit().
|
private |
Note on locking: Subscribers and Topics should be accessed under their own coarse locks, and worker threads will use worker_lock_ to ensure safe access to the subscriber work queue. Protects access to exit_flag_, but is used mostly to ensure visibility of updates between threads..
Definition at line 250 of file statestore.h.
Referenced by SetExitFlag(), and ShouldExit().
|
private |
Failure detector for subscribers. If a subscriber misses a configurable number of consecutive heartbeat messages, it is considered failed and a) its transient topic entries are removed and b) its entry in the subscriber map is erased.
Definition at line 413 of file statestore.h.
Referenced by DoSubscriberUpdate(), RegisterSubscriber(), and UnregisterSubscriber().
|
private |
Cache of subscriber clients used for Heartbeat() RPCs. Separate from update_state_client_cache_ because we enable TCP-level timeouts for these calls, whereas they are not safe for UpdateState() RPCs which can take an unbounded amount of time.
Definition at line 405 of file statestore.h.
Referenced by SendHeartbeat(), Statestore(), and UnregisterSubscriber().
|
private |
Same as above, but for SendHeartbeat() RPCs.
Definition at line 430 of file statestore.h.
Referenced by SendHeartbeat(), and Statestore().
|
private |
Metrics shared across all topics to sum the size in bytes of keys, values and both.
Definition at line 420 of file statestore.h.
Referenced by RegisterSubscriber(), and Statestore().
|
private |
Metric that track the registered, non-failed subscribers.
Definition at line 416 of file statestore.h.
Referenced by RegisterSubscriber(), Statestore(), and UnregisterSubscriber().
|
private |
Definition at line 395 of file statestore.h.
Referenced by DoSubscriberUpdate(), and RegisterSubscriber().
|
private |
Definition at line 417 of file statestore.h.
Referenced by RegisterSubscriber(), Statestore(), and UnregisterSubscriber().
|
private |
The statestore has two pools of threads that send messages to subscribers one-by-one. One pool deals with 'heartbeat' messages that update failure detection state, and the other pool sends 'topic update' messages which contain the actual topic data that a subscriber does not yet have. Each message is scheduled for some time in the future and each worker thread will sleep until that time has passed to rate-limit messages. Subscribers are placed back into the queue once they have been processed. A subscriber may have many entries in a queue, but no more than one for each registration associated with that subscriber. Since at most one registration is considered 'live' per subscriber, this guarantees that subscribers_.size() - 1 'live' subscribers ahead of any subscriber in the queue. Messages may be delayed for any number of reasons, including scheduler interference, lock unfairness when submitting to the thread pool and head-of-line blocking when threads are occupied sending messages to slow subscribers (subscribers are not guaranteed to be in the queue in next-update order). Delays for heartbeat messages can result in the subscriber that is kept waiting assuming that the statestore has failed. Correct configuration of heartbeat message frequency and subscriber timeout is therefore very important, and depends upon the cluster size. See –statestore_heartbeat_frequency_ms and –statestore_subscriber_timeout_seconds. We expect that the provided defaults will work up to clusters of several hundred nodes. Subscribers are therefore not processed in lock-step, and one subscriber may have seen many more messages than another during the same interval (if the second subscriber runs slow for any reason).
Definition at line 393 of file statestore.h.
Referenced by DoSubscriberUpdate(), MainLoop(), RegisterSubscriber(), and SetExitFlag().
|
private |
Used to generated unique IDs for each new registration.
Definition at line 358 of file statestore.h.
Referenced by RegisterSubscriber().
|
private |
Definition at line 355 of file statestore.h.
Referenced by DoSubscriberUpdate(), GetMinSubscriberTopicVersion(), OfferUpdate(), RegisterSubscriber(), SubscribersHandler(), and UnregisterSubscriber().
|
private |
Protects access to subscribers_ and subscriber_uuid_generator_. Must be taken before topic_lock_.
Definition at line 343 of file statestore.h.
Referenced by DoSubscriberUpdate(), GatherTopicUpdates(), OfferUpdate(), RegisterSubscriber(), SubscribersHandler(), and TopicsHandler().
|
private |
Thrift API implementation which proxies requests onto this Statestore.
Definition at line 408 of file statestore.h.
Referenced by thrift_iface().
|
private |
Controls access to topics_. Cannot take subscribers_lock_ after acquiring this lock.
Definition at line 254 of file statestore.h.
Referenced by GatherTopicUpdates(), RegisterSubscriber(), SendTopicUpdate(), TopicsHandler(), and UnregisterSubscriber().
|
private |
Definition at line 422 of file statestore.h.
Referenced by RegisterSubscriber(), and Statestore().
|
private |
Tracks the distribution of topic-update durations - precisely the time spent in calling the UpdateState() RPC which allows us to measure the network transmission cost as well as the subscriber-side processing time.
Definition at line 427 of file statestore.h.
Referenced by SendTopicUpdate(), and Statestore().
|
private |
Definition at line 258 of file statestore.h.
Referenced by GatherTopicUpdates(), RegisterSubscriber(), SendTopicUpdate(), TopicsHandler(), and UnregisterSubscriber().
|
private |
Cache of subscriber clients used for UpdateState() RPCs. Only one client per subscriber should be used, but the cache helps with the client lifecycle on failure.
Definition at line 399 of file statestore.h.
Referenced by SendTopicUpdate(), Statestore(), and UnregisterSubscriber().
|
private |
Definition at line 421 of file statestore.h.
Referenced by RegisterSubscriber(), and Statestore().