Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hdfs-avro-scanner-ir.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "exec/hdfs-avro-scanner.h"
16 #include "exec/read-write-util.h"
17 #include <algorithm>
18 
19 using namespace impala;
20 
21 // Functions in this file are cross-compiled to IR with clang.
22 
23 int HdfsAvroScanner::DecodeAvroData(int max_tuples, MemPool* pool, uint8_t** data,
24  Tuple* tuple, TupleRow* tuple_row) {
25  int num_to_commit = 0;
26  for (int i = 0; i < max_tuples; ++i) {
27  InitTuple(template_tuple_, tuple);
28  MaterializeTuple(pool, data, tuple);
29  tuple_row->SetTuple(scan_node_->tuple_idx(), tuple);
30  if (EvalConjuncts(tuple_row)) {
31  ++num_to_commit;
32  tuple_row = next_row(tuple_row);
33  tuple = next_tuple(tuple);
34  }
35  }
36  return num_to_commit;
37 }
38 
39 bool HdfsAvroScanner::ReadUnionType(int null_union_position, uint8_t** data) {
40  DCHECK(null_union_position == 0 || null_union_position == 1);
41  int8_t union_position = **data;
42  // Union position is varlen zig-zag encoded
43  DCHECK(union_position == 0 || union_position == 2);
44  // "Decode" zig-zag encoding
45  if (union_position == 2) union_position = 1;
46  *data += 1;
47  return union_position != null_union_position;
48 }
49 
50 void HdfsAvroScanner::ReadAvroBoolean(PrimitiveType type, uint8_t** data, bool write_slot,
51  void* slot, MemPool* pool) {
52  if (write_slot) {
53  DCHECK_EQ(type, TYPE_BOOLEAN);
54  *reinterpret_cast<bool*>(slot) = *reinterpret_cast<bool*>(*data);
55  }
56  *data += 1;
57 }
58 
59 void HdfsAvroScanner::ReadAvroInt32(PrimitiveType type, uint8_t** data, bool write_slot,
60  void* slot, MemPool* pool) {
61  int32_t val = ReadWriteUtil::ReadZInt(data);
62  if (write_slot) {
63  if (type == TYPE_INT) {
64  *reinterpret_cast<int32_t*>(slot) = val;
65  } else if (type == TYPE_BIGINT) {
66  *reinterpret_cast<int64_t*>(slot) = val;
67  } else if (type == TYPE_FLOAT) {
68  *reinterpret_cast<float*>(slot) = val;
69  } else if (type == TYPE_DOUBLE) {
70  *reinterpret_cast<double*>(slot) = val;
71  } else {
72  DCHECK(false);
73  }
74  }
75 }
76 
77 void HdfsAvroScanner::ReadAvroInt64(PrimitiveType type, uint8_t** data, bool write_slot,
78  void* slot, MemPool* pool) {
79  int64_t val = ReadWriteUtil::ReadZLong(data);
80  if (write_slot) {
81  if (type == TYPE_BIGINT) {
82  *reinterpret_cast<int64_t*>(slot) = val;
83  } else if (type == TYPE_FLOAT) {
84  *reinterpret_cast<float*>(slot) = val;
85  } else if (type == TYPE_DOUBLE) {
86  *reinterpret_cast<double*>(slot) = val;
87  } else {
88  DCHECK(false);
89  }
90  }
91 }
92 
93 void HdfsAvroScanner::ReadAvroFloat(PrimitiveType type, uint8_t** data, bool write_slot,
94  void* slot, MemPool* pool) {
95  if (write_slot) {
96  float val = *reinterpret_cast<float*>(*data);
97  if (type == TYPE_FLOAT) {
98  *reinterpret_cast<float*>(slot) = val;
99  } else if (type == TYPE_DOUBLE) {
100  *reinterpret_cast<double*>(slot) = val;
101  } else {
102  DCHECK(false);
103  }
104  }
105  *data += 4;
106 }
107 
108 void HdfsAvroScanner::ReadAvroDouble(PrimitiveType type, uint8_t** data, bool write_slot,
109  void* slot, MemPool* pool) {
110  if (write_slot) {
111  DCHECK_EQ(type, TYPE_DOUBLE);
112  *reinterpret_cast<double*>(slot) = *reinterpret_cast<double*>(*data);
113  }
114  *data += 8;
115 }
116 
117 void HdfsAvroScanner::ReadAvroVarchar(PrimitiveType type, int max_len, uint8_t** data,
118  bool write_slot, void* slot, MemPool* pool) {
119  int64_t len = ReadWriteUtil::ReadZLong(data);
120  if (write_slot) {
121  DCHECK(type == TYPE_VARCHAR);
122  StringValue* sv = reinterpret_cast<StringValue*>(slot);
123  int str_len = std::min(static_cast<int>(len), max_len);
124  DCHECK(str_len >= 0);
125  sv->len = str_len;
126  sv->ptr = reinterpret_cast<char*>(*data);
127  }
128  *data += len;
129 }
130 
131 void HdfsAvroScanner::ReadAvroChar(PrimitiveType type, int max_len, uint8_t** data,
132  bool write_slot, void* slot, MemPool* pool) {
133  int64_t len = ReadWriteUtil::ReadZLong(data);
134  if (write_slot) {
135  DCHECK(type == TYPE_CHAR);
136  ColumnType ctype = ColumnType::CreateCharType(max_len);
137  int str_len = std::min(static_cast<int>(len), max_len);
138  if (ctype.IsVarLen()) {
139  StringValue* sv = reinterpret_cast<StringValue*>(slot);
140  sv->ptr = reinterpret_cast<char*>(pool->Allocate(max_len));
141  sv->len = max_len;
142  memcpy(sv->ptr, *data, str_len);
143  StringValue::PadWithSpaces(sv->ptr, max_len, str_len);
144  } else {
145  memcpy(slot, *data, str_len);
146  StringValue::PadWithSpaces(reinterpret_cast<char*>(slot), max_len, str_len);
147  }
148  }
149  *data += len;
150 }
151 
153  bool write_slot, void* slot, MemPool* pool) {
154  int64_t len = ReadWriteUtil::ReadZLong(data);
155  if (write_slot) {
156  DCHECK(type == TYPE_STRING);
157  StringValue* sv = reinterpret_cast<StringValue*>(slot);
158  sv->len = len;
159  sv->ptr = reinterpret_cast<char*>(*data);
160  }
161  *data += len;
162 }
163 
164 void HdfsAvroScanner::ReadAvroDecimal(int slot_byte_size, uint8_t** data,
165  bool write_slot, void* slot, MemPool* pool) {
166  int64_t len = ReadWriteUtil::ReadZLong(data);
167  if (write_slot) {
168  // Decimals are encoded as big-endian integers. Copy the decimal into the most
169  // significant bytes and then shift down to the correct position to sign-extend the
170  // decimal.
171  DCHECK_LE(len, slot_byte_size);
172  int bytes_to_fill = slot_byte_size - len;
173 #if __BYTE_ORDER == __LITTLE_ENDIAN
174  BitUtil::ByteSwap(reinterpret_cast<uint8_t*>(slot) + bytes_to_fill, *data, len);
175 #else
176  memcpy(slot, *data, len);
177 #endif
178  switch (slot_byte_size) {
179  case 4: {
180  int32_t* decimal = reinterpret_cast<int32_t*>(slot);
181  *decimal >>= bytes_to_fill * 8;
182  break;
183  }
184  case 8: {
185  int64_t* decimal = reinterpret_cast<int64_t*>(slot);
186  *decimal >>= bytes_to_fill * 8;
187  break;
188  }
189  case 16: {
190  int128_t* decimal = reinterpret_cast<int128_t*>(slot);
191  *decimal >>= bytes_to_fill * 8;
192  break;
193  }
194  default:
195  DCHECK(false) << "Decimal slots can't be this size: " << slot_byte_size;
196  }
197  }
198  *data += len;
199 }
bool IsVarLen() const
Definition: types.h:172
void ReadAvroVarchar(PrimitiveType type, int max_len, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
HdfsScanNode * scan_node_
The scan node that started this scanner.
Definition: hdfs-scanner.h:141
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
void ReadAvroBoolean(PrimitiveType type, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
static void PadWithSpaces(char *cptr, int64_t cptr_len, int64_t num_chars)
TupleRow * next_row(TupleRow *r) const
Definition: hdfs-scanner.h:368
static int64_t ByteSwap(int64_t value)
Swaps the byte order (i.e. endianess)
Definition: bit-util.h:149
void ReadAvroDecimal(int slot_byte_size, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
void MaterializeTuple(MemPool *pool, uint8_t **data, Tuple *tuple)
Materializes a single tuple from serialized record data.
void InitTuple(Tuple *template_tuple, Tuple *tuple)
Definition: hdfs-scanner.h:355
static int64_t ReadZLong(uint8_t **buf)
PrimitiveType
Definition: types.h:27
void ReadAvroDouble(PrimitiveType type, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
static ColumnType CreateCharType(int len)
Definition: types.h:85
ObjectPool pool
int DecodeAvroData(int max_tuples, MemPool *pool, uint8_t **data, Tuple *tuple, TupleRow *tuple_row)
static int32_t ReadZInt(uint8_t **buf)
Read a zig-zag encoded int.
void ReadAvroChar(PrimitiveType type, int max_len, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
bool IR_ALWAYS_INLINE EvalConjuncts(TupleRow *row)
Definition: hdfs-scanner.h:266
void SetTuple(int tuple_idx, Tuple *tuple)
Definition: tuple-row.h:34
void ReadAvroFloat(PrimitiveType type, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
void ReadAvroInt64(PrimitiveType type, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
void ReadAvroInt32(PrimitiveType type, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
int tuple_idx() const
void ReadAvroString(PrimitiveType type, uint8_t **data, bool write_slot, void *slot, MemPool *pool)
uint8_t * Allocate(int size)
Definition: mem-pool.h:92
Tuple * next_tuple(Tuple *t) const
Definition: hdfs-scanner.h:363
__int128_t int128_t
We use the c++ int128_t type. This is stored using 16 bytes and very performant.
bool ReadUnionType(int null_union_position, uint8_t **data)