17 #include <boost/date_time/posix_time/posix_time.hpp>
18 #include <boost/foreach.hpp>
19 #include <boost/scoped_ptr.hpp>
20 #include <boost/shared_ptr.hpp>
21 #include <boost/thread/barrier.hpp>
22 #include <boost/thread/thread_time.hpp>
23 #include <boost/unordered_set.hpp>
24 #include <gtest/gtest.h>
26 #include "common/daemon.h"
28 #include "statestore/state-store-subscriber-2.0.h"
33 #include "util/thrift-util.h"
34 #include "gen-cpp/StatestoreTypes_types.h"
35 #include "gen-cpp/Types_types.h"
37 using namespace apache::thrift;
38 using namespace apache::thrift::transport;
39 using namespace boost;
52 virtual void Topic1(
const StateStoreSubscriber2::TopicDeltaMap& state,
53 vector<TTopicUpdate>* topic_updates) {
56 virtual void Topic2(
const StateStoreSubscriber2::TopicDeltaMap& state,
57 vector<TTopicUpdate>* topic_updates) {
66 void Topic1(
const StateStoreSubscriber2::TopicDeltaMap& state,
67 vector<TTopicUpdate>* topic_updates) {
68 callbacks_->Topic1(state, topic_updates);
71 void Topic2(
const StateStoreSubscriber2::TopicDeltaMap& state,
72 vector<TTopicUpdate>* topic_updates) {
73 callbacks_->Topic2(state, topic_updates);
77 return bind<void>(mem_fn(&TopicCallbacks::Topic1),
this, _1, _2);
81 return bind<void>(mem_fn(&TopicCallbacks::Topic2),
this, _1, _2);
101 : state_store_(new InProcessStateStore(next_port_++, next_port_++)) {
105 LOG(INFO) <<
"SETTING UP";
107 Status status = state_store_->Start();
108 VLOG(1) << status.GetErrorMsg();
109 EXPECT_TRUE(status.
ok());
113 EXPECT_TRUE(state_store_->StopForTesting().ok());
120 stringstream subscriber_id;
121 subscriber_id << subscriber;
122 subscribers_.push_back(shared_ptr<StateStoreSubscriber2>(
123 new StateStoreSubscriber2(subscriber_id.str(), subscriber, statestore)));
126 subscribers_.back()->AddTopic(
"topic_1",
false, callbacks_.back()->topic1_callback());
127 subscribers_.back()->AddTopic(
"topic_2",
true, callbacks_.back()->topic2_callback());
129 EXPECT_TRUE(subscribers_.back()->Start().ok());
130 EXPECT_TRUE(
WaitForServer(
"localhost", subscriber.port, 10, 100).
ok());
131 return subscribers_.back();
139 int StateStore2Test::next_port_ = 27000;
142 system_time deadline = get_system_time() +
143 posix_time::milliseconds(5000);
145 if (!cv->timed_wait(*lock, deadline)) {
146 return Status(
"CV deadline expired");
155 virtual void Topic1(
const StateStoreSubscriber2::TopicDeltaMap& state,
156 vector<TTopicUpdate>* topic_updates) {
160 mutex*
lock1() {
return &cv_mutex_; }
161 condition_variable*
cv1() {
return &cv_; }
171 unique_lock<mutex> l(*cb->
lock1());
172 shared_ptr<StateStoreSubscriber2> running_subscriber =
173 StartStateStoreSubscriber(cb);
178 virtual void Topic1(
const StateStoreSubscriber2::TopicDeltaMap& state,
179 vector<TTopicUpdate>* topic_updates) {
180 StateStoreSubscriber2::TopicDeltaMap::const_iterator topic =
181 state.find(
"topic_1");
182 if (topic == state.end() || topic->second.topic_entries.size() == 0) {
183 topic_updates->push_back(TTopicUpdate());
184 TTopicUpdate& update = topic_updates->back();
185 update.topic_name =
"topic_1";
186 update.topic_updates.push_back(TTopicItem());
187 TTopicItem& item = update.topic_updates.back();
188 item.key =
"hello world";
189 item.value =
"value";
191 const TTopicDelta& delta = topic->second;
192 if (delta.topic_entries.size() > 0 && delta.topic_entries[0].key ==
"hello world") {
201 unique_lock<mutex> l(*cb->
lock1());
202 shared_ptr<StateStoreSubscriber2> running_subscriber =
203 StartStateStoreSubscriber(cb);
209 using namespace impala;
211 int main(
int argc,
char **argv) {
212 InitDaemon(argc, argv);
214 ::testing::InitGoogleTest(&argc, argv);
215 return RUN_ALL_TESTS();
TEST_F(StateStore2Test, TestReceiveUpdates)
StateStoreSubscriber2::UpdateCallback topic2_callback()
virtual void Topic1(const StateStoreSubscriber2::TopicDeltaMap &state, vector< TTopicUpdate > *topic_updates)
StateStoreSubscriber2::UpdateCallback topic1_callback()
vector< shared_ptr< StateStoreSubscriber2 > > subscribers_
vector< TopicCallbacks * > callbacks_
TNetworkAddress MakeNetworkAddress(const string &hostname, int port)
static const char * ipaddress_
virtual void Topic2(const StateStoreSubscriber2::TopicDeltaMap &state, vector< TTopicUpdate > *topic_updates)
void Topic1(const StateStoreSubscriber2::TopicDeltaMap &state, vector< TTopicUpdate > *topic_updates)
shared_ptr< StateStoreSubscriber2 > StartStateStoreSubscriber(Callbacks *cb)
void Topic2(const StateStoreSubscriber2::TopicDeltaMap &state, vector< TTopicUpdate > *topic_updates)
scoped_ptr< InProcessStateStore > state_store_
void UnregisterAllSubscribers()
Status WaitForCV(unique_lock< mutex > *lock, condition_variable *cv)
Status WaitForServer(const string &host, int port, int num_retries, int retry_interval_ms)
uint64_t Test(T *ht, const ProbeTuple *input, uint64_t num_tuples)
int main(int argc, char **argv)
DECLARE_int32(rpc_cnxn_attempts)
virtual void Topic1(const StateStoreSubscriber2::TopicDeltaMap &state, vector< TTopicUpdate > *topic_updates)
virtual void Topic1(const StateStoreSubscriber2::TopicDeltaMap &state, vector< TTopicUpdate > *topic_updates)
TopicCallbacks(Callbacks *cb)
condition_variable * cv1()