16 #ifndef STATESTORE_STATESTORE_SUBSCRIBER_H 
   17 #define STATESTORE_STATESTORE_SUBSCRIBER_H 
   21 #include <boost/scoped_ptr.hpp> 
   22 #include <boost/shared_ptr.hpp> 
   23 #include <boost/thread/mutex.hpp> 
   24 #include <boost/thread/thread.hpp> 
   32 #include "gen-cpp/StatestoreService.h" 
   33 #include "gen-cpp/StatestoreSubscriber.h" 
   37 class TimeoutFailureDetector;
 
   39 class TNetworkAddress;
 
   74       const TNetworkAddress& heartbeat_address,
 
   75       const TNetworkAddress& statestore_address,
 
  250       const TUniqueId& registration_id,
 
  251       std::vector<TTopicDelta>* subscriber_topic_updates, 
bool* skipped);
 
  254   void Heartbeat(
const TUniqueId& registration_id);
 
StatestoreSubscriber(const std::string &subscriber_id, const TNetworkAddress &heartbeat_address, const TNetworkAddress &statestore_address, MetricGroup *metrics)
statestore_address - the address of the statestore to register with 
 
MetricGroup * metrics_
MetricGroup instance that all metrics are registered in. Not owned by this class. ...
 
boost::shared_ptr< StatestoreSubscriberIf > thrift_iface_
 
void Heartbeat(const TUniqueId ®istration_id)
Called when the statestore sends a heartbeat message. Updates the failure detector. 
 
std::vector< UpdateCallback > callbacks
List of callbacks to invoke for this topic. 
 
TNetworkAddress statestore_address_
Address of the statestore. 
 
class SimpleMetric< std::string, TMetricKind::PROPERTY > StringProperty
 
ClientCache< StatestoreServiceClient > StatestoreClientCache
 
boost::scoped_ptr< Thread > recovery_mode_thread_
Thread in which RecoveryModeChecker runs. 
 
const std::string & id() const 
 
MetricGroups may be organised hierarchically as a tree. 
 
Status CheckRegistrationId(const TUniqueId ®istration_id)
 
boost::scoped_ptr< impala::TimeoutFailureDetector > failure_detector_
Failure detector that tracks heartbeat messages from the statestore. 
 
StatsMetric< double > * processing_time_metric
 
UpdateCallbacks update_callbacks_
 
DoubleGauge * last_recovery_duration_metric_
Amount of time last spent in recovery mode. 
 
boost::mutex registration_id_lock_
 
TNetworkAddress heartbeat_address_
Address that the heartbeat service should be started on. 
 
StringProperty * registration_id_metric_
Current registration ID, in string form. 
 
boost::function< void(const TopicDeltaMap &state, std::vector< TTopicDelta > *topic_updates)> UpdateCallback
 
StringProperty * last_recovery_time_metric_
When the last recovery happened. 
 
std::string TopicId
A TopicId uniquely identifies a single topic. 
 
MonotonicStopWatch topic_update_interval_timer_
Tracks the time between topic-update mesages. 
 
boost::scoped_ptr< StatestoreClientCache > client_cache_
statestore client cache - only one client is ever used. 
 
std::map< Statestore::TopicId, bool > topic_registrations_
 
std::map< Statestore::TopicId, TTopicDelta > TopicDeltaMap
A TopicDeltaMap is passed to each callback. See UpdateCallback for more details. 
 
const std::string subscriber_id_
Unique, but opaque, identifier for this subscriber. 
 
Status UpdateState(const TopicDeltaMap &incoming_topic_deltas, const TUniqueId ®istration_id, std::vector< TTopicDelta > *subscriber_topic_updates, bool *skipped)
 
StatsMetric< double > * topic_update_duration_metric_
 
class SimpleMetric< bool, TMetricKind::PROPERTY > BooleanProperty
 
boost::unordered_map< Statestore::TopicId, int64_t > TopicVersionMap
 
void RecoveryModeChecker()
 
class SimpleMetric< double, TMetricKind::GAUGE > DoubleGauge
 
boost::unordered_map< Statestore::TopicId, Callbacks > UpdateCallbacks
 
TopicVersionMap current_topic_versions_
 
TUniqueId registration_id_
 
StatsMetric< double > * topic_update_interval_metric_
Accumulated statistics on the frequency of topic-update messages. 
 
BooleanProperty * connected_to_statestore_metric_
Metric to indicate if we are successfully registered with the statestore. 
 
StatsMetric< double > * heartbeat_interval_metric_
Accumulated statistics on the frequency of heartbeat messages. 
 
Status Start()
Returns OK unless some error occurred, like a failure to connect. 
 
Status AddTopic(const Statestore::TopicId &topic_id, bool is_transient, const UpdateCallback &callback)
 
boost::shared_ptr< ThriftServer > heartbeat_server_
Container for the heartbeat server. 
 
MonotonicStopWatch heartbeat_interval_timer_
Tracks the time between heartbeat mesages.