15 #include <boost/assign/list_of.hpp>
20 #include "gen-cpp/Descriptors_types.h"
30 using namespace impala;
36 :
Codec(mem_pool, reuse_buffer),
37 is_deflate_(is_deflate) {
49 if ((ret = inflateInit2(&
stream_, window_bits)) != Z_OK) {
50 return Status(
"zlib inflateInit failed: " +
string(
stream_.msg));
62 ss <<
"next_in=" << (
void*)
stream_.next_in;
63 ss <<
" avail_in=" <<
stream_.avail_in;
64 ss <<
" total_in=" <<
stream_.total_in;
65 ss <<
" next_out=" << (
void*)
stream_.next_out;
66 ss <<
" avail_out=" <<
stream_.avail_out;
67 ss <<
" total_out=" <<
stream_.total_out;
72 int64_t* input_bytes_read, int64_t* output_length, uint8_t** output,
bool* eos) {
79 *input_bytes_read = 0;
82 stream_.next_in =
const_cast<Bytef*
>(
reinterpret_cast<const Bytef*
>(input));
83 stream_.avail_in = input_length;
84 stream_.next_out =
reinterpret_cast<Bytef*
>(*output);
85 stream_.avail_out = *output_length;
88 int ret = inflate(&
stream_, Z_SYNC_FLUSH);
89 if (ret != Z_OK && ret != Z_STREAM_END && ret != Z_BUF_ERROR) {
91 ss <<
"GzipDecompressor failed, ret=" << ret;
98 *output_length = *output_length -
stream_.avail_out;
99 *input_bytes_read = input_length -
stream_.avail_in;
100 VLOG_ROW <<
"inflate() ret=" << ret <<
" consumed=" << *input_bytes_read
103 if (ret == Z_BUF_ERROR) {
107 DCHECK_EQ(0, *output_length);
108 DCHECK_EQ(0, *input_bytes_read);
109 }
else if (ret == Z_STREAM_END) {
111 if (inflateReset(&
stream_) != Z_OK) {
112 return Status(
"zlib inflateReset failed: " +
string(
stream_.msg));
119 const uint8_t* input, int64_t* output_length, uint8_t** output) {
120 if (output_preallocated && *output_length == 0) {
128 bool use_temp =
false;
129 if (!output_preallocated) {
134 return Status(
"Decompressor: block size is too big");
144 if (inflateReset(&
stream_) != Z_OK) {
145 return Status(
"zlib inflateReset failed: " +
string(
stream_.msg));
156 while (ret != Z_STREAM_END) {
157 stream_.next_in =
const_cast<Bytef*
>(
reinterpret_cast<const Bytef*
>(input));
158 stream_.avail_in = input_length;
159 stream_.next_out =
reinterpret_cast<Bytef*
>(*output);
160 stream_.avail_out = *output_length;
164 ret = inflate(&
stream_, Z_PARTIAL_FLUSH);
168 ret = inflate(&
stream_, Z_FINISH);
170 if (ret == Z_STREAM_END || ret != Z_OK)
break;
175 ss <<
"Too small a buffer passed to GzipDecompressor. InputLength="
176 << input_length <<
" OutputLength=" << *output_length;
185 ss <<
"GzipDecompressor: block size is too big: " <<
buffer_length_;
195 if (ret != Z_STREAM_END) {
197 ss <<
"GzipDecompressor failed: ";
204 *output_length = *output_length -
stream_.avail_out;
210 :
Codec(mem_pool, reuse_buffer) {
218 const uint8_t* input, int64_t* output_length, uint8_t** output) {
219 if (output_preallocated && *output_length == 0) {
224 bool use_temp =
false;
225 if (output_preallocated) {
232 return Status(
"Decompressor: block size is too big");
238 int ret = BZ_OUTBUFF_FULL;
240 while (ret == BZ_OUTBUFF_FULL) {
242 DCHECK(!output_preallocated);
246 return Status(
"Decompressor: block size is too big");
251 if ((ret = BZ2_bzBuffToBuffDecompress(reinterpret_cast<char*>(
out_buffer_), &outlen,
252 const_cast<char*>(reinterpret_cast<const char*>(input)),
253 static_cast<unsigned int>(input_length), 0, 0)) == BZ_OUTBUFF_FULL) {
254 if (output_preallocated) {
255 return Status(
"Too small a buffer passed to BzipDecompressor");
262 ss <<
"bzlib BZ2_bzBuffToBuffDecompressor failed: " << ret;
268 *output_length = outlen;
275 :
Codec(mem_pool, reuse_buffer) {
304 bool size_only, int64_t* output_len,
char* output) {
305 int64_t uncompressed_total_len = 0;
306 while (input_len > 0) {
307 uint32_t uncompressed_block_len = ReadWriteUtil::GetInt<uint32_t>(input);
308 input +=
sizeof(uint32_t);
309 input_len -=
sizeof(uint32_t);
312 if (uncompressed_total_len == 0) {
314 return Status(TErrorCode::SNAPPY_DECOMPRESS_INVALID_BLOCK_SIZE,
315 uncompressed_block_len);
321 int64_t remaining_output_size = *output_len - uncompressed_total_len;
322 DCHECK_GE(remaining_output_size, uncompressed_block_len);
325 while (uncompressed_block_len > 0) {
327 size_t compressed_len = ReadWriteUtil::GetInt<uint32_t>(input);
328 input +=
sizeof(uint32_t);
329 input_len -=
sizeof(uint32_t);
331 if (compressed_len == 0 || compressed_len > input_len) {
332 if (uncompressed_total_len == 0) {
333 return Status(TErrorCode::SNAPPY_DECOMPRESS_INVALID_COMPRESSED_LENGTH);
340 size_t uncompressed_len;
341 if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input),
342 input_len, &uncompressed_len)) {
343 if (uncompressed_total_len == 0) {
344 return Status(TErrorCode::SNAPPY_DECOMPRESS_UNCOMPRESSED_LENGTH_FAILED);
349 DCHECK_GT(uncompressed_len, 0);
353 if (!snappy::RawUncompress(reinterpret_cast<const char*>(input),
354 compressed_len, output)) {
355 return Status(TErrorCode::SNAPPY_DECOMPRESS_RAW_UNCOMPRESS_FAILED);
357 output += uncompressed_len;
360 input += compressed_len;
361 input_len -= compressed_len;
362 uncompressed_block_len -= uncompressed_len;
363 uncompressed_total_len += uncompressed_len;
368 *output_len = uncompressed_total_len;
369 }
else if (*output_len != uncompressed_total_len) {
370 return Status(TErrorCode::SNAPPY_DECOMPRESS_DECOMPRESS_SIZE_INCORRECT);
376 const uint8_t* input, int64_t* output_len, uint8_t** output) {
377 if (!output_preallocated) {
392 ss <<
"Decompressor: block size is too big. Data is likely corrupt. "
393 <<
"Size: " << *output_len;
397 char* out_ptr =
reinterpret_cast<char*
>(*output);
403 :
Codec(mem_pool, reuse_buffer) {
407 DCHECK(input != NULL);
409 if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input),
410 input_len, &result)) {
417 const uint8_t* input, int64_t* output_length, uint8_t** output) {
418 if (!output_preallocated) {
419 int64_t uncompressed_length =
MaxOutputLen(input_length, input);
420 if (uncompressed_length < 0)
return Status(
"Snappy: GetUncompressedLength failed");
424 return Status(
"Decompressor: block size is too big");
429 *output_length = uncompressed_length;
432 if (!snappy::RawUncompress(reinterpret_cast<const char*>(input),
433 static_cast<size_t>(input_length), reinterpret_cast<char*>(*output))) {
434 return Status(
"Snappy: RawUncompress failed");
441 :
Codec(mem_pool, reuse_buffer) {
445 DCHECK(input != NULL) <<
"Passed null input to Lz4 Decompressor";
450 const uint8_t* input, int64_t* output_length, uint8_t** output) {
451 DCHECK(output_preallocated) <<
"Lz4 Codec implementation must have allocated output";
454 if (LZ4_uncompress(reinterpret_cast<const char*>(input),
455 reinterpret_cast<char*>(*output), *output_length) != input_length) {
456 return Status(
"Lz4: uncompress failed");
virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t *input=NULL)
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t *input, int64_t *output_length, uint8_t **output)
Process a block of data, either compressing or decompressing it.
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t *input, int64_t *output_length, uint8_t **output)
Process a block of data, either compressing or decompressing it.
SnappyBlockDecompressor(MemPool *mem_pool, bool reuse_buffer)
BzipDecompressor(MemPool *mem_pool, bool reuse_buffer)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
virtual ~GzipDecompressor()
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t *input, int64_t *output_length, uint8_t **output)
Process a block of data, either compressing or decompressing it.
void AcquireData(MemPool *src, bool keep_current)
static Status SnappyBlockDecompress(int64_t input_len, const uint8_t *input, bool size_only, int64_t *output_len, char *output)
virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t *input=NULL)
bool reuse_buffer_
Can we reuse the output buffer or do we need to allocate on each call?
virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t *input=NULL)
Lz4Decompressor(MemPool *mem_pool=NULL, bool reuse_buffer=false)
static const int WINDOW_BITS
These are magic numbers from zlib.h. Not clear why they are not defined there.
std::string DebugStreamState() const
static const int DETECT_CODEC
static const int MAX_BLOCK_SIZE
virtual Status ProcessBlockStreaming(int64_t input_length, const uint8_t *input, int64_t *input_bytes_read, int64_t *output_length, uint8_t **output, bool *eos)
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t *input, int64_t *output_length, uint8_t **output)
Process a block of data, either compressing or decompressing it.
int64_t buffer_length_
Length of the output buffer.
virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t *input=NULL)
boost::scoped_ptr< MemPool > temp_memory_pool_
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t *input, int64_t *output_length, uint8_t **output)
Process a block of data, either compressing or decompressing it.
bool is_deflate_
If set assume deflate format, otherwise zlib or gzip.
const int64_t STREAM_GZIP_OUT_BUF_SIZE
virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t *input=NULL)
GzipDecompressor(MemPool *mem_pool=NULL, bool reuse_buffer=false, bool is_deflate=false)
virtual Status Init()
Initialize the codec. This should only be called once.
MemPool * memory_pool_
Pool to allocate the buffer to hold transformed data.
uint8_t * Allocate(int size)
SnappyDecompressor(MemPool *mem_pool=NULL, bool reuse_buffer=false)