20 #include <boost/algorithm/string/join.hpp>
21 #include <boost/foreach.hpp>
22 #include <boost/date_time/posix_time/posix_time.hpp>
23 #include <boost/thread/shared_mutex.hpp>
24 #include <gutil/strings/substitute.h>
29 #include "gen-cpp/StatestoreService_types.h"
37 using boost::posix_time::seconds;
38 using namespace apache::thrift;
39 using namespace strings;
41 DEFINE_int32(statestore_subscriber_timeout_seconds, 30,
"The amount of time (in seconds)"
42 " that may elapse before the connection with the statestore is considered lost.");
43 DEFINE_int32(statestore_subscriber_cnxn_attempts, 10,
"The number of times to retry an "
44 "RPC connection to the statestore. A setting of 0 means retry indefinitely");
45 DEFINE_int32(statestore_subscriber_cnxn_retry_interval_ms, 3000,
"The interval, in ms, "
46 "to wait between attempts to make an RPC connection to the statestore.");
67 : subscriber_(subscriber) { DCHECK(subscriber != NULL); }
69 const TUpdateStateRequest& params) {
70 TUniqueId registration_id;
71 if (params.__isset.registration_id) {
72 registration_id = params.registration_id;
75 subscriber_->UpdateState(params.topic_deltas, registration_id,
76 &response.topic_updates, &response.skipped).ToThrift(&response.status);
78 response.__set_skipped(response.skipped);
81 virtual void Heartbeat(THeartbeatResponse& response,
const THeartbeatRequest& request) {
82 subscriber_->Heartbeat(request.registration_id);
89 StatestoreSubscriber::StatestoreSubscriber(
const std::string& subscriber_id,
90 const TNetworkAddress& heartbeat_address,
const TNetworkAddress& statestore_address,
92 : subscriber_id_(subscriber_id), heartbeat_address_(heartbeat_address),
93 statestore_address_(statestore_address),
96 seconds(FLAGS_statestore_subscriber_timeout_seconds),
97 seconds(FLAGS_statestore_subscriber_timeout_seconds / 2))),
98 is_registered_(false),
100 FLAGS_statestore_subscriber_cnxn_retry_interval_ms)),
101 metrics_(metrics->GetChildGroup(
"statestore-subscriber")) {
105 "statestore-subscriber.last-recovery-duration", 0.0);
107 "statestore-subscriber.last-recovery-time",
"N/A");
119 "statestore-subscriber.registration-id",
"N/A",
120 "The most recent registration ID for this subscriber with the statestore. Set to "
121 "'N/A' if no registration has been completed");
123 client_cache_->InitMetrics(metrics,
"statestore-subscriber.statestore");
128 lock_guard<mutex> l(
lock_);
146 TRegisterSubscriberRequest request;
149 TTopicRegistration thrift_topic;
150 thrift_topic.topic_name = topic.first;
152 request.topic_registrations.push_back(thrift_topic);
157 TRegisterSubscriberResponse response;
159 client.DoRpc(&StatestoreServiceClient::RegisterSubscriber, request, &response));
162 if (response.__isset.registration_id) {
167 VLOG(1) <<
"Subscriber registration ID: " << registration_string;
169 VLOG(1) <<
"No subscriber registration ID received from statestore";
182 lock_guard<mutex> l(
lock_);
183 LOG(INFO) <<
"Starting statestore subscriber";
186 shared_ptr<TProcessor> processor(
new StatestoreSubscriberProcessor(
thrift_iface_));
187 shared_ptr<TProcessorEventHandler> event_handler(
189 processor->setEventHandler(event_handler);
194 LOG(INFO) <<
"Registering with statestore";
198 LOG(INFO) <<
"statestore registration successful";
200 LOG(INFO) <<
"statestore registration unsuccessful: " << status.
GetDetail();
221 lock_guard<mutex> l(
lock_);
223 recovery_timer.
Start();
226 <<
": Connection with statestore lost, entering recovery mode";
227 uint32_t attempt_count = 1;
229 LOG(INFO) <<
"Trying to re-register with statestore, attempt: "
236 LOG(INFO) <<
"Reconnected to statestore. Exiting recovery mode";
242 LOG(WARNING) <<
"Failed to re-register with statestore: "
247 recovery_timer.
ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
255 recovery_timer.
ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
271 return Status(Substitute(
"Unexpected registration ID: $0, was expecting $1",
291 const TUniqueId& registration_id, vector<TTopicDelta>* subscriber_topic_updates,
306 try_mutex::scoped_try_lock l(
lock_);
320 bool found_unexpected_delta =
false;
321 BOOST_FOREACH(
const TopicDeltaMap::value_type& delta, incoming_topic_deltas) {
324 if (delta.second.is_delta && delta.second.from_version != itr->second) {
325 LOG(ERROR) <<
"Unexpected delta update to topic '" << delta.first <<
"' of "
326 <<
"version range (" << delta.second.from_version <<
":"
327 << delta.second.to_version <<
"]. Expected delta start version: "
330 subscriber_topic_updates->push_back(TTopicDelta());
331 TTopicDelta& update = subscriber_topic_updates->back();
332 update.topic_name = delta.second.topic_name;
333 update.__set_from_version(itr->second);
334 found_unexpected_delta =
true;
343 if (!found_unexpected_delta) {
347 BOOST_FOREACH(
const UpdateCallback& callback, callbacks.second.callbacks) {
350 callback(incoming_topic_deltas, subscriber_topic_updates);
352 callbacks.second.processing_time_metric->Update(
MetricGroup * metrics_
MetricGroup instance that all metrics are registered in. Not owned by this class. ...
boost::shared_ptr< StatestoreSubscriberIf > thrift_iface_
const std::string GetDetail() const
void Heartbeat(const TUniqueId ®istration_id)
Called when the statestore sends a heartbeat message. Updates the failure detector.
std::vector< UpdateCallback > callbacks
List of callbacks to invoke for this topic.
TNetworkAddress statestore_address_
Address of the statestore.
const string CALLBACK_METRIC_PATTERN
TODO: Consider allowing fragment IDs as category parameters.
ClientConnection< StatestoreServiceClient > StatestoreConnection
boost::scoped_ptr< Thread > recovery_mode_thread_
Thread in which RecoveryModeChecker runs.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
M * RegisterMetric(M *metric)
MetricGroups may be organised hierarchically as a tree.
Status CheckRegistrationId(const TUniqueId ®istration_id)
virtual void UpdateState(TUpdateStateResponse &response, const TUpdateStateRequest ¶ms)
boost::scoped_ptr< impala::TimeoutFailureDetector > failure_detector_
Failure detector that tracks heartbeat messages from the statestore.
StatsMetric< double > * processing_time_metric
string PrintId(const TUniqueId &id, const string &separator)
const int32_t SLEEP_INTERVAL_MS
void SleepForMs(const int64_t duration_ms)
Sleeps the current thread for at least duration_ms milliseconds.
UpdateCallbacks update_callbacks_
DoubleGauge * last_recovery_duration_metric_
Amount of time last spent in recovery mode.
StatestoreSubscriber * subscriber_
uint64_t Reset()
Restarts the timer. Returns the elapsed time until this point.
boost::mutex registration_id_lock_
TNetworkAddress heartbeat_address_
Address that the heartbeat service should be started on.
StringProperty * registration_id_metric_
Current registration ID, in string form.
boost::function< void(const TopicDeltaMap &state, std::vector< TTopicDelta > *topic_updates)> UpdateCallback
StringProperty * last_recovery_time_metric_
When the last recovery happened.
std::string TopicId
A TopicId uniquely identifies a single topic.
SimpleMetric< T, TMetricKind::PROPERTY > * AddProperty(const std::string &key, const T &value, const std::string &description="")
MonotonicStopWatch topic_update_interval_timer_
Tracks the time between topic-update mesages.
boost::scoped_ptr< StatestoreClientCache > client_cache_
statestore client cache - only one client is ever used.
DEFINE_int32(statestore_subscriber_timeout_seconds, 30,"The amount of time (in seconds)"" that may elapse before the connection with the statestore is considered lost.")
std::string DebugString(const T &val)
std::map< Statestore::TopicId, bool > topic_registrations_
void Update(const T &value)
std::map< Statestore::TopicId, TTopicDelta > TopicDeltaMap
A TopicDeltaMap is passed to each callback. See UpdateCallback for more details.
const std::string subscriber_id_
Unique, but opaque, identifier for this subscriber.
Status UpdateState(const TopicDeltaMap &incoming_topic_deltas, const TUniqueId ®istration_id, std::vector< TTopicDelta > *subscriber_topic_updates, bool *skipped)
uint64_t ElapsedTime() const
Returns time in nanosecond.
virtual void Heartbeat(THeartbeatResponse &response, const THeartbeatRequest &request)
StatsMetric< double > * topic_update_duration_metric_
void RecoveryModeChecker()
SimpleMetric< T > * AddGauge(const std::string &key, const T &value, const TUnit::type unit=TUnit::NONE, const std::string &description="")
Create a gauge metric object with given key and initial value (owned by this object) ...
TopicVersionMap current_topic_versions_
TUniqueId registration_id_
StatsMetric< double > * topic_update_interval_metric_
Accumulated statistics on the frequency of topic-update messages.
BooleanProperty * connected_to_statestore_metric_
Metric to indicate if we are successfully registered with the statestore.
StatsMetric< double > * heartbeat_interval_metric_
Accumulated statistics on the frequency of heartbeat messages.
StatestoreSubscriberThriftIf(StatestoreSubscriber *subscriber)
const string STATESTORE_ID
Status Start()
Returns OK unless some error occurred, like a failure to connect.
Status AddTopic(const Statestore::TopicId &topic_id, bool is_transient, const UpdateCallback &callback)
boost::shared_ptr< ThriftServer > heartbeat_server_
Container for the heartbeat server.
MonotonicStopWatch heartbeat_interval_timer_
Tracks the time between heartbeat mesages.
static TimestampValue LocalTime()