Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
impala::Statestore Class Reference

#include <statestore.h>

Collaboration diagram for impala::Statestore:

Classes

class  Subscriber
 
class  Topic
 
class  TopicEntry
 

Public Types

typedef std::string SubscriberId
 
typedef std::string TopicId
 A TopicId uniquely identifies a single topic. More...
 
typedef std::string TopicEntryKey
 A TopicEntryKey uniquely identifies a single entry in a topic. More...
 

Public Member Functions

 Statestore (MetricGroup *metrics)
 The only constructor; initialises member variables only. More...
 
Status RegisterSubscriber (const SubscriberId &subscriber_id, const TNetworkAddress &location, const std::vector< TTopicRegistration > &topic_registrations, TUniqueId *registration_id)
 
void RegisterWebpages (Webserver *webserver)
 
Status MainLoop ()
 The main processing loop. Blocks until the exit flag is set. More...
 
const boost::shared_ptr
< StatestoreServiceIf > & 
thrift_iface () const
 Returns the Thrift API interface that proxies requests onto the local Statestore. More...
 
void SetExitFlag ()
 

Private Types

typedef boost::unordered_map
< TopicEntryKey, TopicEntry
TopicEntryMap
 Map from TopicEntryKey to TopicEntry, maintained by a Topic object. More...
 
typedef std::map
< TopicEntry::Version,
TopicEntryKey
TopicUpdateLog
 
typedef boost::unordered_map
< TopicId, Topic
TopicMap
 The entire set of topics tracked by the statestore. More...
 
typedef boost::unordered_map
< SubscriberId,
boost::shared_ptr< Subscriber > > 
SubscriberMap
 
typedef std::pair< int64_t,
SubscriberId
ScheduledSubscriberUpdate
 

Private Member Functions

Status OfferUpdate (const ScheduledSubscriberUpdate &update, ThreadPool< ScheduledSubscriberUpdate > *thread_pool)
 
void DoSubscriberUpdate (bool is_heartbeat, int thread_id, const ScheduledSubscriberUpdate &update)
 
Status SendTopicUpdate (Subscriber *subscriber, bool *update_skipped)
 
Status SendHeartbeat (Subscriber *subscriber)
 
void UnregisterSubscriber (Subscriber *subscriber)
 
void GatherTopicUpdates (const Subscriber &subscriber, TUpdateStateRequest *update_state_request)
 
const TopicEntry::Version GetMinSubscriberTopicVersion (const TopicId &topic_id, SubscriberId *subscriber_id=NULL)
 Must be called holding the subscribers_ lock. More...
 
bool ShouldExit ()
 True if the shutdown flag has been set true, false otherwise. More...
 
void TopicsHandler (const Webserver::ArgumentMap &args, rapidjson::Document *document)
 
void SubscribersHandler (const Webserver::ArgumentMap &args, rapidjson::Document *document)
 

Private Attributes

boost::mutex exit_flag_lock_
 
bool exit_flag_
 
boost::mutex topic_lock_
 Controls access to topics_. Cannot take subscribers_lock_ after acquiring this lock. More...
 
TopicMap topics_
 
boost::mutex subscribers_lock_
 
SubscriberMap subscribers_
 
boost::uuids::random_generator subscriber_uuid_generator_
 Used to generated unique IDs for each new registration. More...
 
ThreadPool
< ScheduledSubscriberUpdate
subscriber_topic_update_threadpool_
 
ThreadPool
< ScheduledSubscriberUpdate
subscriber_heartbeat_threadpool_
 
boost::scoped_ptr< ClientCache
< StatestoreSubscriberClient > > 
update_state_client_cache_
 
boost::scoped_ptr< ClientCache
< StatestoreSubscriberClient > > 
heartbeat_client_cache_
 
boost::shared_ptr
< StatestoreServiceIf
thrift_iface_
 Thrift API implementation which proxies requests onto this Statestore. More...
 
boost::scoped_ptr
< MissedHeartbeatFailureDetector
failure_detector_
 
IntGaugenum_subscribers_metric_
 Metric that track the registered, non-failed subscribers. More...
 
SetMetric< std::string > * subscriber_set_metric_
 
IntGaugekey_size_metric_
 Metrics shared across all topics to sum the size in bytes of keys, values and both. More...
 
IntGaugevalue_size_metric_
 
IntGaugetopic_size_metric_
 
StatsMetric< double > * topic_update_duration_metric_
 
StatsMetric< double > * heartbeat_duration_metric_
 Same as above, but for SendHeartbeat() RPCs. More...
 

Detailed Description

The Statestore is a soft-state key-value store that maintains a set of Topics, which are maps from string keys to byte array values. Topics are subscribed to by subscribers, which are remote clients of the statestore which express an interest in some set of Topics. The statestore sends topic updates to subscribers via periodic 'update' messages, and also sends periodic 'heartbeat' messages, which are used to detect the liveness of a subscriber. In response to 'update' messages, subscribers, send topic updates to the statestore to merge with the current topic. These updates are then sent to all other subscribers in their next update message. The next message is scheduled for FLAGS_statestore_update_frequency_ms in the future, unless the subscriber indicated that it skipped processing an update, in which case the statestore will back off slightly before re-sending the same update. Topic entries usually have human-readable keys, and values which are some serialised representation of a data structure, e.g. a Thrift struct. The contents of a value's byte string is opaque to the statestore, which maintains no information about how to deserialise it. Subscribers must use convention to interpret each other's updates. A subscriber may have marked some updates that it made as 'transient', which implies that those entries should be deleted once the subscriber is no longer connected (this is judged by the statestore's failure-detector, which will mark a subscriber as failed when it has not responded to a number of successive heartbeat messages). Transience is tracked per-topic-per-subscriber, so two different subscribers may treat the same topic differently wrt to the transience of their updates. The statestore tracks the history of updates to each topic, with each topic update getting a sequentially increasing version number that is unique across the topic. Subscribers also track the max version of each topic which they have have successfully processed. The statestore can use this information to send a delta of updates to a subscriber, rather than all items in the topic. For non-delta updates, the statestore will send an update that includes all values in the topic.

Definition at line 79 of file statestore.h.

Member Typedef Documentation

typedef std::pair<int64_t, SubscriberId> impala::Statestore::ScheduledSubscriberUpdate
private

Work item passed to both kinds of subscriber update threads. First entry is the earliest time (in microseconds since epoch) that the next message should be sent, the second entry is the subscriber to send it to.

Definition at line 363 of file statestore.h.

typedef std::string impala::Statestore::SubscriberId

A SubscriberId uniquely identifies a single subscriber, and is provided by the subscriber at registration time.

Definition at line 83 of file statestore.h.

typedef boost::unordered_map<SubscriberId, boost::shared_ptr<Subscriber> > impala::Statestore::SubscriberMap
private

Map of subscribers currently connected; upon failure their entry is removed from this map. Subscribers must only be removed by UnregisterSubscriber() which ensures that the correct cleanup is done. If a subscriber re-registers, it must be unregistered prior to re-entry into this map. Subscribers are held in shared_ptrs so that RegisterSubscriber() may overwrite their entry in this map while UpdateSubscriber() tries to update an existing registration without risk of use-after-free.

Definition at line 354 of file statestore.h.

typedef std::string impala::Statestore::TopicEntryKey

A TopicEntryKey uniquely identifies a single entry in a topic.

Definition at line 89 of file statestore.h.

typedef boost::unordered_map<TopicEntryKey, TopicEntry> impala::Statestore::TopicEntryMap
private

Map from TopicEntryKey to TopicEntry, maintained by a Topic object.

Definition at line 167 of file statestore.h.

typedef std::string impala::Statestore::TopicId

A TopicId uniquely identifies a single topic.

Definition at line 86 of file statestore.h.

typedef boost::unordered_map<TopicId, Topic> impala::Statestore::TopicMap
private

The entire set of topics tracked by the statestore.

Definition at line 257 of file statestore.h.

Map from Version to TopicEntryKey, maintained by a Topic object. Effectively a log of the updates made to a Topic, ordered by version.

Definition at line 171 of file statestore.h.

Constructor & Destructor Documentation

Member Function Documentation

void Statestore::DoSubscriberUpdate ( bool  is_heartbeat,
int  thread_id,
const ScheduledSubscriberUpdate update 
)
private

Sends either a heartbeat or topic update message to the subscriber in 'update' at the closest possible time to the first member of 'update'. If is_heartbeat is true, sends a heartbeat update, otherwise the set of pending topic updates is sent. Once complete, the next update is scheduled and added to the appropriate queue.

Definition at line 606 of file statestore.cc.

References impala::abs(), impala::Status::code(), DEADLINE_MISS_THRESHOLD_MS, impala::FailureDetector::FAILED, failure_detector_, impala::Status::GetDetail(), OfferUpdate(), impala::Status::ok(), impala::PrintId(), SendHeartbeat(), SendTopicUpdate(), impala::Status::SetErrorMsg(), impala::SleepForMs(), subscriber_heartbeat_threadpool_, subscriber_topic_update_threadpool_, subscribers_, subscribers_lock_, impala::UnixMillis(), UnregisterSubscriber(), and VLOG_QUERY.

void Statestore::GatherTopicUpdates ( const Subscriber subscriber,
TUpdateStateRequest *  update_state_request 
)
private
const Statestore::TopicEntry::Version Statestore::GetMinSubscriberTopicVersion ( const TopicId topic_id,
SubscriberId subscriber_id = NULL 
)
private

Must be called holding the subscribers_ lock.

Returns the minimum last processed topic version across all subscribers for the given topic ID. Calculated by enumerating all subscribers and looking at their LastTopicVersionProcessed() for this topic. The value returned will always be <= topics_[topic_id].last_version_. Returns TOPIC_INITIAL_VERSION if no subscribers are registered to the topic. The subscriber ID to whom the min version belongs can also be retrieved using the optional subscriber_id output parameter. If multiple subscribers have the same min version, the subscriber_id may be set to any one of the matching subscribers.TODO: Update the min subscriber version only when a topic is updated, rather than each time a subscriber is updated. One way to do this would be to keep a priority queue in Topic of each subscriber's last processed version of the topic.

Definition at line 556 of file statestore.cc.

References subscribers_, and impala::Statestore::Subscriber::TOPIC_INITIAL_VERSION.

Referenced by GatherTopicUpdates(), and TopicsHandler().

Status Statestore::MainLoop ( )

The main processing loop. Blocks until the exit flag is set.

Returns OK unless there is an unrecoverable error.

Definition at line 743 of file statestore.cc.

References impala::ThreadPool< T >::Join(), impala::Status::OK, and subscriber_topic_update_threadpool_.

Referenced by impala::InProcessStatestore::Start().

Status Statestore::OfferUpdate ( const ScheduledSubscriberUpdate update,
ThreadPool< ScheduledSubscriberUpdate > *  thread_pool 
)
private

Utility method to add an update to the given thread pool, and to fail if the thread pool is already at capacity.

Definition at line 335 of file statestore.cc.

References impala::ThreadPool< T >::GetQueueSize(), impala::ThreadPool< T >::Offer(), impala::Status::OK, STATESTORE_MAX_SUBSCRIBERS, subscribers_, and subscribers_lock_.

Referenced by DoSubscriberUpdate(), and RegisterSubscriber().

Status Statestore::RegisterSubscriber ( const SubscriberId subscriber_id,
const TNetworkAddress &  location,
const std::vector< TTopicRegistration > &  topic_registrations,
TUniqueId *  registration_id 
)

Registers a new subscriber with the given unique subscriber ID, running a subscriber service at the given location, with the provided list of topic subscriptions. The registration_id output parameter is the unique ID for this registration, used to distinguish old registrations from new ones for the same subscriber. If a registration already exists for this subscriber, the old registration is removed and a new one is created. Subscribers may receive an update intended for the old registration, since one may be in flight when a new RegisterSubscriber() is received.

Definition at line 352 of file statestore.cc.

References impala::SetMetric< T >::Add(), failure_detector_, key_size_metric_, num_subscribers_metric_, OfferUpdate(), impala::Status::OK, impala::PrintId(), RETURN_IF_ERROR, subscriber_heartbeat_threadpool_, subscriber_set_metric_, subscriber_topic_update_threadpool_, subscriber_uuid_generator_, subscribers_, subscribers_lock_, topic_lock_, topic_size_metric_, topics_, UnregisterSubscriber(), impala::UUIDToTUniqueId(), and value_size_metric_.

void Statestore::RegisterWebpages ( Webserver webserver)

Definition at line 253 of file statestore.cc.

References impala::Webserver::RegisterUrlCallback(), SubscribersHandler(), and TopicsHandler().

Referenced by main().

Status Statestore::SendHeartbeat ( Subscriber subscriber)
private
Status Statestore::SendTopicUpdate ( Subscriber subscriber,
bool update_skipped 
)
private

Does the work of updating a single subscriber, by calling UpdateState() on the client to send a list of topic deltas to the subscriber. If that call fails (either because the RPC could not be completed, or the subscriber indicated an error), this method returns a non-OK status immediately without further processing. The subscriber may indicated that it skipped processing the message, either because it was not ready to do so or because it was busy. In that case, the UpdateState() RPC will return OK (since there was no error) and the output parameter update_skipped is set to true. Otherwise, any updates returned by the subscriber are applied to their target topics.

Definition at line 399 of file statestore.cc.

References impala::Statestore::Subscriber::AddTransientUpdate(), impala::MonotonicStopWatch::ElapsedTime(), GatherTopicUpdates(), impala::Statestore::Subscriber::id(), impala::Statestore::Subscriber::network_address(), impala::Statestore::TopicEntry::NULL_VALUE, impala::Status::OK, impala::Statestore::Topic::Put(), impala::Statestore::Subscriber::registration_id(), RETURN_IF_ERROR, impala::Statestore::Subscriber::SetLastTopicVersionProcessed(), impala::MonotonicStopWatch::Start(), topic_lock_, topic_update_duration_metric_, topics_, impala::StatsMetric< T >::Update(), and update_state_client_cache_.

Referenced by DoSubscriberUpdate().

void Statestore::SetExitFlag ( )

Tells the Statestore to shut down. Does not wait for the processing loop to exit before returning.

Definition at line 581 of file statestore.cc.

References exit_flag_, exit_flag_lock_, impala::ThreadPool< T >::Shutdown(), and subscriber_topic_update_threadpool_.

bool Statestore::ShouldExit ( )
private

True if the shutdown flag has been set true, false otherwise.

Definition at line 576 of file statestore.cc.

References exit_flag_, and exit_flag_lock_.

void Statestore::SubscribersHandler ( const Webserver::ArgumentMap args,
rapidjson::Document *  document 
)
private

Webpage handler: upon return 'document' will contain a list of subscribers as follows: "subscribers": [ { "id": "henry-impala:26000", "address": "henry-impala:23020", "num_topics": 1, "num_transient": 0, "registration_id": "414d28c84930d987:abcffd70b3346fb7" } ]

Definition at line 307 of file statestore.cc.

References impala::PrintId(), subscribers_, and subscribers_lock_.

Referenced by RegisterWebpages().

const boost::shared_ptr<StatestoreServiceIf>& impala::Statestore::thrift_iface ( ) const
inline

Returns the Thrift API interface that proxies requests onto the local Statestore.

Definition at line 115 of file statestore.h.

References thrift_iface_.

void Statestore::TopicsHandler ( const Webserver::ArgumentMap args,
rapidjson::Document *  document 
)
private

Webpage handler: upon return, 'document' will contain a list of topics as follows: "topics": [ { "topic_id": "catalog-update", "num_entries": 1165, "version": 2476, "oldest_version": 2476, "oldest_id": "henry-impala:26000", "key_size": "42.94 KB", "value_size": "9.54 MB", "total_size": "9.58 MB" }, ]

Definition at line 265 of file statestore.cc.

References GetMinSubscriberTopicVersion(), impala::PrettyPrinter::Print(), subscribers_lock_, topic_lock_, and topics_.

Referenced by RegisterWebpages().

void Statestore::UnregisterSubscriber ( Subscriber subscriber)
private

Member Data Documentation

bool impala::Statestore::exit_flag_
private

Definition at line 251 of file statestore.h.

Referenced by SetExitFlag(), and ShouldExit().

boost::mutex impala::Statestore::exit_flag_lock_
private

Note on locking: Subscribers and Topics should be accessed under their own coarse locks, and worker threads will use worker_lock_ to ensure safe access to the subscriber work queue. Protects access to exit_flag_, but is used mostly to ensure visibility of updates between threads..

Definition at line 250 of file statestore.h.

Referenced by SetExitFlag(), and ShouldExit().

boost::scoped_ptr<MissedHeartbeatFailureDetector> impala::Statestore::failure_detector_
private

Failure detector for subscribers. If a subscriber misses a configurable number of consecutive heartbeat messages, it is considered failed and a) its transient topic entries are removed and b) its entry in the subscriber map is erased.

Definition at line 413 of file statestore.h.

Referenced by DoSubscriberUpdate(), RegisterSubscriber(), and UnregisterSubscriber().

boost::scoped_ptr<ClientCache<StatestoreSubscriberClient> > impala::Statestore::heartbeat_client_cache_
private

Cache of subscriber clients used for Heartbeat() RPCs. Separate from update_state_client_cache_ because we enable TCP-level timeouts for these calls, whereas they are not safe for UpdateState() RPCs which can take an unbounded amount of time.

Definition at line 405 of file statestore.h.

Referenced by SendHeartbeat(), Statestore(), and UnregisterSubscriber().

StatsMetric<double>* impala::Statestore::heartbeat_duration_metric_
private

Same as above, but for SendHeartbeat() RPCs.

Definition at line 430 of file statestore.h.

Referenced by SendHeartbeat(), and Statestore().

IntGauge* impala::Statestore::key_size_metric_
private

Metrics shared across all topics to sum the size in bytes of keys, values and both.

Definition at line 420 of file statestore.h.

Referenced by RegisterSubscriber(), and Statestore().

IntGauge* impala::Statestore::num_subscribers_metric_
private

Metric that track the registered, non-failed subscribers.

Definition at line 416 of file statestore.h.

Referenced by RegisterSubscriber(), Statestore(), and UnregisterSubscriber().

ThreadPool<ScheduledSubscriberUpdate> impala::Statestore::subscriber_heartbeat_threadpool_
private

Definition at line 395 of file statestore.h.

Referenced by DoSubscriberUpdate(), and RegisterSubscriber().

SetMetric<std::string>* impala::Statestore::subscriber_set_metric_
private

Definition at line 417 of file statestore.h.

Referenced by RegisterSubscriber(), Statestore(), and UnregisterSubscriber().

ThreadPool<ScheduledSubscriberUpdate> impala::Statestore::subscriber_topic_update_threadpool_
private

The statestore has two pools of threads that send messages to subscribers one-by-one. One pool deals with 'heartbeat' messages that update failure detection state, and the other pool sends 'topic update' messages which contain the actual topic data that a subscriber does not yet have. Each message is scheduled for some time in the future and each worker thread will sleep until that time has passed to rate-limit messages. Subscribers are placed back into the queue once they have been processed. A subscriber may have many entries in a queue, but no more than one for each registration associated with that subscriber. Since at most one registration is considered 'live' per subscriber, this guarantees that subscribers_.size() - 1 'live' subscribers ahead of any subscriber in the queue. Messages may be delayed for any number of reasons, including scheduler interference, lock unfairness when submitting to the thread pool and head-of-line blocking when threads are occupied sending messages to slow subscribers (subscribers are not guaranteed to be in the queue in next-update order). Delays for heartbeat messages can result in the subscriber that is kept waiting assuming that the statestore has failed. Correct configuration of heartbeat message frequency and subscriber timeout is therefore very important, and depends upon the cluster size. See –statestore_heartbeat_frequency_ms and –statestore_subscriber_timeout_seconds. We expect that the provided defaults will work up to clusters of several hundred nodes. Subscribers are therefore not processed in lock-step, and one subscriber may have seen many more messages than another during the same interval (if the second subscriber runs slow for any reason).

Definition at line 393 of file statestore.h.

Referenced by DoSubscriberUpdate(), MainLoop(), RegisterSubscriber(), and SetExitFlag().

boost::uuids::random_generator impala::Statestore::subscriber_uuid_generator_
private

Used to generated unique IDs for each new registration.

Definition at line 358 of file statestore.h.

Referenced by RegisterSubscriber().

SubscriberMap impala::Statestore::subscribers_
private
boost::mutex impala::Statestore::subscribers_lock_
private

Protects access to subscribers_ and subscriber_uuid_generator_. Must be taken before topic_lock_.

Definition at line 343 of file statestore.h.

Referenced by DoSubscriberUpdate(), GatherTopicUpdates(), OfferUpdate(), RegisterSubscriber(), SubscribersHandler(), and TopicsHandler().

boost::shared_ptr<StatestoreServiceIf> impala::Statestore::thrift_iface_
private

Thrift API implementation which proxies requests onto this Statestore.

Definition at line 408 of file statestore.h.

Referenced by thrift_iface().

boost::mutex impala::Statestore::topic_lock_
private

Controls access to topics_. Cannot take subscribers_lock_ after acquiring this lock.

Definition at line 254 of file statestore.h.

Referenced by GatherTopicUpdates(), RegisterSubscriber(), SendTopicUpdate(), TopicsHandler(), and UnregisterSubscriber().

IntGauge* impala::Statestore::topic_size_metric_
private

Definition at line 422 of file statestore.h.

Referenced by RegisterSubscriber(), and Statestore().

StatsMetric<double>* impala::Statestore::topic_update_duration_metric_
private

Tracks the distribution of topic-update durations - precisely the time spent in calling the UpdateState() RPC which allows us to measure the network transmission cost as well as the subscriber-side processing time.

Definition at line 427 of file statestore.h.

Referenced by SendTopicUpdate(), and Statestore().

TopicMap impala::Statestore::topics_
private
boost::scoped_ptr<ClientCache<StatestoreSubscriberClient> > impala::Statestore::update_state_client_cache_
private

Cache of subscriber clients used for UpdateState() RPCs. Only one client per subscriber should be used, but the cache helps with the client lifecycle on failure.

Definition at line 399 of file statestore.h.

Referenced by SendTopicUpdate(), Statestore(), and UnregisterSubscriber().

IntGauge* impala::Statestore::value_size_metric_
private

Definition at line 421 of file statestore.h.

Referenced by RegisterSubscriber(), and Statestore().


The documentation for this class was generated from the following files: