Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hdfs-scanner.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "exec/hdfs-scanner.h"
16 
17 #include <sstream>
18 #include <boost/algorithm/string.hpp>
19 
20 #include "codegen/codegen-anyval.h"
21 #include "codegen/llvm-codegen.h"
22 #include "common/logging.h"
23 #include "common/object-pool.h"
24 #include "exec/text-converter.h"
25 #include "exec/hdfs-scan-node.h"
26 #include "exec/read-write-util.h"
28 #include "exprs/expr-context.h"
29 #include "runtime/descriptors.h"
30 #include "runtime/hdfs-fs-cache.h"
31 #include "runtime/runtime-state.h"
32 #include "runtime/mem-pool.h"
33 #include "runtime/raw-value.h"
34 #include "runtime/row-batch.h"
35 #include "runtime/string-value.h"
36 #include "runtime/tuple-row.h"
37 #include "runtime/tuple.h"
38 #include "util/codec.h"
39 #include "util/debug-util.h"
40 #include "util/runtime-profile.h"
41 #include "util/sse-util.h"
42 #include "util/string-parser.h"
43 #include "gen-cpp/PlanNodes_types.h"
44 
45 #include "common/names.h"
46 
47 using namespace impala;
48 using namespace llvm;
49 
50 const char* FieldLocation::LLVM_CLASS_NAME = "struct.impala::FieldLocation";
51 const char* HdfsScanner::LLVM_CLASS_NAME = "class.impala::HdfsScanner";
52 
54  : scan_node_(scan_node),
55  state_(state),
56  context_(NULL),
57  tuple_byte_size_(scan_node->tuple_desc()->byte_size()),
58  tuple_(NULL),
59  batch_(NULL),
60  num_errors_in_file_(0),
61  num_null_bytes_(scan_node->tuple_desc()->num_null_bytes()),
62  decompression_type_(THdfsCompression::NONE),
63  data_buffer_pool_(new MemPool(scan_node->mem_tracker())),
64  write_tuples_fn_(NULL) {
65 }
66 
68  DCHECK(batch_ == NULL);
69 }
70 
72  context_ = context;
73  stream_ = context->GetStream();
78  decompress_timer_ = ADD_TIMER(scan_node_->runtime_profile(), "DecompressionTime");
79  return Status::OK;
80 }
81 
83  if (decompressor_.get() != NULL) decompressor_->Close();
85 }
86 
88  THdfsFileFormat::type type, const string& scanner_name) {
89  if (!scan_node_->tuple_desc()->string_slots().empty()
90  && partition->escape_char() != '\0') {
91  // Cannot use codegen if there are strings slots and we need to
92  // compact (i.e. copy) the data.
94  return Status::OK;
95  }
96 
97  write_tuples_fn_ = reinterpret_cast<WriteTuplesFn>(scan_node_->GetCodegenFn(type));
98  if (write_tuples_fn_ == NULL) {
100  return Status::OK;
101  }
102  VLOG(2) << scanner_name << "(node_id=" << scan_node_->id()
103  << ") using llvm codegend functions.";
105  return Status::OK;
106 }
107 
111  tuple_mem_ =
113 }
114 
115 int HdfsScanner::GetMemory(MemPool** pool, Tuple** tuple_mem, TupleRow** tuple_row_mem) {
116  DCHECK(batch_ != NULL);
117  DCHECK_GT(batch_->capacity(), batch_->num_rows());
118  *pool = batch_->tuple_data_pool();
119  *tuple_mem = reinterpret_cast<Tuple*>(tuple_mem_);
120  *tuple_row_mem = batch_->GetRow(batch_->AddRow());
121  return batch_->capacity() - batch_->num_rows();
122 }
123 
125  DCHECK(batch_ != NULL);
126  DCHECK_LE(num_rows, batch_->capacity() - batch_->num_rows());
127  batch_->CommitRows(num_rows);
128  tuple_mem_ += scan_node_->tuple_desc()->byte_size() * num_rows;
129 
130  // We need to pass the row batch to the scan node if there is too much memory attached,
131  // which can happen if the query is very selective.
133  context_->ReleaseCompletedResources(batch_, /* done */ false);
136  }
137 
138  if (context_->cancelled()) return Status::CANCELLED;
140  // Free local expr allocations for this thread
142  return Status::OK;
143 }
144 
146  DCHECK(batch_ != NULL);
147  context_->ReleaseCompletedResources(batch_, /* done */ true);
149  batch_ = NULL;
150 }
151 
152 // In this code path, no slots were materialized from the input files. The only
153 // slots are from partition keys. This lets us simplify writing out the batches.
154 // 1. template_tuple_ is the complete tuple.
155 // 2. Eval conjuncts against the tuple.
156 // 3. If it passes, stamp out 'num_tuples' copies of it into the row_batch.
157 int HdfsScanner::WriteEmptyTuples(RowBatch* row_batch, int num_tuples) {
158  DCHECK_GT(num_tuples, 0);
159 
160  if (template_tuple_ == NULL) {
161  // No slots from partitions keys or slots. This is count(*). Just add the
162  // number of rows to the batch.
163  row_batch->AddRows(num_tuples);
164  row_batch->CommitRows(num_tuples);
165  } else {
166  // Make a row and evaluate the row
167  int row_idx = row_batch->AddRow();
168 
169  TupleRow* current_row = row_batch->GetRow(row_idx);
170  current_row->SetTuple(scan_node_->tuple_idx(), template_tuple_);
171  if (!EvalConjuncts(current_row)) return 0;
172  // Add first tuple
173  row_batch->CommitLastRow();
174  --num_tuples;
175 
176  DCHECK_LE(num_tuples, row_batch->capacity() - row_batch->num_rows());
177 
178  for (int n = 0; n < num_tuples; ++n) {
179  DCHECK(!row_batch->AtCapacity());
180  row_idx = row_batch->AddRow();
181  DCHECK(row_idx != RowBatch::INVALID_ROW_INDEX);
182  TupleRow* current_row = row_batch->GetRow(row_idx);
183  current_row->SetTuple(scan_node_->tuple_idx(), template_tuple_);
184  row_batch->CommitLastRow();
185  }
186  }
187  return num_tuples;
188 }
189 
190 // In this code path, no slots were materialized from the input files. The only
191 // slots are from partition keys. This lets us simplify writing out the batches.
192 // 1. template_tuple_ is the complete tuple.
193 // 2. Eval conjuncts against the tuple.
194 // 3. If it passes, stamp out 'num_tuples' copies of it into the row_batch.
196  TupleRow* row, int num_tuples) {
197  DCHECK_GE(num_tuples, 0);
198  if (num_tuples == 0) return 0;
199 
200  if (template_tuple_ == NULL) {
201  // Must be conjuncts on constant exprs.
202  if (!EvalConjuncts(row)) return 0;
203  return num_tuples;
204  } else {
206  if (!EvalConjuncts(row)) return 0;
207  row = next_row(row);
208 
209  for (int n = 1; n < num_tuples; ++n) {
211  row = next_row(row);
212  }
213  }
214  return num_tuples;
215 }
216 
218  Tuple* tuple, TupleRow* tuple_row, Tuple* template_tuple,
219  uint8_t* error_fields, uint8_t* error_in_row) {
220  *error_in_row = false;
221  // Initialize tuple before materializing slots
222  InitTuple(template_tuple, tuple);
223 
224  for (int i = 0; i < scan_node_->materialized_slots().size(); ++i) {
225  int need_escape = false;
226  int len = fields[i].len;
227  if (UNLIKELY(len < 0)) {
228  len = -len;
229  need_escape = true;
230  }
231 
233  bool error = !text_converter_->WriteSlot(desc, tuple,
234  fields[i].start, len, false, need_escape, pool);
235  error_fields[i] = error;
236  *error_in_row |= error;
237  }
238 
239  tuple_row->SetTuple(scan_node_->tuple_idx(), tuple);
240  return EvalConjuncts(tuple_row);
241 }
242 
243 // Codegen for WriteTuple(above). The signature matches WriteTuple (except for the
244 // this* first argument). For writing out and evaluating a single string slot:
245 // define i1 @WriteCompleteTuple(%"class.impala::HdfsScanner"* %this,
246 // %"class.impala::MemPool"* %pool,
247 // %"struct.impala::FieldLocation"* %fields,
248 // %"class.impala::Tuple"* %tuple,
249 // %"class.impala::TupleRow"* %tuple_row,
250 // %"class.impala::Tuple"* %template,
251 // i8* %error_fields, i8* %error_in_row) #20 {
252 // entry:
253 // %tuple_ptr = bitcast %"class.impala::Tuple"* %tuple
254 // to { i8, %"struct.impala::StringValue" }*
255 // %tuple_ptr1 = bitcast %"class.impala::Tuple"* %template
256 // to { i8, %"struct.impala::StringValue" }*
257 // %null_byte = getelementptr inbounds
258 // { i8, %"struct.impala::StringValue" }* %tuple_ptr, i32 0, i32 0
259 // store i8 0, i8* %null_byte
260 // %0 = bitcast %"class.impala::TupleRow"* %tuple_row
261 // to { i8, %"struct.impala::StringValue" }**
262 // %1 = getelementptr { i8, %"struct.impala::StringValue" }** %0, i32 0
263 // store { i8, %"struct.impala::StringValue" }* %tuple_ptr,
264 // { i8, %"struct.impala::StringValue" }** %1
265 // br label %parse
266 //
267 // parse: ; preds = %entry
268 // %data_ptr = getelementptr %"struct.impala::FieldLocation"* %fields, i32 0, i32 0
269 // %len_ptr = getelementptr %"struct.impala::FieldLocation"* %fields, i32 0, i32 1
270 // %slot_error_ptr = getelementptr i8* %error_fields, i32 0
271 // %data = load i8** %data_ptr
272 // %len = load i32* %len_ptr
273 // %2 = call i1 @WriteSlot({ i8, %"struct.impala::StringValue" }* %tuple_ptr,
274 // i8* %data, i32 %len)
275 // %slot_parse_error = xor i1 %2, true
276 // %error_in_row2 = or i1 false, %slot_parse_error
277 // %3 = zext i1 %slot_parse_error to i8
278 // store i8 %3, i8* %slot_error_ptr
279 // %4 = call %"class.impala::ExprContext"* @GetConjunctCtx(
280 // %"class.impala::HdfsScanner"* %this, i32 0)
281 // %conjunct_eval = call i16 @Eq_StringVal_StringValWrapper1(
282 // %"class.impala::ExprContext"* %4, %"class.impala::TupleRow"* %tuple_row)
283 // %5 = ashr i16 %conjunct_eval, 8
284 // %6 = trunc i16 %5 to i8
285 // %val = trunc i8 %6 to i1
286 // br i1 %val, label %parse3, label %eval_fail
287 //
288 // parse3: ; preds = %parse
289 // %7 = zext i1 %error_in_row2 to i8
290 // store i8 %7, i8* %error_in_row
291 // ret i1 true
292 //
293 // eval_fail: ; preds = %parse
294 // ret i1 false
295 // }
297  HdfsScanNode* node, LlvmCodeGen* codegen, const vector<ExprContext*>& conjunct_ctxs) {
298  SCOPED_TIMER(codegen->codegen_timer());
299  RuntimeState* state = node->runtime_state();
300 
301  // TODO: Timestamp is not yet supported
302  for (int i = 0; i < node->materialized_slots().size(); ++i) {
303  SlotDescriptor* slot_desc = node->materialized_slots()[i];
304  if (slot_desc->type().type == TYPE_TIMESTAMP) return NULL;
305  if (slot_desc->type().type == TYPE_DECIMAL) return NULL;
306  }
307 
308  // Cast away const-ness. The codegen only sets the cached typed llvm struct.
309  TupleDescriptor* tuple_desc = const_cast<TupleDescriptor*>(node->tuple_desc());
310  vector<Function*> slot_fns;
311  for (int i = 0; i < node->materialized_slots().size(); ++i) {
312  SlotDescriptor* slot_desc = node->materialized_slots()[i];
313  Function* fn = TextConverter::CodegenWriteSlot(codegen, tuple_desc, slot_desc,
314  node->hdfs_table()->null_column_value().data(),
315  node->hdfs_table()->null_column_value().size(), true);
316  if (fn == NULL) return NULL;
317  slot_fns.push_back(fn);
318  }
319 
320  // Compute order to materialize slots. BE assumes that conjuncts should
321  // be evaluated in the order specified (optimization is already done by FE)
322  vector<int> materialize_order;
323  node->ComputeSlotMaterializationOrder(&materialize_order);
324 
325  // Get types to construct matching function signature to WriteCompleteTuple
326  PointerType* uint8_ptr_type = PointerType::get(codegen->GetType(TYPE_TINYINT), 0);
327 
328  StructType* field_loc_type = reinterpret_cast<StructType*>(
330  Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
331  Type* tuple_opaque_type = codegen->GetType(Tuple::LLVM_CLASS_NAME);
332  Type* mem_pool_type = codegen->GetType(MemPool::LLVM_CLASS_NAME);
333  Type* hdfs_scanner_type = codegen->GetType(HdfsScanner::LLVM_CLASS_NAME);
334 
335  DCHECK(tuple_opaque_type != NULL);
336  DCHECK(tuple_row_type != NULL);
337  DCHECK(field_loc_type != NULL);
338  DCHECK(hdfs_scanner_type != NULL);
339 
340  PointerType* field_loc_ptr_type = PointerType::get(field_loc_type, 0);
341  PointerType* tuple_opaque_ptr_type = PointerType::get(tuple_opaque_type, 0);
342  PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0);
343  PointerType* mem_pool_ptr_type = PointerType::get(mem_pool_type, 0);
344  PointerType* hdfs_scanner_ptr_type = PointerType::get(hdfs_scanner_type, 0);
345 
346  // Generate the typed llvm struct for the output tuple
347  StructType* tuple_type = tuple_desc->GenerateLlvmStruct(codegen);
348  if (tuple_type == NULL) return NULL;
349  PointerType* tuple_ptr_type = PointerType::get(tuple_type, 0);
350 
351  // Initialize the function prototype. This needs to match
352  // HdfsScanner::WriteCompleteTuple's signature identically.
353  LlvmCodeGen::FnPrototype prototype(
354  codegen, "WriteCompleteTuple", codegen->GetType(TYPE_BOOLEAN));
355  prototype.AddArgument(LlvmCodeGen::NamedVariable("this", hdfs_scanner_ptr_type));
356  prototype.AddArgument(LlvmCodeGen::NamedVariable("pool", mem_pool_ptr_type));
357  prototype.AddArgument(LlvmCodeGen::NamedVariable("fields", field_loc_ptr_type));
358  prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple", tuple_opaque_ptr_type));
359  prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple_row", tuple_row_ptr_type));
360  prototype.AddArgument(LlvmCodeGen::NamedVariable("template", tuple_opaque_ptr_type));
361  prototype.AddArgument(LlvmCodeGen::NamedVariable("error_fields", uint8_ptr_type));
362  prototype.AddArgument(LlvmCodeGen::NamedVariable("error_in_row", uint8_ptr_type));
363 
364  LLVMContext& context = codegen->context();
365  LlvmCodeGen::LlvmBuilder builder(context);
366  Value* args[8];
367  Function* fn = prototype.GeneratePrototype(&builder, &args[0]);
368 
369  BasicBlock* parse_block = BasicBlock::Create(context, "parse", fn);
370  BasicBlock* eval_fail_block = BasicBlock::Create(context, "eval_fail", fn);
371 
372  // Extract the input args
373  Value* this_arg = args[0];
374  Value* fields_arg = args[2];
375  Value* tuple_arg = builder.CreateBitCast(args[3], tuple_ptr_type, "tuple_ptr");
376  Value* tuple_row_arg = args[4];
377  Value* template_arg = builder.CreateBitCast(args[5], tuple_ptr_type, "tuple_ptr");
378  Value* errors_arg = args[6];
379  Value* error_in_row_arg = args[7];
380 
381  // Codegen for function body
382  Value* error_in_row = codegen->false_value();
383  // Initialize tuple
384  if (node->num_materialized_partition_keys() == 0) {
385  // No partition key slots, just zero the NULL bytes.
386  for (int i = 0; i < tuple_desc->num_null_bytes(); ++i) {
387  Value* null_byte = builder.CreateStructGEP(tuple_arg, i, "null_byte");
388  builder.CreateStore(codegen->GetIntConstant(TYPE_TINYINT, 0), null_byte);
389  }
390  } else {
391  // Copy template tuple.
392  // TODO: only copy what's necessary from the template tuple.
393  codegen->CodegenMemcpy(&builder, tuple_arg, template_arg, tuple_desc->byte_size());
394  }
395 
396  // Put tuple in tuple_row
397  Value* tuple_row_typed =
398  builder.CreateBitCast(tuple_row_arg, PointerType::get(tuple_ptr_type, 0));
399  Value* tuple_row_idxs[] = { codegen->GetIntConstant(TYPE_INT, node->tuple_idx()) };
400  Value* tuple_in_row_addr = builder.CreateGEP(tuple_row_typed, tuple_row_idxs);
401  builder.CreateStore(tuple_arg, tuple_in_row_addr);
402  builder.CreateBr(parse_block);
403 
404  // Loop through all the conjuncts in order and materialize slots as necessary to
405  // evaluate the conjuncts (e.g. conjunct_ctxs[0] will have the slots it references
406  // first).
407  // materialized_order[slot_idx] represents the first conjunct which needs that slot.
408  // Slots are only materialized if its order matches the current conjunct being
409  // processed. This guarantees that each slot is materialized once when it is first
410  // needed and that at the end of the materialize loop, the conjunct has everything
411  // it needs (either from this iteration or previous iterations).
412  builder.SetInsertPoint(parse_block);
413  for (int conjunct_idx = 0; conjunct_idx <= conjunct_ctxs.size(); ++conjunct_idx) {
414  for (int slot_idx = 0; slot_idx < materialize_order.size(); ++slot_idx) {
415  // If they don't match, it means either the slot has already been
416  // materialized for a previous conjunct or will be materialized later for
417  // another conjunct. Either case, the slot does not need to be materialized
418  // yet.
419  if (materialize_order[slot_idx] != conjunct_idx) continue;
420 
421  // Materialize slots[slot_idx] to evaluate conjunct_ctxs[conjunct_idx]
422  // All slots[i] with materialized_order[i] < conjunct_idx have already been
423  // materialized by prior iterations through the outer loop
424 
425  // Extract ptr/len from fields
426  Value* data_idxs[] = {
427  codegen->GetIntConstant(TYPE_INT, slot_idx),
428  codegen->GetIntConstant(TYPE_INT, 0),
429  };
430  Value* len_idxs[] = {
431  codegen->GetIntConstant(TYPE_INT, slot_idx),
432  codegen->GetIntConstant(TYPE_INT, 1),
433  };
434  Value* error_idxs[] = {
435  codegen->GetIntConstant(TYPE_INT, slot_idx),
436  };
437  Value* data_ptr = builder.CreateGEP(fields_arg, data_idxs, "data_ptr");
438  Value* len_ptr = builder.CreateGEP(fields_arg, len_idxs, "len_ptr");
439  Value* error_ptr = builder.CreateGEP(errors_arg, error_idxs, "slot_error_ptr");
440  Value* data = builder.CreateLoad(data_ptr, "data");
441  Value* len = builder.CreateLoad(len_ptr, "len");
442 
443  // Call slot parse function
444  Function* slot_fn = slot_fns[slot_idx];
445  Value* slot_parsed = builder.CreateCall3(slot_fn, tuple_arg, data, len);
446  Value* slot_error = builder.CreateNot(slot_parsed, "slot_parse_error");
447  error_in_row = builder.CreateOr(error_in_row, slot_error, "error_in_row");
448  slot_error = builder.CreateZExt(slot_error, codegen->GetType(TYPE_TINYINT));
449  builder.CreateStore(slot_error, error_ptr);
450  }
451 
452  if (conjunct_idx == conjunct_ctxs.size()) {
453  // In this branch, we've just materialized slots not referenced by any conjunct.
454  // This slots are the last to get materialized. If we are in this branch, the
455  // tuple passed all conjuncts and should be added to the row batch.
456  Value* error_ret = builder.CreateZExt(error_in_row, codegen->GetType(TYPE_TINYINT));
457  builder.CreateStore(error_ret, error_in_row_arg);
458  builder.CreateRet(codegen->true_value());
459  } else {
460  // All slots for conjunct_ctxs[conjunct_idx] are materialized, evaluate the partial
461  // tuple against that conjunct and start a new parse_block for the next conjunct
462  parse_block = BasicBlock::Create(context, "parse", fn, eval_fail_block);
463  Function* conjunct_fn;
464  Status status =
465  conjunct_ctxs[conjunct_idx]->root()->GetCodegendComputeFn(state, &conjunct_fn);
466  if (!status.ok()) {
467  stringstream ss;
468  ss << "Failed to codegen conjunct: " << status.GetDetail();
469  state->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
470  fn->eraseFromParent();
471  return NULL;
472  }
473 
474  Function* get_ctx_fn =
475  codegen->GetFunction(IRFunction::HDFS_SCANNER_GET_CONJUNCT_CTX);
476  Value* ctx = builder.CreateCall2(
477  get_ctx_fn, this_arg, codegen->GetIntConstant(TYPE_INT, conjunct_idx));
478 
479  Value* conjunct_args[] = {ctx, tuple_row_arg};
481  codegen, &builder, TYPE_BOOLEAN, conjunct_fn, conjunct_args, "conjunct_eval");
482  builder.CreateCondBr(result.GetVal(), parse_block, eval_fail_block);
483  builder.SetInsertPoint(parse_block);
484  }
485  }
486 
487  // Block if eval failed.
488  builder.SetInsertPoint(eval_fail_block);
489  builder.CreateRet(codegen->false_value());
490 
491  codegen->OptimizeFunctionWithExprs(fn);
492  return codegen->FinalizeFunction(fn);
493 }
494 
496  LlvmCodeGen* codegen, Function* write_complete_tuple_fn) {
497  SCOPED_TIMER(codegen->codegen_timer());
498  DCHECK(write_complete_tuple_fn != NULL);
499 
500  Function* write_tuples_fn =
501  codegen->GetFunction(IRFunction::HDFS_SCANNER_WRITE_ALIGNED_TUPLES);
502  DCHECK(write_tuples_fn != NULL);
503 
504  int replaced = 0;
505  write_tuples_fn = codegen->ReplaceCallSites(write_tuples_fn, false,
506  write_complete_tuple_fn, "WriteCompleteTuple", &replaced);
507  DCHECK_EQ(replaced, 1) << "One call site should be replaced.";
508  DCHECK(write_tuples_fn != NULL);
509 
510  return codegen->FinalizeFunction(write_tuples_fn);
511 }
512 
513 Status HdfsScanner::UpdateDecompressor(const THdfsCompression::type& compression) {
514  // Check whether the file in the stream has different compression from the last one.
515  if (compression != decompression_type_) {
516  if (decompression_type_ != THdfsCompression::NONE) {
517  // Close the previous decompressor before creating a new one.
518  DCHECK(decompressor_.get() != NULL);
519  decompressor_->Close();
520  decompressor_.reset(NULL);
521  }
522  // The LZO-compression scanner is implemented in a dynamically linked library and it
523  // is not created at Codec::CreateDecompressor().
524  if (compression != THdfsCompression::NONE && compression != THdfsCompression::LZO) {
526  scan_node_->tuple_desc()->string_slots().empty(), compression, &decompressor_));
527  }
528  decompression_type_ = compression;
529  }
530  return Status::OK;
531 }
532 
533 Status HdfsScanner::UpdateDecompressor(const string& codec) {
534  map<const string, const THdfsCompression::type>::const_iterator
535  type = Codec::CODEC_MAP.find(codec);
536 
537  if (type == Codec::CODEC_MAP.end()) {
538  stringstream ss;
539  ss << Codec::UNKNOWN_CODEC_ERROR << codec;
540  return Status(ss.str());
541  }
542  RETURN_IF_ERROR(UpdateDecompressor(type->second));
543  return Status::OK;
544 }
545 
546 bool HdfsScanner::ReportTupleParseError(FieldLocation* fields, uint8_t* errors,
547  int row_idx) {
548  for (int i = 0; i < scan_node_->materialized_slots().size(); ++i) {
549  if (errors[i]) {
550  const SlotDescriptor* desc = scan_node_->materialized_slots()[i];
551  ReportColumnParseError(desc, fields[i].start, fields[i].len);
552  errors[i] = false;
553  }
554  }
555 
556  // Call into subclass to log a more accurate error message.
557  if (state_->LogHasSpace()) {
558  stringstream ss;
559  ss << "file: " << stream_->filename() << endl << "record: ";
560  LogRowParseError(row_idx, &ss);
561  state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
562  }
563 
565  if (state_->abort_on_error()) {
567  DCHECK(!parse_status_.ok());
568  }
569  return parse_status_.ok();
570 }
571 
572 void HdfsScanner::LogRowParseError(int row_idx, stringstream* ss) {
573  // This is only called for text and seq files which should override this function.
574  DCHECK(false);
575 }
576 
578  const char* data, int len) {
579  // len < 0 is used to indicate the data contains escape characters. We don't care
580  // about that here and can just output the raw string.
581  if (len < 0) len = -len;
582 
583  if (state_->LogHasSpace() || state_->abort_on_error()) {
584  stringstream ss;
585  ss << "Error converting column: "
586  << desc->col_pos() - scan_node_->num_partition_keys()
587  << " TO " << desc->type()
588  << " (Data is: " << string(data,len) << ")";
589  if (state_->LogHasSpace()) {
590  state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
591  }
592 
593  if (state_->abort_on_error() && parse_status_.ok()) parse_status_ = Status(ss.str());
594  }
595 }
const std::vector< SlotDescriptor * > & materialized_slots() const
const std::string & null_column_value() const
Definition: descriptors.h:233
static const CodecMap CODEC_MAP
Definition: codec.h:52
int id() const
Definition: exec-node.h:154
boost::scoped_ptr< Codec > decompressor_
Decompressor class to use, if any.
Definition: hdfs-scanner.h:198
void ReportColumnParseError(const SlotDescriptor *desc, const char *data, int len)
static const char * LLVM_CLASS_NAME
Definition: hdfs-scanner.h:61
int num_rows() const
Definition: row-batch.h:215
HdfsScanNode * scan_node_
The scan node that started this scanner.
Definition: hdfs-scanner.h:141
virtual void LogRowParseError(int row_idx, std::stringstream *)
const std::string GetDetail() const
Definition: status.cc:184
int AddRows(int n)
Definition: row-batch.h:94
static const char * LLVM_CLASS_NAME
Definition: hdfs-scanner.h:137
static CodegenAnyVal CreateCallWrapped(LlvmCodeGen *cg, LlvmCodeGen::LlvmBuilder *builder, const ColumnType &type, llvm::Function *fn, llvm::ArrayRef< llvm::Value * > args, const char *name="", llvm::Value *result_ptr=NULL)
Same as above but wraps the result in a CodegenAnyVal.
int num_partition_keys() const
Returns number of partition keys in the table, including non-materialized slots.
ScannerContext * context_
Context for this scanner.
Definition: hdfs-scanner.h:147
RuntimeProfile::Counter * codegen_timer()
Definition: llvm-codegen.h:135
int tuple_byte_size_
Fixed size of each tuple, in bytes.
Definition: hdfs-scanner.h:167
boost::scoped_ptr< MemPool > data_buffer_pool_
Definition: hdfs-scanner.h:205
static llvm::Function * CodegenWriteCompleteTuple(HdfsScanNode *, LlvmCodeGen *, const std::vector< ExprContext * > &conjunct_ctxs)
MemTracker * mem_tracker()
Definition: exec-node.h:162
static Status CreateDecompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *decompressor)
boost::scoped_ptr< TextConverter > text_converter_
Helper class for converting text to other types;.
Definition: hdfs-scanner.h:186
WriteTuplesFn write_tuples_fn_
Jitted write tuples function pointer. Null if codegen is disabled.
Definition: hdfs-scanner.h:215
Utility struct that wraps a variable name and llvm type.
Definition: llvm-codegen.h:149
uint8_t * tuple_mem_
The tuple memory of batch_.
Definition: hdfs-scanner.h:180
const HdfsTableDescriptor * hdfs_table()
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
int num_completed_io_buffers() const
void ReleaseCompletedResources(RowBatch *batch, bool done)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
std::vector< ExprContext * > conjunct_ctxs_
Definition: hdfs-scanner.h:154
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
int WriteEmptyTuples(RowBatch *row_batch, int num_tuples)
bool LogHasSpace()
Returns true if the error log has not reached max_errors_.
#define ADD_TIMER(profile, name)
virtual ~HdfsScanner()
Definition: hdfs-scanner.cc:67
bool AtCapacity()
Definition: row-batch.h:120
int num_null_bytes() const
Definition: descriptors.h:301
int byte_size() const
Definition: descriptors.h:300
bool cancelled() const
If true, the ScanNode has been cancelled and the scanner thread should finish up. ...
llvm::StructType * GenerateLlvmStruct(LlvmCodeGen *codegen)
Definition: descriptors.cc:556
const RowDescriptor & row_desc() const
Definition: exec-node.h:156
TupleRow * next_row(TupleRow *r) const
Definition: hdfs-scanner.h:368
#define SCOPED_TIMER(c)
static void Close(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for closing multiple expr trees.
void StartNewRowBatch()
Set batch_ to a new row batch and update tuple_mem_ accordingly.
THdfsCompression::type decompression_type_
The most recently used decompression type.
Definition: hdfs-scanner.h:201
const std::vector< SlotDescriptor * > & string_slots() const
Definition: descriptors.h:303
virtual void Close()
Definition: hdfs-scanner.cc:82
void IncNumScannersCodegenEnabled()
LLVM code generator. This is the top level object to generate jitted code.
Definition: llvm-codegen.h:107
RuntimeState * state_
RuntimeState for error reporting.
Definition: hdfs-scanner.h:144
static const char * LLVM_CLASS_NAME
Definition: tuple-row.h:76
PrimitiveType type
Definition: types.h:60
void AddArgument(const NamedVariable &var)
Add argument.
Definition: llvm-codegen.h:171
Status UpdateDecompressor(const THdfsCompression::type &compression)
static const char * LLVM_CLASS_NAME
For C++/IR interop, we need to be able to look up types by name.
Definition: tuple.h:134
bool LogError(const ErrorMsg &msg)
void InitTuple(Tuple *template_tuple, Tuple *tuple)
Definition: hdfs-scanner.h:355
int GetMemory(MemPool **pool, Tuple **tuple_mem, TupleRow **tuple_row_mem)
void ComputeSlotMaterializationOrder(std::vector< int > *order) const
const ColumnType & type() const
Definition: descriptors.h:78
int num_errors_in_file_
number of errors in current file
Definition: hdfs-scanner.h:183
void ReportFileErrors(const std::string &file_name, int num_errors)
Report that num_errors occurred while parsing file_name.
void CodegenMemcpy(LlvmBuilder *, llvm::Value *dst, llvm::Value *src, int size)
ObjectPool pool
Status CommitRows(int num_rows)
HdfsScanner(HdfsScanNode *scan_node, RuntimeState *state)
Definition: hdfs-scanner.cc:53
llvm::Function * GetFunction(IRFunction::Type)
int col_pos() const
Definition: descriptors.h:84
RuntimeState * runtime_state()
void AddMaterializedRowBatch(RowBatch *row_batch)
bool WriteCompleteTuple(MemPool *pool, FieldLocation *fields, Tuple *tuple, TupleRow *tuple_row, Tuple *template_tuple, uint8_t *error_fields, uint8_t *error_in_row)
static const char *const UNKNOWN_CODEC_ERROR
Definition: codec.h:48
void CommitLastRow()
Definition: row-batch.h:109
static const char * LLVM_CLASS_NAME
Definition: mem-pool.h:177
void * GetCodegenFn(THdfsFileFormat::type)
llvm::Value * true_value()
Returns true/false constants (bool type)
Definition: llvm-codegen.h:380
static const Status CANCELLED
Definition: status.h:88
int batch_size() const
Definition: runtime-state.h:98
bool IR_ALWAYS_INLINE EvalConjuncts(TupleRow *row)
Definition: hdfs-scanner.h:266
MemPool * tuple_data_pool()
Definition: row-batch.h:148
static llvm::Function * CodegenWriteAlignedTuples(HdfsScanNode *, LlvmCodeGen *, llvm::Function *write_tuple_fn)
void SetTuple(int tuple_idx, Tuple *tuple)
Definition: tuple-row.h:34
int capacity() const
Definition: row-batch.h:216
#define UNLIKELY(expr)
Definition: compiler-util.h:33
llvm::Value * false_value()
Definition: llvm-codegen.h:381
llvm::Value * GetVal(const char *name="val")
static const Status OK
Definition: status.h:87
llvm::Type * GetType(const ColumnType &type)
Returns llvm type for the column type.
Metadata for a single partition inside an Hdfs table.
Definition: descriptors.h:177
llvm::Value * GetIntConstant(PrimitiveType type, int64_t val)
Returns the constant 'val' of 'type'.
llvm::Function * FinalizeFunction(llvm::Function *function)
Stream * GetStream(int idx=0)
int tuple_idx() const
bool abort_on_error() const
Definition: runtime-state.h:99
llvm::Function * ReplaceCallSites(llvm::Function *caller, bool update_in_place, llvm::Function *new_fn, const std::string &target_name, int *num_replaced)
bool ok() const
Definition: status.h:172
RuntimeProfile::Counter * decompress_timer_
Time spent decompressing bytes.
Definition: hdfs-scanner.h:208
llvm::LLVMContext & context()
Definition: llvm-codegen.h:214
Status InitializeWriteTuplesFn(HdfsPartitionDescriptor *partition, THdfsFileFormat::type type, const std::string &scanner_name)
Definition: hdfs-scanner.cc:87
HdfsPartitionDescriptor * partition_descriptor()
llvm::Function * OptimizeFunctionWithExprs(llvm::Function *fn)
int num_materialized_partition_keys() const
Returns number of materialized partition key slots.
void CommitRows(int n)
Definition: row-batch.h:102
Tuple * InitTemplateTuple(RuntimeState *state, const std::vector< ExprContext * > &value_ctxs)
void IncNumScannersCodegenDisabled()
const std::vector< ExprContext * > & partition_key_value_ctxs() const
Definition: descriptors.h:185
bool ReportTupleParseError(FieldLocation *fields, uint8_t *errors, int row_idx)
ScannerContext::Stream * stream_
The first stream for context_.
Definition: hdfs-scanner.h:150
uint8_t * Allocate(int size)
Definition: mem-pool.h:92
int(* WriteTuplesFn)(HdfsScanner *, MemPool *, TupleRow *, int, FieldLocation *, int, int, int, int)
Definition: hdfs-scanner.h:212
const TupleDescriptor * tuple_desc()
virtual Status Prepare(ScannerContext *context)
One-time initialisation of state that is constant across scan ranges.
Definition: hdfs-scanner.cc:71
RuntimeProfile * runtime_profile()
Definition: exec-node.h:161
Status GetConjunctCtxs(std::vector< ExprContext * > *ctxs)
static llvm::Function * CodegenWriteSlot(LlvmCodeGen *codegen, TupleDescriptor *tuple_desc, SlotDescriptor *slot_desc, const char *null_col_val, int len, bool check_null)
static const int INVALID_ROW_INDEX
Definition: row-batch.h:87