Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hdfs-avro-scanner.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "exec/hdfs-avro-scanner.h"
16 
17 #include <avro/errors.h>
18 #include <avro/legacy.h>
19 #include <avro/schema.h>
20 #include <boost/foreach.hpp>
21 
22 #include "codegen/llvm-codegen.h"
23 #include "exec/hdfs-scan-node.h"
24 #include "exec/read-write-util.h"
26 #include "runtime/raw-value.h"
27 #include "runtime/runtime-state.h"
28 #include "util/codec.h"
29 #include "util/decompress.h"
30 #include "util/runtime-profile.h"
31 
32 #include "common/names.h"
33 
34 // Note: the Avro C++ library uses exceptions for error handling. Any Avro
35 // function that may throw an exception must be placed in a try/catch block.
36 using namespace impala;
37 using namespace llvm;
38 
39 const char* HdfsAvroScanner::LLVM_CLASS_NAME = "class.impala::HdfsAvroScanner";
40 const uint8_t HdfsAvroScanner::AVRO_VERSION_HEADER[4] = {'O', 'b', 'j', 1};
41 
42 const string HdfsAvroScanner::AVRO_SCHEMA_KEY("avro.schema");
43 const string HdfsAvroScanner::AVRO_CODEC_KEY("avro.codec");
44 const string HdfsAvroScanner::AVRO_NULL_CODEC("null");
45 const string HdfsAvroScanner::AVRO_SNAPPY_CODEC("snappy");
46 const string HdfsAvroScanner::AVRO_DEFLATE_CODEC("deflate");
47 
48 #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_
49 
51  schema = other.schema;
52  avro_schema_incref(schema);
53 }
54 
56  // avro_schema_decref can handle NULL
57  avro_schema_decref(schema);
58 }
59 
61  const avro_schema_t& s) {
62  if (LIKELY(s != schema)) {
63  avro_schema_decref(schema);
64  schema = s;
65  }
66  return *this;
67 }
68 
70  const ScopedAvroSchemaT& other) {
71  if (this == &other) return *this;
72  avro_schema_decref(schema);
73  schema = other.schema;
74  avro_schema_incref(schema);
75  return *this;
76 }
78  : BaseSequenceScanner(scan_node, state),
79  avro_header_(NULL),
80  codegend_decode_avro_data_(NULL) {
81 }
82 
84  const vector<ExprContext*>& conjunct_ctxs) {
85  if (!node->runtime_state()->codegen_enabled()) return NULL;
86  LlvmCodeGen* codegen;
87  if (!node->runtime_state()->GetCodegen(&codegen).ok()) return NULL;
88  Function* materialize_tuple_fn = CodegenMaterializeTuple(node, codegen);
89  if (materialize_tuple_fn == NULL) return NULL;
90  return CodegenDecodeAvroData(node->runtime_state(), materialize_tuple_fn, conjunct_ctxs);
91 }
92 
94  AvroFileHeader* header = new AvroFileHeader();
96  return header;
97 }
98 
100  avro_header_ = reinterpret_cast<AvroFileHeader*>(header_);
101 
102  // Check version header
103  uint8_t* header;
105  sizeof(AVRO_VERSION_HEADER), &header, &parse_status_));
106  if (memcmp(header, AVRO_VERSION_HEADER, sizeof(AVRO_VERSION_HEADER))) {
107  stringstream ss;
108  ss << "Invalid AVRO_VERSION_HEADER: '"
109  << ReadWriteUtil::HexDump(header, sizeof(AVRO_VERSION_HEADER)) << "'";
110  return Status(ss.str());
111  }
112 
113  // Decode relevant metadata (encoded as Avro map)
115 
116  // Read file sync marker
117  uint8_t* sync;
119  memcpy(header_->sync, sync, SYNC_HASH_SIZE);
120 
122  return Status::OK;
123 }
124 
126  header_->is_compressed = false;
127  header_->compression_type = THdfsCompression::NONE;
128 
129  int64_t num_entries;
131  if (num_entries < 1) return Status("File header metadata has no data");
132 
133  while (num_entries != 0) {
134  DCHECK_GT(num_entries, 0);
135  for (int i = 0; i < num_entries; ++i) {
136  // Decode Avro string-type key
137  string key;
138  uint8_t* key_buf;
139  int64_t key_len;
141  DCHECK_GE(key_len, 0);
142  RETURN_IF_FALSE(stream_->ReadBytes(key_len, &key_buf, &parse_status_));
143  key = string(reinterpret_cast<char*>(key_buf), key_len);
144 
145  // Decode Avro bytes-type value
146  uint8_t* value;
147  int64_t value_len;
149  DCHECK_GE(value_len, 0);
150  RETURN_IF_FALSE(stream_->ReadBytes(value_len, &value, &parse_status_));
151 
152  if (key == AVRO_SCHEMA_KEY) {
153  avro_schema_t raw_file_schema;
154  int error = avro_schema_from_json_length(
155  reinterpret_cast<char*>(value), value_len, &raw_file_schema);
156  ScopedAvroSchemaT file_schema(raw_file_schema);
157  if (error != 0) {
158  stringstream ss;
159  ss << "Failed to parse file schema: " << avro_strerror();
160  return Status(ss.str());
161  }
162 
163  const string& table_schema_str = scan_node_->hdfs_table()->avro_schema();
164  DCHECK_GT(table_schema_str.size(), 0);
165  avro_schema_t raw_table_schema;
166  error = avro_schema_from_json_length(
167  table_schema_str.c_str(), table_schema_str.size(), &raw_table_schema);
168  ScopedAvroSchemaT table_schema(raw_table_schema);
169  if (error != 0) {
170  stringstream ss;
171  ss << "Failed to parse table schema: " << avro_strerror();
172  return Status(ss.str());
173  }
174  RETURN_IF_ERROR(ResolveSchemas(table_schema.get(), file_schema.get()));
175 
176  // We currently codegen a function only for the table schema. If this file's
177  // schema is different from the table schema, don't use the codegen'd function and
178  // use the interpreted path instead.
180  avro_schema_equal(table_schema.get(), file_schema.get());
181 
182  } else if (key == AVRO_CODEC_KEY) {
183  string avro_codec(reinterpret_cast<char*>(value), value_len);
184  if (avro_codec != AVRO_NULL_CODEC) {
185  header_->is_compressed = true;
186  // This scanner doesn't use header_->codec (Avro doesn't use the
187  // Hadoop codec strings), but fill it in for logging
188  header_->codec = avro_codec;
189  if (avro_codec == AVRO_SNAPPY_CODEC) {
190  header_->compression_type = THdfsCompression::SNAPPY;
191  } else if (avro_codec == AVRO_DEFLATE_CODEC) {
192  header_->compression_type = THdfsCompression::DEFLATE;
193  } else {
194  return Status("Unknown Avro compression codec: " + avro_codec);
195  }
196  }
197  } else {
198  VLOG_ROW << "Skipping metadata entry: " << key;
199  }
200  }
202  }
203 
204  VLOG_FILE << stream_->filename() << ": "
205  << (header_->is_compressed ? "compressed" : "not compressed");
207  if (avro_header_->schema.empty()) {
208  return Status("Schema not found in file header metadata");
209  }
210  return Status::OK;
211 }
212 
213 // Schema resolution is performed by first iterating through the file schema's fields to
214 // construct avro_header_->schema, checking that materialized columns have resolvable file
215 // and table schema fields. Next we iterate through the table schema's fields and check
216 // that any materialized columns missing from the file schema have compatible default
217 // values in the table schema.
218 // Note that schema resolution is only performed for materialized columns.
219 // TODO: test unresolvable schemas
221  const avro_schema_t& file_schema) {
222  if (table_schema->type != AVRO_RECORD) {
223  return Status("Table schema is not a record");
224  }
225  if (file_schema->type != AVRO_RECORD) {
226  return Status("File schema is not a record");
227  }
228 
229  int num_table_fields = avro_schema_record_size(table_schema);
230  DCHECK_GT(num_table_fields, 0);
231 
232  int num_cols = scan_node_->hdfs_table()->num_cols() - scan_node_->num_partition_keys();
233  int max_materialized_col_idx = -1;
234  if (!scan_node_->materialized_slots().empty()) {
235  max_materialized_col_idx = scan_node_->materialized_slots().back()->col_pos()
237  }
238  if (num_table_fields < num_cols) {
239  stringstream ss;
240  ss << "The table has " << num_cols << " non-partition columns "
241  << "but the table's Avro schema has " << num_table_fields << " fields.";
242  state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
243  }
244  if (num_table_fields <= max_materialized_col_idx) {
245  return Status("Cannot read column that doesn't appear in table schema");
246  }
247 
248  // Maps table field index -> if a matching file field was found
249  bool file_field_found[num_table_fields];
250  memset(&file_field_found, 0, num_table_fields);
251 
252  int num_file_fields = avro_schema_record_size(file_schema);
253  DCHECK_GT(num_file_fields, 0);
254  for (int i = 0; i < num_file_fields; ++i) {
255  avro_datum_t file_field = avro_schema_record_field_get_by_index(file_schema, i);
256  SchemaElement element = ConvertSchema(file_field);
257  if (is_avro_complex_type(element.schema.get())) {
258  stringstream ss;
259  ss << "Complex Avro data types (records, enums, arrays, maps, unions, and fixed) "
260  << "are not supported. Got type: " << avro_type_name(element.schema->type);
261  return Status(ss.str());
262  }
263 
264  const char* field_name = avro_schema_record_field_name(file_schema, i);
265  int table_field_idx = avro_schema_record_field_get_index(table_schema, field_name);
266  if (table_field_idx < 0) {
267  // File has extra field, ignore
268  element.slot_desc = NULL;
269  avro_header_->schema.push_back(element);
270  continue;
271  }
272  file_field_found[table_field_idx] = true;
273 
274  // The table schema's fields define the table column ordering, and the table schema
275  // can have more fields than the table has columns. Treat extra fields as
276  // unmaterialized columns.
277  int slot_idx = table_field_idx < num_cols ?
279  vector<int>(1, table_field_idx + scan_node_->num_partition_keys()))
281 
282  if (slot_idx != HdfsScanNode::SKIP_COLUMN) {
283  SlotDescriptor* slot_desc = scan_node_->materialized_slots()[slot_idx];
284  element.slot_desc = slot_desc;
285 
286  // Use element.type (rather than file_field->type) so that e.g. "[int, null]" is
287  // treated as an int and not a union
288  RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, element.schema.get()));
289 
290  // Check that the corresponding table field type matches the declared column
291  // type. This check is not strictly necessary since we won't use its default value
292  // (i.e., if the check doesn't pass, we can still process the file normally), but
293  // allowing the table schema to differ from the table columns could lead to
294  // confusing situations. Note that this check is only performed for materialized
295  // columns.
296  // TODO: get rid of separate table schema and table column concepts (i.e. get rid of
297  // table schema and store default values somewhere else)
298  avro_schema_t table_field =
299  avro_schema_record_field_get_by_index(table_schema, table_field_idx);
300  SchemaElement table_element = ConvertSchema(table_field);
301  RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, table_element.schema.get()));
302  } else {
303  element.slot_desc = NULL;
304  }
305  avro_header_->schema.push_back(element);
306  }
307  DCHECK_EQ(avro_header_->schema.size(), num_file_fields);
308 
309  // Check that all materialized fields either appear in the file schema or have a default
310  // value in the table schema
311  BOOST_FOREACH(SlotDescriptor* slot_desc, scan_node_->materialized_slots()) {
312  int col_idx = slot_desc->col_pos() - scan_node_->num_partition_keys();
313  if (file_field_found[col_idx]) continue;
314 
315  avro_datum_t default_value = avro_schema_record_field_default(table_schema, col_idx);
316  if (default_value == NULL) {
317  stringstream ss;
318  ss << "Field " << avro_schema_record_field_name(table_schema, col_idx)
319  << " is missing from file and does not have a default value";
320  return Status(ss.str());
321  }
322  RETURN_IF_ERROR(VerifyTypesMatch(slot_desc, default_value));
323 
324  if (avro_header_->template_tuple == NULL) {
326  template_tuple_ != NULL ?
328  }
329 
330  switch(default_value->type) {
331  case AVRO_BOOLEAN: {
332  int8_t v;
333  if (avro_boolean_get(default_value, &v)) DCHECK(false);
334  RawValue::Write(&v, avro_header_->template_tuple, slot_desc, NULL);
335  break;
336  }
337  case AVRO_INT32: {
338  int32_t v;
339  if (avro_int32_get(default_value, &v)) DCHECK(false);
340  RawValue::Write(&v, avro_header_->template_tuple, slot_desc, NULL);
341  break;
342  }
343  case AVRO_INT64: {
344  int64_t v;
345  if (avro_int64_get(default_value, &v)) DCHECK(false);
346  RawValue::Write(&v, avro_header_->template_tuple, slot_desc, NULL);
347  break;
348  }
349  case AVRO_FLOAT: {
350  float v;
351  if (avro_float_get(default_value, &v)) DCHECK(false);
352  RawValue::Write(&v, avro_header_->template_tuple, slot_desc, NULL);
353  break;
354  }
355  case AVRO_DOUBLE: {
356  double v;
357  if (avro_double_get(default_value, &v)) DCHECK(false);
358  RawValue::Write(&v, avro_header_->template_tuple, slot_desc, NULL);
359  break;
360  }
361  case AVRO_STRING:
362  case AVRO_BYTES: {
363  // Mempools aren't thread safe so make a local one and transfer it
364  // to the scan node pool.
366  char* v;
367  if (avro_string_get(default_value, &v)) DCHECK(false);
368  StringValue sv(v);
369  RawValue::Write(&sv, avro_header_->template_tuple, slot_desc, &pool);
371  break;
372  }
373  case AVRO_NULL:
375  break;
376  default:
377  DCHECK(false);
378  }
379  }
380  return Status::OK;
381 }
382 
384  const avro_schema_t& schema) {
385  SchemaElement element;
386  element.schema = schema;
387  // Increment the ref count of 'schema' on behalf of the ScopedAvroSchemaT it was
388  // assigned to. This allows 'schema' to outlive the scope it was passed in from (e.g.,
389  // a parent record schema).
390  avro_schema_incref(schema);
391  element.null_union_position = -1;
392 
393  // Look for special case of [<primitive type>, "null"] union
394  if (element.schema->type == AVRO_UNION) {
395  int num_fields = avro_schema_union_size(schema);
396  DCHECK_GT(num_fields, 0);
397  if (num_fields == 2) {
398  avro_schema_t child0 = avro_schema_union_branch(schema, 0);
399  avro_schema_t child1 = avro_schema_union_branch(schema, 1);
400  int null_position = -1;
401  if (child0->type == AVRO_NULL) {
402  null_position = 0;
403  } else if (child1->type == AVRO_NULL) {
404  null_position = 1;
405  }
406 
407  if (null_position != -1) {
408  avro_schema_t non_null_child = null_position == 0 ? child1 : child0;
409  SchemaElement child = ConvertSchema(non_null_child);
410 
411  // 'schema' is a [<child>, "null"] union. If child is a primitive type (i.e.,
412  // not a complex type nor a [<primitive type>, "null"] union itself), we treat
413  // this node as the same type as child except with null_union_position set
414  // appropriately.
415  if (is_avro_primitive(child.schema.get()) && child.null_union_position == -1) {
416  element = child;
417  element.null_union_position = null_position;
418  }
419  }
420  }
421  }
422  // TODO: populate children of complex types
423  return element;
424 }
425 
427  avro_obj_t* schema) {
428  switch (schema->type) {
429  case AVRO_DECIMAL:
430  if (slot_desc->type().type != TYPE_DECIMAL) break;
431  if (slot_desc->type().scale != avro_schema_decimal_scale(schema)) {
432  const string& col_name =
433  scan_node_->hdfs_table()->col_names()[slot_desc->col_pos()];
434  stringstream ss;
435  ss << "File '" << stream_->filename() << "' column '" << col_name
436  << "' has a scale that does not match the table metadata scale."
437  << " File metadata scale: " << avro_schema_decimal_scale(schema)
438  << " Table metadata scale: " << slot_desc->type().scale;
439  return Status(ss.str());
440  }
441  if (slot_desc->type().precision != avro_schema_decimal_precision(schema)) {
442  const string& col_name =
443  scan_node_->hdfs_table()->col_names()[slot_desc->col_pos()];
444  stringstream ss;
445  ss << "File '" << stream_->filename() << "' column '" << col_name
446  << "' has a precision that does not match the table metadata precision."
447  << " File metadata precision: " << avro_schema_decimal_precision(schema)
448  << " Table metadata precision: " << slot_desc->type().precision;
449  return Status(ss.str());
450  }
451  return Status::OK;
452  case AVRO_NULL:
453  // All Impala types are nullable
454  return Status::OK;
455  case AVRO_STRING:
456  case AVRO_BYTES:
457  if (slot_desc->type().IsStringType()) return Status::OK;
458  break;
459  case AVRO_INT32:
460  if (slot_desc->type().type == TYPE_INT) return Status::OK;
461  // Type promotion
462  if (slot_desc->type().type == TYPE_BIGINT) return Status::OK;
463  if (slot_desc->type().type == TYPE_FLOAT) return Status::OK;
464  if (slot_desc->type().type == TYPE_DOUBLE) return Status::OK;
465  break;
466  case AVRO_INT64:
467  if (slot_desc->type().type == TYPE_BIGINT) return Status::OK;
468  // Type promotion
469  if (slot_desc->type().type == TYPE_FLOAT) return Status::OK;
470  if (slot_desc->type().type == TYPE_DOUBLE) return Status::OK;
471  break;
472  case AVRO_FLOAT:
473  if (slot_desc->type().type == TYPE_FLOAT) return Status::OK;
474  // Type promotion
475  if (slot_desc->type().type == TYPE_DOUBLE) return Status::OK;
476  break;
477  case AVRO_DOUBLE:
478  if (slot_desc->type().type == TYPE_DOUBLE) return Status::OK;
479  break;
480  case AVRO_BOOLEAN:
481  if (slot_desc->type().type == TYPE_BOOLEAN) return Status::OK;
482  break;
483  default:
484  break;
485  }
486  stringstream ss;
487  ss << "Unresolvable column types (column " << slot_desc->col_pos() << "): "
488  << "declared type = " << slot_desc->type() << ", "
489  << "Avro type = " << avro_type_name(schema->type);
490  return Status(ss.str());
491 }
492 
494  DCHECK(header_ != NULL);
495  only_parsing_header_ = false;
496  avro_header_ = reinterpret_cast<AvroFileHeader*>(header_);
498  if (header_->is_compressed) {
500  }
501 
503  codegend_decode_avro_data_ = reinterpret_cast<DecodeAvroDataFn>(
504  scan_node_->GetCodegenFn(THdfsFileFormat::AVRO));
505  }
506  if (codegend_decode_avro_data_ == NULL) {
508  } else {
509  VLOG(2) << "HdfsAvroScanner (node_id=" << scan_node_->id()
510  << ") using llvm codegend functions.";
512  }
513 
514  return Status::OK;
515 }
516 
518  while (!finished()) {
519  int64_t num_records;
520  uint8_t* compressed_data;
521  int64_t compressed_size;
522  uint8_t* data;
523 
524  // Read new data block
526  stream_->ReadZLong(&num_records, &parse_status_));
527  DCHECK_GE(num_records, 0);
528  RETURN_IF_FALSE(stream_->ReadZLong(&compressed_size, &parse_status_));
529  DCHECK_GE(compressed_size, 0);
531  compressed_size, &compressed_data, &parse_status_));
532 
533  if (header_->is_compressed) {
534  if (header_->compression_type == THdfsCompression::SNAPPY) {
535  // Snappy-compressed data block includes trailing 4-byte checksum,
536  // decompressor_ doesn't expect this
538  }
539  int64_t size;
541  RETURN_IF_ERROR(decompressor_->ProcessBlock(false, compressed_size, compressed_data,
542  &size, &data));
543  VLOG_FILE << "Decompressed " << compressed_size << " to " << size;
544  } else {
545  data = compressed_data;
546  }
547 
548  // Process block data
549  while (num_records > 0) {
551 
552  MemPool* pool;
553  Tuple* tuple;
554  TupleRow* tuple_row;
555  int max_tuples = GetMemory(&pool, &tuple, &tuple_row);
556  max_tuples = min(num_records, static_cast<int64_t>(max_tuples));
557  int num_to_commit;
558  if (scan_node_->materialized_slots().empty()) {
559  // No slots to materialize (e.g. count(*)), no need to decode data
560  num_to_commit = WriteEmptyTuples(context_, tuple_row, max_tuples);
561  } else {
562  if (codegend_decode_avro_data_ != NULL) {
563  num_to_commit = codegend_decode_avro_data_(
564  this, max_tuples, pool, &data, tuple, tuple_row);
565  } else {
566  num_to_commit = DecodeAvroData(max_tuples, pool, &data, tuple, tuple_row);
567  }
568  }
569  RETURN_IF_ERROR(CommitRows(num_to_commit));
570  num_records -= max_tuples;
571  COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples);
572 
573  if (scan_node_->ReachedLimit()) return Status::OK;
574  }
575 
576  if (decompressor_.get() != NULL && !decompressor_->reuse_output_buffer()) {
577  AttachPool(data_buffer_pool_.get(), true);
578  }
580  }
581 
582  return Status::OK;
583 }
584 
585 void HdfsAvroScanner::MaterializeTuple(MemPool* pool, uint8_t** data, Tuple* tuple) {
586  BOOST_FOREACH(const SchemaElement& element, avro_header_->schema) {
587  const SlotDescriptor* slot_desc = element.slot_desc;
588  bool write_slot = false;
589  void* slot = NULL;
590  PrimitiveType slot_type = INVALID_TYPE;
591  if (slot_desc != NULL) {
592  write_slot = true;
593  slot = tuple->GetSlot(slot_desc->tuple_offset());
594  slot_type = slot_desc->type().type;
595  }
596 
597  avro_type_t type = element.schema->type;
598  if (element.null_union_position != -1
599  && !ReadUnionType(element.null_union_position, data)) {
600  type = AVRO_NULL;
601  }
602 
603  switch (type) {
604  case AVRO_NULL:
605  if (slot_desc != NULL) tuple->SetNull(slot_desc->null_indicator_offset());
606  break;
607  case AVRO_BOOLEAN:
608  ReadAvroBoolean(slot_type, data, write_slot, slot, pool);
609  break;
610  case AVRO_INT32:
611  ReadAvroInt32(slot_type, data, write_slot, slot, pool);
612  break;
613  case AVRO_INT64:
614  ReadAvroInt64(slot_type, data, write_slot, slot, pool);
615  break;
616  case AVRO_FLOAT:
617  ReadAvroFloat(slot_type, data, write_slot, slot, pool);
618  break;
619  case AVRO_DOUBLE:
620  ReadAvroDouble(slot_type, data, write_slot, slot, pool);
621  break;
622  case AVRO_STRING:
623  case AVRO_BYTES:
624  if (slot_desc != NULL && slot_desc->type().type == TYPE_VARCHAR) {
625  ReadAvroVarchar(slot_type, slot_desc->type().len, data, write_slot, slot, pool);
626  } else if (slot_desc != NULL && slot_desc->type().type == TYPE_CHAR) {
627  ReadAvroChar(slot_type, slot_desc->type().len, data, write_slot, slot, pool);
628  } else {
629  ReadAvroString(slot_type, data, write_slot, slot, pool);
630  }
631  break;
632  case AVRO_DECIMAL: {
633  int slot_byte_size = 0;
634  if (slot_desc != NULL) {
635  DCHECK_EQ(slot_type, TYPE_DECIMAL);
636  slot_byte_size = slot_desc->type().GetByteSize();
637  }
638  ReadAvroDecimal(slot_byte_size, data, write_slot, slot, pool);
639  break;
640  }
641  default:
642  DCHECK(false) << "Unsupported SchemaElement: " << type;
643  }
644  }
645 }
646 
647 // This function produces a codegen'd function equivalent to MaterializeTuple() but
648 // optimized for the table schema. It eliminates the conditionals necessary when
649 // interpreting the type of each element in the schema, instead generating code to handle
650 // each element in a record. Example output:
651 //
652 // define void @MaterializeTuple(%"class.impala::HdfsAvroScanner"* %this,
653 // %"class.impala::MemPool"* %pool, i8** %data, %"class.impala::Tuple"* %tuple) {
654 // entry:
655 // %tuple_ptr = bitcast %"class.impala::Tuple"* %tuple to { i8, i32 }*
656 // %is_not_null = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPh(
657 // %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data)
658 // br i1 %is_not_null, label %read_field, label %null_field
659 //
660 // read_field: ; preds = %entry
661 // %slot = getelementptr inbounds { i8, i32 }* %tuple_ptr, i32 0, i32 1
662 // %opaque_slot = bitcast i32* %slot to i8*
663 // call void
664 // @_ZN6impala15HdfsAvroScanner13ReadAvroInt32ENS_13PrimitiveTypeEPPhPvPNS_7MemPoolE(
665 // %"class.impala::HdfsAvroScanner"* %this, i32 5, i8** %data,
666 // i8* %opaque_slot, %"class.impala::MemPool"* %pool)
667 // br label %endif
668 //
669 // null_field: ; preds = %entry
670 // call void @SetNull({ i8, i32 }* %tuple_ptr)
671 // br label %endif
672 //
673 // endif: ; preds = %null_field, %read_field
674 // %is_not_null4 = call i1 @_ZN6impala15HdfsAvroScanner13ReadUnionTypeEiPPh(
675 // %"class.impala::HdfsAvroScanner"* %this, i32 1, i8** %data)
676 // br i1 %is_not_null4, label %read_field1, label %null_field2
677 //
678 // read_field1: ; preds = %endif
679 // call void
680 // @_ZN6impala15HdfsAvroScanner15ReadAvroBooleanENS_13PrimitiveTypeEPPhPvPNS_7MemPoolE(
681 // %"class.impala::HdfsAvroScanner"* %this, i32 0, i8** %data,
682 // i8* null, %"class.impala::MemPool"* %pool)
683 // br label %endif3
684 //
685 // null_field2: ; preds = %endif
686 // br label %endif3
687 //
688 // endif3: ; preds = %null_field2, %read_field1
689 // ret void
690 // }
692  LlvmCodeGen* codegen) {
693  const string& table_schema_str = node->hdfs_table()->avro_schema();
694 
695  // HdfsAvroScanner::Codegen() (which calls this function) gets called by HdfsScanNode
696  // regardless of whether the table we're scanning contains Avro files or not. If this
697  // isn't an Avro table, there is no table schema to codegen a function from (and there's
698  // no need to anyway).
699  // TODO: HdfsScanNode shouldn't codegen functions it doesn't need.
700  if (table_schema_str.empty()) return NULL;
701 
702  avro_schema_t raw_table_schema;
703  int error = avro_schema_from_json_length(
704  table_schema_str.c_str(), table_schema_str.size(), &raw_table_schema);
705  ScopedAvroSchemaT table_schema(raw_table_schema);
706  if (error != 0) {
707  stringstream ss;
708  ss << "Failed to parse table schema: " << avro_strerror();
709  node->runtime_state()->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
710  return NULL;
711  }
712  int num_fields = avro_schema_record_size(table_schema.get());
713  DCHECK_GT(num_fields, 0);
714  // Disable Codegen for TYPE_CHAR
715  for (int field_idx = 0; field_idx < num_fields; ++field_idx) {
716  int col_idx = field_idx + node->num_partition_keys();
717  int slot_idx = node->GetMaterializedSlotIdx(vector<int>(1, col_idx));
718  if (slot_idx != HdfsScanNode::SKIP_COLUMN) {
719  SlotDescriptor* slot_desc = node->materialized_slots()[slot_idx];
720  if (slot_desc->type().type == TYPE_CHAR) {
721  LOG(INFO) << "Avro codegen skipped because CHAR is not supported.";
722  return NULL;
723  }
724  }
725  }
726 
727  LLVMContext& context = codegen->context();
728  LlvmCodeGen::LlvmBuilder builder(context);
729 
730  Type* this_type = codegen->GetType(HdfsAvroScanner::LLVM_CLASS_NAME);
731  DCHECK(this_type != NULL);
732  PointerType* this_ptr_type = PointerType::get(this_type, 0);
733 
734  TupleDescriptor* tuple_desc = const_cast<TupleDescriptor*>(node->tuple_desc());
735  StructType* tuple_type = tuple_desc->GenerateLlvmStruct(codegen);
736  Type* tuple_ptr_type = PointerType::get(tuple_type, 0);
737 
738  Type* tuple_opaque_type = codegen->GetType(Tuple::LLVM_CLASS_NAME);
739  PointerType* tuple_opaque_ptr_type = PointerType::get(tuple_opaque_type, 0);
740 
741  Type* data_ptr_type = PointerType::get(codegen->ptr_type(), 0); // char**
742  Type* mempool_type = PointerType::get(codegen->GetType(MemPool::LLVM_CLASS_NAME), 0);
743 
744  LlvmCodeGen::FnPrototype prototype(codegen, "MaterializeTuple", codegen->void_type());
745  prototype.AddArgument(LlvmCodeGen::NamedVariable("this", this_ptr_type));
746  prototype.AddArgument(LlvmCodeGen::NamedVariable("pool", mempool_type));
747  prototype.AddArgument(LlvmCodeGen::NamedVariable("data", data_ptr_type));
748  prototype.AddArgument(LlvmCodeGen::NamedVariable("tuple", tuple_opaque_ptr_type));
749  Value* args[4];
750  Function* fn = prototype.GeneratePrototype(&builder, args);
751 
752  Value* this_val = args[0];
753  Value* pool_val = args[1];
754  Value* data_val = args[2];
755  Value* opaque_tuple_val = args[3];
756 
757  Value* tuple_val = builder.CreateBitCast(opaque_tuple_val, tuple_ptr_type, "tuple_ptr");
758 
759  // Codegen logic for parsing each field and, if necessary, populating a slot with the
760  // result.
761  for (int field_idx = 0; field_idx < num_fields; ++field_idx) {
762  avro_datum_t field =
763  avro_schema_record_field_get_by_index(table_schema.get(), field_idx);
764  SchemaElement element = ConvertSchema(field);
765  int col_idx = field_idx + node->num_partition_keys();
766  int slot_idx = node->GetMaterializedSlotIdx(vector<int>(1, col_idx));
767 
768  // The previous iteration may have left the insert point somewhere else
769  builder.SetInsertPoint(&fn->back());
770 
771  // Block that calls appropriate Read<Type> function
772  BasicBlock* read_field_block = BasicBlock::Create(context, "read_field", fn);
773 
774  if (element.null_union_position != -1) {
775  // Field could be null. Create conditional branch based on ReadUnionType result.
776  BasicBlock* null_block = BasicBlock::Create(context, "null_field", fn);
777  BasicBlock* endif_block = BasicBlock::Create(context, "endif", fn);
778  Function* read_union_fn =
779  codegen->GetFunction(IRFunction::READ_UNION_TYPE);
780  Value* null_union_pos_val =
781  codegen->GetIntConstant(TYPE_INT, element.null_union_position);
782  Value* is_not_null_val = builder.CreateCall3(
783  read_union_fn, this_val, null_union_pos_val, data_val, "is_not_null");
784  builder.CreateCondBr(is_not_null_val, read_field_block, null_block);
785 
786  // Write branch at end of read_field_block, we fill in the rest later
787  builder.SetInsertPoint(read_field_block);
788  builder.CreateBr(endif_block);
789 
790  // Write null field IR
791  builder.SetInsertPoint(null_block);
792  if (slot_idx != HdfsScanNode::SKIP_COLUMN) {
793  SlotDescriptor* slot_desc = node->materialized_slots()[slot_idx];
794  Function* set_null_fn = slot_desc->CodegenUpdateNull(codegen, tuple_type, true);
795  DCHECK(set_null_fn != NULL);
796  builder.CreateCall(set_null_fn, tuple_val);
797  }
798  // LLVM requires all basic blocks to end with a terminating instruction
799  builder.CreateBr(endif_block);
800  } else {
801  // Field is never null, read field unconditionally.
802  builder.CreateBr(read_field_block);
803  }
804 
805  SlotDescriptor* slot_desc = NULL;
806  if (slot_idx != HdfsScanNode::SKIP_COLUMN) {
807  slot_desc = node->materialized_slots()[slot_idx];
808  }
809 
810  // Write read_field_block IR starting at the beginning of the block
811  builder.SetInsertPoint(read_field_block, read_field_block->begin());
812  Function* read_field_fn;
813  switch (element.schema->type) {
814  case AVRO_BOOLEAN:
815  read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_BOOLEAN);
816  break;
817  case AVRO_INT32:
818  read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_INT32);
819  break;
820  case AVRO_INT64:
821  read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_INT64);
822  break;
823  case AVRO_FLOAT:
824  read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_FLOAT);
825  break;
826  case AVRO_DOUBLE:
827  read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_DOUBLE);
828  break;
829  case AVRO_STRING:
830  case AVRO_BYTES:
831  if ((slot_idx != HdfsScanNode::SKIP_COLUMN) &&
832  (slot_desc->type().type == TYPE_VARCHAR)) {
833  read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_VARCHAR);
834  } else {
835  read_field_fn = codegen->GetFunction(IRFunction::READ_AVRO_STRING);
836  }
837  break;
838  default:
839  // Unsupported type, can't codegen
840  VLOG(1) << "Failed to codegen MaterializeTuple() due to unsupported type: "
841  << element.schema->type;
842  fn->eraseFromParent();
843  return NULL;
844  }
845 
846  // Call appropriate ReadAvro<Type> function
847  Value* write_slot_val = builder.getFalse();
848  Value* slot_type_val = builder.getInt32(0);
849  Value* opaque_slot_val = codegen->null_ptr_value();
850  if (slot_idx != HdfsScanNode::SKIP_COLUMN) {
851  // Field corresponds to a materialized column, fill in relevant arguments
852  write_slot_val = builder.getTrue();
853  if (slot_desc->type().type == TYPE_DECIMAL) {
854  // ReadAvroDecimal() takes slot byte size instead of slot type
855  slot_type_val = builder.getInt32(slot_desc->type().GetByteSize());
856  } else {
857  slot_type_val = builder.getInt32(slot_desc->type().type);
858  }
859  Value* slot_val =
860  builder.CreateStructGEP(tuple_val, slot_desc->field_idx(), "slot");
861  opaque_slot_val =
862  builder.CreateBitCast(slot_val, codegen->ptr_type(), "opaque_slot");
863  }
864 
865  // NOTE: ReadAvroVarchar/Char has different signature than rest of read functions
866  if ((slot_idx != HdfsScanNode::SKIP_COLUMN) &&
867  (slot_desc->type().type == TYPE_VARCHAR ||
868  slot_desc->type().type == TYPE_CHAR)) {
869  // Need to pass an extra argument (the length) to the codegen function
870  Value* fixed_len = builder.getInt32(slot_desc->type().len);
871  Value* read_field_args[] = {this_val, slot_type_val, fixed_len, data_val,
872  write_slot_val, opaque_slot_val, pool_val};
873  builder.CreateCall(read_field_fn, read_field_args);
874  } else {
875  Value* read_field_args[] =
876  {this_val, slot_type_val, data_val, write_slot_val, opaque_slot_val, pool_val};
877  builder.CreateCall(read_field_fn, read_field_args);
878  }
879  }
880  builder.SetInsertPoint(&fn->back());
881  builder.CreateRetVoid();
882  return codegen->FinalizeFunction(fn);
883 }
884 
886  Function* materialize_tuple_fn, const vector<ExprContext*>& conjunct_ctxs) {
887  LlvmCodeGen* codegen;
888  if (!state->GetCodegen(&codegen).ok()) return NULL;
889  SCOPED_TIMER(codegen->codegen_timer());
890  DCHECK(materialize_tuple_fn != NULL);
891 
892  Function* decode_avro_data_fn = codegen->GetFunction(IRFunction::DECODE_AVRO_DATA);
893  int replaced = 0;
894  decode_avro_data_fn = codegen->ReplaceCallSites(decode_avro_data_fn, false,
895  materialize_tuple_fn, "MaterializeTuple", &replaced);
896  DCHECK_EQ(replaced, 1);
897 
898  Function* eval_conjuncts_fn = ExecNode::CodegenEvalConjuncts(state, conjunct_ctxs);
899  decode_avro_data_fn = codegen->ReplaceCallSites(decode_avro_data_fn, false,
900  eval_conjuncts_fn, "EvalConjuncts", &replaced);
901  DCHECK_EQ(replaced, 1);
902  decode_avro_data_fn->setName("DecodeAvroData");
903 
904  decode_avro_data_fn = codegen->OptimizeFunctionWithExprs(decode_avro_data_fn);
905  DCHECK(decode_avro_data_fn != NULL);
906  return decode_avro_data_fn;
907 }
int(* DecodeAvroDataFn)(HdfsAvroScanner *, int, MemPool *, uint8_t **, Tuple *, TupleRow *)
const std::vector< SlotDescriptor * > & materialized_slots() const
virtual Status ProcessRange()
int id() const
Definition: exec-node.h:154
boost::scoped_ptr< Codec > decompressor_
Decompressor class to use, if any.
Definition: hdfs-scanner.h:198
void ReadAvroVarchar(PrimitiveType type, int max_len, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
HdfsScanNode * scan_node_
The scan node that started this scanner.
Definition: hdfs-scanner.h:141
void SetNull(const NullIndicatorOffset &offset)
Definition: tuple.h:101
virtual Status ReadFileHeader()
TODO: check that file schema matches metadata schema.
int num_partition_keys() const
Returns number of partition keys in the table, including non-materialized slots.
ScannerContext * context_
Context for this scanner.
Definition: hdfs-scanner.h:147
RuntimeProfile::Counter * codegen_timer()
Definition: llvm-codegen.h:135
bool only_parsing_header_
If true, this scanner object is only for processing the header.
int64_t total_bytes_returned()
Returns the total number of bytes returned.
FileHeader * header_
File header for this scan range. This is not owned by the parent scan node.
#define RETURN_IF_FALSE(x)
boost::scoped_ptr< MemPool > data_buffer_pool_
Definition: hdfs-scanner.h:205
MemTracker * mem_tracker()
Definition: exec-node.h:162
int GetMaterializedSlotIdx(const std::vector< int > &path) const
static const int SYNC_HASH_SIZE
Size of the sync hash field.
ScopedAvroSchemaT & operator=(const ScopedAvroSchemaT &)
Utility struct that wraps a variable name and llvm type.
Definition: llvm-codegen.h:149
static const std::string AVRO_SNAPPY_CODEC
const HdfsTableDescriptor * hdfs_table()
int precision
Only set if type == TYPE_DECIMAL.
Definition: types.h:68
struct avro_obj_t * avro_schema_t
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
void ReadAvroBoolean(PrimitiveType type, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
static SchemaElement ConvertSchema(const avro_schema_t &schema)
Utility function that maps the Avro library's type representation to our own.
virtual FileHeader * AllocateFileHeader()
Implementation of BaseSeqeunceScanner super class methods.
int WriteEmptyTuples(RowBatch *row_batch, int num_tuples)
Status VerifyTypesMatch(SlotDescriptor *slot_desc, avro_obj_t *schema)
void * GetSlot(int offset)
Definition: tuple.h:118
RuntimeProfile::Counter * rows_read_counter() const
Definition: scan-node.h:96
bool ReadZLong(int64_t *val, Status *)
Read a zigzag encoded long.
Tuple * InitEmptyTemplateTuple()
llvm::StructType * GenerateLlvmStruct(LlvmCodeGen *codegen)
Definition: descriptors.cc:556
#define COUNTER_ADD(c, v)
const NullIndicatorOffset & null_indicator_offset() const
Definition: descriptors.h:89
bool ReachedLimit()
Definition: exec-node.h:159
THdfsCompression::type compression_type
Enum for compression type.
#define SCOPED_TIMER(c)
llvm::Value * null_ptr_value()
Definition: llvm-codegen.h:382
Status ParseMetadata()
Utility function for decoding and parsing file header metadata.
void IncNumScannersCodegenEnabled()
LLVM code generator. This is the top level object to generate jitted code.
Definition: llvm-codegen.h:107
bool IsStringType() const
Definition: types.h:168
RuntimeState * state_
RuntimeState for error reporting.
Definition: hdfs-scanner.h:144
void ReadAvroDecimal(int slot_byte_size, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
PrimitiveType type
Definition: types.h:60
void AddArgument(const NamedVariable &var)
Add argument.
Definition: llvm-codegen.h:171
static llvm::Function * CodegenEvalConjuncts(RuntimeState *state, const std::vector< ExprContext * > &conjunct_ctxs, const char *name="EvalConjuncts")
Definition: exec-node.cc:452
void MaterializeTuple(MemPool *pool, uint8_t **data, Tuple *tuple)
Materializes a single tuple from serialized record data.
Status UpdateDecompressor(const THdfsCompression::type &compression)
static const char * LLVM_CLASS_NAME
For C++/IR interop, we need to be able to look up types by name.
Definition: tuple.h:134
bool LogError(const ErrorMsg &msg)
int GetMemory(MemPool **pool, Tuple **tuple_mem, TupleRow **tuple_row_mem)
const ColumnType & type() const
Definition: descriptors.h:78
static llvm::Function * CodegenMaterializeTuple(HdfsScanNode *node, LlvmCodeGen *codegen)
static const uint8_t AVRO_VERSION_HEADER[4]
PrimitiveType
Definition: types.h:27
void ReadAvroDouble(PrimitiveType type, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
ObjectPool pool
int DecodeAvroData(int max_tuples, MemPool *pool, uint8_t **data, Tuple *tuple, TupleRow *tuple_row)
int GetByteSize() const
Returns the byte size of this type. Returns 0 for variable length types.
Definition: types.h:178
Status CommitRows(int num_rows)
static const int SKIP_COLUMN
llvm::Function * GetFunction(IRFunction::Type)
int len
Only set if type == TYPE_CHAR or type == TYPE_VARCHAR.
Definition: types.h:62
avro_schema_t schema
If not NULL, this owns a reference to schema.
int col_pos() const
Definition: descriptors.h:84
llvm::Function * CodegenUpdateNull(LlvmCodeGen *, llvm::StructType *tuple, bool set_null)
Definition: descriptors.cc:510
static void Write(const void *value, Tuple *tuple, const SlotDescriptor *slot_desc, MemPool *pool)
Definition: raw-value.cc:303
RuntimeState * runtime_state()
Wrapper for avro_schema_t's that handles decrementing the ref count.
#define VLOG_ROW
Definition: logging.h:59
virtual Status InitNewRange()
Reset internal state for a new scan range.
void ReadAvroChar(PrimitiveType type, int max_len, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
static const char * LLVM_CLASS_NAME
Definition: mem-pool.h:177
void * GetCodegenFn(THdfsFileFormat::type)
const std::vector< std::string > & col_names() const
Definition: descriptors.h:165
static const std::string AVRO_SCHEMA_KEY
Metadata keys.
void AttachPool(MemPool *pool, bool commit_batch)
Definition: hdfs-scanner.h:256
void ReadAvroFloat(PrimitiveType type, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
void TransferToScanNodePool(MemPool *pool)
Acquires all allocations from pool into scan_node_pool_. Thread-safe.
bool codegen_enabled() const
Returns true if codegen is enabled for this query.
uint8_t sync[SYNC_HASH_SIZE]
The sync hash for this file.
static const char * LLVM_CLASS_NAME
static const Status OK
Definition: status.h:87
#define LIKELY(expr)
Definition: compiler-util.h:32
HdfsAvroScanner(HdfsScanNode *scan_node, RuntimeState *state)
llvm::Type * GetType(const ColumnType &type)
Returns llvm type for the column type.
Status GetCodegen(LlvmCodeGen **codegen, bool initialize=true)
std::string codec
Codec name if it is compressed.
int tuple_offset() const
Definition: descriptors.h:88
int field_idx() const
Returns the field index in the generated llvm struct for this slot's tuple.
Definition: descriptors.h:87
void ReadAvroInt64(PrimitiveType type, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
llvm::Value * GetIntConstant(PrimitiveType type, int64_t val)
Returns the constant 'val' of 'type'.
static llvm::Function * Codegen(HdfsScanNode *, const std::vector< ExprContext * > &conjunct_ctxs)
Codegen parsing records, writing tuples and evaluating predicates.
void ReadAvroInt32(PrimitiveType type, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
llvm::Function * FinalizeFunction(llvm::Function *function)
static std::string HexDump(const uint8_t *buf, int64_t length)
Dump the first length bytes of buf to a Hex string.
#define VLOG_FILE
Definition: logging.h:58
static llvm::Function * CodegenDecodeAvroData(RuntimeState *state, llvm::Function *materialize_tuple_fn, const std::vector< ExprContext * > &conjunct_ctxs)
llvm::Function * ReplaceCallSites(llvm::Function *caller, bool update_in_place, llvm::Function *new_fn, const std::string &target_name, int *num_replaced)
bool ok() const
Definition: status.h:172
RuntimeProfile::Counter * decompress_timer_
Time spent decompressing bytes.
Definition: hdfs-scanner.h:208
DecodeAvroDataFn codegend_decode_avro_data_
The codegen'd version of DecodeAvroData() if available, NULL otherwise.
bool ReadBytes(int64_t length, uint8_t **buf, Status *, bool peek=false)
ScopedAvroSchemaT schema
The record field schema from the file.
llvm::Type * void_type()
Definition: llvm-codegen.h:394
void ReadAvroString(PrimitiveType type, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
bool is_compressed
true if the file is compressed
Status ResolveSchemas(const avro_schema_t &table_root, const avro_schema_t &file_root)
llvm::LLVMContext & context()
Definition: llvm-codegen.h:214
llvm::Function * OptimizeFunctionWithExprs(llvm::Function *fn)
static const std::string AVRO_NULL_CODEC
Supported codecs, as they appear in the metadata.
const std::string & avro_schema() const
Definition: descriptors.h:234
std::vector< SchemaElement > schema
List of SchemaElements corresponding to the fields of the file schema.
static const std::string AVRO_CODEC_KEY
void IncNumScannersCodegenDisabled()
static const std::string AVRO_DEFLATE_CODEC
ScannerContext::Stream * stream_
The first stream for context_.
Definition: hdfs-scanner.h:150
static const uint TRAILING_CHECKSUM_LEN
Definition: decompress.h:74
const TupleDescriptor * tuple_desc()
AvroFileHeader * avro_header_
llvm::PointerType * ptr_type()
Definition: llvm-codegen.h:393
RuntimeProfile::Counter * materialize_tuple_timer() const
Definition: scan-node.h:104
bool ReadUnionType(int null_union_position, uint8_t **data)