Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
statestore-subscriber.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 
16 #ifndef STATESTORE_STATESTORE_SUBSCRIBER_H
17 #define STATESTORE_STATESTORE_SUBSCRIBER_H
18 
19 #include <string>
20 
21 #include <boost/scoped_ptr.hpp>
22 #include <boost/shared_ptr.hpp>
23 #include <boost/thread/mutex.hpp>
24 #include <boost/thread/thread.hpp>
25 
26 #include "statestore/statestore.h"
27 #include "util/stopwatch.h"
28 #include "rpc/thrift-util.h"
29 #include "rpc/thrift-client.h"
30 #include "util/thread.h"
31 #include "util/metrics.h"
32 #include "gen-cpp/StatestoreService.h"
33 #include "gen-cpp/StatestoreSubscriber.h"
34 
35 namespace impala {
36 
37 class TimeoutFailureDetector;
38 class Status;
39 class TNetworkAddress;
41 
43 
50 //
55 //
59 //
65  public:
68  //
71  //
73  StatestoreSubscriber(const std::string& subscriber_id,
74  const TNetworkAddress& heartbeat_address,
75  const TNetworkAddress& statestore_address,
76  MetricGroup* metrics);
77 
79  typedef std::map<Statestore::TopicId, TTopicDelta> TopicDeltaMap;
80 
83  //
88  //
94  //
98  typedef boost::function<void (const TopicDeltaMap& state,
99  std::vector<TTopicDelta>* topic_updates)> UpdateCallback;
100 
107  //
110  Status AddTopic(const Statestore::TopicId& topic_id, bool is_transient,
111  const UpdateCallback& callback);
112 
116  //
118  Status Start();
119 
120  const std::string& id() const { return subscriber_id_; }
121 
122  private:
124  const std::string subscriber_id_;
125 
127  TNetworkAddress heartbeat_address_;
128 
130  TNetworkAddress statestore_address_;
131 
134  boost::shared_ptr<StatestoreSubscriberIf> thrift_iface_;
135 
137  boost::shared_ptr<ThriftServer> heartbeat_server_;
138 
140  boost::scoped_ptr<impala::TimeoutFailureDetector> failure_detector_;
141 
143  boost::scoped_ptr<Thread> recovery_mode_thread_;
144 
147  boost::mutex lock_;
148 
152 
155  boost::mutex registration_id_lock_;
156 
162  TUniqueId registration_id_;
163 
164  struct Callbacks {
168 
170  std::vector<UpdateCallback> callbacks;
171  };
172 
177  typedef boost::unordered_map<Statestore::TopicId, Callbacks> UpdateCallbacks;
178 
182 
186  std::map<Statestore::TopicId, bool> topic_registrations_;
187 
190  typedef boost::unordered_map<Statestore::TopicId, int64_t> TopicVersionMap;
192 
194  boost::scoped_ptr<StatestoreClientCache> client_cache_;
195 
198 
201 
204 
207 
210 
213 
217 
220 
223 
226 
229 
243  //
249  Status UpdateState(const TopicDeltaMap& incoming_topic_deltas,
250  const TUniqueId& registration_id,
251  std::vector<TTopicDelta>* subscriber_topic_updates, bool* skipped);
252 
254  void Heartbeat(const TUniqueId& registration_id);
255 
260  //
264  void RecoveryModeChecker();
265 
269  Status Register();
270 
274  Status CheckRegistrationId(const TUniqueId& registration_id);
275 };
276 
277 }
278 
279 #endif
StatestoreSubscriber(const std::string &subscriber_id, const TNetworkAddress &heartbeat_address, const TNetworkAddress &statestore_address, MetricGroup *metrics)
statestore_address - the address of the statestore to register with
MetricGroup * metrics_
MetricGroup instance that all metrics are registered in. Not owned by this class. ...
boost::shared_ptr< StatestoreSubscriberIf > thrift_iface_
void Heartbeat(const TUniqueId &registration_id)
Called when the statestore sends a heartbeat message. Updates the failure detector.
std::vector< UpdateCallback > callbacks
List of callbacks to invoke for this topic.
TNetworkAddress statestore_address_
Address of the statestore.
class SimpleMetric< std::string, TMetricKind::PROPERTY > StringProperty
Definition: metrics.h:323
ClientCache< StatestoreServiceClient > StatestoreClientCache
boost::scoped_ptr< Thread > recovery_mode_thread_
Thread in which RecoveryModeChecker runs.
const std::string & id() const
MetricGroups may be organised hierarchically as a tree.
Definition: metrics.h:200
Status CheckRegistrationId(const TUniqueId &registration_id)
boost::scoped_ptr< impala::TimeoutFailureDetector > failure_detector_
Failure detector that tracks heartbeat messages from the statestore.
DoubleGauge * last_recovery_duration_metric_
Amount of time last spent in recovery mode.
TNetworkAddress heartbeat_address_
Address that the heartbeat service should be started on.
StringProperty * registration_id_metric_
Current registration ID, in string form.
boost::function< void(const TopicDeltaMap &state, std::vector< TTopicDelta > *topic_updates)> UpdateCallback
StringProperty * last_recovery_time_metric_
When the last recovery happened.
std::string TopicId
A TopicId uniquely identifies a single topic.
Definition: statestore.h:86
MonotonicStopWatch topic_update_interval_timer_
Tracks the time between topic-update mesages.
boost::scoped_ptr< StatestoreClientCache > client_cache_
statestore client cache - only one client is ever used.
std::map< Statestore::TopicId, bool > topic_registrations_
std::map< Statestore::TopicId, TTopicDelta > TopicDeltaMap
A TopicDeltaMap is passed to each callback. See UpdateCallback for more details.
const std::string subscriber_id_
Unique, but opaque, identifier for this subscriber.
Status UpdateState(const TopicDeltaMap &incoming_topic_deltas, const TUniqueId &registration_id, std::vector< TTopicDelta > *subscriber_topic_updates, bool *skipped)
StatsMetric< double > * topic_update_duration_metric_
class SimpleMetric< bool, TMetricKind::PROPERTY > BooleanProperty
Definition: metrics.h:322
boost::unordered_map< Statestore::TopicId, int64_t > TopicVersionMap
class SimpleMetric< double, TMetricKind::GAUGE > DoubleGauge
Definition: metrics.h:319
boost::unordered_map< Statestore::TopicId, Callbacks > UpdateCallbacks
StatsMetric< double > * topic_update_interval_metric_
Accumulated statistics on the frequency of topic-update messages.
BooleanProperty * connected_to_statestore_metric_
Metric to indicate if we are successfully registered with the statestore.
StatsMetric< double > * heartbeat_interval_metric_
Accumulated statistics on the frequency of heartbeat messages.
Status Start()
Returns OK unless some error occurred, like a failure to connect.
Status AddTopic(const Statestore::TopicId &topic_id, bool is_transient, const UpdateCallback &callback)
boost::shared_ptr< ThriftServer > heartbeat_server_
Container for the heartbeat server.
MonotonicStopWatch heartbeat_interval_timer_
Tracks the time between heartbeat mesages.