25 #include <boost/crc.hpp>
26 #include <gutil/strings/substitute.h>
30 using boost::crc_32_type;
31 using namespace impala;
32 using namespace strings;
35 :
Codec(mem_pool, reuse_buffer),
49 window_bits = -window_bits;
53 if ((ret = deflateInit2(&
stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
54 window_bits, 9, Z_DEFAULT_STRATEGY )) != Z_OK) {
55 return Status(
"zlib deflateInit failed: " +
string(
stream_.msg));
62 #if !defined ZLIB_VERNUM || ZLIB_VERNUM <= 0x1230
76 return deflateBound(&
stream_, input_len) + 32;
78 return deflateBound(&
stream_, input_len);
83 int64_t* output_length, uint8_t* output) {
85 stream_.next_in =
const_cast<Bytef*
>(
reinterpret_cast<const Bytef*
>(input));
86 stream_.avail_in = input_length;
87 stream_.next_out =
reinterpret_cast<Bytef*
>(output);
88 stream_.avail_out = *output_length;
91 if ((ret = deflate(&
stream_, Z_FINISH)) != Z_STREAM_END) {
94 return Status(Substitute(
"zlib deflate failed: output buffer ($0) is too small.",
95 output_length).c_str());
98 ss <<
"zlib deflate failed: " <<
stream_.msg;
102 *output_length = *output_length -
stream_.avail_out;
104 if (deflateReset(&
stream_) != Z_OK) {
105 return Status(
"zlib deflateReset failed: " +
string(
stream_.msg));
111 int64_t input_length,
const uint8_t* input, int64_t* output_length,
113 DCHECK(!output_preallocated || (output_preallocated && *output_length > 0));
114 int64_t max_compressed_len =
MaxOutputLen(input_length);
115 if (!output_preallocated) {
117 DCHECK(
memory_pool_ != NULL) <<
"Can't allocate without passing in a mem pool";
123 }
else if (*output_length < max_compressed_len) {
124 return Status(
"GzipCompressor::ProcessBlock: output length too small");
132 :
Codec(mem_pool, reuse_buffer) {
141 const uint8_t* input, int64_t *output_length, uint8_t** output) {
144 DCHECK(input != NULL);
146 if (output_preallocated) {
156 int ret = BZ_OUTBUFF_FULL;
157 while (ret == BZ_OUTBUFF_FULL) {
159 DCHECK(!output_preallocated);
165 if ((ret = BZ2_bzBuffToBuffCompress(reinterpret_cast<char*>(
out_buffer_), &outlen,
166 const_cast<char*>(reinterpret_cast<const char*>(input)),
167 static_cast<unsigned int>(input_length), 5, 2, 0)) == BZ_OUTBUFF_FULL) {
168 if (output_preallocated) {
169 return Status(
"Too small buffer passed to BzipCompressor");
176 ss <<
"bzlib BZ2_bzBuffToBuffCompressor failed: " << ret;
182 *output_length = outlen;
189 :
Codec(mem_pool, reuse_buffer) {
198 int64_t input_length,
const uint8_t* input, int64_t *output_length,
204 int64_t block_size = input_length / 2;
205 size_t length = snappy::MaxCompressedLength(block_size) * 2;
206 length += 3 *
sizeof (int32_t);
207 DCHECK(!output_preallocated || length <= *output_length);
209 if (output_preallocated) {
220 outp +=
sizeof (int32_t);
221 while (input_length > 0) {
225 outp +=
sizeof (int32_t);
227 snappy::RawCompress(reinterpret_cast<const char*>(input),
228 static_cast<size_t>(block_size), reinterpret_cast<char*>(outp), &size);
232 input_length -= block_size;
242 :
Codec(mem_pool, reuse_buffer) {
246 return snappy::MaxCompressedLength(input_len);
250 const uint8_t* input, int64_t* output_length, uint8_t** output) {
251 int64_t max_compressed_len =
MaxOutputLen(input_length);
252 if (output_preallocated && *output_length < max_compressed_len) {
253 return Status(
"SnappyCompressor::ProcessBlock: output length too small");
256 if (!output_preallocated) {
258 DCHECK(
memory_pool_ != NULL) <<
"Can't allocate without passing in a mem pool";
266 snappy::RawCompress(reinterpret_cast<const char*>(input),
267 static_cast<size_t>(input_length),
268 reinterpret_cast<char*>(*output), &out_len);
269 *output_length = out_len;
275 crc.process_bytes(reinterpret_cast<const char*>(input), input_len);
276 uint32_t chk = crc.checksum();
278 return ((chk >> 15) | (chk << 17)) + 0xa282ead8;
282 :
Codec(mem_pool, reuse_buffer) {
286 return LZ4_compressBound(input_len);
290 const uint8_t* input, int64_t* output_length, uint8_t** output) {
291 CHECK(output_preallocated) <<
"Output was not allocated for Lz4 Codec";
293 *output_length = LZ4_compress(reinterpret_cast<const char*>(input),
294 reinterpret_cast<char*>(*output), input_length);
virtual ~GzipCompressor()
virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t *input=NULL)
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t *input, int64_t *output_length, uint8_t **output)
Process a block of data, either compressing or decompressing it.
virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t *input=NULL)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
static const int GZIP_CODEC
SnappyCompressor(MemPool *mem_pool=NULL, bool reuse_buffer=false)
void AcquireData(MemPool *src, bool keep_current)
virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t *input=NULL)
virtual Status Init()
Initialize the codec. This should only be called once.
bool reuse_buffer_
Can we reuse the output buffer or do we need to allocate on each call?
BzipCompressor(MemPool *mem_pool, bool reuse_buffer)
static void PutInt(uint8_t *buf, uint16_t integer)
z_stream stream_
Structure used to communicate with the library.
virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t *input=NULL)
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t *input, int64_t *output_length, uint8_t **output)
Process a block of data, either compressing or decompressing it.
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t *input, int64_t *output_length, uint8_t **output)
Process a block of data, either compressing or decompressing it.
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t *input, int64_t *output_length, uint8_t **output)
Process a block of data, either compressing or decompressing it.
GzipCompressor(Format format, MemPool *mem_pool=NULL, bool reuse_buffer=false)
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t *input, int64_t *output_length, uint8_t **output)
Process a block of data, either compressing or decompressing it.
Status Compress(int64_t input_length, const uint8_t *input, int64_t *output_length, uint8_t *output)
int64_t buffer_length_
Length of the output buffer.
static const int WINDOW_BITS
These are magic numbers from zlib.h. Not clear why they are not defined there.
boost::scoped_ptr< MemPool > temp_memory_pool_
virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t *input=NULL)
Format
Compression formats supported by the zlib library.
SnappyBlockCompressor(MemPool *mem_pool, bool reuse_buffer)
Lz4Compressor(MemPool *mem_pool=NULL, bool reuse_buffer=false)
static uint32_t ComputeChecksum(int64_t input_len, const uint8_t *input)
MemPool * memory_pool_
Pool to allocate the buffer to hold transformed data.
uint8_t * Allocate(int size)