Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
query-options.cc
Go to the documentation of this file.
1 // Copyright 2014 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "service/query-options.h"
16 
17 #include "util/debug-util.h"
18 #include "util/mem-info.h"
19 #include "util/parse-util.h"
20 #include "gen-cpp/ImpalaInternalService_types.h"
21 
22 #include <sstream>
23 #include <boost/foreach.hpp>
24 #include <boost/algorithm/string.hpp>
25 #include <gutil/strings/substitute.h>
26 
27 #include "common/names.h"
28 
29 using boost::algorithm::iequals;
30 using boost::algorithm::is_any_of;
31 using boost::algorithm::token_compress_on;
32 using boost::algorithm::split;
33 using boost::algorithm::trim;
34 using namespace impala;
35 using namespace strings;
36 
37 // Utility method to wrap ParseUtil::ParseMemSpec() by returning a Status instead of an
38 // int.
39 static Status ParseMemValue(const string& value, const string& key, int64_t* result) {
40  bool is_percent;
41  *result = ParseUtil::ParseMemSpec(value, &is_percent, MemInfo::physical_mem());
42  if (*result < 0) {
43  return Status("Failed to parse " + key + " from '" + value + "'.");
44  }
45  if (is_percent) {
46  return Status("Invalid " + key + " with percent '" + value + "'.");
47  }
48  return Status::OK;
49 }
50 
51 // Returns the TImpalaQueryOptions enum for the given "key". Input is case insensitive.
52 // Return -1 if the input is an invalid option.
53 int GetQueryOptionForKey(const string& key) {
54  map<int, const char*>::const_iterator itr =
55  _TImpalaQueryOptions_VALUES_TO_NAMES.begin();
56  for (; itr != _TImpalaQueryOptions_VALUES_TO_NAMES.end(); ++itr) {
57  if (iequals(key, (*itr).second)) {
58  return itr->first;
59  }
60  }
61  return -1;
62 }
63 
64 void impala::TQueryOptionsToMap(const TQueryOptions& query_options,
65  map<string, string>* configuration) {
66  map<int, const char*>::const_iterator itr =
67  _TImpalaQueryOptions_VALUES_TO_NAMES.begin();
68  for (; itr != _TImpalaQueryOptions_VALUES_TO_NAMES.end(); ++itr) {
69  stringstream val;
70  switch (itr->first) {
71  case TImpalaQueryOptions::ABORT_ON_ERROR:
72  val << query_options.abort_on_error;
73  break;
74  case TImpalaQueryOptions::MAX_ERRORS:
75  val << query_options.max_errors;
76  break;
77  case TImpalaQueryOptions::DISABLE_CODEGEN:
78  val << query_options.disable_codegen;
79  break;
81  val << query_options.batch_size;
82  break;
83  case TImpalaQueryOptions::MEM_LIMIT:
84  val << query_options.mem_limit;
85  break;
86  case TImpalaQueryOptions::NUM_NODES:
87  val << query_options.num_nodes;
88  break;
89  case TImpalaQueryOptions::MAX_SCAN_RANGE_LENGTH:
90  val << query_options.max_scan_range_length;
91  break;
92  case TImpalaQueryOptions::MAX_IO_BUFFERS:
93  val << query_options.max_io_buffers;
94  break;
95  case TImpalaQueryOptions::NUM_SCANNER_THREADS:
96  val << query_options.num_scanner_threads;
97  break;
98  case TImpalaQueryOptions::ALLOW_UNSUPPORTED_FORMATS:
99  val << query_options.allow_unsupported_formats;
100  break;
101  case TImpalaQueryOptions::DEFAULT_ORDER_BY_LIMIT:
102  val << query_options.default_order_by_limit;
103  break;
104  case TImpalaQueryOptions::DEBUG_ACTION:
105  val << query_options.debug_action;
106  break;
107  case TImpalaQueryOptions::ABORT_ON_DEFAULT_LIMIT_EXCEEDED:
108  val << query_options.abort_on_default_limit_exceeded;
109  break;
110  case TImpalaQueryOptions::COMPRESSION_CODEC:
111  val << query_options.compression_codec;
112  break;
113  case TImpalaQueryOptions::SEQ_COMPRESSION_MODE:
114  val << query_options.seq_compression_mode;
115  break;
116  case TImpalaQueryOptions::HBASE_CACHING:
117  val << query_options.hbase_caching;
118  break;
119  case TImpalaQueryOptions::HBASE_CACHE_BLOCKS:
120  val << query_options.hbase_cache_blocks;
121  break;
122  case TImpalaQueryOptions::PARQUET_FILE_SIZE:
123  val << query_options.parquet_file_size;
124  break;
125  case TImpalaQueryOptions::EXPLAIN_LEVEL:
126  val << query_options.explain_level;
127  break;
128  case TImpalaQueryOptions::SYNC_DDL:
129  val << query_options.sync_ddl;
130  break;
131  case TImpalaQueryOptions::REQUEST_POOL:
132  val << query_options.request_pool;
133  break;
134  case TImpalaQueryOptions::V_CPU_CORES:
135  val << query_options.v_cpu_cores;
136  break;
137  case TImpalaQueryOptions::RESERVATION_REQUEST_TIMEOUT:
138  val << query_options.reservation_request_timeout;
139  break;
140  case TImpalaQueryOptions::DISABLE_CACHED_READS:
141  val << query_options.disable_cached_reads;
142  break;
143  case TImpalaQueryOptions::DISABLE_OUTERMOST_TOPN:
144  val << query_options.disable_outermost_topn;
145  break;
146  case TImpalaQueryOptions::RM_INITIAL_MEM:
147  val << query_options.rm_initial_mem;
148  break;
149  case TImpalaQueryOptions::QUERY_TIMEOUT_S:
150  val << query_options.query_timeout_s;
151  break;
152  case TImpalaQueryOptions::MAX_BLOCK_MGR_MEMORY:
153  val << query_options.max_block_mgr_memory;
154  break;
155  case TImpalaQueryOptions::APPX_COUNT_DISTINCT:
156  val << query_options.appx_count_distinct;
157  break;
158  case TImpalaQueryOptions::DISABLE_UNSAFE_SPILLS:
159  val << query_options.disable_unsafe_spills;
160  break;
161  case TImpalaQueryOptions::EXEC_SINGLE_NODE_ROWS_THRESHOLD:
162  val << query_options.exec_single_node_rows_threshold;
163  break;
164  default:
165  // We hit this DCHECK(false) if we forgot to add the corresponding entry here
166  // when we add a new query option.
167  LOG(ERROR) << "Missing exec option implementation: " << itr->second;
168  DCHECK(false);
169  }
170  (*configuration)[itr->second] = val.str();
171  }
172 }
173 
174 Status impala::SetQueryOption(const string& key, const string& value,
175  TQueryOptions* query_options) {
176  int option = GetQueryOptionForKey(key);
177  if (option < 0) {
178  return Status(Substitute("Ignoring invalid configuration option: $0", key));
179  } else {
180  switch (option) {
181  case TImpalaQueryOptions::ABORT_ON_ERROR:
182  query_options->__set_abort_on_error(
183  iequals(value, "true") || iequals(value, "1"));
184  break;
185  case TImpalaQueryOptions::MAX_ERRORS:
186  query_options->__set_max_errors(atoi(value.c_str()));
187  break;
188  case TImpalaQueryOptions::DISABLE_CODEGEN:
189  query_options->__set_disable_codegen(
190  iequals(value, "true") || iequals(value, "1"));
191  break;
193  query_options->__set_batch_size(atoi(value.c_str()));
194  break;
195  case TImpalaQueryOptions::MEM_LIMIT: {
196  // Parse the mem limit spec and validate it.
197  int64_t bytes_limit;
198  RETURN_IF_ERROR(ParseMemValue(value, "query memory limit", &bytes_limit));
199  query_options->__set_mem_limit(bytes_limit);
200  break;
201  }
202  case TImpalaQueryOptions::NUM_NODES:
203  query_options->__set_num_nodes(atoi(value.c_str()));
204  break;
205  case TImpalaQueryOptions::MAX_SCAN_RANGE_LENGTH:
206  query_options->__set_max_scan_range_length(atol(value.c_str()));
207  break;
208  case TImpalaQueryOptions::MAX_IO_BUFFERS:
209  query_options->__set_max_io_buffers(atoi(value.c_str()));
210  break;
211  case TImpalaQueryOptions::NUM_SCANNER_THREADS:
212  query_options->__set_num_scanner_threads(atoi(value.c_str()));
213  break;
214  case TImpalaQueryOptions::ALLOW_UNSUPPORTED_FORMATS:
215  query_options->__set_allow_unsupported_formats(
216  iequals(value, "true") || iequals(value, "1"));
217  break;
218  case TImpalaQueryOptions::DEFAULT_ORDER_BY_LIMIT:
219  query_options->__set_default_order_by_limit(atoi(value.c_str()));
220  break;
221  case TImpalaQueryOptions::DEBUG_ACTION:
222  query_options->__set_debug_action(value.c_str());
223  break;
224  case TImpalaQueryOptions::SEQ_COMPRESSION_MODE: {
225  if (iequals(value, "block")) {
226  query_options->__set_seq_compression_mode(THdfsSeqCompressionMode::BLOCK);
227  } else if (iequals(value, "record")) {
228  query_options->__set_seq_compression_mode(THdfsSeqCompressionMode::RECORD);
229  } else {
230  stringstream ss;
231  ss << "Invalid sequence file compression mode: " << value;
232  return Status(ss.str());
233  }
234  break;
235  }
236  case TImpalaQueryOptions::COMPRESSION_CODEC: {
237  if (value.empty()) break;
238  if (iequals(value, "none")) {
239  query_options->__set_compression_codec(THdfsCompression::NONE);
240  } else if (iequals(value, "gzip")) {
241  query_options->__set_compression_codec(THdfsCompression::GZIP);
242  } else if (iequals(value, "bzip2")) {
243  query_options->__set_compression_codec(THdfsCompression::BZIP2);
244  } else if (iequals(value, "default")) {
245  query_options->__set_compression_codec(THdfsCompression::DEFAULT);
246  } else if (iequals(value, "snappy")) {
247  query_options->__set_compression_codec(THdfsCompression::SNAPPY);
248  } else if (iequals(value, "snappy_blocked")) {
249  query_options->__set_compression_codec(THdfsCompression::SNAPPY_BLOCKED);
250  } else {
251  stringstream ss;
252  ss << "Invalid compression codec: " << value;
253  return Status(ss.str());
254  }
255  break;
256  }
257  case TImpalaQueryOptions::ABORT_ON_DEFAULT_LIMIT_EXCEEDED:
258  query_options->__set_abort_on_default_limit_exceeded(
259  iequals(value, "true") || iequals(value, "1"));
260  break;
261  case TImpalaQueryOptions::HBASE_CACHING:
262  query_options->__set_hbase_caching(atoi(value.c_str()));
263  break;
264  case TImpalaQueryOptions::HBASE_CACHE_BLOCKS:
265  query_options->__set_hbase_cache_blocks(
266  iequals(value, "true") || iequals(value, "1"));
267  break;
268  case TImpalaQueryOptions::PARQUET_FILE_SIZE: {
269  int64_t file_size;
270  RETURN_IF_ERROR(ParseMemValue(value, "parquet file size", &file_size));
271  query_options->__set_parquet_file_size(file_size);
272  break;
273  }
274  case TImpalaQueryOptions::EXPLAIN_LEVEL:
275  if (iequals(value, "minimal") || iequals(value, "0")) {
276  query_options->__set_explain_level(TExplainLevel::MINIMAL);
277  } else if (iequals(value, "standard") || iequals(value, "1")) {
278  query_options->__set_explain_level(TExplainLevel::STANDARD);
279  } else if (iequals(value, "extended") || iequals(value, "2")) {
280  query_options->__set_explain_level(TExplainLevel::EXTENDED);
281  } else if (iequals(value, "verbose") || iequals(value, "3")) {
282  query_options->__set_explain_level(TExplainLevel::VERBOSE);
283  } else {
284  return Status(Substitute("Invalid explain level '$0'. Valid levels are"
285  " MINIMAL(0), STANDARD(1), EXTENDED(2) and VERBOSE(3).", value));
286  }
287  break;
288  case TImpalaQueryOptions::SYNC_DDL:
289  query_options->__set_sync_ddl(iequals(value, "true") || iequals(value, "1"));
290  break;
291  case TImpalaQueryOptions::REQUEST_POOL:
292  query_options->__set_request_pool(value);
293  break;
294  case TImpalaQueryOptions::V_CPU_CORES:
295  query_options->__set_v_cpu_cores(atoi(value.c_str()));
296  break;
297  case TImpalaQueryOptions::RESERVATION_REQUEST_TIMEOUT:
298  query_options->__set_reservation_request_timeout(atoi(value.c_str()));
299  break;
300  case TImpalaQueryOptions::DISABLE_CACHED_READS:
301  query_options->__set_disable_cached_reads(
302  iequals(value, "true") || iequals(value, "1"));
303  break;
304  case TImpalaQueryOptions::DISABLE_OUTERMOST_TOPN:
305  query_options->__set_disable_outermost_topn(
306  iequals(value, "true") || iequals(value, "1"));
307  break;
308  case TImpalaQueryOptions::RM_INITIAL_MEM: {
309  int64_t reservation_size;
310  RETURN_IF_ERROR(ParseMemValue(value, "RM memory limit", &reservation_size));
311  query_options->__set_rm_initial_mem(reservation_size);
312  break;
313  }
314  case TImpalaQueryOptions::QUERY_TIMEOUT_S:
315  query_options->__set_query_timeout_s(atoi(value.c_str()));
316  break;
317  case TImpalaQueryOptions::MAX_BLOCK_MGR_MEMORY: {
318  int64_t mem;
319  RETURN_IF_ERROR(ParseMemValue(value, "block mgr memory limit", &mem));
320  query_options->__set_max_block_mgr_memory(mem);
321  break;
322  }
323  case TImpalaQueryOptions::APPX_COUNT_DISTINCT: {
324  query_options->__set_appx_count_distinct(
325  iequals(value, "true") || iequals(value, "1"));
326  break;
327  }
328  case TImpalaQueryOptions::DISABLE_UNSAFE_SPILLS: {
329  query_options->__set_disable_unsafe_spills(
330  iequals(value, "true") || iequals(value, "1"));
331  break;
332  }
333  case TImpalaQueryOptions::EXEC_SINGLE_NODE_ROWS_THRESHOLD:
334  query_options->__set_exec_single_node_rows_threshold(atoi(value.c_str()));
335  break;
336  default:
337  // We hit this DCHECK(false) if we forgot to add the corresponding entry here
338  // when we add a new query option.
339  LOG(ERROR) << "Missing exec option implementation: " << key;
340  DCHECK(false);
341  break;
342  }
343  }
344  return Status::OK;
345 }
346 
347 Status impala::ParseQueryOptions(const string& options, TQueryOptions* query_options) {
348  if (options.length() == 0) return Status::OK;
349  vector<string> kv_pairs;
350  split(kv_pairs, options, is_any_of(","), token_compress_on);
351  BOOST_FOREACH(string& kv_string, kv_pairs) {
352  trim(kv_string);
353  if (kv_string.length() == 0) continue;
354  vector<string> key_value;
355  split(key_value, kv_string, is_any_of("="), token_compress_on);
356  if (key_value.size() != 2) {
357  return Status(Substitute("Ignoring invalid configuration option $0: bad format "
358  "(expected 'key=value')", kv_string));
359  }
360  RETURN_IF_ERROR(SetQueryOption(key_value[0], key_value[1], query_options));
361  }
362  return Status::OK;
363 }
void TQueryOptionsToMap(const TQueryOptions &query_options, std::map< std::string, std::string > *configuration)
Converts a TQueryOptions struct into a map of key, value pairs.
int GetQueryOptionForKey(const string &key)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
static int64_t physical_mem()
Get total physical memory in bytes (ignores cgroups memory limits).
Definition: mem-info.h:36
static int64_t ParseMemSpec(const std::string &mem_spec_str, bool *is_percent, int64_t relative_reference)
Definition: parse-util.cc:23
const int BATCH_SIZE
static Status ParseMemValue(const string &value, const string &key, int64_t *result)
Status ParseQueryOptions(const std::string &options, TQueryOptions *query_options)
static const Status OK
Definition: status.h:87
void SetQueryOption(TImpalaQueryOptions::type opt, const T &opt_val, TExecuteStatementReq *exec_stmt_req)
Definition: child-query.cc:102