Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
decompress.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <boost/assign/list_of.hpp>
16 #include "util/decompress.h"
17 #include "exec/read-write-util.h"
18 #include "runtime/runtime-state.h"
19 #include "common/logging.h"
20 #include "gen-cpp/Descriptors_types.h"
21 
22 // Codec libraries
23 #include <zlib.h>
24 #include <bzlib.h>
25 #include <snappy.h>
26 #include <lz4.h>
27 
28 #include "common/names.h"
29 
30 using namespace impala;
31 
32 // Output buffer size for streaming gzip
33 const int64_t STREAM_GZIP_OUT_BUF_SIZE = 16 * 1024 * 1024;
34 
35 GzipDecompressor::GzipDecompressor(MemPool* mem_pool, bool reuse_buffer, bool is_deflate)
36  : Codec(mem_pool, reuse_buffer),
37  is_deflate_(is_deflate) {
38  bzero(&stream_, sizeof(stream_));
39 }
40 
42  (void)inflateEnd(&stream_);
43 }
44 
46  int ret;
47  // Initialize to run either deflate or zlib/gzip format
48  int window_bits = is_deflate_ ? -WINDOW_BITS : WINDOW_BITS | DETECT_CODEC;
49  if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) {
50  return Status("zlib inflateInit failed: " + string(stream_.msg));
51  }
52 
53  return Status::OK;
54 }
55 
56 int64_t GzipDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
57  return -1;
58 }
59 
61  stringstream ss;
62  ss << "next_in=" << (void*)stream_.next_in;
63  ss << " avail_in=" << stream_.avail_in;
64  ss << " total_in=" << stream_.total_in;
65  ss << " next_out=" << (void*)stream_.next_out;
66  ss << " avail_out=" << stream_.avail_out;
67  ss << " total_out=" << stream_.total_out;
68  return ss.str();
69 }
70 
71 Status GzipDecompressor::ProcessBlockStreaming(int64_t input_length, const uint8_t* input,
72  int64_t* input_bytes_read, int64_t* output_length, uint8_t** output, bool* eos) {
73  if (!reuse_buffer_ || out_buffer_ == NULL) {
76  }
77  *output = out_buffer_;
78  *output_length = buffer_length_;
79  *input_bytes_read = 0;
80  *eos = false;
81 
82  stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
83  stream_.avail_in = input_length;
84  stream_.next_out = reinterpret_cast<Bytef*>(*output);
85  stream_.avail_out = *output_length;
86  VLOG_ROW << "ProcessBlockStreaming() stream: " << DebugStreamState();
87 
88  int ret = inflate(&stream_, Z_SYNC_FLUSH);
89  if (ret != Z_OK && ret != Z_STREAM_END && ret != Z_BUF_ERROR) {
90  stringstream ss;
91  ss << "GzipDecompressor failed, ret=" << ret;
92  if (stream_.msg != NULL) ss << " msg=" << stream_.msg;
93  return Status(ss.str());
94  }
95 
96  // stream_.avail_out is the number of bytes *left* in the out buffer, but
97  // we're interested in the number of bytes used.
98  *output_length = *output_length - stream_.avail_out;
99  *input_bytes_read = input_length - stream_.avail_in;
100  VLOG_ROW << "inflate() ret=" << ret << " consumed=" << *input_bytes_read
101  << " produced=" << *output_length << " stream: " << DebugStreamState();
102 
103  if (ret == Z_BUF_ERROR) {
104  // Z_BUF_ERROR is returned if no progress was made. This should be very unlikely.
105  // The caller should check for this case (where 0 bytes were consumed, 0 bytes
106  // produced) and try again with more input.
107  DCHECK_EQ(0, *output_length);
108  DCHECK_EQ(0, *input_bytes_read);
109  } else if (ret == Z_STREAM_END) {
110  *eos = true;
111  if (inflateReset(&stream_) != Z_OK) {
112  return Status("zlib inflateReset failed: " + string(stream_.msg));
113  }
114  }
115  return Status::OK;
116 }
117 
118 Status GzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
119  const uint8_t* input, int64_t* output_length, uint8_t** output) {
120  if (output_preallocated && *output_length == 0) {
121  // The zlib library does not allow *output to be NULL, even when output_length is 0
122  // (inflate() will return Z_STREAM_ERROR). We don't consider this an error, so bail
123  // early if no output is expected. Note that we don't signal an error if the input
124  // actually contains compressed data.
125  return Status::OK;
126  }
127 
128  bool use_temp = false;
129  if (!output_preallocated) {
130  if (!reuse_buffer_ || out_buffer_ == NULL) {
131  // guess that we will need 2x the input length.
132  buffer_length_ = input_length * 2;
134  return Status("Decompressor: block size is too big");
135  }
137  }
138  use_temp = true;
139  *output = out_buffer_;
140  *output_length = buffer_length_;
141  }
142 
143  // Reset the stream for this block
144  if (inflateReset(&stream_) != Z_OK) {
145  return Status("zlib inflateReset failed: " + string(stream_.msg));
146  }
147 
148  int ret = 0;
149  // gzip can run in streaming mode or non-streaming mode. We only
150  // support the non-streaming use case where we present it the entire
151  // compressed input and a buffer big enough to contain the entire
152  // compressed output. In the case where we don't know the output,
153  // we just make a bigger buffer and try the non-streaming mode
154  // from the beginning again.
155  // TODO: support streaming, especially for compressed text.
156  while (ret != Z_STREAM_END) {
157  stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
158  stream_.avail_in = input_length;
159  stream_.next_out = reinterpret_cast<Bytef*>(*output);
160  stream_.avail_out = *output_length;
161 
162  if (use_temp) {
163  // We don't know the output size, so this might fail.
164  ret = inflate(&stream_, Z_PARTIAL_FLUSH);
165  } else {
166  // We know the output size. In this case, we can use Z_FINISH
167  // which is more efficient.
168  ret = inflate(&stream_, Z_FINISH);
169  }
170  if (ret == Z_STREAM_END || ret != Z_OK) break;
171 
172  // Not enough output space.
173  if (!use_temp) {
174  stringstream ss;
175  ss << "Too small a buffer passed to GzipDecompressor. InputLength="
176  << input_length << " OutputLength=" << *output_length;
177  return Status(ss.str());
178  }
179 
180  // User didn't supply the buffer, double the buffer and try again.
181  temp_memory_pool_->Clear();
182  buffer_length_ *= 2;
184  stringstream ss;
185  ss << "GzipDecompressor: block size is too big: " << buffer_length_;
186  return Status(ss.str());
187  }
188 
190  *output = out_buffer_;
191  *output_length = buffer_length_;
192  ret = inflateReset(&stream_);
193  }
194 
195  if (ret != Z_STREAM_END) {
196  stringstream ss;
197  ss << "GzipDecompressor failed: ";
198  if (stream_.msg != NULL) ss << stream_.msg;
199  return Status(ss.str());
200  }
201 
202  // stream_.avail_out is the number of bytes *left* in the out buffer, but
203  // we're interested in the number of bytes used.
204  *output_length = *output_length - stream_.avail_out;
206  return Status::OK;
207 }
208 
209 BzipDecompressor::BzipDecompressor(MemPool* mem_pool, bool reuse_buffer)
210  : Codec(mem_pool, reuse_buffer) {
211 }
212 
213 int64_t BzipDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
214  return -1;
215 }
216 
217 Status BzipDecompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
218  const uint8_t* input, int64_t* output_length, uint8_t** output) {
219  if (output_preallocated && *output_length == 0) {
220  // Same problem as zlib library, see comment in GzipDecompressor::ProcessBlock()
221  return Status::OK;
222  }
223 
224  bool use_temp = false;
225  if (output_preallocated) {
226  buffer_length_ = *output_length;
227  out_buffer_ = *output;
228  } else if (!reuse_buffer_ || out_buffer_ == NULL) {
229  // guess that we will need 2x the input length.
230  buffer_length_ = input_length * 2;
232  return Status("Decompressor: block size is too big");
233  }
235  use_temp = true;
236  }
237 
238  int ret = BZ_OUTBUFF_FULL;
239  unsigned int outlen;
240  while (ret == BZ_OUTBUFF_FULL) {
241  if (out_buffer_ == NULL) {
242  DCHECK(!output_preallocated);
243  temp_memory_pool_->Clear();
246  return Status("Decompressor: block size is too big");
247  }
249  }
250  outlen = static_cast<unsigned int>(buffer_length_);
251  if ((ret = BZ2_bzBuffToBuffDecompress(reinterpret_cast<char*>(out_buffer_), &outlen,
252  const_cast<char*>(reinterpret_cast<const char*>(input)),
253  static_cast<unsigned int>(input_length), 0, 0)) == BZ_OUTBUFF_FULL) {
254  if (output_preallocated) {
255  return Status("Too small a buffer passed to BzipDecompressor");
256  }
257  out_buffer_ = NULL;
258  }
259  }
260  if (ret != BZ_OK) {
261  stringstream ss;
262  ss << "bzlib BZ2_bzBuffToBuffDecompressor failed: " << ret;
263  return Status(ss.str());
264 
265  }
266 
267  *output = out_buffer_;
268  *output_length = outlen;
270  return Status::OK;
271 }
272 
273 
275  : Codec(mem_pool, reuse_buffer) {
276 }
277 
278 int64_t SnappyBlockDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
279  return -1;
280 }
281 
282 // Hadoop uses a block compression scheme on top of snappy. As per the hadoop docs
283 // the input is split into blocks. Each block "contains the uncompressed length for
284 // the block followed by one of more length-prefixed blocks of compressed data."
285 // This is essentially blocks of blocks.
286 // The outer block consists of:
287 // - 4 byte little endian uncompressed_size
288 // < inner blocks >
289 // ... repeated until input_len is consumed ..
290 // The inner blocks have:
291 // - 4-byte little endian compressed_size
292 // < snappy compressed block >
293 // - 4-byte little endian compressed_size
294 // < snappy compressed block >
295 // ... repeated until uncompressed_size from outer block is consumed ...
296 
297 // Utility function to decompress snappy block compressed data. If size_only is true,
298 // this function does not decompress but only computes the output size and writes
299 // the result to *output_len.
300 // If size_only is false, output must be preallocated to output_len and this needs to
301 // be exactly big enough to hold the decompressed output.
302 // size_only is a O(1) operations (just reads a single varint for each snappy block).
303 static Status SnappyBlockDecompress(int64_t input_len, const uint8_t* input,
304  bool size_only, int64_t* output_len, char* output) {
305  int64_t uncompressed_total_len = 0;
306  while (input_len > 0) {
307  uint32_t uncompressed_block_len = ReadWriteUtil::GetInt<uint32_t>(input);
308  input += sizeof(uint32_t);
309  input_len -= sizeof(uint32_t);
310 
311  if (uncompressed_block_len > Codec::MAX_BLOCK_SIZE) {
312  if (uncompressed_total_len == 0) {
313  // TODO: is this check really robust?
314  return Status(TErrorCode::SNAPPY_DECOMPRESS_INVALID_BLOCK_SIZE,
315  uncompressed_block_len);
316  }
317  break;
318  }
319 
320  if (!size_only) {
321  int64_t remaining_output_size = *output_len - uncompressed_total_len;
322  DCHECK_GE(remaining_output_size, uncompressed_block_len);
323  }
324 
325  while (uncompressed_block_len > 0) {
326  // Read the length of the next snappy compressed block.
327  size_t compressed_len = ReadWriteUtil::GetInt<uint32_t>(input);
328  input += sizeof(uint32_t);
329  input_len -= sizeof(uint32_t);
330 
331  if (compressed_len == 0 || compressed_len > input_len) {
332  if (uncompressed_total_len == 0) {
333  return Status(TErrorCode::SNAPPY_DECOMPRESS_INVALID_COMPRESSED_LENGTH);
334  }
335  input_len = 0;
336  break;
337  }
338 
339  // Read how big the output will be.
340  size_t uncompressed_len;
341  if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input),
342  input_len, &uncompressed_len)) {
343  if (uncompressed_total_len == 0) {
344  return Status(TErrorCode::SNAPPY_DECOMPRESS_UNCOMPRESSED_LENGTH_FAILED);
345  }
346  input_len = 0;
347  break;
348  }
349  DCHECK_GT(uncompressed_len, 0);
350 
351  if (!size_only) {
352  // Decompress this snappy block
353  if (!snappy::RawUncompress(reinterpret_cast<const char*>(input),
354  compressed_len, output)) {
355  return Status(TErrorCode::SNAPPY_DECOMPRESS_RAW_UNCOMPRESS_FAILED);
356  }
357  output += uncompressed_len;
358  }
359 
360  input += compressed_len;
361  input_len -= compressed_len;
362  uncompressed_block_len -= uncompressed_len;
363  uncompressed_total_len += uncompressed_len;
364  }
365  }
366 
367  if (size_only) {
368  *output_len = uncompressed_total_len;
369  } else if (*output_len != uncompressed_total_len) {
370  return Status(TErrorCode::SNAPPY_DECOMPRESS_DECOMPRESS_SIZE_INCORRECT);
371  }
372  return Status::OK;
373 }
374 
375 Status SnappyBlockDecompressor::ProcessBlock(bool output_preallocated, int64_t input_len,
376  const uint8_t* input, int64_t* output_len, uint8_t** output) {
377  if (!output_preallocated) {
378  // If we don't know the size beforehand, compute it.
379  RETURN_IF_ERROR(SnappyBlockDecompress(input_len, input, true, output_len, NULL));
380 
381  if (!reuse_buffer_ || out_buffer_ == NULL || buffer_length_ < *output_len) {
382  // Need to allocate a new buffer
383  buffer_length_ = *output_len;
385  }
386  *output = out_buffer_;
387  }
388 
389  if (*output_len > MAX_BLOCK_SIZE) {
390  // TODO: is this check really robust?
391  stringstream ss;
392  ss << "Decompressor: block size is too big. Data is likely corrupt. "
393  << "Size: " << *output_len;
394  return Status(ss.str());
395  }
396 
397  char* out_ptr = reinterpret_cast<char*>(*output);
398  RETURN_IF_ERROR(SnappyBlockDecompress(input_len, input, false, output_len, out_ptr));
399  return Status::OK;
400 }
401 
402 SnappyDecompressor::SnappyDecompressor(MemPool* mem_pool, bool reuse_buffer)
403  : Codec(mem_pool, reuse_buffer) {
404 }
405 
406 int64_t SnappyDecompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
407  DCHECK(input != NULL);
408  size_t result;
409  if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input),
410  input_len, &result)) {
411  return -1;
412  }
413  return result;
414 }
415 
416 Status SnappyDecompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
417  const uint8_t* input, int64_t* output_length, uint8_t** output) {
418  if (!output_preallocated) {
419  int64_t uncompressed_length = MaxOutputLen(input_length, input);
420  if (uncompressed_length < 0) return Status("Snappy: GetUncompressedLength failed");
421  if (!reuse_buffer_ || out_buffer_ == NULL || buffer_length_ < uncompressed_length) {
422  buffer_length_ = uncompressed_length;
424  return Status("Decompressor: block size is too big");
425  }
427  }
428  *output = out_buffer_;
429  *output_length = uncompressed_length;
430  }
431 
432  if (!snappy::RawUncompress(reinterpret_cast<const char*>(input),
433  static_cast<size_t>(input_length), reinterpret_cast<char*>(*output))) {
434  return Status("Snappy: RawUncompress failed");
435  }
436 
437  return Status::OK;
438 }
439 
440 Lz4Decompressor::Lz4Decompressor(MemPool* mem_pool, bool reuse_buffer)
441  : Codec(mem_pool, reuse_buffer) {
442 }
443 
444 int64_t Lz4Decompressor::MaxOutputLen(int64_t input_len, const uint8_t* input) {
445  DCHECK(input != NULL) << "Passed null input to Lz4 Decompressor";
446  return -1;
447 }
448 
449 Status Lz4Decompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
450  const uint8_t* input, int64_t* output_length, uint8_t** output) {
451  DCHECK(output_preallocated) << "Lz4 Codec implementation must have allocated output";
452  // LZ4_uncompress will cause a segmentation fault if passed a NULL output.
453  if(*output_length == 0) return Status::OK;
454  if (LZ4_uncompress(reinterpret_cast<const char*>(input),
455  reinterpret_cast<char*>(*output), *output_length) != input_length) {
456  return Status("Lz4: uncompress failed");
457  }
458 
459  return Status::OK;
460 }
virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t *input=NULL)
Definition: decompress.cc:444
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t *input, int64_t *output_length, uint8_t **output)
Process a block of data, either compressing or decompressing it.
Definition: decompress.cc:118
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t *input, int64_t *output_length, uint8_t **output)
Process a block of data, either compressing or decompressing it.
Definition: decompress.cc:449
SnappyBlockDecompressor(MemPool *mem_pool, bool reuse_buffer)
Definition: decompress.cc:274
BzipDecompressor(MemPool *mem_pool, bool reuse_buffer)
Definition: decompress.cc:209
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t *input, int64_t *output_length, uint8_t **output)
Process a block of data, either compressing or decompressing it.
Definition: decompress.cc:217
void AcquireData(MemPool *src, bool keep_current)
Definition: mem-pool.cc:161
static Status SnappyBlockDecompress(int64_t input_len, const uint8_t *input, bool size_only, int64_t *output_len, char *output)
Definition: decompress.cc:303
virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t *input=NULL)
Definition: decompress.cc:213
bool reuse_buffer_
Can we reuse the output buffer or do we need to allocate on each call?
Definition: codec.h:161
virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t *input=NULL)
Definition: decompress.cc:278
Lz4Decompressor(MemPool *mem_pool=NULL, bool reuse_buffer=false)
Definition: decompress.cc:440
static const int WINDOW_BITS
These are magic numbers from zlib.h. Not clear why they are not defined there.
Definition: decompress.h:51
std::string DebugStreamState() const
Definition: decompress.cc:60
static const int DETECT_CODEC
Definition: decompress.h:52
static const int MAX_BLOCK_SIZE
Definition: codec.h:140
#define VLOG_ROW
Definition: logging.h:59
virtual Status ProcessBlockStreaming(int64_t input_length, const uint8_t *input, int64_t *input_bytes_read, int64_t *output_length, uint8_t **output, bool *eos)
Definition: decompress.cc:71
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t *input, int64_t *output_length, uint8_t **output)
Process a block of data, either compressing or decompressing it.
Definition: decompress.cc:375
uint8_t * out_buffer_
Definition: codec.h:165
int64_t buffer_length_
Length of the output buffer.
Definition: codec.h:168
virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t *input=NULL)
Definition: decompress.cc:406
static const Status OK
Definition: status.h:87
boost::scoped_ptr< MemPool > temp_memory_pool_
Definition: codec.h:158
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t *input, int64_t *output_length, uint8_t **output)
Process a block of data, either compressing or decompressing it.
Definition: decompress.cc:416
bool is_deflate_
If set assume deflate format, otherwise zlib or gzip.
Definition: decompress.h:46
const int64_t STREAM_GZIP_OUT_BUF_SIZE
Definition: decompress.cc:33
virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t *input=NULL)
Definition: decompress.cc:56
GzipDecompressor(MemPool *mem_pool=NULL, bool reuse_buffer=false, bool is_deflate=false)
Definition: decompress.cc:35
virtual Status Init()
Initialize the codec. This should only be called once.
Definition: decompress.cc:45
MemPool * memory_pool_
Pool to allocate the buffer to hold transformed data.
Definition: codec.h:154
uint8_t * Allocate(int size)
Definition: mem-pool.h:92
SnappyDecompressor(MemPool *mem_pool=NULL, bool reuse_buffer=false)
Definition: decompress.cc:402