Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
compress.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "util/compress.h"
16 #include "exec/read-write-util.h"
17 #include "runtime/runtime-state.h"
18 
19 // Codec libraries
20 #include <zlib.h>
21 #include <bzlib.h>
22 #include <snappy.h>
23 #include <lz4.h>
24 
25 #include <boost/crc.hpp>
26 #include <gutil/strings/substitute.h>
27 
28 #include "common/names.h"
29 
30 using boost::crc_32_type;
31 using namespace impala;
32 using namespace strings;
33 
34 GzipCompressor::GzipCompressor(Format format, MemPool* mem_pool, bool reuse_buffer)
35  : Codec(mem_pool, reuse_buffer),
36  format_(format) {
37  bzero(&stream_, sizeof(stream_));
38 }
39 
41  (void)deflateEnd(&stream_);
42 }
43 
45  int ret;
46  // Initialize to run specified format
47  int window_bits = WINDOW_BITS;
48  if (format_ == DEFLATE) {
49  window_bits = -window_bits;
50  } else if (format_ == GZIP) {
51  window_bits += GZIP_CODEC;
52  }
53  if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
54  window_bits, 9, Z_DEFAULT_STRATEGY )) != Z_OK) {
55  return Status("zlib deflateInit failed: " + string(stream_.msg));
56  }
57 
58  return Status::OK;
59 }
60 
61 int64_t GzipCompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
62 #if !defined ZLIB_VERNUM || ZLIB_VERNUM <= 0x1230
63  if (UNLIKELY(input_len == 0 && format_ == GZIP)) {
64  // zlib version 1.2.3 has a bug in deflateBound() that causes it to return too low a
65  // bound for this case. Hardcode the value returned in zlib version 1.2.3.1+.
66  return 23;
67  }
68  // There is a known issue that zlib 1.2.3 does not include the size of the
69  // gzip wrapper. This is has been fixed in zlib 1.2.3.1:
70  // http://www.zlib.net/ChangeLog.txt
71  // "Take into account wrapper variations in deflateBound()"
72  //
73  // Mark, maintainer of zlib, has stated that 12 needs to be added to result for gzip
74  // http://compgroups.net/comp.unix.programmer/gzip-compressing-an-in-memory-string-usi/54854
75  // To have a safe upper bound for "wrapper variations", we add 32 to estimate
76  return deflateBound(&stream_, input_len) + 32;
77 #else
78  return deflateBound(&stream_, input_len);
79 #endif
80 }
81 
82 Status GzipCompressor::Compress(int64_t input_length, const uint8_t* input,
83  int64_t* output_length, uint8_t* output) {
84  DCHECK_GE(*output_length, MaxOutputLen(input_length));
85  stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
86  stream_.avail_in = input_length;
87  stream_.next_out = reinterpret_cast<Bytef*>(output);
88  stream_.avail_out = *output_length;
89 
90  int64_t ret = 0;
91  if ((ret = deflate(&stream_, Z_FINISH)) != Z_STREAM_END) {
92  if (ret == Z_OK) {
93  // will return Z_OK (and stream_.msg NOT set) if stream_.avail_out is too small
94  return Status(Substitute("zlib deflate failed: output buffer ($0) is too small.",
95  output_length).c_str());
96  }
97  stringstream ss;
98  ss << "zlib deflate failed: " << stream_.msg;
99  return Status(ss.str());
100  }
101 
102  *output_length = *output_length - stream_.avail_out;
103 
104  if (deflateReset(&stream_) != Z_OK) {
105  return Status("zlib deflateReset failed: " + string(stream_.msg));
106  }
107  return Status::OK;
108 }
109 
110 Status GzipCompressor::ProcessBlock(bool output_preallocated,
111  int64_t input_length, const uint8_t* input, int64_t* output_length,
112  uint8_t** output) {
113  DCHECK(!output_preallocated || (output_preallocated && *output_length > 0));
114  int64_t max_compressed_len = MaxOutputLen(input_length);
115  if (!output_preallocated) {
116  if (!reuse_buffer_ || buffer_length_ < max_compressed_len || out_buffer_ == NULL) {
117  DCHECK(memory_pool_ != NULL) << "Can't allocate without passing in a mem pool";
118  buffer_length_ = max_compressed_len;
120  }
121  *output = out_buffer_;
122  *output_length = buffer_length_;
123  } else if (*output_length < max_compressed_len) {
124  return Status("GzipCompressor::ProcessBlock: output length too small");
125  }
126 
127  RETURN_IF_ERROR(Compress(input_length, input, output_length, *output));
128  return Status::OK;
129 }
130 
131 BzipCompressor::BzipCompressor(MemPool* mem_pool, bool reuse_buffer)
132  : Codec(mem_pool, reuse_buffer) {
133 }
134 
135 int64_t BzipCompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
136  // TODO: is it possible to get a bound with bzip.
137  return -1;
138 }
139 
140 Status BzipCompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
141  const uint8_t* input, int64_t *output_length, uint8_t** output) {
142  // The bz2 library does not allow input to be NULL, even when input_length is 0. This
143  // should be OK because we do not write any file formats that support bzip compression.
144  DCHECK(input != NULL);
145 
146  if (output_preallocated) {
147  buffer_length_ = *output_length;
148  out_buffer_ = *output;
149  } else if (!reuse_buffer_ || out_buffer_ == NULL) {
150  // guess that we will need no more the input length.
151  buffer_length_ = input_length;
153  }
154 
155  unsigned int outlen;
156  int ret = BZ_OUTBUFF_FULL;
157  while (ret == BZ_OUTBUFF_FULL) {
158  if (out_buffer_ == NULL) {
159  DCHECK(!output_preallocated);
160  temp_memory_pool_->Clear();
163  }
164  outlen = static_cast<unsigned int>(buffer_length_);
165  if ((ret = BZ2_bzBuffToBuffCompress(reinterpret_cast<char*>(out_buffer_), &outlen,
166  const_cast<char*>(reinterpret_cast<const char*>(input)),
167  static_cast<unsigned int>(input_length), 5, 2, 0)) == BZ_OUTBUFF_FULL) {
168  if (output_preallocated) {
169  return Status("Too small buffer passed to BzipCompressor");
170  }
171  out_buffer_ = NULL;
172  }
173  }
174  if (ret != BZ_OK) {
175  stringstream ss;
176  ss << "bzlib BZ2_bzBuffToBuffCompressor failed: " << ret;
177  return Status(ss.str());
178 
179  }
180 
181  *output = out_buffer_;
182  *output_length = outlen;
184  return Status::OK;
185 }
186 
187 // Currently this is only use for testing of the decompressor.
189  : Codec(mem_pool, reuse_buffer) {
190 }
191 
192 int64_t SnappyBlockCompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
193  // TODO: is it possible to get a bound on this?
194  return -1;
195 }
196 
198  int64_t input_length, const uint8_t* input, int64_t *output_length,
199  uint8_t** output) {
200  // Hadoop uses a block compression scheme on top of snappy. First there is
201  // an integer which is the size of the decompressed data followed by a
202  // sequence of compressed blocks each preceded with an integer size.
203  // For testing purposes we are going to generate two blocks.
204  int64_t block_size = input_length / 2;
205  size_t length = snappy::MaxCompressedLength(block_size) * 2;
206  length += 3 * sizeof (int32_t);
207  DCHECK(!output_preallocated || length <= *output_length);
208 
209  if (output_preallocated) {
210  buffer_length_ = *output_length;
211  out_buffer_ = *output;
212  } else if (!reuse_buffer_ || out_buffer_ == NULL || buffer_length_ < length) {
213  buffer_length_ = length;
215  }
216 
217  uint8_t* outp = out_buffer_;
218  uint8_t* sizep;
219  ReadWriteUtil::PutInt(outp, static_cast<uint32_t>(input_length));
220  outp += sizeof (int32_t);
221  while (input_length > 0) {
222  // TODO: should this be a while or a do-while loop? Check what Hadoop does.
223  // Point at the spot to store the compressed size.
224  sizep = outp;
225  outp += sizeof (int32_t);
226  size_t size;
227  snappy::RawCompress(reinterpret_cast<const char*>(input),
228  static_cast<size_t>(block_size), reinterpret_cast<char*>(outp), &size);
229 
230  ReadWriteUtil::PutInt(sizep, static_cast<uint32_t>(size));
231  input += block_size;
232  input_length -= block_size;
233  outp += size;
234  }
235 
236  *output = out_buffer_;
237  *output_length = outp - out_buffer_;
238  return Status::OK;
239 }
240 
241 SnappyCompressor::SnappyCompressor(MemPool* mem_pool, bool reuse_buffer)
242  : Codec(mem_pool, reuse_buffer) {
243 }
244 
245 int64_t SnappyCompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
246  return snappy::MaxCompressedLength(input_len);
247 }
248 
249 Status SnappyCompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
250  const uint8_t* input, int64_t* output_length, uint8_t** output) {
251  int64_t max_compressed_len = MaxOutputLen(input_length);
252  if (output_preallocated && *output_length < max_compressed_len) {
253  return Status("SnappyCompressor::ProcessBlock: output length too small");
254  }
255 
256  if (!output_preallocated) {
257  if ((!reuse_buffer_ || buffer_length_ < max_compressed_len)) {
258  DCHECK(memory_pool_ != NULL) << "Can't allocate without passing in a mem pool";
259  buffer_length_ = max_compressed_len;
261  }
262  *output = out_buffer_;
263  }
264 
265  size_t out_len;
266  snappy::RawCompress(reinterpret_cast<const char*>(input),
267  static_cast<size_t>(input_length),
268  reinterpret_cast<char*>(*output), &out_len);
269  *output_length = out_len;
270  return Status::OK;
271 }
272 
273 uint32_t SnappyCompressor::ComputeChecksum(int64_t input_len, const uint8_t* input) {
274  crc_32_type crc;
275  crc.process_bytes(reinterpret_cast<const char*>(input), input_len);
276  uint32_t chk = crc.checksum();
277  // Snappy requires the checksum to be masked.
278  return ((chk >> 15) | (chk << 17)) + 0xa282ead8;
279 }
280 
281 Lz4Compressor::Lz4Compressor(MemPool* mem_pool, bool reuse_buffer)
282  : Codec(mem_pool, reuse_buffer) {
283 }
284 
285 int64_t Lz4Compressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
286  return LZ4_compressBound(input_len);
287 }
288 
289 Status Lz4Compressor::ProcessBlock(bool output_preallocated, int64_t input_length,
290  const uint8_t* input, int64_t* output_length, uint8_t** output) {
291  CHECK(output_preallocated) << "Output was not allocated for Lz4 Codec";
292  if (input_length == 0) return Status::OK;
293  *output_length = LZ4_compress(reinterpret_cast<const char*>(input),
294  reinterpret_cast<char*>(*output), input_length);
295  return Status::OK;
296 }
virtual ~GzipCompressor()
Definition: compress.cc:40
virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t *input=NULL)
Definition: compress.cc:192
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t *input, int64_t *output_length, uint8_t **output)
Process a block of data, either compressing or decompressing it.
Definition: compress.cc:249
virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t *input=NULL)
Definition: compress.cc:285
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
static const int GZIP_CODEC
Definition: compress.h:60
SnappyCompressor(MemPool *mem_pool=NULL, bool reuse_buffer=false)
Definition: compress.cc:241
void AcquireData(MemPool *src, bool keep_current)
Definition: mem-pool.cc:161
virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t *input=NULL)
Definition: compress.cc:61
virtual Status Init()
Initialize the codec. This should only be called once.
Definition: compress.cc:44
bool reuse_buffer_
Can we reuse the output buffer or do we need to allocate on each call?
Definition: codec.h:161
BzipCompressor(MemPool *mem_pool, bool reuse_buffer)
Definition: compress.cc:131
static void PutInt(uint8_t *buf, uint16_t integer)
z_stream stream_
Structure used to communicate with the library.
Definition: compress.h:56
virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t *input=NULL)
Definition: compress.cc:135
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t *input, int64_t *output_length, uint8_t **output)
Process a block of data, either compressing or decompressing it.
Definition: compress.cc:110
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t *input, int64_t *output_length, uint8_t **output)
Process a block of data, either compressing or decompressing it.
Definition: compress.cc:140
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t *input, int64_t *output_length, uint8_t **output)
Process a block of data, either compressing or decompressing it.
Definition: compress.cc:197
GzipCompressor(Format format, MemPool *mem_pool=NULL, bool reuse_buffer=false)
Definition: compress.cc:34
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t *input, int64_t *output_length, uint8_t **output)
Process a block of data, either compressing or decompressing it.
Definition: compress.cc:289
Status Compress(int64_t input_length, const uint8_t *input, int64_t *output_length, uint8_t *output)
Definition: compress.cc:82
#define UNLIKELY(expr)
Definition: compiler-util.h:33
uint8_t * out_buffer_
Definition: codec.h:165
int64_t buffer_length_
Length of the output buffer.
Definition: codec.h:168
static const int WINDOW_BITS
These are magic numbers from zlib.h. Not clear why they are not defined there.
Definition: compress.h:59
static const Status OK
Definition: status.h:87
boost::scoped_ptr< MemPool > temp_memory_pool_
Definition: codec.h:158
virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t *input=NULL)
Definition: compress.cc:245
Format
Compression formats supported by the zlib library.
Definition: compress.h:35
SnappyBlockCompressor(MemPool *mem_pool, bool reuse_buffer)
Definition: compress.cc:188
Lz4Compressor(MemPool *mem_pool=NULL, bool reuse_buffer=false)
Definition: compress.cc:281
static uint32_t ComputeChecksum(int64_t input_len, const uint8_t *input)
Definition: compress.cc:273
MemPool * memory_pool_
Pool to allocate the buffer to hold transformed data.
Definition: codec.h:154
uint8_t * Allocate(int size)
Definition: mem-pool.h:92