17 #include <avro/errors.h>
18 #include <avro/legacy.h>
19 #include <avro/schema.h>
20 #include <boost/foreach.hpp>
36 using namespace impala;
48 #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
52 avro_schema_incref(schema);
57 avro_schema_decref(schema);
63 avro_schema_decref(schema);
71 if (
this == &other)
return *
this;
72 avro_schema_decref(schema);
74 avro_schema_incref(schema);
80 codegend_decode_avro_data_(NULL) {
84 const vector<ExprContext*>& conjunct_ctxs) {
89 if (materialize_tuple_fn == NULL)
return NULL;
108 ss <<
"Invalid AVRO_VERSION_HEADER: '"
131 if (num_entries < 1)
return Status(
"File header metadata has no data");
133 while (num_entries != 0) {
134 DCHECK_GT(num_entries, 0);
135 for (
int i = 0; i < num_entries; ++i) {
141 DCHECK_GE(key_len, 0);
143 key = string(reinterpret_cast<char*>(key_buf), key_len);
149 DCHECK_GE(value_len, 0);
154 int error = avro_schema_from_json_length(
155 reinterpret_cast<char*>(value), value_len, &raw_file_schema);
159 ss <<
"Failed to parse file schema: " << avro_strerror();
164 DCHECK_GT(table_schema_str.size(), 0);
166 error = avro_schema_from_json_length(
167 table_schema_str.c_str(), table_schema_str.size(), &raw_table_schema);
171 ss <<
"Failed to parse table schema: " << avro_strerror();
180 avro_schema_equal(table_schema.get(), file_schema.
get());
183 string avro_codec(reinterpret_cast<char*>(value), value_len);
194 return Status(
"Unknown Avro compression codec: " + avro_codec);
198 VLOG_ROW <<
"Skipping metadata entry: " << key;
208 return Status(
"Schema not found in file header metadata");
222 if (table_schema->type != AVRO_RECORD) {
223 return Status(
"Table schema is not a record");
225 if (file_schema->type != AVRO_RECORD) {
226 return Status(
"File schema is not a record");
229 int num_table_fields = avro_schema_record_size(table_schema);
230 DCHECK_GT(num_table_fields, 0);
233 int max_materialized_col_idx = -1;
238 if (num_table_fields < num_cols) {
240 ss <<
"The table has " << num_cols <<
" non-partition columns "
241 <<
"but the table's Avro schema has " << num_table_fields <<
" fields.";
244 if (num_table_fields <= max_materialized_col_idx) {
245 return Status(
"Cannot read column that doesn't appear in table schema");
249 bool file_field_found[num_table_fields];
250 memset(&file_field_found, 0, num_table_fields);
252 int num_file_fields = avro_schema_record_size(file_schema);
253 DCHECK_GT(num_file_fields, 0);
254 for (
int i = 0; i < num_file_fields; ++i) {
255 avro_datum_t file_field = avro_schema_record_field_get_by_index(file_schema, i);
257 if (is_avro_complex_type(element.
schema.
get())) {
259 ss <<
"Complex Avro data types (records, enums, arrays, maps, unions, and fixed) "
260 <<
"are not supported. Got type: " << avro_type_name(element.
schema->type);
264 const char* field_name = avro_schema_record_field_name(file_schema, i);
265 int table_field_idx = avro_schema_record_field_get_index(table_schema, field_name);
266 if (table_field_idx < 0) {
272 file_field_found[table_field_idx] =
true;
277 int slot_idx = table_field_idx < num_cols ?
299 avro_schema_record_field_get_by_index(table_schema, table_field_idx);
313 if (file_field_found[col_idx])
continue;
315 avro_datum_t default_value = avro_schema_record_field_default(table_schema, col_idx);
316 if (default_value == NULL) {
318 ss <<
"Field " << avro_schema_record_field_name(table_schema, col_idx)
319 <<
" is missing from file and does not have a default value";
330 switch(default_value->type) {
333 if (avro_boolean_get(default_value, &v)) DCHECK(
false);
339 if (avro_int32_get(default_value, &v)) DCHECK(
false);
345 if (avro_int64_get(default_value, &v)) DCHECK(
false);
351 if (avro_float_get(default_value, &v)) DCHECK(
false);
357 if (avro_double_get(default_value, &v)) DCHECK(
false);
367 if (avro_string_get(default_value, &v)) DCHECK(
false);
390 avro_schema_incref(schema);
394 if (element.
schema->type == AVRO_UNION) {
395 int num_fields = avro_schema_union_size(schema);
396 DCHECK_GT(num_fields, 0);
397 if (num_fields == 2) {
400 int null_position = -1;
401 if (child0->type == AVRO_NULL) {
403 }
else if (child1->type == AVRO_NULL) {
407 if (null_position != -1) {
408 avro_schema_t non_null_child = null_position == 0 ? child1 : child0;
427 avro_obj_t* schema) {
428 switch (schema->type) {
431 if (slot_desc->
type().
scale != avro_schema_decimal_scale(schema)) {
432 const string& col_name =
436 <<
"' has a scale that does not match the table metadata scale."
437 <<
" File metadata scale: " << avro_schema_decimal_scale(schema)
438 <<
" Table metadata scale: " << slot_desc->
type().
scale;
441 if (slot_desc->
type().
precision != avro_schema_decimal_precision(schema)) {
442 const string& col_name =
446 <<
"' has a precision that does not match the table metadata precision."
447 <<
" File metadata precision: " << avro_schema_decimal_precision(schema)
448 <<
" Table metadata precision: " << slot_desc->
type().
precision;
487 ss <<
"Unresolvable column types (column " << slot_desc->
col_pos() <<
"): "
488 <<
"declared type = " << slot_desc->
type() <<
", "
489 <<
"Avro type = " << avro_type_name(schema->type);
509 VLOG(2) <<
"HdfsAvroScanner (node_id=" <<
scan_node_->
id()
510 <<
") using llvm codegend functions.";
520 uint8_t* compressed_data;
521 int64_t compressed_size;
527 DCHECK_GE(num_records, 0);
529 DCHECK_GE(compressed_size, 0);
543 VLOG_FILE <<
"Decompressed " << compressed_size <<
" to " << size;
545 data = compressed_data;
549 while (num_records > 0) {
555 int max_tuples =
GetMemory(&pool, &tuple, &tuple_row);
556 max_tuples = min(num_records, static_cast<int64_t>(max_tuples));
564 this, max_tuples, pool, &data, tuple, tuple_row);
566 num_to_commit =
DecodeAvroData(max_tuples, pool, &data, tuple, tuple_row);
570 num_records -= max_tuples;
588 bool write_slot =
false;
591 if (slot_desc != NULL) {
597 avro_type_t type = element.
schema->type;
633 int slot_byte_size = 0;
634 if (slot_desc != NULL) {
642 DCHECK(
false) <<
"Unsupported SchemaElement: " << type;
700 if (table_schema_str.empty())
return NULL;
703 int error = avro_schema_from_json_length(
704 table_schema_str.c_str(), table_schema_str.size(), &raw_table_schema);
708 ss <<
"Failed to parse table schema: " << avro_strerror();
712 int num_fields = avro_schema_record_size(table_schema.get());
713 DCHECK_GT(num_fields, 0);
715 for (
int field_idx = 0; field_idx < num_fields; ++field_idx) {
721 LOG(INFO) <<
"Avro codegen skipped because CHAR is not supported.";
727 LLVMContext& context = codegen->
context();
731 DCHECK(this_type != NULL);
732 PointerType* this_ptr_type = PointerType::get(this_type, 0);
736 Type* tuple_ptr_type = PointerType::get(tuple_type, 0);
739 PointerType* tuple_opaque_ptr_type = PointerType::get(tuple_opaque_type, 0);
741 Type* data_ptr_type = PointerType::get(codegen->
ptr_type(), 0);
750 Function* fn = prototype.GeneratePrototype(&builder, args);
752 Value* this_val = args[0];
753 Value* pool_val = args[1];
754 Value* data_val = args[2];
755 Value* opaque_tuple_val = args[3];
757 Value* tuple_val = builder.CreateBitCast(opaque_tuple_val, tuple_ptr_type,
"tuple_ptr");
761 for (
int field_idx = 0; field_idx < num_fields; ++field_idx) {
763 avro_schema_record_field_get_by_index(table_schema.get(), field_idx);
769 builder.SetInsertPoint(&fn->back());
772 BasicBlock* read_field_block = BasicBlock::Create(context,
"read_field", fn);
774 if (element.null_union_position != -1) {
776 BasicBlock* null_block = BasicBlock::Create(context,
"null_field", fn);
777 BasicBlock* endif_block = BasicBlock::Create(context,
"endif", fn);
778 Function* read_union_fn =
780 Value* null_union_pos_val =
782 Value* is_not_null_val = builder.CreateCall3(
783 read_union_fn, this_val, null_union_pos_val, data_val,
"is_not_null");
784 builder.CreateCondBr(is_not_null_val, read_field_block, null_block);
787 builder.SetInsertPoint(read_field_block);
788 builder.CreateBr(endif_block);
791 builder.SetInsertPoint(null_block);
795 DCHECK(set_null_fn != NULL);
796 builder.CreateCall(set_null_fn, tuple_val);
799 builder.CreateBr(endif_block);
802 builder.CreateBr(read_field_block);
811 builder.SetInsertPoint(read_field_block, read_field_block->begin());
812 Function* read_field_fn;
813 switch (element.schema->type) {
815 read_field_fn = codegen->
GetFunction(IRFunction::READ_AVRO_BOOLEAN);
818 read_field_fn = codegen->
GetFunction(IRFunction::READ_AVRO_INT32);
821 read_field_fn = codegen->
GetFunction(IRFunction::READ_AVRO_INT64);
824 read_field_fn = codegen->
GetFunction(IRFunction::READ_AVRO_FLOAT);
827 read_field_fn = codegen->
GetFunction(IRFunction::READ_AVRO_DOUBLE);
833 read_field_fn = codegen->
GetFunction(IRFunction::READ_AVRO_VARCHAR);
835 read_field_fn = codegen->
GetFunction(IRFunction::READ_AVRO_STRING);
840 VLOG(1) <<
"Failed to codegen MaterializeTuple() due to unsupported type: "
841 << element.schema->type;
842 fn->eraseFromParent();
847 Value* write_slot_val = builder.getFalse();
848 Value* slot_type_val = builder.getInt32(0);
852 write_slot_val = builder.getTrue();
857 slot_type_val = builder.getInt32(slot_desc->
type().
type);
860 builder.CreateStructGEP(tuple_val, slot_desc->
field_idx(),
"slot");
862 builder.CreateBitCast(slot_val, codegen->
ptr_type(),
"opaque_slot");
870 Value* fixed_len = builder.getInt32(slot_desc->
type().
len);
871 Value* read_field_args[] = {this_val, slot_type_val, fixed_len, data_val,
872 write_slot_val, opaque_slot_val, pool_val};
873 builder.CreateCall(read_field_fn, read_field_args);
875 Value* read_field_args[] =
876 {this_val, slot_type_val, data_val, write_slot_val, opaque_slot_val, pool_val};
877 builder.CreateCall(read_field_fn, read_field_args);
880 builder.SetInsertPoint(&fn->back());
881 builder.CreateRetVoid();
886 Function* materialize_tuple_fn,
const vector<ExprContext*>& conjunct_ctxs) {
890 DCHECK(materialize_tuple_fn != NULL);
892 Function* decode_avro_data_fn = codegen->
GetFunction(IRFunction::DECODE_AVRO_DATA);
895 materialize_tuple_fn,
"MaterializeTuple", &replaced);
896 DCHECK_EQ(replaced, 1);
900 eval_conjuncts_fn,
"EvalConjuncts", &replaced);
901 DCHECK_EQ(replaced, 1);
902 decode_avro_data_fn->setName(
"DecodeAvroData");
905 DCHECK(decode_avro_data_fn != NULL);
906 return decode_avro_data_fn;
int(* DecodeAvroDataFn)(HdfsAvroScanner *, int, MemPool *, uint8_t **, Tuple *, TupleRow *)
const std::vector< SlotDescriptor * > & materialized_slots() const
virtual Status ProcessRange()
avro_schema_t get() const
boost::scoped_ptr< Codec > decompressor_
Decompressor class to use, if any.
void ReadAvroVarchar(PrimitiveType type, int max_len, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
HdfsScanNode * scan_node_
The scan node that started this scanner.
ScopedAvroSchemaT(avro_schema_t s=NULL)
void SetNull(const NullIndicatorOffset &offset)
virtual Status ReadFileHeader()
TODO: check that file schema matches metadata schema.
int num_partition_keys() const
Returns number of partition keys in the table, including non-materialized slots.
ScannerContext * context_
Context for this scanner.
RuntimeProfile::Counter * codegen_timer()
bool only_parsing_header_
If true, this scanner object is only for processing the header.
int64_t total_bytes_returned()
Returns the total number of bytes returned.
FileHeader * header_
File header for this scan range. This is not owned by the parent scan node.
#define RETURN_IF_FALSE(x)
boost::scoped_ptr< MemPool > data_buffer_pool_
MemTracker * mem_tracker()
int GetMaterializedSlotIdx(const std::vector< int > &path) const
static const int SYNC_HASH_SIZE
Size of the sync hash field.
ScopedAvroSchemaT & operator=(const ScopedAvroSchemaT &)
Utility struct that wraps a variable name and llvm type.
static const std::string AVRO_SNAPPY_CODEC
const HdfsTableDescriptor * hdfs_table()
int precision
Only set if type == TYPE_DECIMAL.
struct avro_obj_t * avro_schema_t
A tuple with 0 materialised slots is represented as NULL.
void ReadAvroBoolean(PrimitiveType type, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
static SchemaElement ConvertSchema(const avro_schema_t &schema)
Utility function that maps the Avro library's type representation to our own.
virtual FileHeader * AllocateFileHeader()
Implementation of BaseSeqeunceScanner super class methods.
int WriteEmptyTuples(RowBatch *row_batch, int num_tuples)
Status VerifyTypesMatch(SlotDescriptor *slot_desc, avro_obj_t *schema)
void * GetSlot(int offset)
RuntimeProfile::Counter * rows_read_counter() const
bool ReadZLong(int64_t *val, Status *)
Read a zigzag encoded long.
Tuple * InitEmptyTemplateTuple()
llvm::StructType * GenerateLlvmStruct(LlvmCodeGen *codegen)
#define COUNTER_ADD(c, v)
const NullIndicatorOffset & null_indicator_offset() const
llvm::Value * null_ptr_value()
Status ParseMetadata()
Utility function for decoding and parsing file header metadata.
void IncNumScannersCodegenEnabled()
LLVM code generator. This is the top level object to generate jitted code.
bool IsStringType() const
RuntimeState * state_
RuntimeState for error reporting.
void ReadAvroDecimal(int slot_byte_size, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
void AddArgument(const NamedVariable &var)
Add argument.
static llvm::Function * CodegenEvalConjuncts(RuntimeState *state, const std::vector< ExprContext * > &conjunct_ctxs, const char *name="EvalConjuncts")
void MaterializeTuple(MemPool *pool, uint8_t **data, Tuple *tuple)
Materializes a single tuple from serialized record data.
Status UpdateDecompressor(const THdfsCompression::type &compression)
static const char * LLVM_CLASS_NAME
For C++/IR interop, we need to be able to look up types by name.
bool LogError(const ErrorMsg &msg)
int GetMemory(MemPool **pool, Tuple **tuple_mem, TupleRow **tuple_row_mem)
const ColumnType & type() const
static llvm::Function * CodegenMaterializeTuple(HdfsScanNode *node, LlvmCodeGen *codegen)
static const uint8_t AVRO_VERSION_HEADER[4]
void ReadAvroDouble(PrimitiveType type, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
int DecodeAvroData(int max_tuples, MemPool *pool, uint8_t **data, Tuple *tuple, TupleRow *tuple_row)
int GetByteSize() const
Returns the byte size of this type. Returns 0 for variable length types.
Status CommitRows(int num_rows)
static const int SKIP_COLUMN
llvm::Function * GetFunction(IRFunction::Type)
int len
Only set if type == TYPE_CHAR or type == TYPE_VARCHAR.
avro_schema_t schema
If not NULL, this owns a reference to schema.
llvm::Function * CodegenUpdateNull(LlvmCodeGen *, llvm::StructType *tuple, bool set_null)
static void Write(const void *value, Tuple *tuple, const SlotDescriptor *slot_desc, MemPool *pool)
RuntimeState * runtime_state()
Wrapper for avro_schema_t's that handles decrementing the ref count.
virtual Status InitNewRange()
Reset internal state for a new scan range.
void ReadAvroChar(PrimitiveType type, int max_len, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
static const char * LLVM_CLASS_NAME
void * GetCodegenFn(THdfsFileFormat::type)
const std::vector< std::string > & col_names() const
static const std::string AVRO_SCHEMA_KEY
Metadata keys.
void AttachPool(MemPool *pool, bool commit_batch)
const SlotDescriptor * slot_desc
void ReadAvroFloat(PrimitiveType type, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
void TransferToScanNodePool(MemPool *pool)
Acquires all allocations from pool into scan_node_pool_. Thread-safe.
bool codegen_enabled() const
Returns true if codegen is enabled for this query.
static const char * LLVM_CLASS_NAME
HdfsAvroScanner(HdfsScanNode *scan_node, RuntimeState *state)
llvm::Type * GetType(const ColumnType &type)
Returns llvm type for the column type.
Status GetCodegen(LlvmCodeGen **codegen, bool initialize=true)
int field_idx() const
Returns the field index in the generated llvm struct for this slot's tuple.
void ReadAvroInt64(PrimitiveType type, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
llvm::Value * GetIntConstant(PrimitiveType type, int64_t val)
Returns the constant 'val' of 'type'.
static llvm::Function * Codegen(HdfsScanNode *, const std::vector< ExprContext * > &conjunct_ctxs)
Codegen parsing records, writing tuples and evaluating predicates.
void ReadAvroInt32(PrimitiveType type, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
llvm::Function * FinalizeFunction(llvm::Function *function)
static std::string HexDump(const uint8_t *buf, int64_t length)
Dump the first length bytes of buf to a Hex string.
static llvm::Function * CodegenDecodeAvroData(RuntimeState *state, llvm::Function *materialize_tuple_fn, const std::vector< ExprContext * > &conjunct_ctxs)
llvm::Function * ReplaceCallSites(llvm::Function *caller, bool update_in_place, llvm::Function *new_fn, const std::string &target_name, int *num_replaced)
RuntimeProfile::Counter * decompress_timer_
Time spent decompressing bytes.
DecodeAvroDataFn codegend_decode_avro_data_
The codegen'd version of DecodeAvroData() if available, NULL otherwise.
bool ReadBytes(int64_t length, uint8_t **buf, Status *, bool peek=false)
ScopedAvroSchemaT schema
The record field schema from the file.
void ReadAvroString(PrimitiveType type, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
Status ResolveSchemas(const avro_schema_t &table_root, const avro_schema_t &file_root)
llvm::LLVMContext & context()
llvm::Function * OptimizeFunctionWithExprs(llvm::Function *fn)
static const std::string AVRO_NULL_CODEC
Supported codecs, as they appear in the metadata.
const std::string & avro_schema() const
static const std::string AVRO_CODEC_KEY
void IncNumScannersCodegenDisabled()
static const std::string AVRO_DEFLATE_CODEC
ScannerContext::Stream * stream_
The first stream for context_.
static const uint TRAILING_CHECKSUM_LEN
const TupleDescriptor * tuple_desc()
AvroFileHeader * avro_header_
llvm::PointerType * ptr_type()
RuntimeProfile::Counter * materialize_tuple_timer() const
bool ReadUnionType(int null_union_position, uint8_t **data)