45 const vector<ExprContext*>& output_expr_ctxs)
47 parent, state, output, partition, table_desc, output_expr_ctxs) {
61 codec_ = THdfsCompression::NONE;
62 if (query_options.__isset.compression_codec) {
63 codec_ = query_options.compression_codec;
64 if (
codec_ == THdfsCompression::SNAPPY) {
66 codec_ = THdfsCompression::SNAPPY_BLOCKED;
70 if (
codec_ != THdfsCompression::NONE) {
97 const vector<int32_t>& row_group_indices,
100 if (row_group_indices.empty()) {
103 limit = row_group_indices.size();
107 bool all_rows = row_group_indices.empty();
108 int num_non_partition_cols =
114 for (
int row_idx = 0; row_idx < limit; ++row_idx) {
116 batch->
GetRow(row_idx) : batch->
GetRow(row_group_indices[row_idx]);
123 for (
int j = 0; j < num_non_partition_cols; ++j) {
132 PrintEscaped(reinterpret_cast<const StringValue*>(value));
141 if (j + 1 < num_non_partition_cols) {
169 const uint8_t* uncompressed_data =
170 reinterpret_cast<const uint8_t*
>(rowbatch_string.data());
171 int64_t uncompressed_len = rowbatch_string.size();
172 const uint8_t* data = uncompressed_data;
173 int64_t len = uncompressed_len;
177 uint8_t* compressed_data;
178 int64_t compressed_len;
180 uncompressed_len, uncompressed_data,
181 &compressed_len, &compressed_data));
182 data = compressed_data;
183 len = compressed_len;
195 for (
int i = 0; i < str_val->
len; ++i) {
static const int HDFS_FLUSH_WRITE_SIZE
const std::string & null_column_value() const
HdfsTextTableWriter(HdfsTableSink *parent, RuntimeState *state, OutputPartition *output, const HdfsPartitionDescriptor *partition, const HdfsTableDescriptor *table_desc, const std::vector< ExprContext * > &output_expr_ctxs)
virtual Status Init()
Do initialization of writer.
HdfsTableSink * parent_
Parent table sink object.
static Status CreateCompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *compressor)
RuntimeState * state_
Runtime state.
static const int ASCII_PRECISION
Ascii output precision for double/float.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
TupleRow * GetRow(int row_idx)
Status Write(const char *data, int32_t len)
Write to the current hdfs file.
MemTracker * mem_tracker()
static const int64_t COMPRESSED_BLOCK_SIZE
virtual uint64_t default_block_size() const
const HdfsTableDescriptor * table_desc_
Table descriptor of table to be written.
#define COUNTER_ADD(c, v)
int64_t flush_size_
Size in rowbatch_stringstream_ before we call flush.
std::string DebugString() const
static int64_t UnpaddedCharLength(const char *cptr, int64_t len)
Returns number of characters in a char array (ignores trailing spaces)
const TQueryOptions & query_options() const
virtual std::string file_extension() const
Returns the file extension for this writer.
int num_clustering_cols() const
Status AppendRowBatch(RowBatch *current_row, const std::vector< int32_t > &row_group_indices, bool *new_file)
int len
Only set if type == TYPE_CHAR or type == TYPE_VARCHAR.
std::vector< ExprContext * > output_expr_ctxs_
Expressions that materialize output values.
void PrintEscaped(const StringValue *str_val)
boost::scoped_ptr< MemPool > mem_pool_
Memory pool to use with compressor_.
void Release(int64_t bytes)
Decreases consumption of this tracker and its ancestors by 'bytes'.
virtual Status Finalize()
int64_t num_rows
Records number of rows appended to the current file in this partition.
char escape_char_
Escape character.
RuntimeProfile::Counter * hdfs_write_timer()
std::stringstream rowbatch_stringstream_
RuntimeProfile::Counter * rows_inserted_counter()
RuntimeProfile::Counter * encode_timer()
void Consume(int64_t bytes)
Increases consumption of this tracker and its ancestors by 'bytes'.
static char * CharSlotToPtr(void *slot, const ColumnType &type)
Metadata for a single partition inside an Hdfs table.
boost::scoped_ptr< Codec > compressor_
Compressor if compression is enabled.
char field_delim_
Character delimiting fields (to become slots).
static const int64_t COMPRESSED_BUFFERED_SIZE
char tuple_delim_
Character delimiting tuples.
virtual void Close()
Called once when this writer should cleanup any resources.
RuntimeProfile::Counter * compress_timer()
OutputPartition * output_
Structure describing partition written to by this writer.
THdfsCompression::type codec_
Compression codec.