Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hdfs-text-table-writer.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 #include "exec/exec-node.h"
17 #include "exprs/expr.h"
18 #include "exprs/expr-context.h"
19 #include "runtime/raw-value.h"
20 #include "runtime/row-batch.h"
21 #include "runtime/runtime-state.h"
22 #include "runtime/hdfs-fs-cache.h"
23 #include "util/codec.h"
24 #include "util/compress.h"
25 #include "util/hdfs-util.h"
26 
27 #include <hdfs.h>
28 #include <stdlib.h>
29 
30 #include "common/names.h"
31 
32 // Hdfs block size for compressed text.
33 static const int64_t COMPRESSED_BLOCK_SIZE = 64 * 1024 * 1024;
34 
35 // Size to buffer before compression. We want this to be less than the block size
36 // (compressed text is not splittable).
37 static const int64_t COMPRESSED_BUFFERED_SIZE = 60 * 1024 * 1024;
38 
39 namespace impala {
40 
42  RuntimeState* state, OutputPartition* output,
43  const HdfsPartitionDescriptor* partition,
44  const HdfsTableDescriptor* table_desc,
45  const vector<ExprContext*>& output_expr_ctxs)
47  parent, state, output, partition, table_desc, output_expr_ctxs) {
48  tuple_delim_ = partition->line_delim();
49  field_delim_ = partition->field_delim();
50  escape_char_ = partition->escape_char();
52 
53  // The default stringstream output precision is not very high, making it impossible
54  // to properly output doubles (they get rounded to ints). Set a more reasonable
55  // precision.
57 }
58 
60  const TQueryOptions& query_options = state_->query_options();
61  codec_ = THdfsCompression::NONE;
62  if (query_options.__isset.compression_codec) {
63  codec_ = query_options.compression_codec;
64  if (codec_ == THdfsCompression::SNAPPY) {
65  // hadoop.io.codec always means SNAPPY_BLOCKED. Alias the two.
66  codec_ = THdfsCompression::SNAPPY_BLOCKED;
67  }
68  }
69 
70  if (codec_ != THdfsCompression::NONE) {
71  mem_pool_.reset(new MemPool(parent_->mem_tracker()));
73  mem_pool_.get(), true, codec_, &compressor_));
75  } else {
77  }
79  return Status::OK;
80 }
81 
84  if (mem_pool_.get() != NULL) mem_pool_->FreeAll();
85 }
86 
88  return compressor_.get() == NULL ? 0 : COMPRESSED_BLOCK_SIZE;
89 }
90 
92  if (compressor_.get() == NULL) return "";
93  return compressor_->file_extension();
94 }
95 
97  const vector<int32_t>& row_group_indices,
98  bool* new_file) {
99  int32_t limit;
100  if (row_group_indices.empty()) {
101  limit = batch->num_rows();
102  } else {
103  limit = row_group_indices.size();
104  }
106 
107  bool all_rows = row_group_indices.empty();
108  int num_non_partition_cols =
110  DCHECK_GE(output_expr_ctxs_.size(), num_non_partition_cols) << parent_->DebugString();
111 
112  {
114  for (int row_idx = 0; row_idx < limit; ++row_idx) {
115  TupleRow* current_row = all_rows ?
116  batch->GetRow(row_idx) : batch->GetRow(row_group_indices[row_idx]);
117 
118  // There might be a select expr for partition cols as well, but we shouldn't be
119  // writing their values to the row. Since there must be at least
120  // num_non_partition_cols select exprs, and we assume that by convention any
121  // partition col exprs are the last in output exprs, it's ok to just write
122  // the first num_non_partition_cols values.
123  for (int j = 0; j < num_non_partition_cols; ++j) {
124  void* value = output_expr_ctxs_[j]->GetValue(current_row);
125  if (value != NULL) {
126  const ColumnType& type = output_expr_ctxs_[j]->root()->type();
127  if (type.type == TYPE_CHAR) {
128  char* val_ptr = StringValue::CharSlotToPtr(value, type);
129  StringValue sv(val_ptr, StringValue::UnpaddedCharLength(val_ptr, type.len));
130  PrintEscaped(&sv);
131  } else if (type.IsVarLen()) {
132  PrintEscaped(reinterpret_cast<const StringValue*>(value));
133  } else {
134  output_expr_ctxs_[j]->PrintValue(value, &rowbatch_stringstream_);
135  }
136  } else {
137  // NULLs in hive are encoded based on the 'serialization.null.format' property.
139  }
140  // Append field delimiter.
141  if (j + 1 < num_non_partition_cols) {
143  }
144  }
145  // Append tuple delimiter.
147  ++output_->num_rows;
148  }
149  }
150 
151  *new_file = false;
152  if (rowbatch_stringstream_.tellp() >= flush_size_) {
154 
155  // If compressed, start a new file (compressed data is not splittable).
156  *new_file = compressor_.get() != NULL;
157  }
158 
159  return Status::OK;
160 }
161 
163  return Flush();
164 }
165 
167  string rowbatch_string = rowbatch_stringstream_.str();
168  rowbatch_stringstream_.str(string());
169  const uint8_t* uncompressed_data =
170  reinterpret_cast<const uint8_t*>(rowbatch_string.data());
171  int64_t uncompressed_len = rowbatch_string.size();
172  const uint8_t* data = uncompressed_data;
173  int64_t len = uncompressed_len;
174 
175  if (compressor_.get() != NULL) {
177  uint8_t* compressed_data;
178  int64_t compressed_len;
179  RETURN_IF_ERROR(compressor_->ProcessBlock(false,
180  uncompressed_len, uncompressed_data,
181  &compressed_len, &compressed_data));
182  data = compressed_data;
183  len = compressed_len;
184  }
185 
186  {
188  RETURN_IF_ERROR(Write(data, len));
189  }
190 
191  return Status::OK;
192 }
193 
194 inline void HdfsTextTableWriter::PrintEscaped(const StringValue* str_val) {
195  for (int i = 0; i < str_val->len; ++i) {
196  if (UNLIKELY(str_val->ptr[i] == field_delim_ || str_val->ptr[i] == escape_char_)) {
198  }
199  rowbatch_stringstream_ << str_val->ptr[i];
200  }
201 }
202 
203 }
static const int HDFS_FLUSH_WRITE_SIZE
const std::string & null_column_value() const
Definition: descriptors.h:233
HdfsTextTableWriter(HdfsTableSink *parent, RuntimeState *state, OutputPartition *output, const HdfsPartitionDescriptor *partition, const HdfsTableDescriptor *table_desc, const std::vector< ExprContext * > &output_expr_ctxs)
bool IsVarLen() const
Definition: types.h:172
virtual Status Init()
Do initialization of writer.
int num_rows() const
Definition: row-batch.h:215
HdfsTableSink * parent_
Parent table sink object.
static Status CreateCompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *compressor)
RuntimeState * state_
Runtime state.
static const int ASCII_PRECISION
Ascii output precision for double/float.
Definition: raw-value.h:40
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
Status Write(const char *data, int32_t len)
Write to the current hdfs file.
MemTracker * mem_tracker()
static const int64_t COMPRESSED_BLOCK_SIZE
virtual uint64_t default_block_size() const
const HdfsTableDescriptor * table_desc_
Table descriptor of table to be written.
#define COUNTER_ADD(c, v)
int64_t flush_size_
Size in rowbatch_stringstream_ before we call flush.
#define SCOPED_TIMER(c)
std::string DebugString() const
static int64_t UnpaddedCharLength(const char *cptr, int64_t len)
Returns number of characters in a char array (ignores trailing spaces)
PrimitiveType type
Definition: types.h:60
const TQueryOptions & query_options() const
Definition: runtime-state.h:95
virtual std::string file_extension() const
Returns the file extension for this writer.
int num_clustering_cols() const
Definition: descriptors.h:153
Status AppendRowBatch(RowBatch *current_row, const std::vector< int32_t > &row_group_indices, bool *new_file)
int len
Only set if type == TYPE_CHAR or type == TYPE_VARCHAR.
Definition: types.h:62
std::vector< ExprContext * > output_expr_ctxs_
Expressions that materialize output values.
void PrintEscaped(const StringValue *str_val)
boost::scoped_ptr< MemPool > mem_pool_
Memory pool to use with compressor_.
void Release(int64_t bytes)
Decreases consumption of this tracker and its ancestors by 'bytes'.
Definition: mem-tracker.h:209
int64_t num_rows
Records number of rows appended to the current file in this partition.
char escape_char_
Escape character.
#define UNLIKELY(expr)
Definition: compiler-util.h:33
RuntimeProfile::Counter * hdfs_write_timer()
std::stringstream rowbatch_stringstream_
RuntimeProfile::Counter * rows_inserted_counter()
static const Status OK
Definition: status.h:87
RuntimeProfile::Counter * encode_timer()
void Consume(int64_t bytes)
Increases consumption of this tracker and its ancestors by 'bytes'.
Definition: mem-tracker.h:118
static char * CharSlotToPtr(void *slot, const ColumnType &type)
Metadata for a single partition inside an Hdfs table.
Definition: descriptors.h:177
boost::scoped_ptr< Codec > compressor_
Compressor if compression is enabled.
char field_delim_
Character delimiting fields (to become slots).
static const int64_t COMPRESSED_BUFFERED_SIZE
char tuple_delim_
Character delimiting tuples.
virtual void Close()
Called once when this writer should cleanup any resources.
RuntimeProfile::Counter * compress_timer()
OutputPartition * output_
Structure describing partition written to by this writer.
THdfsCompression::type codec_
Compression codec.