Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
impala::StatestoreSubscriber Class Reference

#include <statestore-subscriber.h>

Collaboration diagram for impala::StatestoreSubscriber:

Classes

struct  Callbacks
 

Public Types

typedef std::map
< Statestore::TopicId,
TTopicDelta > 
TopicDeltaMap
 A TopicDeltaMap is passed to each callback. See UpdateCallback for more details. More...
 
typedef boost::function< void(const
TopicDeltaMap &state,
std::vector< TTopicDelta >
*topic_updates)> 
UpdateCallback
 

Public Member Functions

 StatestoreSubscriber (const std::string &subscriber_id, const TNetworkAddress &heartbeat_address, const TNetworkAddress &statestore_address, MetricGroup *metrics)
 statestore_address - the address of the statestore to register with More...
 
Status AddTopic (const Statestore::TopicId &topic_id, bool is_transient, const UpdateCallback &callback)
 
Status Start ()
 Returns OK unless some error occurred, like a failure to connect. More...
 
const std::string & id () const
 

Private Types

typedef boost::unordered_map
< Statestore::TopicId,
Callbacks
UpdateCallbacks
 
typedef boost::unordered_map
< Statestore::TopicId, int64_t > 
TopicVersionMap
 

Private Member Functions

Status UpdateState (const TopicDeltaMap &incoming_topic_deltas, const TUniqueId &registration_id, std::vector< TTopicDelta > *subscriber_topic_updates, bool *skipped)
 
void Heartbeat (const TUniqueId &registration_id)
 Called when the statestore sends a heartbeat message. Updates the failure detector. More...
 
void RecoveryModeChecker ()
 
Status Register ()
 
Status CheckRegistrationId (const TUniqueId &registration_id)
 

Private Attributes

const std::string subscriber_id_
 Unique, but opaque, identifier for this subscriber. More...
 
TNetworkAddress heartbeat_address_
 Address that the heartbeat service should be started on. More...
 
TNetworkAddress statestore_address_
 Address of the statestore. More...
 
boost::shared_ptr
< StatestoreSubscriberIf
thrift_iface_
 
boost::shared_ptr< ThriftServerheartbeat_server_
 Container for the heartbeat server. More...
 
boost::scoped_ptr
< impala::TimeoutFailureDetector
failure_detector_
 Failure detector that tracks heartbeat messages from the statestore. More...
 
boost::scoped_ptr< Threadrecovery_mode_thread_
 Thread in which RecoveryModeChecker runs. More...
 
boost::mutex lock_
 
bool is_registered_
 
boost::mutex registration_id_lock_
 
TUniqueId registration_id_
 
UpdateCallbacks update_callbacks_
 
std::map< Statestore::TopicId,
bool
topic_registrations_
 
TopicVersionMap current_topic_versions_
 
boost::scoped_ptr
< StatestoreClientCache
client_cache_
 statestore client cache - only one client is ever used. More...
 
MetricGroupmetrics_
 MetricGroup instance that all metrics are registered in. Not owned by this class. More...
 
BooleanPropertyconnected_to_statestore_metric_
 Metric to indicate if we are successfully registered with the statestore. More...
 
DoubleGaugelast_recovery_duration_metric_
 Amount of time last spent in recovery mode. More...
 
StringPropertylast_recovery_time_metric_
 When the last recovery happened. More...
 
StatsMetric< double > * topic_update_interval_metric_
 Accumulated statistics on the frequency of topic-update messages. More...
 
MonotonicStopWatch topic_update_interval_timer_
 Tracks the time between topic-update mesages. More...
 
StatsMetric< double > * topic_update_duration_metric_
 
MonotonicStopWatch heartbeat_interval_timer_
 Tracks the time between heartbeat mesages. More...
 
StatsMetric< double > * heartbeat_interval_metric_
 Accumulated statistics on the frequency of heartbeat messages. More...
 
StringPropertyregistration_id_metric_
 Current registration ID, in string form. More...
 

Friends

class StatestoreSubscriberThriftIf
 Subscriber thrift implementation, needs to access UpdateState. More...
 

Detailed Description

A StatestoreSubscriber communicates with a statestore periodically through the exchange of topic update messages. These messages contain updates from the statestore to a list of 'topics' that the subscriber is interested in; in response the subscriber sends a list of changes that it wishes to make to a topic. The statestore also sends more frequent 'heartbeat' messages that confirm the connection between statestore and subscriber is still active. Clients of the subscriber register topics of interest, and a function to call once an update has been received. Each callback may optionally add one or more updates to a list of topic updates to be sent back to the statestore. See AddTopic for the requirements placed on these callbacks. Topics must be subscribed to before the subscriber is connected to the statestore: there is no way to add a new subscription after the subscriber has successfully registered. If the subscriber does not receive heartbeats from the statestore within a configurable period of time, the subscriber enters 'recovery mode', where it continually attempts to re-register with the statestore. Recovery mode is not triggered if a heartbeat takes a long time to process locally.

Definition at line 64 of file statestore-subscriber.h.

Member Typedef Documentation

A TopicDeltaMap is passed to each callback. See UpdateCallback for more details.

Definition at line 79 of file statestore-subscriber.h.

typedef boost::unordered_map<Statestore::TopicId, int64_t> impala::StatestoreSubscriber::TopicVersionMap
private

Mapping of TopicId to the last version of the topic this subscriber successfully processed.

Definition at line 190 of file statestore-subscriber.h.

typedef boost::function<void (const TopicDeltaMap& state, std::vector<TTopicDelta>* topic_updates)> impala::StatestoreSubscriber::UpdateCallback

Function called to update a service with new state. Called in a separate thread to the one in which it is registered. Every UpdateCallback is invoked every time that an update is received from the statestore. Therefore the callback should not assume that the TopicDeltaMap contains an entry for their particular topic of interest. If a delta for a particular topic does not have the 'is_delta' flag set, clients should assume that the delta contains the entire known state for that topic. This occurs particularly after statestore failure, and usually clients will need to republish any local state that is missing. Callbacks may publish new updates to any topic via the topic_updates parameter, although updates for unknown topics (i.e. those with no subscribers) will be ignored.

Definition at line 99 of file statestore-subscriber.h.

Mapping of topic ids to their associated callbacks. Because this mapping stores a pointer to an UpdateCallback, memory errors will occur if an UpdateCallback is deleted before being unregistered. The UpdateCallback destructor checks for such problems, so that we will have an assertion failure rather than a memory error.

Definition at line 177 of file statestore-subscriber.h.

Constructor & Destructor Documentation

impala::StatestoreSubscriber::StatestoreSubscriber ( const std::string &  subscriber_id,
const TNetworkAddress &  heartbeat_address,
const TNetworkAddress &  statestore_address,
MetricGroup metrics 
)

statestore_address - the address of the statestore to register with

Only constructor. subscriber_id - should be unique across the cluster, identifies this subscriber heartbeat_address - the local address on which the heartbeat service which communicates with the statestore should be started.

Definition at line 89 of file statestore-subscriber.cc.

References impala::MetricGroup::AddGauge(), impala::MetricGroup::AddProperty(), client_cache_, connected_to_statestore_metric_, heartbeat_interval_metric_, last_recovery_duration_metric_, last_recovery_time_metric_, metrics_, impala::MetricGroup::RegisterMetric(), registration_id_metric_, topic_update_duration_metric_, and topic_update_interval_metric_.

Member Function Documentation

Status impala::StatestoreSubscriber::AddTopic ( const Statestore::TopicId topic_id,
bool  is_transient,
const UpdateCallback callback 
)

Adds a topic to the set of topics that updates will be received for. When a topic update is received, the supplied UpdateCallback will be invoked. Therefore clients should ensure that it is safe to invoke callback for the entire lifetime of the subscriber; in particular this means that the subscriber should be torn-down before any objects that own callbacks. Must be called before Start(), in which case it will return Status::OK. Otherwise an error will be returned.

Definition at line 126 of file statestore-subscriber.cc.

References impala::CALLBACK_METRIC_PATTERN, impala::StatestoreSubscriber::Callbacks::callbacks, is_registered_, lock_, metrics_, impala::Status::OK, impala::StatestoreSubscriber::Callbacks::processing_time_metric, impala::MetricGroup::RegisterMetric(), topic_registrations_, and update_callbacks_.

Referenced by impala::ImpalaServer::ImpalaServer(), and impala::AdmissionController::Init().

Status impala::StatestoreSubscriber::CheckRegistrationId ( const TUniqueId &  registration_id)
private

Returns OK if registration_id == registration_id_, or if registration_id_ is not yet set, an error otherwise. Used to confirm that RPCs from the statestore are intended for the current registration epoch.

Definition at line 263 of file statestore-subscriber.cc.

References impala::Status::OK, impala::PrintId(), registration_id_, and registration_id_lock_.

Referenced by Heartbeat(), and UpdateState().

void impala::StatestoreSubscriber::Heartbeat ( const TUniqueId &  registration_id)
private
const std::string& impala::StatestoreSubscriber::id ( ) const
inline

Definition at line 120 of file statestore-subscriber.h.

References subscriber_id_.

void impala::StatestoreSubscriber::RecoveryModeChecker ( )
private

Run in a separate thread. In a loop, check failure_detector_ to see if the statestore is still sending heartbeat messages. If not, enter 'recovery mode' where a reconnection is repeatedly attempted. Once reconnected, all existing subscriptions and services are reregistered and normal operation resumes. During recovery mode, any public methods that are started will block on lock_, which is only released when recovery finishes. In practice, all registrations are made early in the life of an impalad before the statestore could be detected as failed.

Definition at line 211 of file statestore-subscriber.cc.

References connected_to_statestore_metric_, impala_udf::DebugString(), impala::MonotonicStopWatch::ElapsedTime(), impala::FailureDetector::FAILED, failure_detector_, impala::Status::GetDetail(), last_recovery_duration_metric_, last_recovery_time_metric_, impala::TimestampValue::LocalTime(), lock_, impala::Status::ok(), Register(), impala::SLEEP_INTERVAL_MS, impala::SleepForMs(), impala::MonotonicStopWatch::Start(), impala::STATESTORE_ID, and subscriber_id_.

Referenced by Start().

Status impala::StatestoreSubscriber::Register ( )
private

Creates a client of the remote statestore and sends a list of topics to register for. Returns OK unless there is some problem connecting, or the statestore reports an error.

Definition at line 141 of file statestore-subscriber.cc.

References client_cache_, connected_to_statestore_metric_, heartbeat_address_, heartbeat_interval_timer_, impala::PrintId(), registration_id_, registration_id_lock_, registration_id_metric_, RETURN_IF_ERROR, impala::MonotonicStopWatch::Start(), statestore_address_, subscriber_id_, topic_registrations_, topic_update_interval_timer_, and update_callbacks_.

Referenced by RecoveryModeChecker(), and Start().

Status impala::StatestoreSubscriber::Start ( )

Returns OK unless some error occurred, like a failure to connect.

Registers this subscriber with the statestore, and starts the heartbeat service, as well as a thread to check for failure and initiate recovery mode.

Definition at line 176 of file statestore-subscriber.cc.

References impala::Status::GetDetail(), heartbeat_address_, heartbeat_server_, is_registered_, lock_, metrics_, impala::Status::ok(), recovery_mode_thread_, RecoveryModeChecker(), Register(), RETURN_IF_ERROR, and thrift_iface_.

Status impala::StatestoreSubscriber::UpdateState ( const TopicDeltaMap incoming_topic_deltas,
const TUniqueId &  registration_id,
std::vector< TTopicDelta > *  subscriber_topic_updates,
bool skipped 
)
private

Called when the statestore sends a topic update. Each registered callback is called in turn with the given map of incoming_topic_deltas from the statestore. Each TTopicDelta sent from the statestore to the subscriber will contain the topic name, a list of additions to the topic, a list of deletions from the topic, and the version range the update covers. A from_version of 0 indicates a non-delta update. In response, any updates to the topic by the subscriber are aggregated in subscriber_topic_updates and returned to the statestore. Each update is a TTopicDelta that contains a list of additions to the topic and a list of deletions from the topic. Additionally, if a subscriber has received an unexpected delta update version range, they can request a new delta update by setting the "from_version" field of the TTopicDelta response. The next statestore update will be based off the version the subscriber responds with. If the subscriber is in recovery mode, this method returns immediately. Returns an error if some error was encountered (e.g. the supplied registration ID was unexpected), and OK otherwise. The output parameter 'skipped' is set to true if the subscriber chose not to process this topic-update (if, for example, a concurrent update was being processed, or if the subscriber currently believes it is recovering). Doing so indicates that no topics were updated during this call.

Definition at line 290 of file statestore-subscriber.cc.

References CheckRegistrationId(), current_topic_versions_, impala::MonotonicStopWatch::ElapsedTime(), lock_, impala::Status::OK, impala::MonotonicStopWatch::Reset(), RETURN_IF_ERROR, impala::MonotonicStopWatch::Start(), impala::MonotonicStopWatch::Stop(), topic_update_duration_metric_, topic_update_interval_metric_, topic_update_interval_timer_, impala::StatsMetric< T >::Update(), and update_callbacks_.

Friends And Related Function Documentation

friend class StatestoreSubscriberThriftIf
friend

Subscriber thrift implementation, needs to access UpdateState.

Definition at line 228 of file statestore-subscriber.h.

Member Data Documentation

boost::scoped_ptr<StatestoreClientCache> impala::StatestoreSubscriber::client_cache_
private

statestore client cache - only one client is ever used.

Definition at line 194 of file statestore-subscriber.h.

Referenced by Register(), and StatestoreSubscriber().

BooleanProperty* impala::StatestoreSubscriber::connected_to_statestore_metric_
private

Metric to indicate if we are successfully registered with the statestore.

Definition at line 200 of file statestore-subscriber.h.

Referenced by RecoveryModeChecker(), Register(), and StatestoreSubscriber().

TopicVersionMap impala::StatestoreSubscriber::current_topic_versions_
private

Definition at line 191 of file statestore-subscriber.h.

Referenced by UpdateState().

boost::scoped_ptr<impala::TimeoutFailureDetector> impala::StatestoreSubscriber::failure_detector_
private

Failure detector that tracks heartbeat messages from the statestore.

Definition at line 140 of file statestore-subscriber.h.

Referenced by Heartbeat(), and RecoveryModeChecker().

TNetworkAddress impala::StatestoreSubscriber::heartbeat_address_
private

Address that the heartbeat service should be started on.

Definition at line 127 of file statestore-subscriber.h.

Referenced by Register(), and Start().

StatsMetric<double>* impala::StatestoreSubscriber::heartbeat_interval_metric_
private

Accumulated statistics on the frequency of heartbeat messages.

Definition at line 222 of file statestore-subscriber.h.

Referenced by Heartbeat(), and StatestoreSubscriber().

MonotonicStopWatch impala::StatestoreSubscriber::heartbeat_interval_timer_
private

Tracks the time between heartbeat mesages.

Definition at line 219 of file statestore-subscriber.h.

Referenced by Heartbeat(), and Register().

boost::shared_ptr<ThriftServer> impala::StatestoreSubscriber::heartbeat_server_
private

Container for the heartbeat server.

Definition at line 137 of file statestore-subscriber.h.

Referenced by Start().

bool impala::StatestoreSubscriber::is_registered_
private

Set to true after Register(...) is successful, after which no more topics may be subscribed to.

Definition at line 151 of file statestore-subscriber.h.

Referenced by AddTopic(), and Start().

DoubleGauge* impala::StatestoreSubscriber::last_recovery_duration_metric_
private

Amount of time last spent in recovery mode.

Definition at line 203 of file statestore-subscriber.h.

Referenced by RecoveryModeChecker(), and StatestoreSubscriber().

StringProperty* impala::StatestoreSubscriber::last_recovery_time_metric_
private

When the last recovery happened.

Definition at line 206 of file statestore-subscriber.h.

Referenced by RecoveryModeChecker(), and StatestoreSubscriber().

boost::mutex impala::StatestoreSubscriber::lock_
private

Class-wide lock. Protects all subsequent members. Most private methods must be called holding this lock; this is noted in the method comments.

Definition at line 147 of file statestore-subscriber.h.

Referenced by AddTopic(), RecoveryModeChecker(), Start(), and UpdateState().

MetricGroup* impala::StatestoreSubscriber::metrics_
private

MetricGroup instance that all metrics are registered in. Not owned by this class.

Definition at line 197 of file statestore-subscriber.h.

Referenced by AddTopic(), Start(), and StatestoreSubscriber().

boost::scoped_ptr<Thread> impala::StatestoreSubscriber::recovery_mode_thread_
private

Thread in which RecoveryModeChecker runs.

Definition at line 143 of file statestore-subscriber.h.

Referenced by Start().

TUniqueId impala::StatestoreSubscriber::registration_id_
private

Set during Register(), this is the unique ID of the current registration with the statestore. If this subscriber must recover, or disconnects and then reconnects, the registration_id_ will change after Register() is called again. This allows the subscriber to reject communication from the statestore that pertains to a previous registration.

Definition at line 162 of file statestore-subscriber.h.

Referenced by CheckRegistrationId(), and Register().

boost::mutex impala::StatestoreSubscriber::registration_id_lock_
private

Protects registration_id_. Must be taken after lock_ if both are to be taken together.

Definition at line 155 of file statestore-subscriber.h.

Referenced by CheckRegistrationId(), and Register().

StringProperty* impala::StatestoreSubscriber::registration_id_metric_
private

Current registration ID, in string form.

Definition at line 225 of file statestore-subscriber.h.

Referenced by Register(), and StatestoreSubscriber().

TNetworkAddress impala::StatestoreSubscriber::statestore_address_
private

Address of the statestore.

Definition at line 130 of file statestore-subscriber.h.

Referenced by Register().

const std::string impala::StatestoreSubscriber::subscriber_id_
private

Unique, but opaque, identifier for this subscriber.

Definition at line 124 of file statestore-subscriber.h.

Referenced by id(), RecoveryModeChecker(), and Register().

boost::shared_ptr<StatestoreSubscriberIf> impala::StatestoreSubscriber::thrift_iface_
private

Implementation of the heartbeat thrift interface, which proxies calls onto this object.

Definition at line 134 of file statestore-subscriber.h.

Referenced by Start().

std::map<Statestore::TopicId, bool> impala::StatestoreSubscriber::topic_registrations_
private

One entry for every topic subscribed to. The value is whether this subscriber considers this topic to be 'transient', that is any updates it makes will be deleted upon failure or disconnection.

Definition at line 186 of file statestore-subscriber.h.

Referenced by AddTopic(), and Register().

StatsMetric<double>* impala::StatestoreSubscriber::topic_update_duration_metric_
private

Accumulated statistics on the time taken to process each topic-update message from the statestore (that is, to call all callbacks)

Definition at line 216 of file statestore-subscriber.h.

Referenced by StatestoreSubscriber(), and UpdateState().

StatsMetric<double>* impala::StatestoreSubscriber::topic_update_interval_metric_
private

Accumulated statistics on the frequency of topic-update messages.

Definition at line 209 of file statestore-subscriber.h.

Referenced by StatestoreSubscriber(), and UpdateState().

MonotonicStopWatch impala::StatestoreSubscriber::topic_update_interval_timer_
private

Tracks the time between topic-update mesages.

Definition at line 212 of file statestore-subscriber.h.

Referenced by Register(), and UpdateState().

UpdateCallbacks impala::StatestoreSubscriber::update_callbacks_
private

Callback for all services that have registered for updates (indexed by the associated SubscriptionId), and associated lock.

Definition at line 181 of file statestore-subscriber.h.

Referenced by AddTopic(), Register(), and UpdateState().


The documentation for this class was generated from the following files: