Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
decompress-test.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <stdlib.h>
16 #include <stdio.h>
17 #include <iostream>
18 #include <gtest/gtest.h>
19 #include "runtime/mem-tracker.h"
20 #include "runtime/mem-pool.h"
21 #include "util/decompress.h"
22 #include "util/compress.h"
23 #include "gen-cpp/Descriptors_types.h"
24 
25 #include "common/names.h"
26 
27 namespace impala {
28 
29 // Fixture for testing class Decompressor
31  protected:
33  uint8_t* ip = input_;
34  for (int i = 0; i < 1024; i++) {
35  for (uint8_t ch = 'a'; ch <= 'z'; ++ch) {
36  *ip++ = ch;
37  }
38  for (uint8_t ch = 'Z'; ch >= 'A'; --ch) {
39  *ip++ = ch;
40  }
41  }
42 
43  // The input for the streaming tests is a larger buffer which contains input_
44  // at the beginning and end and is null otherwise.
45  memset(&input_streaming_, 0, sizeof(input_streaming_));
46  memcpy(&input_streaming_, &input_, sizeof(input_));
47  memcpy(&input_streaming_[sizeof(input_streaming_) - sizeof(input_)],
48  &input_, sizeof(input_));
49  }
50 
53  }
54 
55  void RunTest(THdfsCompression::type format) {
56  scoped_ptr<Codec> compressor;
57  scoped_ptr<Codec> decompressor;
58 
59  EXPECT_TRUE(
60  Codec::CreateCompressor(&mem_pool_, true, format, &compressor).ok());
61  EXPECT_TRUE(
62  Codec::CreateDecompressor(&mem_pool_, true, format, &decompressor).ok());
63 
64  // LZ4 is not implemented to work without an allocated output
65  if(format == THdfsCompression::LZ4) {
66  CompressAndDecompressNoOutputAllocated(compressor.get(), decompressor.get(),
67  sizeof(input_), input_);
68  CompressAndDecompressNoOutputAllocated(compressor.get(), decompressor.get(),
69  0, NULL);
70  } else {
71  CompressAndDecompress(compressor.get(), decompressor.get(), sizeof(input_),
72  input_);
73  if (format != THdfsCompression::BZIP2) {
74  CompressAndDecompress(compressor.get(), decompressor.get(), 0, NULL);
75  } else {
76  // bzip does not allow NULL input
77  CompressAndDecompress(compressor.get(), decompressor.get(), 0, input_);
78  }
79  }
80 
81  compressor->Close();
82  decompressor->Close();
83  }
84 
85  void RunTestStreaming(THdfsCompression::type format) {
86  scoped_ptr<Codec> compressor;
87  scoped_ptr<Codec> decompressor;
88  EXPECT_TRUE(
89  Codec::CreateCompressor(&mem_pool_, true, format, &compressor).ok());
90  EXPECT_TRUE(
91  Codec::CreateDecompressor(&mem_pool_, true, format, &decompressor).ok());
92 
93  CompressAndStreamingDecompress(compressor.get(), decompressor.get(),
95  CompressAndStreamingDecompress(compressor.get(), decompressor.get(),
96  0, NULL);
97 
98  compressor->Close();
99  decompressor->Close();
100  }
101 
102  void CompressAndDecompress(Codec* compressor, Codec* decompressor,
103  int64_t input_len, uint8_t* input) {
104  // Non-preallocated output buffers
105  uint8_t* compressed;
106  int64_t compressed_length;
107  EXPECT_TRUE(compressor->ProcessBlock(false, input_len,
108  input, &compressed_length, &compressed).ok());
109  uint8_t* output;
110  int64_t output_len;
111  EXPECT_TRUE(decompressor->ProcessBlock(false, compressed_length,
112  compressed, &output_len, &output).ok());
113 
114  EXPECT_EQ(output_len, input_len);
115  EXPECT_EQ(memcmp(input, output, input_len), 0);
116 
117  // Preallocated output buffers
118  int64_t max_compressed_length = compressor->MaxOutputLen(input_len, input);
119 
120  // Don't redo compression if compressor doesn't support MaxOutputLen()
121  if (max_compressed_length != -1) {
122  EXPECT_GE(max_compressed_length, 0);
123  uint8_t* compressed = mem_pool_.Allocate(max_compressed_length);
124  compressed_length = max_compressed_length;
125 
126 
127  EXPECT_TRUE(compressor->ProcessBlock(true, input_len, input, &compressed_length,
128  &compressed).ok());
129  }
130 
131  output_len = decompressor->MaxOutputLen(compressed_length, compressed);
132  if (output_len == -1) output_len = input_len;
133  output = mem_pool_.Allocate(output_len);
134 
135  EXPECT_TRUE(decompressor->ProcessBlock(true, compressed_length, compressed,
136  &output_len, &output).ok());
137 
138  EXPECT_EQ(output_len, input_len);
139  EXPECT_EQ(memcmp(input, output, input_len), 0);
140  }
141 
142  void CompressAndStreamingDecompress(Codec* compressor, Codec* decompressor,
143  int64_t input_len, uint8_t* input) {
144  uint8_t* compressed;
145  int64_t compressed_length;
146  EXPECT_TRUE(compressor->ProcessBlock(false, input_len,
147  input, &compressed_length, &compressed).ok());
148 
149  // Should take multiple calls to ProcessBlockStreaming() to decompress the buffer.
150  int64_t total_output_produced = 0;
151  int64_t compressed_bytes_remaining = compressed_length;
152  bool eos = false;
153  while (!eos) {
154  EXPECT_LE(total_output_produced, input_len);
155  uint8_t* output = NULL;
156  int64_t output_len = 0;
157  int64_t compressed_bytes_read = 0;
158  EXPECT_TRUE(decompressor->ProcessBlockStreaming(compressed_bytes_remaining,
159  compressed, &compressed_bytes_read, &output_len, &output, &eos).ok());
160  EXPECT_EQ(memcmp(input + total_output_produced, output, output_len), 0);
161  total_output_produced += output_len;
162  compressed = compressed + compressed_bytes_read;
163  compressed_bytes_remaining -= compressed_bytes_read;
164  }
165  EXPECT_EQ(0, compressed_bytes_remaining);
166  EXPECT_EQ(total_output_produced, input_len);
167  }
168 
169  // Only tests compressors and decompressors with allocated output.
171  Codec* decompressor, int64_t input_len, uint8_t* input) {
172  // Preallocated output buffers for compressor
173  int64_t max_compressed_length = compressor->MaxOutputLen(input_len, input);
174  uint8_t* compressed = mem_pool_.Allocate(max_compressed_length);
175  int64_t compressed_length = max_compressed_length;
176 
177  EXPECT_TRUE(compressor->ProcessBlock(true, input_len, input, &compressed_length,
178  &compressed).ok());
179 
180  int64_t output_len = decompressor->MaxOutputLen(compressed_length, compressed);
181  if (output_len == -1) output_len = input_len;
182  uint8_t* output = mem_pool_.Allocate(output_len);
183 
184  EXPECT_TRUE(decompressor->ProcessBlock(true, compressed_length, compressed,
185  &output_len, &output).ok());
186 
187  EXPECT_EQ(output_len, input_len);
188  EXPECT_EQ(memcmp(input, output, input_len), 0);
189  }
190 
191 
192  uint8_t input_[2 * 26 * 1024];
193 
194  // Buffer for testing ProcessBlockStreaming() which allocates 16mb output buffers. This
195  // is 2x + 1 the size of the output buffers to ensure that the decompressed output
196  // requires several calls and doesn't need to be nicely aligned (the last call gets a
197  // small amount of data).
198  uint8_t input_streaming_[32 * 1024 * 1024 + 1];
199 
202 };
203 
205  RunTest(THdfsCompression::DEFAULT);
206 }
207 
209  RunTest(THdfsCompression::SNAPPY);
210 }
211 
213  RunTest(THdfsCompression::LZ4);
214 }
215 
217  RunTest(THdfsCompression::GZIP);
218  RunTestStreaming(THdfsCompression::GZIP);
219 }
220 
222  RunTest(THdfsCompression::DEFLATE);
223  RunTestStreaming(THdfsCompression::GZIP);
224 }
225 
227  RunTest(THdfsCompression::BZIP2);
228 }
229 
230 TEST_F(DecompressorTest, SnappyBlocked) {
231  RunTest(THdfsCompression::SNAPPY_BLOCKED);
232 }
233 
234 TEST_F(DecompressorTest, Impala1506) {
235  // Regression test for IMPALA-1506
236  MemTracker trax;
237  MemPool pool(&trax);
238  scoped_ptr<Codec> compressor;
239  Codec::CreateCompressor(&pool, true, impala::THdfsCompression::GZIP, &compressor);
240 
241  int64_t input_len = 3;
242  const uint8_t input[3] = {1, 2, 3};
243  int64_t output_len = -1;
244  uint8_t* output = NULL;
245 
246  // call twice because the compressor will reallocate the first time
247  EXPECT_TRUE(
248  compressor->ProcessBlock(false, input_len, input, &output_len, &output).ok());
249  EXPECT_GE(output_len, 0);
250  output_len = -1;
251  EXPECT_TRUE(
252  compressor->ProcessBlock(false, input_len, input, &output_len, &output).ok());
253  EXPECT_GE(output_len, 0);
254 
255  pool.FreeAll();
256 }
257 
258 }
259 
260 int main(int argc, char **argv) {
261  ::testing::InitGoogleTest(&argc, argv);
262  return RUN_ALL_TESTS();
263 }
void RunTestStreaming(THdfsCompression::type format)
static Status CreateCompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *compressor)
TEST_F(InstructionCounterTest, Count)
static Status CreateDecompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *decompressor)
void FreeAll()
Definition: mem-pool.cc:73
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t *input, int64_t *output_length, uint8_t **output)=0
Process a block of data, either compressing or decompressing it.
ObjectPool pool
uint8_t input_[2 *26 *1024]
virtual Status ProcessBlockStreaming(int64_t input_length, const uint8_t *input, int64_t *input_bytes_read, int64_t *output_length, uint8_t **output, bool *eos)
Definition: codec.h:117
This class is thread-safe.
Definition: mem-tracker.h:61
void RunTest(THdfsCompression::type format)
void CompressAndDecompress(Codec *compressor, Codec *decompressor, int64_t input_len, uint8_t *input)
uint64_t Test(T *ht, const ProbeTuple *input, uint64_t num_tuples)
virtual int64_t MaxOutputLen(int64_t input_len, const uint8_t *input=NULL)=0
int main(int argc, char **argv)
void CompressAndDecompressNoOutputAllocated(Codec *compressor, Codec *decompressor, int64_t input_len, uint8_t *input)
void CompressAndStreamingDecompress(Codec *compressor, Codec *decompressor, int64_t input_len, uint8_t *input)
bool ok() const
Definition: status.h:172
uint8_t input_streaming_[32 *1024 *1024+1]
uint8_t * Allocate(int size)
Definition: mem-pool.h:92