Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
state-store-2.0-test.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <vector>
16 
17 #include <boost/date_time/posix_time/posix_time.hpp>
18 #include <boost/foreach.hpp>
19 #include <boost/scoped_ptr.hpp>
20 #include <boost/shared_ptr.hpp>
21 #include <boost/thread/barrier.hpp>
22 #include <boost/thread/thread_time.hpp>
23 #include <boost/unordered_set.hpp>
24 #include <gtest/gtest.h>
25 
26 #include "common/daemon.h"
27 #include "common/status.h"
28 #include "statestore/state-store-subscriber-2.0.h"
30 #include "util/cpu-info.h"
31 #include "util/metrics.h"
32 #include "util/network-util.h"
33 #include "util/thrift-util.h"
34 #include "gen-cpp/StatestoreTypes_types.h"
35 #include "gen-cpp/Types_types.h"
36 
37 using namespace apache::thrift;
38 using namespace apache::thrift::transport;
39 using namespace boost;
40 using namespace std;
41 
42 DECLARE_int32(rpc_cnxn_attempts);
43 DECLARE_int32(rpc_cnxn_retry_interval_ms);
44 DECLARE_int32(statestore_max_missed_heartbeats);
45 
46 namespace impala {
47 
49  public:
50  class Callbacks {
51  public:
52  virtual void Topic1(const StateStoreSubscriber2::TopicDeltaMap& state,
53  vector<TTopicUpdate>* topic_updates) {
54  }
55 
56  virtual void Topic2(const StateStoreSubscriber2::TopicDeltaMap& state,
57  vector<TTopicUpdate>* topic_updates) {
58  }
59  };
60 
61  protected:
63  public:
64  TopicCallbacks(Callbacks* cb) : callbacks_(cb) { }
65 
66  void Topic1(const StateStoreSubscriber2::TopicDeltaMap& state,
67  vector<TTopicUpdate>* topic_updates) {
68  callbacks_->Topic1(state, topic_updates);
69  }
70 
71  void Topic2(const StateStoreSubscriber2::TopicDeltaMap& state,
72  vector<TTopicUpdate>* topic_updates) {
73  callbacks_->Topic2(state, topic_updates);
74  }
75 
76  StateStoreSubscriber2::UpdateCallback topic1_callback() {
77  return bind<void>(mem_fn(&TopicCallbacks::Topic1), this, _1, _2);
78  }
79 
80  StateStoreSubscriber2::UpdateCallback topic2_callback() {
81  return bind<void>(mem_fn(&TopicCallbacks::Topic2), this, _1, _2);
82  }
83 
85  delete callbacks_;
86  }
87 
88  private:
90  };
91 
92  static int next_port_;
93  scoped_ptr<InProcessStateStore> state_store_;
94  static const char* ipaddress_;
95 
96 
97  vector<shared_ptr<StateStoreSubscriber2> > subscribers_;
98  vector<TopicCallbacks*> callbacks_;
99 
101  : state_store_(new InProcessStateStore(next_port_++, next_port_++)) {
102  }
103 
104  virtual void SetUp() {
105  LOG(INFO) << "SETTING UP";
106 
107  Status status = state_store_->Start();
108  VLOG(1) << status.GetErrorMsg();
109  EXPECT_TRUE(status.ok());
110  }
111 
112  virtual void TearDown() {
113  EXPECT_TRUE(state_store_->StopForTesting().ok());
114  // TODO: delete all the callbacks
115  }
116 
117  shared_ptr<StateStoreSubscriber2> StartStateStoreSubscriber(Callbacks* cb) {
118  TNetworkAddress statestore = MakeNetworkAddress("localhost", state_store_->port());
119  TNetworkAddress subscriber = MakeNetworkAddress("localhost", next_port_++);
120  stringstream subscriber_id;
121  subscriber_id << subscriber;
122  subscribers_.push_back(shared_ptr<StateStoreSubscriber2>(
123  new StateStoreSubscriber2(subscriber_id.str(), subscriber, statestore)));
124 
125  callbacks_.push_back(new TopicCallbacks(cb));
126  subscribers_.back()->AddTopic("topic_1", false, callbacks_.back()->topic1_callback());
127  subscribers_.back()->AddTopic("topic_2", true, callbacks_.back()->topic2_callback());
128 
129  EXPECT_TRUE(subscribers_.back()->Start().ok());
130  EXPECT_TRUE(WaitForServer("localhost", subscriber.port, 10, 100).ok());
131  return subscribers_.back();
132  }
133 
135  // TODO: Stop for testing?
136  }
137 };
138 
139 int StateStore2Test::next_port_ = 27000;
140 
141 Status WaitForCV(unique_lock<mutex>* lock, condition_variable* cv) {
142  system_time deadline = get_system_time() +
143  posix_time::milliseconds(5000);
144 
145  if (!cv->timed_wait(*lock, deadline)) {
146  return Status("CV deadline expired");
147  }
148 
149  return Status::OK;
150 }
151 
152 
154  public:
155  virtual void Topic1(const StateStoreSubscriber2::TopicDeltaMap& state,
156  vector<TTopicUpdate>* topic_updates) {
157  cv_.notify_all();
158  }
159 
160  mutex* lock1() { return &cv_mutex_; }
161  condition_variable* cv1() { return &cv_; }
162 
163  protected:
164  mutex cv_mutex_;
165  condition_variable cv_;
166 
167 };
168 
169 TEST_F(StateStore2Test, TestRegistration) {
171  unique_lock<mutex> l(*cb->lock1());
172  shared_ptr<StateStoreSubscriber2> running_subscriber =
173  StartStateStoreSubscriber(cb);
174  EXPECT_TRUE(WaitForCV(&l, cb->cv1()).ok());
175 }
176 
178  virtual void Topic1(const StateStoreSubscriber2::TopicDeltaMap& state,
179  vector<TTopicUpdate>* topic_updates) {
180  StateStoreSubscriber2::TopicDeltaMap::const_iterator topic =
181  state.find("topic_1");
182  if (topic == state.end() || topic->second.topic_entries.size() == 0) {
183  topic_updates->push_back(TTopicUpdate());
184  TTopicUpdate& update = topic_updates->back();
185  update.topic_name = "topic_1";
186  update.topic_updates.push_back(TTopicItem());
187  TTopicItem& item = update.topic_updates.back();
188  item.key = "hello world";
189  item.value = "value";
190  } else {
191  const TTopicDelta& delta = topic->second;
192  if (delta.topic_entries.size() > 0 && delta.topic_entries[0].key == "hello world") {
193  cv_.notify_all();
194  }
195  }
196  }
197 };
198 
199 TEST_F(StateStore2Test, TestReceiveUpdates) {
201  unique_lock<mutex> l(*cb->lock1());
202  shared_ptr<StateStoreSubscriber2> running_subscriber =
203  StartStateStoreSubscriber(cb);
204  EXPECT_TRUE(WaitForCV(&l, cb->cv1()).ok());
205 }
206 
207 }
208 
209 using namespace impala;
210 
211 int main(int argc, char **argv) {
212  InitDaemon(argc, argv);
213 
214  ::testing::InitGoogleTest(&argc, argv);
215  return RUN_ALL_TESTS();
216 }
TEST_F(StateStore2Test, TestReceiveUpdates)
StateStoreSubscriber2::UpdateCallback topic2_callback()
virtual void Topic1(const StateStoreSubscriber2::TopicDeltaMap &state, vector< TTopicUpdate > *topic_updates)
StateStoreSubscriber2::UpdateCallback topic1_callback()
vector< shared_ptr< StateStoreSubscriber2 > > subscribers_
vector< TopicCallbacks * > callbacks_
TNetworkAddress MakeNetworkAddress(const string &hostname, int port)
Definition: network-util.cc:96
static const char * ipaddress_
virtual void Topic2(const StateStoreSubscriber2::TopicDeltaMap &state, vector< TTopicUpdate > *topic_updates)
void Topic1(const StateStoreSubscriber2::TopicDeltaMap &state, vector< TTopicUpdate > *topic_updates)
shared_ptr< StateStoreSubscriber2 > StartStateStoreSubscriber(Callbacks *cb)
void Topic2(const StateStoreSubscriber2::TopicDeltaMap &state, vector< TTopicUpdate > *topic_updates)
scoped_ptr< InProcessStateStore > state_store_
Status WaitForCV(unique_lock< mutex > *lock, condition_variable *cv)
Status WaitForServer(const string &host, int port, int num_retries, int retry_interval_ms)
Definition: thrift-util.cc:121
uint64_t Test(T *ht, const ProbeTuple *input, uint64_t num_tuples)
int main(int argc, char **argv)
DECLARE_int32(rpc_cnxn_attempts)
virtual void Topic1(const StateStoreSubscriber2::TopicDeltaMap &state, vector< TTopicUpdate > *topic_updates)
virtual void Topic1(const StateStoreSubscriber2::TopicDeltaMap &state, vector< TTopicUpdate > *topic_updates)
bool ok() const
Definition: status.h:172