10 #include "boost/shared_ptr.hpp"
11 #include "boost/thread.hpp"
12 #include "thrift/protocol/TBinaryProtocol.h"
14 #include "thrift/transport/TTransportUtils.h"
15 #include "thrift/transport/TSocket.h"
16 #include "gen-cpp/TCLIService.h"
18 using namespace apache::hive::service::cli::thrift;
19 using namespace apache::thrift;
20 using namespace apache::thrift::transport;
22 using namespace boost;
29 void CloseOperation(apache::hive::service::cli::thrift::TFetchResultsReq& fetchResultsReq, TCLIServiceClient& impalaClient)
33 TCloseOperationReq closeOperationReq;
34 TCloseOperationResp closeOperationResp;
36 closeOperationReq.operationHandle = fetchResultsReq.operationHandle;
38 impalaClient.CloseOperation(closeOperationResp, closeOperationReq);
40 catch (TException& ex)
42 printf(
"\nClose Operation failed from thread\n\n");
50 void FetchFunction(apache::hive::service::cli::thrift::TOperationHandle opHandle, shared_ptr<TCLIServiceClient> impalaClient,
56 bool hasMoreRows =
true;
58 TFetchResultsResp fetchResultsResp;
60 apache::hive::service::cli::thrift::TFetchResultsReq fetchResultsReq;
61 fetchResultsReq.orientation = TFetchOrientation::FETCH_NEXT;
62 fetchResultsReq.__set_operationHandle(opHandle);
63 fetchResultsReq.maxRows = 1000;
65 unique_lock<mutex> l(*m);
66 impalaClient->FetchResults(fetchResultsResp, fetchResultsReq);
69 if ((TStatusCode::SUCCESS_STATUS == fetchResultsResp.status.statusCode) ||
70 (TStatusCode::SUCCESS_WITH_INFO_STATUS == fetchResultsResp.status.statusCode) )
72 unique_lock<mutex> l(*m);
73 numRows = fetchResultsResp.results.rows.size();
74 hasMoreRows = 0 < numRows;
78 printf(
"\nNo More Rows, closing operation\n\n");
84 unique_lock<mutex> l(*m);
85 printf(
"\nNo Success\n\n");
91 printf(
"\nTransport exception while fetching\n\n");
99 shared_ptr<TTransport> transport;
100 shared_ptr<mutex> mutex(
new mutex);
102 apache::hive::service::cli::thrift::TOpenSessionResp openSessionResp;
103 apache::hive::service::cli::thrift::TExecuteStatementResp executeResponse;
107 shared_ptr<TSocket> socket(
new TSocket(
"localhost", 21050));
109 transport.reset(
new TBufferedTransport(socket));
112 catch (
const TException& e)
114 printf(
"\n Error in the transport open \n\n");
117 shared_ptr<TProtocol>
protocol(
new TBinaryProtocol(transport));
118 shared_ptr<TCLIServiceClient> client(
new TCLIServiceClient(protocol));
120 TOpenSessionReq request;
124 request.__set_username(
"");
125 request.__set_password(
"");
127 client->OpenSession(openSessionResp, request);
129 catch (TException& ex)
131 printf(
"\nOpenSession Failed\n\n");
134 TExecuteStatementReq queryRequest;
138 queryRequest.__set_sessionHandle(openSessionResp.sessionHandle);
139 queryRequest.__set_statement(
"SELECT COUNT(*) FROM tpch.lineitem");
141 client->ExecuteStatement(executeResponse, queryRequest);
146 printf(
"\nTransport error\n\n");
149 thread workerThread(
FetchFunction, executeResponse.operationHandle, client, mutex);
153 unique_lock<mutex> l(*mutex);
154 TGetOperationStatusReq statusReq;
155 TGetOperationStatusResp statusResp;
157 statusReq.__set_operationHandle(executeResponse.operationHandle);
158 printf(
"GET OPERATION STATUS\n");
159 client->GetOperationStatus(statusResp, statusReq);
160 printf(
"OPERATION STATUS RETURNED\n");
161 if ((TOperationState::RUNNING_STATE == statusResp.operationState) ||
162 (TOperationState::INITIALIZED_STATE == statusResp.operationState))
164 TCancelOperationReq cancelReq;
165 TCancelOperationResp cancelResp;
167 cancelReq.__set_operationHandle(executeResponse.operationHandle);
168 client->CancelOperation(cancelResp, cancelReq);
173 printf(
"Cancellation error\n");
179 int main(
int argc,
const char* argv[] )
183 for (
int testLoop = 0; testLoop < 100; testLoop++)
void ExecuteAndCancelOperationTest()
Test method to execute/cancel a SQL statement and fetch the results in a seperate thread...
const StringSearch UrlParser::protocol_search & protocol
void CloseOperation(apache::hive::service::cli::thrift::TFetchResultsReq &fetchResultsReq, TCLIServiceClient &impalaClient)
Method to close a fetch operation handle.
int main(int argc, const char *argv[])
void FetchFunction(apache::hive::service::cli::thrift::TOperationHandle opHandle, shared_ptr< TCLIServiceClient > impalaClient, shared_ptr< mutex > m)
Thread entry point method to demonstrate background-fetching behavior.