18 #include <boost/algorithm/string.hpp>
43 #include "gen-cpp/PlanNodes_types.h"
47 using namespace impala;
54 : scan_node_(scan_node),
57 tuple_byte_size_(scan_node->tuple_desc()->byte_size()),
60 num_errors_in_file_(0),
61 num_null_bytes_(scan_node->tuple_desc()->num_null_bytes()),
62 decompression_type_(THdfsCompression::NONE),
63 data_buffer_pool_(new
MemPool(scan_node->mem_tracker())),
64 write_tuples_fn_(NULL) {
88 THdfsFileFormat::type type,
const string& scanner_name) {
102 VLOG(2) << scanner_name <<
"(node_id=" <<
scan_node_->
id()
103 <<
") using llvm codegend functions.";
158 DCHECK_GT(num_tuples, 0);
163 row_batch->
AddRows(num_tuples);
167 int row_idx = row_batch->
AddRow();
178 for (
int n = 0; n < num_tuples; ++n) {
180 row_idx = row_batch->
AddRow();
197 DCHECK_GE(num_tuples, 0);
198 if (num_tuples == 0)
return 0;
209 for (
int n = 1; n < num_tuples; ++n) {
219 uint8_t* error_fields, uint8_t* error_in_row) {
220 *error_in_row =
false;
225 int need_escape =
false;
226 int len = fields[i].
len;
234 fields[i].start, len,
false, need_escape, pool);
235 error_fields[i] = error;
236 *error_in_row |= error;
310 vector<Function*> slot_fns;
316 if (fn == NULL)
return NULL;
317 slot_fns.push_back(fn);
322 vector<int> materialize_order;
328 StructType* field_loc_type =
reinterpret_cast<StructType*
>(
335 DCHECK(tuple_opaque_type != NULL);
336 DCHECK(tuple_row_type != NULL);
337 DCHECK(field_loc_type != NULL);
338 DCHECK(hdfs_scanner_type != NULL);
340 PointerType* field_loc_ptr_type = PointerType::get(field_loc_type, 0);
341 PointerType* tuple_opaque_ptr_type = PointerType::get(tuple_opaque_type, 0);
342 PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0);
343 PointerType* mem_pool_ptr_type = PointerType::get(mem_pool_type, 0);
344 PointerType* hdfs_scanner_ptr_type = PointerType::get(hdfs_scanner_type, 0);
348 if (tuple_type == NULL)
return NULL;
349 PointerType* tuple_ptr_type = PointerType::get(tuple_type, 0);
364 LLVMContext& context = codegen->
context();
367 Function* fn = prototype.GeneratePrototype(&builder, &args[0]);
369 BasicBlock* parse_block = BasicBlock::Create(context,
"parse", fn);
370 BasicBlock* eval_fail_block = BasicBlock::Create(context,
"eval_fail", fn);
373 Value* this_arg = args[0];
374 Value* fields_arg = args[2];
375 Value* tuple_arg = builder.CreateBitCast(args[3], tuple_ptr_type,
"tuple_ptr");
376 Value* tuple_row_arg = args[4];
377 Value* template_arg = builder.CreateBitCast(args[5], tuple_ptr_type,
"tuple_ptr");
378 Value* errors_arg = args[6];
379 Value* error_in_row_arg = args[7];
387 Value* null_byte = builder.CreateStructGEP(tuple_arg, i,
"null_byte");
397 Value* tuple_row_typed =
398 builder.CreateBitCast(tuple_row_arg, PointerType::get(tuple_ptr_type, 0));
400 Value* tuple_in_row_addr = builder.CreateGEP(tuple_row_typed, tuple_row_idxs);
401 builder.CreateStore(tuple_arg, tuple_in_row_addr);
402 builder.CreateBr(parse_block);
412 builder.SetInsertPoint(parse_block);
413 for (
int conjunct_idx = 0; conjunct_idx <= conjunct_ctxs.size(); ++conjunct_idx) {
414 for (
int slot_idx = 0; slot_idx < materialize_order.size(); ++slot_idx) {
419 if (materialize_order[slot_idx] != conjunct_idx)
continue;
426 Value* data_idxs[] = {
430 Value* len_idxs[] = {
434 Value* error_idxs[] = {
437 Value* data_ptr = builder.CreateGEP(fields_arg, data_idxs,
"data_ptr");
438 Value* len_ptr = builder.CreateGEP(fields_arg, len_idxs,
"len_ptr");
439 Value* error_ptr = builder.CreateGEP(errors_arg, error_idxs,
"slot_error_ptr");
440 Value* data = builder.CreateLoad(data_ptr,
"data");
441 Value* len = builder.CreateLoad(len_ptr,
"len");
444 Function* slot_fn = slot_fns[slot_idx];
445 Value* slot_parsed = builder.CreateCall3(slot_fn, tuple_arg, data, len);
446 Value* slot_error = builder.CreateNot(slot_parsed,
"slot_parse_error");
447 error_in_row = builder.CreateOr(error_in_row, slot_error,
"error_in_row");
449 builder.CreateStore(slot_error, error_ptr);
452 if (conjunct_idx == conjunct_ctxs.size()) {
457 builder.CreateStore(error_ret, error_in_row_arg);
462 parse_block = BasicBlock::Create(context,
"parse", fn, eval_fail_block);
463 Function* conjunct_fn;
465 conjunct_ctxs[conjunct_idx]->root()->GetCodegendComputeFn(state, &conjunct_fn);
468 ss <<
"Failed to codegen conjunct: " << status.
GetDetail();
469 state->LogError(
ErrorMsg(TErrorCode::GENERAL, ss.str()));
470 fn->eraseFromParent();
474 Function* get_ctx_fn =
475 codegen->
GetFunction(IRFunction::HDFS_SCANNER_GET_CONJUNCT_CTX);
476 Value* ctx = builder.CreateCall2(
479 Value* conjunct_args[] = {ctx, tuple_row_arg};
481 codegen, &builder,
TYPE_BOOLEAN, conjunct_fn, conjunct_args,
"conjunct_eval");
482 builder.CreateCondBr(result.
GetVal(), parse_block, eval_fail_block);
483 builder.SetInsertPoint(parse_block);
488 builder.SetInsertPoint(eval_fail_block);
496 LlvmCodeGen* codegen, Function* write_complete_tuple_fn) {
498 DCHECK(write_complete_tuple_fn != NULL);
500 Function* write_tuples_fn =
501 codegen->
GetFunction(IRFunction::HDFS_SCANNER_WRITE_ALIGNED_TUPLES);
502 DCHECK(write_tuples_fn != NULL);
506 write_complete_tuple_fn,
"WriteCompleteTuple", &replaced);
507 DCHECK_EQ(replaced, 1) <<
"One call site should be replaced.";
508 DCHECK(write_tuples_fn != NULL);
524 if (compression != THdfsCompression::NONE && compression != THdfsCompression::LZO) {
534 map<const string, const THdfsCompression::type>::const_iterator
578 const char* data,
int len) {
581 if (len < 0) len = -len;
585 ss <<
"Error converting column: "
587 <<
" TO " << desc->
type()
588 <<
" (Data is: " << string(data,len) <<
")";
const std::vector< SlotDescriptor * > & materialized_slots() const
const std::string & null_column_value() const
static const CodecMap CODEC_MAP
boost::scoped_ptr< Codec > decompressor_
Decompressor class to use, if any.
void ReportColumnParseError(const SlotDescriptor *desc, const char *data, int len)
static const char * LLVM_CLASS_NAME
HdfsScanNode * scan_node_
The scan node that started this scanner.
virtual void LogRowParseError(int row_idx, std::stringstream *)
const std::string GetDetail() const
static const char * LLVM_CLASS_NAME
static CodegenAnyVal CreateCallWrapped(LlvmCodeGen *cg, LlvmCodeGen::LlvmBuilder *builder, const ColumnType &type, llvm::Function *fn, llvm::ArrayRef< llvm::Value * > args, const char *name="", llvm::Value *result_ptr=NULL)
Same as above but wraps the result in a CodegenAnyVal.
int num_partition_keys() const
Returns number of partition keys in the table, including non-materialized slots.
ScannerContext * context_
Context for this scanner.
RuntimeProfile::Counter * codegen_timer()
int tuple_byte_size_
Fixed size of each tuple, in bytes.
boost::scoped_ptr< MemPool > data_buffer_pool_
static llvm::Function * CodegenWriteCompleteTuple(HdfsScanNode *, LlvmCodeGen *, const std::vector< ExprContext * > &conjunct_ctxs)
MemTracker * mem_tracker()
static Status CreateDecompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *decompressor)
boost::scoped_ptr< TextConverter > text_converter_
Helper class for converting text to other types;.
WriteTuplesFn write_tuples_fn_
Jitted write tuples function pointer. Null if codegen is disabled.
Utility struct that wraps a variable name and llvm type.
uint8_t * tuple_mem_
The tuple memory of batch_.
const HdfsTableDescriptor * hdfs_table()
A tuple with 0 materialised slots is represented as NULL.
int num_completed_io_buffers() const
void ReleaseCompletedResources(RowBatch *batch, bool done)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
std::vector< ExprContext * > conjunct_ctxs_
TupleRow * GetRow(int row_idx)
int WriteEmptyTuples(RowBatch *row_batch, int num_tuples)
bool LogHasSpace()
Returns true if the error log has not reached max_errors_.
#define ADD_TIMER(profile, name)
int num_null_bytes() const
bool cancelled() const
If true, the ScanNode has been cancelled and the scanner thread should finish up. ...
llvm::StructType * GenerateLlvmStruct(LlvmCodeGen *codegen)
const RowDescriptor & row_desc() const
TupleRow * next_row(TupleRow *r) const
static void Close(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for closing multiple expr trees.
void StartNewRowBatch()
Set batch_ to a new row batch and update tuple_mem_ accordingly.
THdfsCompression::type decompression_type_
The most recently used decompression type.
const std::vector< SlotDescriptor * > & string_slots() const
void IncNumScannersCodegenEnabled()
LLVM code generator. This is the top level object to generate jitted code.
RuntimeState * state_
RuntimeState for error reporting.
static const char * LLVM_CLASS_NAME
void AddArgument(const NamedVariable &var)
Add argument.
Status UpdateDecompressor(const THdfsCompression::type &compression)
static const char * LLVM_CLASS_NAME
For C++/IR interop, we need to be able to look up types by name.
bool LogError(const ErrorMsg &msg)
void InitTuple(Tuple *template_tuple, Tuple *tuple)
int GetMemory(MemPool **pool, Tuple **tuple_mem, TupleRow **tuple_row_mem)
void ComputeSlotMaterializationOrder(std::vector< int > *order) const
const ColumnType & type() const
int num_errors_in_file_
number of errors in current file
void ReportFileErrors(const std::string &file_name, int num_errors)
Report that num_errors occurred while parsing file_name.
void CodegenMemcpy(LlvmBuilder *, llvm::Value *dst, llvm::Value *src, int size)
Status CommitRows(int num_rows)
HdfsScanner(HdfsScanNode *scan_node, RuntimeState *state)
llvm::Function * GetFunction(IRFunction::Type)
RuntimeState * runtime_state()
void AddMaterializedRowBatch(RowBatch *row_batch)
bool WriteCompleteTuple(MemPool *pool, FieldLocation *fields, Tuple *tuple, TupleRow *tuple_row, Tuple *template_tuple, uint8_t *error_fields, uint8_t *error_in_row)
static const char *const UNKNOWN_CODEC_ERROR
static const char * LLVM_CLASS_NAME
void * GetCodegenFn(THdfsFileFormat::type)
llvm::Value * true_value()
Returns true/false constants (bool type)
static const Status CANCELLED
bool IR_ALWAYS_INLINE EvalConjuncts(TupleRow *row)
MemPool * tuple_data_pool()
static llvm::Function * CodegenWriteAlignedTuples(HdfsScanNode *, LlvmCodeGen *, llvm::Function *write_tuple_fn)
void SetTuple(int tuple_idx, Tuple *tuple)
void FreeLocalAllocations()
llvm::Value * false_value()
llvm::Value * GetVal(const char *name="val")
llvm::Type * GetType(const ColumnType &type)
Returns llvm type for the column type.
Metadata for a single partition inside an Hdfs table.
llvm::Value * GetIntConstant(PrimitiveType type, int64_t val)
Returns the constant 'val' of 'type'.
llvm::Function * FinalizeFunction(llvm::Function *function)
Stream * GetStream(int idx=0)
bool abort_on_error() const
llvm::Function * ReplaceCallSites(llvm::Function *caller, bool update_in_place, llvm::Function *new_fn, const std::string &target_name, int *num_replaced)
RuntimeProfile::Counter * decompress_timer_
Time spent decompressing bytes.
llvm::LLVMContext & context()
Status InitializeWriteTuplesFn(HdfsPartitionDescriptor *partition, THdfsFileFormat::type type, const std::string &scanner_name)
HdfsPartitionDescriptor * partition_descriptor()
llvm::Function * OptimizeFunctionWithExprs(llvm::Function *fn)
int num_materialized_partition_keys() const
Returns number of materialized partition key slots.
Tuple * InitTemplateTuple(RuntimeState *state, const std::vector< ExprContext * > &value_ctxs)
void IncNumScannersCodegenDisabled()
const std::vector< ExprContext * > & partition_key_value_ctxs() const
bool ReportTupleParseError(FieldLocation *fields, uint8_t *errors, int row_idx)
ScannerContext::Stream * stream_
The first stream for context_.
uint8_t * Allocate(int size)
int(* WriteTuplesFn)(HdfsScanner *, MemPool *, TupleRow *, int, FieldLocation *, int, int, int, int)
const TupleDescriptor * tuple_desc()
virtual Status Prepare(ScannerContext *context)
One-time initialisation of state that is constant across scan ranges.
RuntimeProfile * runtime_profile()
Status GetConjunctCtxs(std::vector< ExprContext * > *ctxs)
static llvm::Function * CodegenWriteSlot(LlvmCodeGen *codegen, TupleDescriptor *tuple_desc, SlotDescriptor *slot_desc, const char *null_col_val, int len, bool check_null)
static const int INVALID_ROW_INDEX