Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hdfs-sequence-table-writer.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
17 #include "exec/exec-node.h"
18 #include "util/hdfs-util.h"
19 #include "util/uid-util.h"
20 #include "exprs/expr.h"
21 #include "exprs/expr-context.h"
22 #include "runtime/raw-value.h"
23 #include "runtime/row-batch.h"
24 #include "runtime/runtime-state.h"
25 #include "runtime/hdfs-fs-cache.h"
26 
27 #include <vector>
28 #include <hdfs.h>
29 #include <boost/scoped_ptr.hpp>
30 #include <stdlib.h>
31 #include <codec.h>
32 
33 #include "common/names.h"
34 
35 namespace impala {
36 
37 uint8_t HdfsSequenceTableWriter::SEQ6_CODE[4] = {'S', 'E', 'Q', 6};
38 const char* HdfsSequenceTableWriter::VALUE_CLASS_NAME = "org.apache.hadoop.io.Text";
39 
41  RuntimeState* state, OutputPartition* output,
42  const HdfsPartitionDescriptor* partition,
43  const HdfsTableDescriptor* table_desc,
44  const vector<ExprContext*>& output_exprs)
45  : HdfsTableWriter(parent, state, output, partition, table_desc, output_exprs),
46  mem_pool_(new MemPool(parent->mem_tracker())), compress_flag_(false),
47  unflushed_rows_(0), record_compression_(false) {
48  approx_block_size_ = 64 * 1024 * 1024;
50  field_delim_ = partition->field_delim();
51  escape_char_ = partition->escape_char();
52 }
53 
55  THdfsCompression::type codec = THdfsCompression::SNAPPY_BLOCKED;
56  const TQueryOptions& query_options = state_->query_options();
57  if (query_options.__isset.compression_codec) {
58  codec = query_options.compression_codec;
59  if (codec == THdfsCompression::SNAPPY) {
60  // Seq file (and in general things that use hadoop.io.codec) always
61  // mean snappy_blocked.
62  codec = THdfsCompression::SNAPPY_BLOCKED;
63  }
64  }
65  if (codec != THdfsCompression::NONE) {
66  compress_flag_ = true;
67  if (query_options.__isset.seq_compression_mode) {
69  query_options.seq_compression_mode == THdfsSeqCompressionMode::RECORD;
70  }
74  DCHECK(compressor_.get() != NULL);
75  }
76 
77  // create the Sync marker
78  string uuid = GenerateUUIDString();
79  uint8_t sync_neg1[20];
80 
81  ReadWriteUtil::PutInt(sync_neg1, static_cast<uint32_t>(-1));
82  DCHECK(uuid.size() == 16);
83  memcpy(sync_neg1 + sizeof(int32_t), uuid.data(), uuid.size());
84  neg1_sync_marker_ = string(reinterpret_cast<char*>(sync_neg1), 20);
85  sync_marker_ = uuid;
86 
87  return Status::OK;
88 }
89 
91  const vector<int32_t>& row_group_indices,
92  bool* new_file) {
93  int32_t limit;
94  if (row_group_indices.empty()) {
95  limit = batch->num_rows();
96  } else {
97  limit = row_group_indices.size();
98  }
100 
101  bool all_rows = row_group_indices.empty();
102  int num_non_partition_cols =
104  DCHECK_GE(output_expr_ctxs_.size(), num_non_partition_cols) << parent_->DebugString();
105 
106  {
108  if (all_rows) {
109  for (int row_idx = 0; row_idx < limit; ++row_idx) {
110  RETURN_IF_ERROR(ConsumeRow(batch->GetRow(row_idx)));
111  }
112  } else {
113  for (int row_idx = 0; row_idx < limit; ++row_idx) {
114  TupleRow* row = batch->GetRow(row_group_indices[row_idx]);
116  }
117  }
118  }
119 
120  if (!compress_flag_) {
122  }
123 
124  if (out_.Size() >= approx_block_size_) Flush();
125  *new_file = false;
126  return Status::OK;
127 }
128 
130  out_.WriteBytes(sizeof(SEQ6_CODE), reinterpret_cast<uint8_t*>(SEQ6_CODE));
131 
132  // Output an empty KeyClassName field
134 
135  // Setup to be correct value class
137  reinterpret_cast<const uint8_t*>(VALUE_CLASS_NAME));
138 
139  // Flag for if compression is used
141  // Only valid if compression is used. Indicates if block compression is used.
143 
144  // Output the name of our compression codec, parsed by readers
145  if (compress_flag_) {
146  out_.WriteText(codec_name_.size(),
147  reinterpret_cast<const uint8_t*>(codec_name_.data()));
148  }
149 
150  // Meta data is formated as an integer N followed by N*2 strings,
151  // which are key-value pairs. Hive does not write meta data, so neither does Impala
152  out_.WriteInt(0);
153 
154  // write the sync marker
155  out_.WriteBytes(sync_marker_.size(), sync_marker_.data());
156 
157  string text = out_.String();
158  RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(text.c_str()), text.size()));
159  out_.Clear();
160  return Status::OK;
161 }
162 
164  WriteStream header;
165  DCHECK(compress_flag_);
166 
167  // add a sync marker to start of the block
168  header.WriteBytes(sync_marker_.size(), sync_marker_.data());
169 
170  header.WriteVLong(unflushed_rows_);
171 
172  // Write Key Lengths and Key Values
173  header.WriteEmptyText();
174  header.WriteEmptyText();
175 
176  // Output an Empty string for value Lengths
177  header.WriteEmptyText();
178 
179  uint8_t *output;
180  int64_t output_length;
181  string text = out_.String();
182  {
184  RETURN_IF_ERROR(compressor_->ProcessBlock(false, text.size(),
185  reinterpret_cast<uint8_t*>(&text[0]), &output_length, &output));
186  }
187 
188  header.WriteVInt(output_length);
189  string head = header.String();
190  RETURN_IF_ERROR(Write(reinterpret_cast<const uint8_t*>(head.data()),
191  head.size()));
192  RETURN_IF_ERROR(Write(output, output_length));
193  return Status::OK;
194 }
195 
197  WriteStream* buf) {
198  for (int i = 0; i < str_val->len; ++i) {
199  if (str_val->ptr[i] == field_delim_ || str_val->ptr[i] == escape_char_) {
200  buf->WriteByte(escape_char_);
201  }
202  buf->WriteByte(str_val->ptr[i]);
203  }
204 }
205 
207  // TODO Unify with text table writer
208  int num_non_partition_cols =
210  DCHECK_GE(output_expr_ctxs_.size(), num_non_partition_cols) << parent_->DebugString();
211  for (int j = 0; j < num_non_partition_cols; ++j) {
212  void* value = output_expr_ctxs_[j]->GetValue(row);
213  if (value != NULL) {
214  if (output_expr_ctxs_[j]->root()->type().type == TYPE_STRING) {
215  WriteEscapedString(reinterpret_cast<const StringValue*>(value), &row_buf_);
216  } else {
217  string str;
218  output_expr_ctxs_[j]->PrintValue(value, &str);
219  buf->WriteBytes(str.size(), str.data());
220  }
221  } else {
222  // NULLs in hive are encoded based on the 'serialization.null.format' property.
223  const string& null_val = table_desc_->null_column_value();
224  buf->WriteBytes(null_val.size(), null_val.data());
225  }
226  // Append field delimiter.
227  if (j + 1 < num_non_partition_cols) {
228  buf->WriteByte(field_delim_);
229  }
230  }
231 }
232 
234  ++unflushed_rows_;
235  row_buf_.Clear();
237  // Output row for a block compressed sequence file
238  // write the length as a vlong and then write the contents
239  EncodeRow(row, &row_buf_);
242  return Status::OK;
243  }
244 
245  EncodeRow(row, &row_buf_);
246 
247  const uint8_t* value_bytes;
248  int64_t value_length;
249  if (record_compression_) {
250  // apply compression to row_buf_
251  // the length of the buffer must be prefixed to the buffer prior to compression
252  //
253  // TODO this incurs copy overhead to place the length in front of the
254  // buffer prior to compression. We may want to rewrite to avoid copying.
255  string text = row_buf_.String();
256  row_buf_.Clear();
257  // encoding as "Text" writes the length before the text
258  row_buf_.WriteText(text.size(), reinterpret_cast<const uint8_t*>(&text.data()[0]));
259  text = row_buf_.String();
260  uint8_t *tmp;
261  {
263  RETURN_IF_ERROR(compressor_->ProcessBlock(false, text.size(),
264  reinterpret_cast<uint8_t*>(&text[0]), &value_length, &tmp));
265  }
266  value_bytes = tmp;
267  } else {
268  value_length = row_buf_.Size();
269  value_bytes = reinterpret_cast<const uint8_t*>(row_buf_.String().data());
270  }
271 
272  int rec_len = value_length;
273  // if the record is compressed, the length is part of the compressed text
274  // if not, then we need to write the length (below) and account for it's size
275  if (!record_compression_) rec_len += ReadWriteUtil::VLongRequiredBytes(value_length);
276 
277  // Length of the record (incl. key length and value length)
278  out_.WriteInt(rec_len);
279 
280  // Write length of the key (Impala/Hive doesn't write a key)
281  out_.WriteInt(0);
282 
283  // if the record is compressed, the length is part of the compressed text
284  if (!record_compression_) out_.WriteVLong(value_length);
285 
286  // write out the value (possibly compressed)
287  out_.WriteBytes(value_length, value_bytes);
288  return Status::OK;
289 }
290 
292  if (unflushed_rows_ == 0) return Status::OK;
293 
295 
298  } else {
299  string out_str = out_.String();
301  Write(reinterpret_cast<const uint8_t*>(out_str.data()), out_str.size()));
302  }
303  out_.Clear();
304  unflushed_rows_ = 0;
305  return Status::OK;
306 }
307 
308 } // namespace impala
const std::string & null_column_value() const
Definition: descriptors.h:233
int num_rows() const
Definition: row-batch.h:215
int WriteBytes(int length, const uint8_t *buf)
Writes bytes to the buffer, returns the number of bytes written.
HdfsTableSink * parent_
Parent table sink object.
static Status CreateCompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *compressor)
int WriteEmptyText()
Writes an empty string to the buffer (encoded as 1 byte)
void EncodeRow(TupleRow *row, WriteStream *buf)
RuntimeState * state_
Runtime state.
std::string sync_marker_
16 byte sync marker (a uuid)
static const char * VALUE_CLASS_NAME
Name of java class to use when reading the values.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
Status Write(const char *data, int32_t len)
Write to the current hdfs file.
MemTracker * mem_tracker()
const HdfsTableDescriptor * table_desc_
Table descriptor of table to be written.
int WriteInt(uint32_t val)
#define COUNTER_ADD(c, v)
#define SCOPED_TIMER(c)
HdfsSequenceTableWriter(HdfsTableSink *parent, RuntimeState *state, OutputPartition *output, const HdfsPartitionDescriptor *partition, const HdfsTableDescriptor *table_desc, const std::vector< ExprContext * > &output_exprs)
std::string DebugString() const
uint64_t unflushed_rows_
number of rows consumed since last flush
Status ConsumeRow(TupleRow *row)
processes a single row, delegates to Compress or NoCompress ConsumeRow().
char escape_char_
Escape character for text encoding.
static void PutInt(uint8_t *buf, uint16_t integer)
const TQueryOptions & query_options() const
Definition: runtime-state.h:95
int WriteVInt(int32_t val)
static Status GetHadoopCodecClassName(THdfsCompression::type, std::string *out_name)
Returns the java class name for the given compression type.
Definition: codec.cc:59
int WriteBoolean(bool val)
int num_clustering_cols() const
Definition: descriptors.h:153
bool record_compression_
true if compression is applied on each record individually
int WriteByte(uint8_t val)
std::string codec_name_
name of codec, only set if compress_flag_
std::vector< ExprContext * > output_expr_ctxs_
Expressions that materialize output values.
string GenerateUUIDString()
generates a 16 byte UUID
Definition: uid-util.h:52
int WriteText(int32_t len, const uint8_t *buf)
Writes the length as a VLong follows by the byte string.
WriteStream row_buf_
Temporary Buffer for a single row.
Status WriteFileHeader()
writes the SEQ file header to HDFS
bool compress_flag_
true if compression is enabled
std::string neg1_sync_marker_
A -1 infront of the sync marker, used in decompressed formats.
virtual Status AppendRowBatch(RowBatch *rows, const std::vector< int32_t > &row_group_indices, bool *new_file)
RuntimeProfile::Counter * hdfs_write_timer()
virtual Status Init()
Do initialization of writer.
RuntimeProfile::Counter * rows_inserted_counter()
static const Status OK
Definition: status.h:87
RuntimeProfile::Counter * encode_timer()
void Consume(int64_t bytes)
Increases consumption of this tracker and its ancestors by 'bytes'.
Definition: mem-tracker.h:118
Metadata for a single partition inside an Hdfs table.
Definition: descriptors.h:177
Status WriteCompressedBlock()
writes the contents of out_ as a single compressed block
std::string String()
returns the contents of this stream as a string
WriteStream out_
buffer which holds accumulated output
boost::scoped_ptr< Codec > compressor_
the codec for compressing, only set if compress_flag_
int WriteVLong(int64_t val)
char field_delim_
Character delimiting fields.
void WriteEscapedString(const StringValue *str_val, WriteStream *buf)
writes the str_val to the buffer, escaping special characters
RuntimeProfile::Counter * compress_timer()
static uint8_t SEQ6_CODE[4]
Magic characters used to identify the file type.
static int VLongRequiredBytes(int64_t val)
returns size of the encoded long value, not including the 1 byte for length
MemPool * mem_pool_
memory pool used by codec to allocate output buffer