19 #include <boost/scoped_ptr.hpp>
22 #include <gutil/strings/substitute.h>
38 using namespace strings;
39 using namespace impala;
41 const uint8_t
OBJ1[4] = {
'O',
'b',
'j', 1};
52 const vector<ExprContext*>& output_exprs) :
53 HdfsTableWriter(parent, state, output, partition, table_desc, output_exprs),
60 int num_non_partition_cols =
62 for (
int j = 0; j < num_non_partition_cols; ++j) {
115 #if __BYTE_ORDER == __LITTLE_ENDIAN
147 case THdfsCompression::SNAPPY:
150 case THdfsCompression::DEFLATE:
153 case THdfsCompression::NONE:
157 const char*
name = _THdfsCompression_VALUES_TO_NAMES.find(codec)->second;
159 "Avro only supports NONE, DEFLATE, and SNAPPY codecs; unsupported codec $0",
169 const vector<int32_t>& row_group_indices,
bool* new_file) {
171 bool all_rows = row_group_indices.empty();
175 limit = row_group_indices.size();
181 for (
int row_idx = 0; row_idx < limit; ++row_idx) {
183 batch->
GetRow(row_idx) : batch->
GetRow(row_group_indices[row_idx]);
233 const uint8_t* output;
234 int64_t output_length;
243 reinterpret_cast<const uint8_t*
>(text.data()), &output_length, &temp));
247 text.size(),
reinterpret_cast<const uint8_t*
>(text.data()));
250 output =
reinterpret_cast<const uint8_t*
>(text.data());
262 const string& head = header.
String();
267 Write(reinterpret_cast<const uint8_t*>(head.data()), head.size()));
const int DEFAULT_AVRO_BLOCK_SIZE
int WriteZLong(int64_t val)
std::string sync_marker_
16 byte sync marker (a uuid)
int WriteBytes(int length, const uint8_t *buf)
Writes bytes to the buffer, returns the number of bytes written.
HdfsTableSink * parent_
Parent table sink object.
static Status CreateCompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *compressor)
const char * AVRO_CODEC_STR
RuntimeState * state_
Runtime state.
int precision
Only set if type == TYPE_DECIMAL.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
static int GetDecimalByteSize(int precision)
TupleRow * GetRow(int row_idx)
WriteStream out_
Buffer which holds accumulated output.
Status Write(const char *data, int32_t len)
Write to the current hdfs file.
MemTracker * mem_tracker()
const HdfsTableDescriptor * table_desc_
Table descriptor of table to be written.
#define COUNTER_ADD(c, v)
static int64_t ByteSwap(int64_t value)
Swaps the byte order (i.e. endianess)
virtual Status AppendRowBatch(RowBatch *rows, const std::vector< int32_t > &row_group_indices, bool *new_file)
boost::scoped_ptr< MemPool > mem_pool_
const TQueryOptions & query_options() const
int WriteZInt(int32_t val)
Writes a zig-zag encoded integer.
const char * AVRO_SCHEMA_STR
int num_clustering_cols() const
int WriteByte(uint8_t val)
string GenerateUUIDString()
generates a 16 byte UUID
std::vector< ExprContext * > output_expr_ctxs_
Expressions that materialize output values.
const THdfsCompression::type AVRO_DEFAULT_CODEC
std::string codec_name_
Name of codec, only set if codec_type_ != NONE.
THdfsCompression::type codec_type_
Type of the codec, will be NONE if no compression is used.
Status WriteFileHeader()
Writes the Avro file header to HDFS.
RuntimeProfile::Counter * hdfs_write_timer()
RuntimeProfile::Counter * rows_inserted_counter()
RuntimeProfile::Counter * encode_timer()
Metadata for a single partition inside an Hdfs table.
virtual Status Init()
Do initialization of writer.
std::string String()
returns the contents of this stream as a string
boost::scoped_ptr< Codec > compressor_
The codec for compressing, only set if codec_type_ != NONE.
uint64_t unflushed_rows_
Number of rows consumed since last flush.
RuntimeProfile::Counter * compress_timer()
static uint32_t ComputeChecksum(int64_t input_len, const uint8_t *input)
void AppendField(const ColumnType &type, const void *value)
Adds an encoded field to out_.
const std::string & avro_schema() const
void ConsumeRow(TupleRow *row)
Processes a single row, appending to out_.