Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
statestore.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef STATESTORE_STATESTORE_H
16 #define STATESTORE_STATESTORE_H
17 
18 #include <stdint.h>
19 #include <string>
20 #include <vector>
21 #include <map>
22 #include <boost/unordered_map.hpp>
23 #include <boost/scoped_ptr.hpp>
24 #include <boost/thread/condition_variable.hpp>
25 #include <boost/uuid/uuid_generators.hpp>
26 
27 #include "gen-cpp/Types_types.h"
28 #include "gen-cpp/StatestoreSubscriber.h"
29 #include "gen-cpp/StatestoreService.h"
30 #include "util/metrics.h"
32 #include "rpc/thrift-client.h"
33 #include "util/thread-pool.h"
34 #include "util/webserver.h"
35 #include "runtime/client-cache.h"
38 
39 namespace impala {
40 
41 using namespace impala;
42 
43 class Status;
44 
47 //
52 //
59 //
64 //
71 //
74 //
79 class Statestore {
80  public:
83  typedef std::string SubscriberId;
84 
86  typedef std::string TopicId;
87 
89  typedef std::string TopicEntryKey;
90 
92  Statestore(MetricGroup* metrics);
93 
98  //
102  Status RegisterSubscriber(const SubscriberId& subscriber_id,
103  const TNetworkAddress& location,
104  const std::vector<TTopicRegistration>& topic_registrations,
105  TUniqueId* registration_id);
106 
107  void RegisterWebpages(Webserver* webserver);
108 
110  //
112  Status MainLoop();
113 
115  const boost::shared_ptr<StatestoreServiceIf>& thrift_iface() const {
116  return thrift_iface_;
117  }
118 
121  void SetExitFlag();
122 
123  private:
127  class TopicEntry {
128  public:
130  typedef std::string Value;
131 
135  typedef uint64_t Version;
136 
139 
141  static const Value NULL_VALUE;
142 
147  void SetValue(const Value& bytes, Version version);
148 
150 
151  const Value& value() const { return value_; }
152  uint64_t version() const { return version_; }
153  uint32_t length() const { return value_.size(); }
154 
155  private:
159 
164  };
165 
167  typedef boost::unordered_map<TopicEntryKey, TopicEntry> TopicEntryMap;
168 
171  typedef std::map<TopicEntry::Version, TopicEntryKey> TopicUpdateLog;
172 
175  //
178  class Topic {
179  public:
180  Topic(const TopicId& topic_id, IntGauge* key_size_metric,
181  IntGauge* value_size_metric, IntGauge* topic_size_metric)
182  : topic_id_(topic_id), last_version_(0L), total_key_size_bytes_(0L),
183  total_value_size_bytes_(0L), key_size_metric_(key_size_metric),
184  value_size_metric_(value_size_metric), topic_size_metric_(topic_size_metric) { }
185 
189  //
191  TopicEntry::Version Put(const TopicEntryKey& key, const TopicEntry::Value& bytes);
192 
197  //
200  //
202  void DeleteIfVersionsMatch(TopicEntry::Version version, const TopicEntryKey& key);
203 
204  const TopicId& id() const { return topic_id_; }
205  const TopicEntryMap& entries() const { return entries_; }
208  int64_t total_key_size_bytes() const { return total_key_size_bytes_; }
210 
211  private:
214 
217 
221 
225  //
231 
234 
237 
242  };
243 
247 
250  boost::mutex exit_flag_lock_;
252 
254  boost::mutex topic_lock_;
255 
257  typedef boost::unordered_map<TopicId, Topic> TopicMap;
259 
265  class Subscriber {
266  public:
267  Subscriber(const SubscriberId& subscriber_id, const TUniqueId& registration_id,
268  const TNetworkAddress& network_address,
269  const std::vector<TTopicRegistration>& subscribed_topics);
270 
274  struct TopicState {
277  };
278 
281  typedef boost::unordered_map<TopicId, TopicState> Topics;
282 
285 
286  const Topics& subscribed_topics() const { return subscribed_topics_; }
287  const TNetworkAddress& network_address() const { return network_address_; }
288  const SubscriberId& id() const { return subscriber_id_; }
289  const TUniqueId& registration_id() const { return registration_id_; }
290 
296  void AddTransientUpdate(const TopicId& topic_id, const TopicEntryKey& topic_key,
297  TopicEntry::Version version);
298 
301  typedef boost::unordered_map<std::pair<TopicId, TopicEntryKey>, TopicEntry::Version>
303 
305 
308  const TopicEntry::Version LastTopicVersionProcessed(const TopicId& topic_id) const;
309 
313  void SetLastTopicVersionProcessed(const TopicId& topic_id,
314  TopicEntry::Version version);
315 
316  private:
320 
325  const TUniqueId registration_id_;
326 
328  const TNetworkAddress network_address_;
329 
335 
339  };
340 
343  boost::mutex subscribers_lock_;
344 
349  //
353  typedef boost::unordered_map<SubscriberId, boost::shared_ptr<Subscriber> >
356 
358  boost::uuids::random_generator subscriber_uuid_generator_;
359 
363  typedef std::pair<int64_t, SubscriberId> ScheduledSubscriberUpdate;
364 
369  //
377  //
382  //
389  //
394 
396 
399  boost::scoped_ptr<ClientCache<StatestoreSubscriberClient> > update_state_client_cache_;
400 
405  boost::scoped_ptr<ClientCache<StatestoreSubscriberClient> > heartbeat_client_cache_;
406 
408  boost::shared_ptr<StatestoreServiceIf> thrift_iface_;
409 
413  boost::scoped_ptr<MissedHeartbeatFailureDetector> failure_detector_;
414 
418 
423 
428 
431 
436 
441  void DoSubscriberUpdate(bool is_heartbeat, int thread_id,
442  const ScheduledSubscriberUpdate& update);
443 
448  //
454  Status SendTopicUpdate(Subscriber* subscriber, bool* update_skipped);
455 
458  Status SendHeartbeat(Subscriber* subscriber);
459 
462  void UnregisterSubscriber(Subscriber* subscriber);
463 
467  void GatherTopicUpdates(const Subscriber& subscriber,
468  TUpdateStateRequest* update_state_request);
469 
478  //
480  //
485  SubscriberId* subscriber_id = NULL);
486 
488  bool ShouldExit();
489 
502  void TopicsHandler(const Webserver::ArgumentMap& args, rapidjson::Document* document);
503 
516  rapidjson::Document* document);
517 
518 };
519 
520 }
521 
522 #endif
Status RegisterSubscriber(const SubscriberId &subscriber_id, const TNetworkAddress &location, const std::vector< TTopicRegistration > &topic_registrations, TUniqueId *registration_id)
Definition: statestore.cc:352
boost::scoped_ptr< ClientCache< StatestoreSubscriberClient > > heartbeat_client_cache_
Definition: statestore.h:405
TopicEntry::Version last_version_
Definition: statestore.h:220
static const Value NULL_VALUE
Representation of an empty Value. Must have size() == 0.
Definition: statestore.h:141
const TNetworkAddress & network_address() const
Definition: statestore.h:287
IntGauge * topic_size_metric_
Definition: statestore.h:422
std::pair< int64_t, SubscriberId > ScheduledSubscriberUpdate
Definition: statestore.h:363
int64_t total_value_size_bytes() const
Definition: statestore.h:209
std::string Value
A Value is a string of bytes, for which std::string is a convenient representation.
Definition: statestore.h:130
Statestore(MetricGroup *metrics)
The only constructor; initialises member variables only.
Definition: statestore.cc:211
void SetValue(const Value &bytes, Version version)
Definition: statestore.cc:120
const TUniqueId registration_id_
Definition: statestore.h:325
boost::unordered_map< SubscriberId, boost::shared_ptr< Subscriber > > SubscriberMap
Definition: statestore.h:354
boost::unordered_map< std::pair< TopicId, TopicEntryKey >, TopicEntry::Version > TransientEntryMap
Definition: statestore.h:302
void RegisterWebpages(Webserver *webserver)
Definition: statestore.cc:253
const TopicUpdateLog & topic_update_log() const
Definition: statestore.h:207
const Value & value() const
Definition: statestore.h:151
MetricGroups may be organised hierarchically as a tree.
Definition: metrics.h:200
boost::scoped_ptr< ClientCache< StatestoreSubscriberClient > > update_state_client_cache_
Definition: statestore.h:399
void TopicsHandler(const Webserver::ArgumentMap &args, rapidjson::Document *document)
Definition: statestore.cc:265
const SubscriberId subscriber_id_
Definition: statestore.h:319
TransientEntryMap transient_entries_
Definition: statestore.h:338
int64_t total_key_size_bytes_
Total memory occupied by the key strings, in bytes.
Definition: statestore.h:233
Subscriber(const SubscriberId &subscriber_id, const TUniqueId &registration_id, const TNetworkAddress &network_address, const std::vector< TTopicRegistration > &subscribed_topics)
Definition: statestore.cc:175
std::string SubscriberId
Definition: statestore.h:83
IntGauge * key_size_metric_
Metrics shared across all topics to sum the size in bytes of keys, values and both.
Definition: statestore.h:239
std::string TopicEntryKey
A TopicEntryKey uniquely identifies a single entry in a topic.
Definition: statestore.h:89
SetMetric< std::string > * subscriber_set_metric_
Definition: statestore.h:417
const SubscriberId & id() const
Definition: statestore.h:288
int64_t total_value_size_bytes_
Total memory occupied by the value byte strings, in bytes.
Definition: statestore.h:236
const TransientEntryMap & transient_entries() const
Definition: statestore.h:304
boost::unordered_map< TopicId, Topic > TopicMap
The entire set of topics tracked by the statestore.
Definition: statestore.h:257
std::map< std::string, std::string > ArgumentMap
Definition: webserver.h:36
const TopicEntryMap & entries() const
Definition: statestore.h:205
boost::unordered_map< TopicEntryKey, TopicEntry > TopicEntryMap
Map from TopicEntryKey to TopicEntry, maintained by a Topic object.
Definition: statestore.h:167
uint64_t version() const
Definition: statestore.h:152
StatsMetric< double > * heartbeat_duration_metric_
Same as above, but for SendHeartbeat() RPCs.
Definition: statestore.h:430
static const TopicEntry::Version TOPIC_INITIAL_VERSION
The Version value used to initialize new Topic subscriptions for this Subscriber. ...
Definition: statestore.h:284
bool ShouldExit()
True if the shutdown flag has been set true, false otherwise.
Definition: statestore.cc:576
Topic(const TopicId &topic_id, IntGauge *key_size_metric, IntGauge *value_size_metric, IntGauge *topic_size_metric)
Definition: statestore.h:180
boost::uuids::random_generator subscriber_uuid_generator_
Used to generated unique IDs for each new registration.
Definition: statestore.h:358
ThreadPool< ScheduledSubscriberUpdate > subscriber_topic_update_threadpool_
Definition: statestore.h:393
uint32_t length() const
Definition: statestore.h:153
std::string TopicId
A TopicId uniquely identifies a single topic.
Definition: statestore.h:86
Status SendTopicUpdate(Subscriber *subscriber, bool *update_skipped)
Definition: statestore.cc:399
IntGauge * topic_size_metric_
Definition: statestore.h:241
void AddTransientUpdate(const TopicId &topic_id, const TopicEntryKey &topic_key, TopicEntry::Version version)
Definition: statestore.cc:189
IntGauge * num_subscribers_metric_
Metric that track the registered, non-failed subscribers.
Definition: statestore.h:416
void SubscribersHandler(const Webserver::ArgumentMap &args, rapidjson::Document *document)
Definition: statestore.cc:307
IntGauge * value_size_metric_
Definition: statestore.h:421
boost::mutex exit_flag_lock_
Definition: statestore.h:250
int64_t total_key_size_bytes() const
Definition: statestore.h:208
StatsMetric< double > * topic_update_duration_metric_
Definition: statestore.h:427
IntGauge * key_size_metric_
Metrics shared across all topics to sum the size in bytes of keys, values and both.
Definition: statestore.h:420
void DeleteIfVersionsMatch(TopicEntry::Version version, const TopicEntryKey &key)
Must be called holding the topic lock.
Definition: statestore.cc:158
void SetLastTopicVersionProcessed(const TopicId &topic_id, TopicEntry::Version version)
Definition: statestore.cc:206
boost::shared_ptr< StatestoreServiceIf > thrift_iface_
Thrift API implementation which proxies requests onto this Statestore.
Definition: statestore.h:408
TopicEntry::Version last_version() const
Definition: statestore.h:206
void GatherTopicUpdates(const Subscriber &subscriber, TUpdateStateRequest *update_state_request)
Definition: statestore.cc:484
boost::scoped_ptr< MissedHeartbeatFailureDetector > failure_detector_
Definition: statestore.h:413
void UnregisterSubscriber(Subscriber *subscriber)
Definition: statestore.cc:714
boost::unordered_map< TopicId, TopicState > Topics
Definition: statestore.h:281
TopicEntryMap entries_
Map from topic entry key to topic entry.
Definition: statestore.h:213
SubscriberMap subscribers_
Definition: statestore.h:355
TopicEntry::Version Put(const TopicEntryKey &key, const TopicEntry::Value &bytes)
Must be called holding the topic lock.
Definition: statestore.cc:127
Status MainLoop()
The main processing loop. Blocks until the exit flag is set.
Definition: statestore.cc:743
TopicUpdateLog topic_update_log_
Definition: statestore.h:230
const TopicId & id() const
Definition: statestore.h:204
void DoSubscriberUpdate(bool is_heartbeat, int thread_id, const ScheduledSubscriberUpdate &update)
Definition: statestore.cc:606
boost::mutex topic_lock_
Controls access to topics_. Cannot take subscribers_lock_ after acquiring this lock.
Definition: statestore.h:254
const boost::shared_ptr< StatestoreServiceIf > & thrift_iface() const
Returns the Thrift API interface that proxies requests onto the local Statestore. ...
Definition: statestore.h:115
ThreadPool< ScheduledSubscriberUpdate > subscriber_heartbeat_threadpool_
Definition: statestore.h:395
const TopicId topic_id_
Unique identifier for this topic. Should be human-readable.
Definition: statestore.h:216
const Topics & subscribed_topics() const
Definition: statestore.h:286
Status OfferUpdate(const ScheduledSubscriberUpdate &update, ThreadPool< ScheduledSubscriberUpdate > *thread_pool)
Definition: statestore.cc:335
std::map< TopicEntry::Version, TopicEntryKey > TopicUpdateLog
Definition: statestore.h:171
boost::mutex subscribers_lock_
Definition: statestore.h:343
const TopicEntry::Version LastTopicVersionProcessed(const TopicId &topic_id) const
Definition: statestore.cc:199
const TopicEntry::Version GetMinSubscriberTopicVersion(const TopicId &topic_id, SubscriberId *subscriber_id=NULL)
Must be called holding the subscribers_ lock.
Definition: statestore.cc:556
Status SendHeartbeat(Subscriber *subscriber)
Definition: statestore.cc:587
const TUniqueId & registration_id() const
Definition: statestore.h:289
IntGauge * value_size_metric_
Definition: statestore.h:240
const TNetworkAddress network_address_
The location of the subscriber service that this subscriber runs.
Definition: statestore.h:328
static const Version TOPIC_ENTRY_INITIAL_VERSION
The Version value used to initialize a new TopicEntry.
Definition: statestore.h:138