16 #ifndef STATESTORE_STATESTORE_SUBSCRIBER_H
17 #define STATESTORE_STATESTORE_SUBSCRIBER_H
21 #include <boost/scoped_ptr.hpp>
22 #include <boost/shared_ptr.hpp>
23 #include <boost/thread/mutex.hpp>
24 #include <boost/thread/thread.hpp>
32 #include "gen-cpp/StatestoreService.h"
33 #include "gen-cpp/StatestoreSubscriber.h"
37 class TimeoutFailureDetector;
39 class TNetworkAddress;
74 const TNetworkAddress& heartbeat_address,
75 const TNetworkAddress& statestore_address,
250 const TUniqueId& registration_id,
251 std::vector<TTopicDelta>* subscriber_topic_updates,
bool* skipped);
254 void Heartbeat(
const TUniqueId& registration_id);
StatestoreSubscriber(const std::string &subscriber_id, const TNetworkAddress &heartbeat_address, const TNetworkAddress &statestore_address, MetricGroup *metrics)
statestore_address - the address of the statestore to register with
MetricGroup * metrics_
MetricGroup instance that all metrics are registered in. Not owned by this class. ...
boost::shared_ptr< StatestoreSubscriberIf > thrift_iface_
void Heartbeat(const TUniqueId ®istration_id)
Called when the statestore sends a heartbeat message. Updates the failure detector.
std::vector< UpdateCallback > callbacks
List of callbacks to invoke for this topic.
TNetworkAddress statestore_address_
Address of the statestore.
class SimpleMetric< std::string, TMetricKind::PROPERTY > StringProperty
ClientCache< StatestoreServiceClient > StatestoreClientCache
boost::scoped_ptr< Thread > recovery_mode_thread_
Thread in which RecoveryModeChecker runs.
const std::string & id() const
MetricGroups may be organised hierarchically as a tree.
Status CheckRegistrationId(const TUniqueId ®istration_id)
boost::scoped_ptr< impala::TimeoutFailureDetector > failure_detector_
Failure detector that tracks heartbeat messages from the statestore.
StatsMetric< double > * processing_time_metric
UpdateCallbacks update_callbacks_
DoubleGauge * last_recovery_duration_metric_
Amount of time last spent in recovery mode.
boost::mutex registration_id_lock_
TNetworkAddress heartbeat_address_
Address that the heartbeat service should be started on.
StringProperty * registration_id_metric_
Current registration ID, in string form.
boost::function< void(const TopicDeltaMap &state, std::vector< TTopicDelta > *topic_updates)> UpdateCallback
StringProperty * last_recovery_time_metric_
When the last recovery happened.
std::string TopicId
A TopicId uniquely identifies a single topic.
MonotonicStopWatch topic_update_interval_timer_
Tracks the time between topic-update mesages.
boost::scoped_ptr< StatestoreClientCache > client_cache_
statestore client cache - only one client is ever used.
std::map< Statestore::TopicId, bool > topic_registrations_
std::map< Statestore::TopicId, TTopicDelta > TopicDeltaMap
A TopicDeltaMap is passed to each callback. See UpdateCallback for more details.
const std::string subscriber_id_
Unique, but opaque, identifier for this subscriber.
Status UpdateState(const TopicDeltaMap &incoming_topic_deltas, const TUniqueId ®istration_id, std::vector< TTopicDelta > *subscriber_topic_updates, bool *skipped)
StatsMetric< double > * topic_update_duration_metric_
class SimpleMetric< bool, TMetricKind::PROPERTY > BooleanProperty
boost::unordered_map< Statestore::TopicId, int64_t > TopicVersionMap
void RecoveryModeChecker()
class SimpleMetric< double, TMetricKind::GAUGE > DoubleGauge
boost::unordered_map< Statestore::TopicId, Callbacks > UpdateCallbacks
TopicVersionMap current_topic_versions_
TUniqueId registration_id_
StatsMetric< double > * topic_update_interval_metric_
Accumulated statistics on the frequency of topic-update messages.
BooleanProperty * connected_to_statestore_metric_
Metric to indicate if we are successfully registered with the statestore.
StatsMetric< double > * heartbeat_interval_metric_
Accumulated statistics on the frequency of heartbeat messages.
Status Start()
Returns OK unless some error occurred, like a failure to connect.
Status AddTopic(const Statestore::TopicId &topic_id, bool is_transient, const UpdateCallback &callback)
boost::shared_ptr< ThriftServer > heartbeat_server_
Container for the heartbeat server.
MonotonicStopWatch heartbeat_interval_timer_
Tracks the time between heartbeat mesages.