20 #include "gen-cpp/ImpalaInternalService_types.h"
23 #include <boost/foreach.hpp>
24 #include <boost/algorithm/string.hpp>
25 #include <gutil/strings/substitute.h>
29 using boost::algorithm::iequals;
30 using boost::algorithm::is_any_of;
31 using boost::algorithm::token_compress_on;
32 using boost::algorithm::split;
33 using boost::algorithm::trim;
34 using namespace impala;
35 using namespace strings;
43 return Status(
"Failed to parse " + key +
" from '" + value +
"'.");
46 return Status(
"Invalid " + key +
" with percent '" + value +
"'.");
54 map<int, const char*>::const_iterator itr =
55 _TImpalaQueryOptions_VALUES_TO_NAMES.begin();
56 for (; itr != _TImpalaQueryOptions_VALUES_TO_NAMES.end(); ++itr) {
57 if (iequals(key, (*itr).second)) {
65 map<string, string>* configuration) {
66 map<int, const char*>::const_iterator itr =
67 _TImpalaQueryOptions_VALUES_TO_NAMES.begin();
68 for (; itr != _TImpalaQueryOptions_VALUES_TO_NAMES.end(); ++itr) {
71 case TImpalaQueryOptions::ABORT_ON_ERROR:
72 val << query_options.abort_on_error;
74 case TImpalaQueryOptions::MAX_ERRORS:
75 val << query_options.max_errors;
77 case TImpalaQueryOptions::DISABLE_CODEGEN:
78 val << query_options.disable_codegen;
81 val << query_options.batch_size;
83 case TImpalaQueryOptions::MEM_LIMIT:
84 val << query_options.mem_limit;
86 case TImpalaQueryOptions::NUM_NODES:
87 val << query_options.num_nodes;
89 case TImpalaQueryOptions::MAX_SCAN_RANGE_LENGTH:
90 val << query_options.max_scan_range_length;
92 case TImpalaQueryOptions::MAX_IO_BUFFERS:
93 val << query_options.max_io_buffers;
95 case TImpalaQueryOptions::NUM_SCANNER_THREADS:
96 val << query_options.num_scanner_threads;
98 case TImpalaQueryOptions::ALLOW_UNSUPPORTED_FORMATS:
99 val << query_options.allow_unsupported_formats;
101 case TImpalaQueryOptions::DEFAULT_ORDER_BY_LIMIT:
102 val << query_options.default_order_by_limit;
104 case TImpalaQueryOptions::DEBUG_ACTION:
105 val << query_options.debug_action;
107 case TImpalaQueryOptions::ABORT_ON_DEFAULT_LIMIT_EXCEEDED:
108 val << query_options.abort_on_default_limit_exceeded;
110 case TImpalaQueryOptions::COMPRESSION_CODEC:
111 val << query_options.compression_codec;
113 case TImpalaQueryOptions::SEQ_COMPRESSION_MODE:
114 val << query_options.seq_compression_mode;
116 case TImpalaQueryOptions::HBASE_CACHING:
117 val << query_options.hbase_caching;
119 case TImpalaQueryOptions::HBASE_CACHE_BLOCKS:
120 val << query_options.hbase_cache_blocks;
122 case TImpalaQueryOptions::PARQUET_FILE_SIZE:
123 val << query_options.parquet_file_size;
125 case TImpalaQueryOptions::EXPLAIN_LEVEL:
126 val << query_options.explain_level;
128 case TImpalaQueryOptions::SYNC_DDL:
129 val << query_options.sync_ddl;
131 case TImpalaQueryOptions::REQUEST_POOL:
132 val << query_options.request_pool;
134 case TImpalaQueryOptions::V_CPU_CORES:
135 val << query_options.v_cpu_cores;
137 case TImpalaQueryOptions::RESERVATION_REQUEST_TIMEOUT:
138 val << query_options.reservation_request_timeout;
140 case TImpalaQueryOptions::DISABLE_CACHED_READS:
141 val << query_options.disable_cached_reads;
143 case TImpalaQueryOptions::DISABLE_OUTERMOST_TOPN:
144 val << query_options.disable_outermost_topn;
146 case TImpalaQueryOptions::RM_INITIAL_MEM:
147 val << query_options.rm_initial_mem;
149 case TImpalaQueryOptions::QUERY_TIMEOUT_S:
150 val << query_options.query_timeout_s;
152 case TImpalaQueryOptions::MAX_BLOCK_MGR_MEMORY:
153 val << query_options.max_block_mgr_memory;
155 case TImpalaQueryOptions::APPX_COUNT_DISTINCT:
156 val << query_options.appx_count_distinct;
158 case TImpalaQueryOptions::DISABLE_UNSAFE_SPILLS:
159 val << query_options.disable_unsafe_spills;
161 case TImpalaQueryOptions::EXEC_SINGLE_NODE_ROWS_THRESHOLD:
162 val << query_options.exec_single_node_rows_threshold;
167 LOG(ERROR) <<
"Missing exec option implementation: " << itr->second;
170 (*configuration)[itr->second] = val.str();
175 TQueryOptions* query_options) {
178 return Status(Substitute(
"Ignoring invalid configuration option: $0", key));
181 case TImpalaQueryOptions::ABORT_ON_ERROR:
182 query_options->__set_abort_on_error(
183 iequals(value,
"true") || iequals(value,
"1"));
185 case TImpalaQueryOptions::MAX_ERRORS:
186 query_options->__set_max_errors(atoi(value.c_str()));
188 case TImpalaQueryOptions::DISABLE_CODEGEN:
189 query_options->__set_disable_codegen(
190 iequals(value,
"true") || iequals(value,
"1"));
193 query_options->__set_batch_size(atoi(value.c_str()));
195 case TImpalaQueryOptions::MEM_LIMIT: {
199 query_options->__set_mem_limit(bytes_limit);
202 case TImpalaQueryOptions::NUM_NODES:
203 query_options->__set_num_nodes(atoi(value.c_str()));
205 case TImpalaQueryOptions::MAX_SCAN_RANGE_LENGTH:
206 query_options->__set_max_scan_range_length(atol(value.c_str()));
208 case TImpalaQueryOptions::MAX_IO_BUFFERS:
209 query_options->__set_max_io_buffers(atoi(value.c_str()));
211 case TImpalaQueryOptions::NUM_SCANNER_THREADS:
212 query_options->__set_num_scanner_threads(atoi(value.c_str()));
214 case TImpalaQueryOptions::ALLOW_UNSUPPORTED_FORMATS:
215 query_options->__set_allow_unsupported_formats(
216 iequals(value,
"true") || iequals(value,
"1"));
218 case TImpalaQueryOptions::DEFAULT_ORDER_BY_LIMIT:
219 query_options->__set_default_order_by_limit(atoi(value.c_str()));
221 case TImpalaQueryOptions::DEBUG_ACTION:
222 query_options->__set_debug_action(value.c_str());
224 case TImpalaQueryOptions::SEQ_COMPRESSION_MODE: {
225 if (iequals(value,
"block")) {
226 query_options->__set_seq_compression_mode(THdfsSeqCompressionMode::BLOCK);
227 }
else if (iequals(value,
"record")) {
228 query_options->__set_seq_compression_mode(THdfsSeqCompressionMode::RECORD);
231 ss <<
"Invalid sequence file compression mode: " << value;
236 case TImpalaQueryOptions::COMPRESSION_CODEC: {
237 if (value.empty())
break;
238 if (iequals(value,
"none")) {
239 query_options->__set_compression_codec(THdfsCompression::NONE);
240 }
else if (iequals(value,
"gzip")) {
241 query_options->__set_compression_codec(THdfsCompression::GZIP);
242 }
else if (iequals(value,
"bzip2")) {
243 query_options->__set_compression_codec(THdfsCompression::BZIP2);
244 }
else if (iequals(value,
"default")) {
245 query_options->__set_compression_codec(THdfsCompression::DEFAULT);
246 }
else if (iequals(value,
"snappy")) {
247 query_options->__set_compression_codec(THdfsCompression::SNAPPY);
248 }
else if (iequals(value,
"snappy_blocked")) {
249 query_options->__set_compression_codec(THdfsCompression::SNAPPY_BLOCKED);
252 ss <<
"Invalid compression codec: " << value;
257 case TImpalaQueryOptions::ABORT_ON_DEFAULT_LIMIT_EXCEEDED:
258 query_options->__set_abort_on_default_limit_exceeded(
259 iequals(value,
"true") || iequals(value,
"1"));
261 case TImpalaQueryOptions::HBASE_CACHING:
262 query_options->__set_hbase_caching(atoi(value.c_str()));
264 case TImpalaQueryOptions::HBASE_CACHE_BLOCKS:
265 query_options->__set_hbase_cache_blocks(
266 iequals(value,
"true") || iequals(value,
"1"));
268 case TImpalaQueryOptions::PARQUET_FILE_SIZE: {
271 query_options->__set_parquet_file_size(file_size);
274 case TImpalaQueryOptions::EXPLAIN_LEVEL:
275 if (iequals(value,
"minimal") || iequals(value,
"0")) {
276 query_options->__set_explain_level(TExplainLevel::MINIMAL);
277 }
else if (iequals(value,
"standard") || iequals(value,
"1")) {
278 query_options->__set_explain_level(TExplainLevel::STANDARD);
279 }
else if (iequals(value,
"extended") || iequals(value,
"2")) {
280 query_options->__set_explain_level(TExplainLevel::EXTENDED);
281 }
else if (iequals(value,
"verbose") || iequals(value,
"3")) {
282 query_options->__set_explain_level(TExplainLevel::VERBOSE);
284 return Status(Substitute(
"Invalid explain level '$0'. Valid levels are"
285 " MINIMAL(0), STANDARD(1), EXTENDED(2) and VERBOSE(3).", value));
288 case TImpalaQueryOptions::SYNC_DDL:
289 query_options->__set_sync_ddl(iequals(value,
"true") || iequals(value,
"1"));
291 case TImpalaQueryOptions::REQUEST_POOL:
292 query_options->__set_request_pool(value);
294 case TImpalaQueryOptions::V_CPU_CORES:
295 query_options->__set_v_cpu_cores(atoi(value.c_str()));
297 case TImpalaQueryOptions::RESERVATION_REQUEST_TIMEOUT:
298 query_options->__set_reservation_request_timeout(atoi(value.c_str()));
300 case TImpalaQueryOptions::DISABLE_CACHED_READS:
301 query_options->__set_disable_cached_reads(
302 iequals(value,
"true") || iequals(value,
"1"));
304 case TImpalaQueryOptions::DISABLE_OUTERMOST_TOPN:
305 query_options->__set_disable_outermost_topn(
306 iequals(value,
"true") || iequals(value,
"1"));
308 case TImpalaQueryOptions::RM_INITIAL_MEM: {
309 int64_t reservation_size;
311 query_options->__set_rm_initial_mem(reservation_size);
314 case TImpalaQueryOptions::QUERY_TIMEOUT_S:
315 query_options->__set_query_timeout_s(atoi(value.c_str()));
317 case TImpalaQueryOptions::MAX_BLOCK_MGR_MEMORY: {
320 query_options->__set_max_block_mgr_memory(mem);
323 case TImpalaQueryOptions::APPX_COUNT_DISTINCT: {
324 query_options->__set_appx_count_distinct(
325 iequals(value,
"true") || iequals(value,
"1"));
328 case TImpalaQueryOptions::DISABLE_UNSAFE_SPILLS: {
329 query_options->__set_disable_unsafe_spills(
330 iequals(value,
"true") || iequals(value,
"1"));
333 case TImpalaQueryOptions::EXEC_SINGLE_NODE_ROWS_THRESHOLD:
334 query_options->__set_exec_single_node_rows_threshold(atoi(value.c_str()));
339 LOG(ERROR) <<
"Missing exec option implementation: " << key;
349 vector<string> kv_pairs;
350 split(kv_pairs, options, is_any_of(
","), token_compress_on);
351 BOOST_FOREACH(
string& kv_string, kv_pairs) {
353 if (kv_string.length() == 0)
continue;
354 vector<string> key_value;
355 split(key_value, kv_string, is_any_of(
"="), token_compress_on);
356 if (key_value.size() != 2) {
357 return Status(Substitute(
"Ignoring invalid configuration option $0: bad format "
358 "(expected 'key=value')", kv_string));
void TQueryOptionsToMap(const TQueryOptions &query_options, std::map< std::string, std::string > *configuration)
Converts a TQueryOptions struct into a map of key, value pairs.
int GetQueryOptionForKey(const string &key)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
static int64_t physical_mem()
Get total physical memory in bytes (ignores cgroups memory limits).
static int64_t ParseMemSpec(const std::string &mem_spec_str, bool *is_percent, int64_t relative_reference)
static Status ParseMemValue(const string &value, const string &key, int64_t *result)
Status ParseQueryOptions(const std::string &options, TQueryOptions *query_options)
void SetQueryOption(TImpalaQueryOptions::type opt, const T &opt_val, TExecuteStatementReq *exec_stmt_req)