Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
impalad-query-executor.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <boost/algorithm/string/join.hpp>
18 #include <boost/algorithm/string.hpp>
19 
20 #include "common/logging.h"
21 #include "rpc/thrift-client.h"
22 #include "rpc/thrift-util.h"
23 
24 DEFINE_string(impalad, "localhost:21000", "host:port of impalad process");
25 DECLARE_int32(num_nodes);
26 
27 #include "common/names.h"
28 
29 using boost::algorithm::is_any_of;
30 using boost::algorithm::split;
31 using namespace Apache::Hadoop::Hive;
32 using namespace beeswax;
33 
34 namespace impala {
35 
36 ImpaladQueryExecutor::ImpaladQueryExecutor()
37  : query_in_progress_(false),
38  current_row_(0),
39  eos_(false) {
40 }
41 
43  Close();
44 }
45 
47  DCHECK(!FLAGS_impalad.empty());
48  vector<string> elems;
49  split(elems, FLAGS_impalad, is_any_of(":"));
50  DCHECK_EQ(elems.size(), 2);
51  int port = atoi(elems[1].c_str());
52  DCHECK_GT(port, 0);
53 
54  client_.reset(new ThriftClient<ImpalaServiceClient>(elems[0], port));
55 
56  // Wait for up to 10s for the server to start, polling at 50ms intervals
57  RETURN_IF_ERROR(WaitForServer(elems[0], port, 200, 50));
58 
59  RETURN_IF_ERROR(client_->Open());
60 
61  return Status::OK;
62 }
63 
65  if (!query_in_progress_) return Status::OK;
66  try {
67  client_->iface()->close(query_handle_);
68  } catch (BeeswaxException& e) {
69  stringstream ss;
70  ss << e.SQLState << ": " << e.message;
71  return Status(ss.str());
72  }
73  query_in_progress_ = false;
74  return Status::OK;
75 }
76 
78  const string& query_string, vector<FieldSchema>* col_schema) {
79  // close anything that ran previously
80  Close();
81  Query query;
82  query.query = query_string;
83  query.configuration = exec_options_;
84  query.hadoop_user = "impala_test_user";
85  query_results_.data.clear();
86 
87  // TODO: catch exception and return error code
88  // LogContextId of "" will ask the Beeswax service to assign a new id but Beeswax
89  // does not provide a constant for it.
90  ResultsMetadata resultsMetadata;
91  try {
92  client_->iface()->executeAndWait(query_handle_, query, "");
93  client_->iface()->get_results_metadata(resultsMetadata, query_handle_);
94  } catch (BeeswaxException& e) {
95  stringstream ss;
96  ss << e.SQLState << ": " << e.message;
97  return Status(ss.str());
98  }
99  current_row_ = 0;
100  query_in_progress_ = true;
101  if (col_schema != NULL) *col_schema = resultsMetadata.schema.fieldSchemas;
102  return Status::OK;
103 }
104 
106  return Status::OK;
107 }
108 
110  // If we have not fetched any data, or we've returned all the data, fetch more rows
111  // from ImpalaServer
112  if (!query_results_.__isset.data || current_row_ >= query_results_.data.size()) {
113  client_->iface()->fetch(query_results_, query_handle_, false, 0);
114  current_row_ = 0;
115  }
116 
117  DCHECK(query_results_.ready);
118 
119  // Set the return row if we have data
120  if (query_results_.data.size() > 0) {
121  *row = query_results_.data.at(current_row_);
122  ++current_row_;
123  } else {
124  *row = "";
125  }
126 
127  // Set eos_ to true after the we have returned the last row from the last batch.
128  if (current_row_ >= query_results_.data.size() && !query_results_.has_more) {
129  eos_ = true;
130  }
131 
132  return Status::OK;
133 }
134 
135 Status ImpaladQueryExecutor::FetchResult(vector<void*>* row) {
136  return Status("ImpaladQueryExecutor::FetchResult(vector<void*>) not supported");
137 }
138 
140  return "";
141 }
142 
144  return "";
145 }
146 
147 // Return the explain plan for the query
148 Status ImpaladQueryExecutor::Explain(const string& query_string, string* explain_plan) {
149  Query query;
150  query.query = query_string;
151 
152  try {
153  client_->iface()->explain(query_explanation_, query);
154  *explain_plan = query_explanation_.textual;
155  } catch (BeeswaxException& e) {
156  stringstream ss;
157  ss << e.SQLState << ": " << e.message;
158  return Status(ss.str());
159  }
160  return Status::OK;
161 }
162 
164  // TODO: make query profile part of TFetchResultsResult so that we can
165  // return it here
166  return NULL;
167 }
168 
169 }
Status Exec(const std::string &query_string, std::vector< Apache::Hadoop::Hive::FieldSchema > *col_types)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
beeswax::QueryHandle query_handle_
Beeswax query handle and result.
Status Close()
call beeswax.close() for current query, if one in progress
boost::scoped_ptr< ThriftClient< ImpalaServiceClient > > client_
fe service-related
beeswax::QueryExplanation query_explanation_
DECLARE_int32(num_nodes)
std::vector< std::string > exec_options_
Execution options.
Status WaitForServer(const string &host, int port, int num_retries, int retry_interval_ms)
Definition: thrift-util.cc:121
RuntimeProfile * query_profile()
Returns the counters for the entire query.
std::string FileErrors() const
Returns a string representation of the file_errors_.
static const Status OK
Definition: status.h:87
DEFINE_string(impalad,"localhost:21000","host:port of impalad process")
Status FetchResult(RowBatch **batch)
Status Explain(const std::string &query_string, std::string *explain_plan)
Return the explain plan for the query.