Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
codec.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "util/codec.h"
16 
17 #include <boost/assign/list_of.hpp>
18 #include <limits> // for std::numeric_limits
19 #include <gutil/strings/substitute.h>
20 
21 #include "util/compress.h"
22 #include "util/decompress.h"
23 
24 #include "gen-cpp/CatalogObjects_types.h"
25 #include "gen-cpp/CatalogObjects_constants.h"
26 
27 #include "common/names.h"
28 
29 using boost::assign::map_list_of;
30 using namespace impala;
31 using namespace strings;
32 
33 const char* const Codec::DEFAULT_COMPRESSION =
34  "org.apache.hadoop.io.compress.DefaultCodec";
35 const char* const Codec::GZIP_COMPRESSION = "org.apache.hadoop.io.compress.GzipCodec";
36 const char* const Codec::BZIP2_COMPRESSION = "org.apache.hadoop.io.compress.BZip2Codec";
37 const char* const Codec::SNAPPY_COMPRESSION = "org.apache.hadoop.io.compress.SnappyCodec";
38 const char* const Codec::UNKNOWN_CODEC_ERROR =
39  "This compression codec is currently unsupported: ";
40 const char* const NO_LZO_MSG = "LZO codecs may not be created via the Codec interface. "
41  "Instead the LZO library is directly invoked.";
42 
43 const Codec::CodecMap Codec::CODEC_MAP = map_list_of
44  ("", THdfsCompression::NONE)
45  (DEFAULT_COMPRESSION, THdfsCompression::DEFAULT)
46  (GZIP_COMPRESSION, THdfsCompression::GZIP)
47  (BZIP2_COMPRESSION, THdfsCompression::BZIP2)
48  (SNAPPY_COMPRESSION, THdfsCompression::SNAPPY_BLOCKED);
49 
50 string Codec::GetCodecName(THdfsCompression::type type) {
51  BOOST_FOREACH(const CodecMap::value_type& codec,
52  g_CatalogObjects_constants.COMPRESSION_MAP) {
53  if (codec.second == type) return codec.first;
54  }
55  DCHECK(false) << "Missing codec in COMPRESSION_MAP: " << type;
56  return "INVALID";
57 }
58 
59 Status Codec::GetHadoopCodecClassName(THdfsCompression::type type, string* out_name) {
60  BOOST_FOREACH(const CodecMap::value_type& codec, CODEC_MAP) {
61  if (codec.second == type) {
62  out_name->assign(codec.first);
63  return Status::OK;
64  }
65  }
66  return Status(Substitute("Unsupported codec for given file type: $0",
67  _THdfsCompression_VALUES_TO_NAMES.find(type)->second));
68 }
69 
70 Status Codec::CreateCompressor(MemPool* mem_pool, bool reuse, const string& codec,
71  scoped_ptr<Codec>* compressor) {
72  CodecMap::const_iterator type = CODEC_MAP.find(codec);
73  if (type == CODEC_MAP.end()) {
74  return Status(Substitute("$0$1", UNKNOWN_CODEC_ERROR, codec));
75  }
76 
78  CreateCompressor(mem_pool, reuse, type->second, compressor));
79  return Status::OK;
80 }
81 
82 Status Codec::CreateCompressor(MemPool* mem_pool, bool reuse,
83  THdfsCompression::type format, scoped_ptr<Codec>* compressor) {
84  switch (format) {
85  case THdfsCompression::NONE:
86  compressor->reset(NULL);
87  return Status::OK;
88  case THdfsCompression::GZIP:
89  compressor->reset(new GzipCompressor(GzipCompressor::GZIP, mem_pool, reuse));
90  break;
91  case THdfsCompression::DEFAULT:
92  compressor->reset(new GzipCompressor(GzipCompressor::ZLIB, mem_pool, reuse));
93  break;
94  case THdfsCompression::DEFLATE:
95  compressor->reset(new GzipCompressor(GzipCompressor::DEFLATE, mem_pool, reuse));
96  break;
97  case THdfsCompression::BZIP2:
98  compressor->reset(new BzipCompressor(mem_pool, reuse));
99  break;
100  case THdfsCompression::SNAPPY_BLOCKED:
101  compressor->reset(new SnappyBlockCompressor(mem_pool, reuse));
102  break;
103  case THdfsCompression::SNAPPY:
104  compressor->reset(new SnappyCompressor(mem_pool, reuse));
105  break;
106  case THdfsCompression::LZ4:
107  compressor->reset(new Lz4Compressor(mem_pool, reuse));
108  break;
109  default: {
110  if (format == THdfsCompression::LZO) return Status(NO_LZO_MSG);
111  return Status(Substitute("Unsupported codec: $0", format));
112  }
113  }
114 
115  return (*compressor)->Init();
116 }
117 
118 Status Codec::CreateDecompressor(MemPool* mem_pool, bool reuse, const string& codec,
119  scoped_ptr<Codec>* decompressor) {
120  CodecMap::const_iterator type = CODEC_MAP.find(codec);
121  if (type == CODEC_MAP.end()) {
122  return Status(Substitute("$0$1", UNKNOWN_CODEC_ERROR, codec));
123  }
124 
126  CreateDecompressor(mem_pool, reuse, type->second, decompressor));
127  return Status::OK;
128 }
129 
130 Status Codec::CreateDecompressor(MemPool* mem_pool, bool reuse,
131  THdfsCompression::type format, scoped_ptr<Codec>* decompressor) {
132  switch (format) {
133  case THdfsCompression::NONE:
134  decompressor->reset(NULL);
135  return Status::OK;
136  case THdfsCompression::DEFAULT:
137  case THdfsCompression::GZIP:
138  decompressor->reset(new GzipDecompressor(mem_pool, reuse, false));
139  break;
140  case THdfsCompression::DEFLATE:
141  decompressor->reset(new GzipDecompressor(mem_pool, reuse, true));
142  break;
143  case THdfsCompression::BZIP2:
144  decompressor->reset(new BzipDecompressor(mem_pool, reuse));
145  break;
146  case THdfsCompression::SNAPPY_BLOCKED:
147  decompressor->reset(new SnappyBlockDecompressor(mem_pool, reuse));
148  break;
149  case THdfsCompression::SNAPPY:
150  decompressor->reset(new SnappyDecompressor(mem_pool, reuse));
151  break;
152  case THdfsCompression::LZ4:
153  decompressor->reset(new Lz4Decompressor(mem_pool, reuse));
154  break;
155  default: {
156  if (format == THdfsCompression::LZO) return Status(NO_LZO_MSG);
157  return Substitute("Unsupported codec: $0", format);
158  }
159  }
160 
161  return (*decompressor)->Init();
162 }
163 
164 Codec::Codec(MemPool* mem_pool, bool reuse_buffer)
165  : memory_pool_(mem_pool),
166  reuse_buffer_(reuse_buffer),
167  out_buffer_(NULL),
168  buffer_length_(0) {
169  if (memory_pool_ != NULL) {
171  }
172 }
173 
174 void Codec::Close() {
175  if (temp_memory_pool_.get() != NULL) {
176  DCHECK(memory_pool_ != NULL);
178  }
179 }
180 
181 Status Codec::ProcessBlock32(bool output_preallocated, int input_length,
182  const uint8_t* input, int* output_length, uint8_t** output) {
183  int64_t input_len64 = input_length;
184  int64_t output_len64 = *output_length;
185  RETURN_IF_ERROR(ProcessBlock(output_preallocated, input_len64, input, &output_len64,
186  output));
187  // Check whether we are going to have an overflow if we are going to cast from int64_t
188  // to int.
189  // TODO: Is there a faster way to do this check?
190  if (UNLIKELY(output_len64 > numeric_limits<int>::max())) {
191  return Status(Substitute("Arithmetic overflow in codec function. Output length is $0",
192  output_len64));;
193  }
194  *output_length = static_cast<int32_t>(output_len64);
195  return Status::OK;
196 }
static const CodecMap CODEC_MAP
Definition: codec.h:52
static Status CreateCompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *compressor)
static const char *const BZIP2_COMPRESSION
Definition: codec.h:46
Codec(MemPool *mem_pool, bool reuse_buffer)
Definition: codec.cc:164
static Status CreateDecompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *decompressor)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
void AcquireData(MemPool *src, bool keep_current)
Definition: mem-pool.cc:161
static const char *const DEFAULT_COMPRESSION
These are the codec string representations used in Hadoop.
Definition: codec.h:44
static std::string GetCodecName(THdfsCompression::type)
Return the name of a compression algorithm.
Definition: codec.cc:50
static const char *const GZIP_COMPRESSION
Definition: codec.h:45
MemTracker * mem_tracker()
Definition: mem-pool.h:151
static Status GetHadoopCodecClassName(THdfsCompression::type, std::string *out_name)
Returns the java class name for the given compression type.
Definition: codec.cc:59
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t *input, int64_t *output_length, uint8_t **output)=0
Process a block of data, either compressing or decompressing it.
static const char *const SNAPPY_COMPRESSION
Definition: codec.h:47
static const char *const UNKNOWN_CODEC_ERROR
Definition: codec.h:48
Status ProcessBlock32(bool output_preallocated, int input_length, const uint8_t *input, int *output_length, uint8_t **output)
Definition: codec.cc:181
#define UNLIKELY(expr)
Definition: compiler-util.h:33
static const Status OK
Definition: status.h:87
boost::scoped_ptr< MemPool > temp_memory_pool_
Definition: codec.h:158
std::map< const std::string, const THdfsCompression::type > CodecMap
Map from codec string to compression format.
Definition: codec.h:51
const char *const NO_LZO_MSG
Definition: codec.cc:40
virtual void Close()
Must be called on codec before destructor for final cleanup.
Definition: codec.cc:174
MemPool * memory_pool_
Pool to allocate the buffer to hold transformed data.
Definition: codec.h:154