Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
simba.cc
Go to the documentation of this file.
1 // =================================================================================================
7 // =================================================================================================
8 #include <stdio.h>
9 
10 #include "boost/shared_ptr.hpp"
11 #include "boost/thread.hpp"
12 #include "thrift/protocol/TBinaryProtocol.h"
13 //#include "thrift/transport/sasltransport/TSaslClientTransport.h"
14 #include "thrift/transport/TTransportUtils.h"
15 #include "thrift/transport/TSocket.h"
16 #include "gen-cpp/TCLIService.h"
17 
18 using namespace apache::hive::service::cli::thrift;
19 using namespace apache::thrift;
20 using namespace apache::thrift::transport;
21 using namespace apache::thrift::protocol;
22 using namespace boost;
23 
24 
29 void CloseOperation(apache::hive::service::cli::thrift::TFetchResultsReq& fetchResultsReq, TCLIServiceClient& impalaClient)
30 {
31  try
32  {
33  TCloseOperationReq closeOperationReq;
34  TCloseOperationResp closeOperationResp;
35 
36  closeOperationReq.operationHandle = fetchResultsReq.operationHandle;
37 
38  impalaClient.CloseOperation(closeOperationResp, closeOperationReq);
39  }
40  catch (TException& ex)
41  {
42  printf("\nClose Operation failed from thread\n\n");
43  }
44 }
45 
50 void FetchFunction(apache::hive::service::cli::thrift::TOperationHandle opHandle, shared_ptr<TCLIServiceClient> impalaClient,
51  shared_ptr<mutex> m){
52 
53  try
54  {
55  size_t numRows = 0;
56  bool hasMoreRows = true;
57 
58  TFetchResultsResp fetchResultsResp;
59 
60  apache::hive::service::cli::thrift::TFetchResultsReq fetchResultsReq;
61  fetchResultsReq.orientation = TFetchOrientation::FETCH_NEXT;
62  fetchResultsReq.__set_operationHandle(opHandle);
63  fetchResultsReq.maxRows = 1000;
64  {
65  unique_lock<mutex> l(*m);
66  impalaClient->FetchResults(fetchResultsResp, fetchResultsReq);
67  }
68 
69  if ((TStatusCode::SUCCESS_STATUS == fetchResultsResp.status.statusCode) ||
70  (TStatusCode::SUCCESS_WITH_INFO_STATUS == fetchResultsResp.status.statusCode) )
71  {
72  unique_lock<mutex> l(*m);
73  numRows = fetchResultsResp.results.rows.size();
74  hasMoreRows = 0 < numRows;
75 
76  if (!hasMoreRows)
77  {
78  printf("\nNo More Rows, closing operation\n\n");
79  CloseOperation(fetchResultsReq, *impalaClient);
80  }
81  }
82  else
83  {
84  unique_lock<mutex> l(*m);
85  printf("\nNo Success\n\n");
86  CloseOperation(fetchResultsReq, *impalaClient);
87  }
88  }
89  catch (...)
90  {
91  printf("\nTransport exception while fetching\n\n");
92  }
93 }
94 
98 {
99  shared_ptr<TTransport> transport;
100  shared_ptr<mutex> mutex(new mutex);
101 
102  apache::hive::service::cli::thrift::TOpenSessionResp openSessionResp;
103  apache::hive::service::cli::thrift::TExecuteStatementResp executeResponse;
104 
105  try
106  {
107  shared_ptr<TSocket> socket(new TSocket("localhost", 21050));
108 
109  transport.reset(new TBufferedTransport(socket));
110  transport->open();
111  }
112  catch (const TException& e)
113  {
114  printf("\n Error in the transport open \n\n");
115  }
116 
117  shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
118  shared_ptr<TCLIServiceClient> client(new TCLIServiceClient(protocol));
119 
120  TOpenSessionReq request;
121 
122  try
123  {
124  request.__set_username("");
125  request.__set_password("");
126 
127  client->OpenSession(openSessionResp, request);
128  }
129  catch (TException& ex)
130  {
131  printf("\nOpenSession Failed\n\n");
132  }
133 
134  TExecuteStatementReq queryRequest;
135 
136  try
137  {
138  queryRequest.__set_sessionHandle(openSessionResp.sessionHandle);
139  queryRequest.__set_statement("SELECT COUNT(*) FROM tpch.lineitem"); //BIM_REPOSITORY.BIM_InstanceSettings WHERE Setting='BIM_REPOSITORY_VERSION' AND SettingValue='C'");
140 
141  client->ExecuteStatement(executeResponse, queryRequest);
142  }
143 
144  catch (const TTransportException& e)
145  {
146  printf("\nTransport error\n\n");
147  }
148 
149  thread workerThread(FetchFunction, executeResponse.operationHandle, client, mutex);
150 
151  try
152  {
153  unique_lock<mutex> l(*mutex);
154  TGetOperationStatusReq statusReq;
155  TGetOperationStatusResp statusResp;
156 
157  statusReq.__set_operationHandle(executeResponse.operationHandle);
158  printf("GET OPERATION STATUS\n");
159  client->GetOperationStatus(statusResp, statusReq);
160  printf("OPERATION STATUS RETURNED\n");
161  if ((TOperationState::RUNNING_STATE == statusResp.operationState) ||
162  (TOperationState::INITIALIZED_STATE == statusResp.operationState))
163  {
164  TCancelOperationReq cancelReq;
165  TCancelOperationResp cancelResp;
166 
167  cancelReq.__set_operationHandle(executeResponse.operationHandle);
168  client->CancelOperation(cancelResp, cancelReq);
169  }
170  }
171  catch(...)
172  {
173  printf("Cancellation error\n");
174  }
175 }
176 
177 
178 
179 int main( int argc, const char* argv[] )
180 {
181  // Loop to run test behavior repeatedly until the driver experiences a "hang"
182  // This loop of 100 iterations should take < 1 minute; otherwise the driver is hung up
183  for (int testLoop = 0; testLoop < 100; testLoop++)
184  {
186  }
187 }
void ExecuteAndCancelOperationTest()
Test method to execute/cancel a SQL statement and fetch the results in a seperate thread...
Definition: simba.cc:97
const StringSearch UrlParser::protocol_search & protocol
Definition: url-parser.cc:36
void CloseOperation(apache::hive::service::cli::thrift::TFetchResultsReq &fetchResultsReq, TCLIServiceClient &impalaClient)
Method to close a fetch operation handle.
Definition: simba.cc:29
int main(int argc, const char *argv[])
Definition: simba.cc:179
void FetchFunction(apache::hive::service::cli::thrift::TOperationHandle opHandle, shared_ptr< TCLIServiceClient > impalaClient, shared_ptr< mutex > m)
Thread entry point method to demonstrate background-fetching behavior.
Definition: simba.cc:50