15 #ifndef STATESTORE_STATESTORE_H
16 #define STATESTORE_STATESTORE_H
22 #include <boost/unordered_map.hpp>
23 #include <boost/scoped_ptr.hpp>
24 #include <boost/thread/condition_variable.hpp>
25 #include <boost/uuid/uuid_generators.hpp>
27 #include "gen-cpp/Types_types.h"
28 #include "gen-cpp/StatestoreSubscriber.h"
29 #include "gen-cpp/StatestoreService.h"
41 using namespace impala;
103 const TNetworkAddress& location,
104 const std::vector<TTopicRegistration>& topic_registrations,
105 TUniqueId* registration_id);
257 typedef boost::unordered_map<TopicId, Topic>
TopicMap;
281 typedef boost::unordered_map<TopicId, TopicState>
Topics;
353 typedef boost::unordered_map<SubscriberId, boost::shared_ptr<Subscriber> >
468 TUpdateStateRequest* update_state_request);
516 rapidjson::Document* document);
Status RegisterSubscriber(const SubscriberId &subscriber_id, const TNetworkAddress &location, const std::vector< TTopicRegistration > &topic_registrations, TUniqueId *registration_id)
boost::scoped_ptr< ClientCache< StatestoreSubscriberClient > > heartbeat_client_cache_
TopicEntry::Version last_version_
static const Value NULL_VALUE
Representation of an empty Value. Must have size() == 0.
const TNetworkAddress & network_address() const
IntGauge * topic_size_metric_
std::pair< int64_t, SubscriberId > ScheduledSubscriberUpdate
int64_t total_value_size_bytes() const
std::string Value
A Value is a string of bytes, for which std::string is a convenient representation.
Statestore(MetricGroup *metrics)
The only constructor; initialises member variables only.
void SetValue(const Value &bytes, Version version)
const TUniqueId registration_id_
boost::unordered_map< SubscriberId, boost::shared_ptr< Subscriber > > SubscriberMap
boost::unordered_map< std::pair< TopicId, TopicEntryKey >, TopicEntry::Version > TransientEntryMap
void RegisterWebpages(Webserver *webserver)
const TopicUpdateLog & topic_update_log() const
const Value & value() const
MetricGroups may be organised hierarchically as a tree.
boost::scoped_ptr< ClientCache< StatestoreSubscriberClient > > update_state_client_cache_
void TopicsHandler(const Webserver::ArgumentMap &args, rapidjson::Document *document)
const SubscriberId subscriber_id_
TransientEntryMap transient_entries_
int64_t total_key_size_bytes_
Total memory occupied by the key strings, in bytes.
Subscriber(const SubscriberId &subscriber_id, const TUniqueId ®istration_id, const TNetworkAddress &network_address, const std::vector< TTopicRegistration > &subscribed_topics)
IntGauge * key_size_metric_
Metrics shared across all topics to sum the size in bytes of keys, values and both.
std::string TopicEntryKey
A TopicEntryKey uniquely identifies a single entry in a topic.
SetMetric< std::string > * subscriber_set_metric_
const SubscriberId & id() const
int64_t total_value_size_bytes_
Total memory occupied by the value byte strings, in bytes.
const TransientEntryMap & transient_entries() const
boost::unordered_map< TopicId, Topic > TopicMap
The entire set of topics tracked by the statestore.
std::map< std::string, std::string > ArgumentMap
const TopicEntryMap & entries() const
boost::unordered_map< TopicEntryKey, TopicEntry > TopicEntryMap
Map from TopicEntryKey to TopicEntry, maintained by a Topic object.
StatsMetric< double > * heartbeat_duration_metric_
Same as above, but for SendHeartbeat() RPCs.
static const TopicEntry::Version TOPIC_INITIAL_VERSION
The Version value used to initialize new Topic subscriptions for this Subscriber. ...
bool ShouldExit()
True if the shutdown flag has been set true, false otherwise.
Topic(const TopicId &topic_id, IntGauge *key_size_metric, IntGauge *value_size_metric, IntGauge *topic_size_metric)
boost::uuids::random_generator subscriber_uuid_generator_
Used to generated unique IDs for each new registration.
ThreadPool< ScheduledSubscriberUpdate > subscriber_topic_update_threadpool_
std::string TopicId
A TopicId uniquely identifies a single topic.
Status SendTopicUpdate(Subscriber *subscriber, bool *update_skipped)
IntGauge * topic_size_metric_
void AddTransientUpdate(const TopicId &topic_id, const TopicEntryKey &topic_key, TopicEntry::Version version)
IntGauge * num_subscribers_metric_
Metric that track the registered, non-failed subscribers.
void SubscribersHandler(const Webserver::ArgumentMap &args, rapidjson::Document *document)
IntGauge * value_size_metric_
boost::mutex exit_flag_lock_
int64_t total_key_size_bytes() const
StatsMetric< double > * topic_update_duration_metric_
IntGauge * key_size_metric_
Metrics shared across all topics to sum the size in bytes of keys, values and both.
void DeleteIfVersionsMatch(TopicEntry::Version version, const TopicEntryKey &key)
Must be called holding the topic lock.
void SetLastTopicVersionProcessed(const TopicId &topic_id, TopicEntry::Version version)
boost::shared_ptr< StatestoreServiceIf > thrift_iface_
Thrift API implementation which proxies requests onto this Statestore.
TopicEntry::Version last_version() const
void GatherTopicUpdates(const Subscriber &subscriber, TUpdateStateRequest *update_state_request)
boost::scoped_ptr< MissedHeartbeatFailureDetector > failure_detector_
void UnregisterSubscriber(Subscriber *subscriber)
TopicEntry::Version last_version
boost::unordered_map< TopicId, TopicState > Topics
TopicEntryMap entries_
Map from topic entry key to topic entry.
SubscriberMap subscribers_
TopicEntry::Version Put(const TopicEntryKey &key, const TopicEntry::Value &bytes)
Must be called holding the topic lock.
Status MainLoop()
The main processing loop. Blocks until the exit flag is set.
TopicUpdateLog topic_update_log_
const TopicId & id() const
void DoSubscriberUpdate(bool is_heartbeat, int thread_id, const ScheduledSubscriberUpdate &update)
boost::mutex topic_lock_
Controls access to topics_. Cannot take subscribers_lock_ after acquiring this lock.
const boost::shared_ptr< StatestoreServiceIf > & thrift_iface() const
Returns the Thrift API interface that proxies requests onto the local Statestore. ...
ThreadPool< ScheduledSubscriberUpdate > subscriber_heartbeat_threadpool_
const TopicId topic_id_
Unique identifier for this topic. Should be human-readable.
const Topics & subscribed_topics() const
Status OfferUpdate(const ScheduledSubscriberUpdate &update, ThreadPool< ScheduledSubscriberUpdate > *thread_pool)
std::map< TopicEntry::Version, TopicEntryKey > TopicUpdateLog
boost::mutex subscribers_lock_
const TopicEntry::Version LastTopicVersionProcessed(const TopicId &topic_id) const
const TopicEntry::Version GetMinSubscriberTopicVersion(const TopicId &topic_id, SubscriberId *subscriber_id=NULL)
Must be called holding the subscribers_ lock.
Topics subscribed_topics_
Status SendHeartbeat(Subscriber *subscriber)
const TUniqueId & registration_id() const
IntGauge * value_size_metric_
const TNetworkAddress network_address_
The location of the subscriber service that this subscriber runs.
static const Version TOPIC_ENTRY_INITIAL_VERSION
The Version value used to initialize a new TopicEntry.