29 #include <boost/scoped_ptr.hpp>
44 const vector<ExprContext*>& output_exprs)
45 :
HdfsTableWriter(parent, state, output, partition, table_desc, output_exprs),
46 mem_pool_(new
MemPool(parent->mem_tracker())), compress_flag_(false),
47 unflushed_rows_(0), record_compression_(false) {
55 THdfsCompression::type codec = THdfsCompression::SNAPPY_BLOCKED;
57 if (query_options.__isset.compression_codec) {
58 codec = query_options.compression_codec;
59 if (codec == THdfsCompression::SNAPPY) {
62 codec = THdfsCompression::SNAPPY_BLOCKED;
65 if (codec != THdfsCompression::NONE) {
67 if (query_options.__isset.seq_compression_mode) {
69 query_options.seq_compression_mode == THdfsSeqCompressionMode::RECORD;
79 uint8_t sync_neg1[20];
82 DCHECK(uuid.size() == 16);
83 memcpy(sync_neg1 +
sizeof(int32_t), uuid.data(), uuid.size());
91 const vector<int32_t>& row_group_indices,
94 if (row_group_indices.empty()) {
97 limit = row_group_indices.size();
101 bool all_rows = row_group_indices.empty();
102 int num_non_partition_cols =
109 for (
int row_idx = 0; row_idx < limit; ++row_idx) {
113 for (
int row_idx = 0; row_idx < limit; ++row_idx) {
147 reinterpret_cast<const uint8_t*
>(
codec_name_.data()));
180 int64_t output_length;
185 reinterpret_cast<uint8_t*
>(&text[0]), &output_length, &output));
189 string head = header.
String();
198 for (
int i = 0; i < str_val->
len; ++i) {
208 int num_non_partition_cols =
211 for (
int j = 0; j < num_non_partition_cols; ++j) {
224 buf->
WriteBytes(null_val.size(), null_val.data());
227 if (j + 1 < num_non_partition_cols) {
247 const uint8_t* value_bytes;
248 int64_t value_length;
258 row_buf_.
WriteText(text.size(),
reinterpret_cast<const uint8_t*
>(&text.data()[0]));
264 reinterpret_cast<uint8_t*
>(&text[0]), &value_length, &tmp));
269 value_bytes =
reinterpret_cast<const uint8_t*
>(
row_buf_.
String().data());
272 int rec_len = value_length;
301 Write(reinterpret_cast<const uint8_t*>(out_str.data()), out_str.size()));
const std::string & null_column_value() const
int WriteBytes(int length, const uint8_t *buf)
Writes bytes to the buffer, returns the number of bytes written.
HdfsTableSink * parent_
Parent table sink object.
static Status CreateCompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *compressor)
int WriteEmptyText()
Writes an empty string to the buffer (encoded as 1 byte)
void EncodeRow(TupleRow *row, WriteStream *buf)
RuntimeState * state_
Runtime state.
std::string sync_marker_
16 byte sync marker (a uuid)
static const char * VALUE_CLASS_NAME
Name of java class to use when reading the values.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
TupleRow * GetRow(int row_idx)
Status Write(const char *data, int32_t len)
Write to the current hdfs file.
MemTracker * mem_tracker()
const HdfsTableDescriptor * table_desc_
Table descriptor of table to be written.
int WriteInt(uint32_t val)
#define COUNTER_ADD(c, v)
HdfsSequenceTableWriter(HdfsTableSink *parent, RuntimeState *state, OutputPartition *output, const HdfsPartitionDescriptor *partition, const HdfsTableDescriptor *table_desc, const std::vector< ExprContext * > &output_exprs)
std::string DebugString() const
uint64_t unflushed_rows_
number of rows consumed since last flush
Status ConsumeRow(TupleRow *row)
processes a single row, delegates to Compress or NoCompress ConsumeRow().
char escape_char_
Escape character for text encoding.
static void PutInt(uint8_t *buf, uint16_t integer)
const TQueryOptions & query_options() const
int WriteVInt(int32_t val)
static Status GetHadoopCodecClassName(THdfsCompression::type, std::string *out_name)
Returns the java class name for the given compression type.
int WriteBoolean(bool val)
int num_clustering_cols() const
bool record_compression_
true if compression is applied on each record individually
uint64_t approx_block_size_
int WriteByte(uint8_t val)
std::string codec_name_
name of codec, only set if compress_flag_
std::vector< ExprContext * > output_expr_ctxs_
Expressions that materialize output values.
string GenerateUUIDString()
generates a 16 byte UUID
int WriteText(int32_t len, const uint8_t *buf)
Writes the length as a VLong follows by the byte string.
WriteStream row_buf_
Temporary Buffer for a single row.
Status WriteFileHeader()
writes the SEQ file header to HDFS
bool compress_flag_
true if compression is enabled
std::string neg1_sync_marker_
A -1 infront of the sync marker, used in decompressed formats.
virtual Status AppendRowBatch(RowBatch *rows, const std::vector< int32_t > &row_group_indices, bool *new_file)
RuntimeProfile::Counter * hdfs_write_timer()
virtual Status Init()
Do initialization of writer.
RuntimeProfile::Counter * rows_inserted_counter()
RuntimeProfile::Counter * encode_timer()
void Consume(int64_t bytes)
Increases consumption of this tracker and its ancestors by 'bytes'.
Metadata for a single partition inside an Hdfs table.
Status WriteCompressedBlock()
writes the contents of out_ as a single compressed block
std::string String()
returns the contents of this stream as a string
WriteStream out_
buffer which holds accumulated output
boost::scoped_ptr< Codec > compressor_
the codec for compressing, only set if compress_flag_
int WriteVLong(int64_t val)
char field_delim_
Character delimiting fields.
void WriteEscapedString(const StringValue *str_val, WriteStream *buf)
writes the str_val to the buffer, escaping special characters
RuntimeProfile::Counter * compress_timer()
static uint8_t SEQ6_CODE[4]
Magic characters used to identify the file type.
static int VLongRequiredBytes(int64_t val)
returns size of the encoded long value, not including the 1 byte for length
MemPool * mem_pool_
memory pool used by codec to allocate output buffer