Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hdfs-avro-table-writer.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <vector>
18 #include <hdfs.h>
19 #include <boost/scoped_ptr.hpp>
20 #include <stdlib.h>
21 #include <codec.h>
22 #include <gutil/strings/substitute.h>
23 
24 #include "exec/exec-node.h"
25 #include "util/compress.h"
26 #include "util/hdfs-util.h"
27 #include "util/uid-util.h"
28 #include "exprs/expr.h"
29 #include "exprs/expr-context.h"
30 #include "runtime/raw-value.h"
31 #include "runtime/row-batch.h"
32 #include "runtime/runtime-state.h"
33 #include "runtime/hdfs-fs-cache.h"
34 #include "write-stream.inline.h"
35 
36 #include "common/names.h"
37 
38 using namespace strings;
39 using namespace impala;
40 
41 const uint8_t OBJ1[4] = {'O', 'b', 'j', 1};
42 const char* AVRO_SCHEMA_STR = "avro.schema";
43 const char* AVRO_CODEC_STR = "avro.codec";
44 const THdfsCompression::type AVRO_DEFAULT_CODEC = THdfsCompression::SNAPPY;
45 // Desired size of each Avro block (bytes); actual block size will vary +/- the
46 // size of a row. This is approximate size of the block before compression.
47 const int DEFAULT_AVRO_BLOCK_SIZE = 64 * 1024;
48 
49 HdfsAvroTableWriter::HdfsAvroTableWriter(HdfsTableSink* parent,
50  RuntimeState* state, OutputPartition* output,
51  const HdfsPartitionDescriptor* partition, const HdfsTableDescriptor* table_desc,
52  const vector<ExprContext*>& output_exprs) :
53  HdfsTableWriter(parent, state, output, partition, table_desc, output_exprs),
54  unflushed_rows_(0) {
55  mem_pool_.reset(new MemPool(parent->mem_tracker()));
56 }
57 
60  int num_non_partition_cols =
62  for (int j = 0; j < num_non_partition_cols; ++j) {
63  void* value = output_expr_ctxs_[j]->GetValue(row);
64  AppendField(output_expr_ctxs_[j]->root()->type(), value);
65  }
66 }
67 
68 inline void HdfsAvroTableWriter::AppendField(const ColumnType& type, const void* value) {
69  // Each avro field is written as union, which is a ZLong indicating the union
70  // field followed by the encoded value. Impala/Hive always stores values as
71  // a union of [ColumnType, NULL].
72  // TODO check if we want to support [NULL, ColumnType] union
73 
74  if (value == NULL) {
75  // indicate the second field of the union
76  out_.WriteZLong(1);
77  // No bytes are written for a null value.
78  return;
79  }
80 
81  // indicate that we are using the first field of the union
82  out_.WriteZLong(0);
83 
84  switch (type.type) {
85  case TYPE_BOOLEAN:
86  out_.WriteByte(*reinterpret_cast<const char*>(value));
87  break;
88  case TYPE_TINYINT:
89  out_.WriteZInt(*reinterpret_cast<const int8_t*>(value));
90  break;
91  case TYPE_SMALLINT:
92  out_.WriteZInt(*reinterpret_cast<const int16_t*>(value));
93  break;
94  case TYPE_INT:
95  out_.WriteZInt(*reinterpret_cast<const int32_t*>(value));
96  break;
97  case TYPE_BIGINT:
98  out_.WriteZLong(*reinterpret_cast<const int64_t*>(value));
99  break;
100  case TYPE_FLOAT:
101  out_.WriteBytes(4, reinterpret_cast<const char*>(value));
102  break;
103  case TYPE_DOUBLE:
104  out_.WriteBytes(8, reinterpret_cast<const char*>(value));
105  break;
106  case TYPE_STRING: {
107  const StringValue& sv = *reinterpret_cast<const StringValue*>(value);
108  out_.WriteZLong(sv.len);
109  out_.WriteBytes(sv.len, sv.ptr);
110  break;
111  }
112  case TYPE_DECIMAL: {
113  int byte_size = ColumnType::GetDecimalByteSize(type.precision);
114  out_.WriteZLong(byte_size);
115 #if __BYTE_ORDER == __LITTLE_ENDIAN
116  char tmp[16];
117  BitUtil::ByteSwap(tmp, value, byte_size);
118  out_.WriteBytes(byte_size, tmp);
119 #else
120  out_.WriteBytes(byte_size, reinterpret_cast<const char*>(value));
121 #endif
122  break;
123  }
124  case TYPE_TIMESTAMP:
125  case TYPE_BINARY:
126  case INVALID_TYPE:
127  case TYPE_NULL:
128  case TYPE_DATE:
129  case TYPE_DATETIME:
130  default:
131  DCHECK(false);
132  }
133 }
134 
136  // create the Sync marker
138 
139  THdfsCompression::type codec = AVRO_DEFAULT_CODEC;
140  if (state_->query_options().__isset.compression_codec) {
141  codec = state_->query_options().compression_codec;
142  }
143 
144  // sets codec_name_ and compressor_
145  codec_type_ = codec;
146  switch (codec) {
147  case THdfsCompression::SNAPPY:
148  codec_name_ = "snappy";
149  break;
150  case THdfsCompression::DEFLATE:
151  codec_name_ = "deflate";
152  break;
153  case THdfsCompression::NONE:
154  codec_name_ = "null";
155  return Status::OK;
156  default:
157  const char* name = _THdfsCompression_VALUES_TO_NAMES.find(codec)->second;
158  return Status(Substitute(
159  "Avro only supports NONE, DEFLATE, and SNAPPY codecs; unsupported codec $0",
160  name));
161  }
163  DCHECK(compressor_.get() != NULL);
164 
165  return Status::OK;
166 }
167 
169  const vector<int32_t>& row_group_indices, bool* new_file) {
170  int32_t limit;
171  bool all_rows = row_group_indices.empty();
172  if (all_rows) {
173  limit = batch->num_rows();
174  } else {
175  limit = row_group_indices.size();
176  }
178 
179  {
181  for (int row_idx = 0; row_idx < limit; ++row_idx) {
182  TupleRow* row = all_rows ?
183  batch->GetRow(row_idx) : batch->GetRow(row_group_indices[row_idx]);
184  ConsumeRow(row);
185  }
186  }
187 
189  *new_file = false;
190  return Status::OK;
191 }
192 
194  out_.Clear();
195  out_.WriteBytes(4, reinterpret_cast<const uint8_t*>(OBJ1));
196 
197  // Write 'File Metadata' as an encoded avro map
198  // number of key/value pairs in the map
199  out_.WriteZLong(2);
200 
201  // Schema information
204  const string& avro_schema = table_desc_->avro_schema();
205  out_.WriteZLong(avro_schema.size());
206  out_.WriteBytes(avro_schema.size(), avro_schema.data());
207 
208  // codec information
209  out_.WriteZLong(strlen(AVRO_CODEC_STR));
211  out_.WriteZLong(codec_name_.size());
212  out_.WriteBytes(codec_name_.size(), codec_name_.data());
213 
214  // Write end of map marker
215  out_.WriteZLong(0);
216 
217  out_.WriteBytes(sync_marker_.size(), sync_marker_.data());
218 
219  const string& text = out_.String();
220  RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(text.c_str()),
221  text.size()));
222  out_.Clear();
223  return Status::OK;
224 }
225 
227  if (unflushed_rows_ == 0) return Status::OK;
228 
229  WriteStream header;
230  // 1. Count of objects in this block
231  header.WriteZLong(unflushed_rows_);
232 
233  const uint8_t* output;
234  int64_t output_length;
235  // Snappy format requires a CRC after the compressed data
236  uint32_t crc;
237  const string& text = out_.String();
238 
239  if (codec_type_ != THdfsCompression::NONE) {
241  uint8_t* temp;
242  RETURN_IF_ERROR(compressor_->ProcessBlock(false, text.size(),
243  reinterpret_cast<const uint8_t*>(text.data()), &output_length, &temp));
244  output = temp;
245  if (codec_type_ == THdfsCompression::SNAPPY) {
247  text.size(), reinterpret_cast<const uint8_t*>(text.data()));
248  }
249  } else {
250  output = reinterpret_cast<const uint8_t*>(text.data());
251  output_length = out_.Size();
252  }
253 
254  // 2. length of serialized objects
255  if (codec_type_ == THdfsCompression::SNAPPY) {
256  // + 4 for the CRC checksum at the end of the compressed block
257  header.WriteZLong(output_length + 4);
258  } else {
259  header.WriteZLong(output_length);
260  }
261 
262  const string& head = header.String();
263  {
265  // Flush (1) and (2) to HDFS
267  Write(reinterpret_cast<const uint8_t*>(head.data()), head.size()));
268  // 3. serialized objects
269  RETURN_IF_ERROR(Write(output, output_length));
270 
271  // Write CRC checksum
272  if (codec_type_ == THdfsCompression::SNAPPY) {
273  RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(&crc), sizeof(uint32_t)));
274  }
275  }
276 
277  // 4. sync marker
279  Write(reinterpret_cast<const uint8_t*>(sync_marker_.data()), sync_marker_.size()));
280 
281  out_.Clear();
282  unflushed_rows_ = 0;
283  return Status::OK;
284 }
const int DEFAULT_AVRO_BLOCK_SIZE
int WriteZLong(int64_t val)
std::string sync_marker_
16 byte sync marker (a uuid)
int num_rows() const
Definition: row-batch.h:215
int WriteBytes(int length, const uint8_t *buf)
Writes bytes to the buffer, returns the number of bytes written.
HdfsTableSink * parent_
Parent table sink object.
static Status CreateCompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *compressor)
const char * AVRO_CODEC_STR
RuntimeState * state_
Runtime state.
int precision
Only set if type == TYPE_DECIMAL.
Definition: types.h:68
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
static int GetDecimalByteSize(int precision)
Definition: types.h:225
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
WriteStream out_
Buffer which holds accumulated output.
Status Write(const char *data, int32_t len)
Write to the current hdfs file.
MemTracker * mem_tracker()
const HdfsTableDescriptor * table_desc_
Table descriptor of table to be written.
#define COUNTER_ADD(c, v)
#define SCOPED_TIMER(c)
static int64_t ByteSwap(int64_t value)
Swaps the byte order (i.e. endianess)
Definition: bit-util.h:149
PrimitiveType type
Definition: types.h:60
virtual Status AppendRowBatch(RowBatch *rows, const std::vector< int32_t > &row_group_indices, bool *new_file)
boost::scoped_ptr< MemPool > mem_pool_
const TQueryOptions & query_options() const
Definition: runtime-state.h:95
int WriteZInt(int32_t val)
Writes a zig-zag encoded integer.
const char * AVRO_SCHEMA_STR
int num_clustering_cols() const
Definition: descriptors.h:153
int WriteByte(uint8_t val)
string GenerateUUIDString()
generates a 16 byte UUID
Definition: uid-util.h:52
std::vector< ExprContext * > output_expr_ctxs_
Expressions that materialize output values.
const THdfsCompression::type AVRO_DEFAULT_CODEC
std::string codec_name_
Name of codec, only set if codec_type_ != NONE.
THdfsCompression::type codec_type_
Type of the codec, will be NONE if no compression is used.
Status WriteFileHeader()
Writes the Avro file header to HDFS.
RuntimeProfile::Counter * hdfs_write_timer()
RuntimeProfile::Counter * rows_inserted_counter()
const uint8_t OBJ1[4]
static const Status OK
Definition: status.h:87
RuntimeProfile::Counter * encode_timer()
Metadata for a single partition inside an Hdfs table.
Definition: descriptors.h:177
virtual Status Init()
Do initialization of writer.
std::string String()
returns the contents of this stream as a string
boost::scoped_ptr< Codec > compressor_
The codec for compressing, only set if codec_type_ != NONE.
uint64_t unflushed_rows_
Number of rows consumed since last flush.
RuntimeProfile::Counter * compress_timer()
string name
Definition: cpu-info.cc:50
static uint32_t ComputeChecksum(int64_t input_len, const uint8_t *input)
Definition: compress.cc:273
void AppendField(const ColumnType &type, const void *value)
Adds an encoded field to out_.
const std::string & avro_schema() const
Definition: descriptors.h:234
void ConsumeRow(TupleRow *row)
Processes a single row, appending to out_.