17 #include <boost/assign/list_of.hpp>
19 #include <gutil/strings/substitute.h>
24 #include "gen-cpp/CatalogObjects_types.h"
25 #include "gen-cpp/CatalogObjects_constants.h"
29 using boost::assign::map_list_of;
30 using namespace impala;
31 using namespace strings;
34 "org.apache.hadoop.io.compress.DefaultCodec";
39 "This compression codec is currently unsupported: ";
40 const char*
const NO_LZO_MSG =
"LZO codecs may not be created via the Codec interface. "
41 "Instead the LZO library is directly invoked.";
44 (
"", THdfsCompression::NONE)
45 (DEFAULT_COMPRESSION, THdfsCompression::DEFAULT)
46 (GZIP_COMPRESSION, THdfsCompression::GZIP)
47 (BZIP2_COMPRESSION, THdfsCompression::BZIP2)
48 (SNAPPY_COMPRESSION, THdfsCompression::SNAPPY_BLOCKED);
51 BOOST_FOREACH(
const CodecMap::value_type& codec,
52 g_CatalogObjects_constants.COMPRESSION_MAP) {
53 if (codec.second == type)
return codec.first;
55 DCHECK(
false) <<
"Missing codec in COMPRESSION_MAP: " << type;
60 BOOST_FOREACH(
const CodecMap::value_type& codec, CODEC_MAP) {
61 if (codec.second == type) {
62 out_name->assign(codec.first);
66 return Status(Substitute(
"Unsupported codec for given file type: $0",
67 _THdfsCompression_VALUES_TO_NAMES.find(type)->second));
71 scoped_ptr<Codec>* compressor) {
72 CodecMap::const_iterator type = CODEC_MAP.find(codec);
73 if (type == CODEC_MAP.end()) {
74 return Status(Substitute(
"$0$1", UNKNOWN_CODEC_ERROR, codec));
78 CreateCompressor(mem_pool, reuse, type->second, compressor));
83 THdfsCompression::type format, scoped_ptr<Codec>* compressor) {
85 case THdfsCompression::NONE:
86 compressor->reset(NULL);
88 case THdfsCompression::GZIP:
91 case THdfsCompression::DEFAULT:
94 case THdfsCompression::DEFLATE:
97 case THdfsCompression::BZIP2:
100 case THdfsCompression::SNAPPY_BLOCKED:
103 case THdfsCompression::SNAPPY:
106 case THdfsCompression::LZ4:
111 return Status(Substitute(
"Unsupported codec: $0", format));
115 return (*compressor)->Init();
119 scoped_ptr<Codec>* decompressor) {
120 CodecMap::const_iterator type = CODEC_MAP.find(codec);
121 if (type == CODEC_MAP.end()) {
122 return Status(Substitute(
"$0$1", UNKNOWN_CODEC_ERROR, codec));
126 CreateDecompressor(mem_pool, reuse, type->second, decompressor));
131 THdfsCompression::type format, scoped_ptr<Codec>* decompressor) {
133 case THdfsCompression::NONE:
134 decompressor->reset(NULL);
136 case THdfsCompression::DEFAULT:
137 case THdfsCompression::GZIP:
140 case THdfsCompression::DEFLATE:
143 case THdfsCompression::BZIP2:
146 case THdfsCompression::SNAPPY_BLOCKED:
149 case THdfsCompression::SNAPPY:
152 case THdfsCompression::LZ4:
157 return Substitute(
"Unsupported codec: $0", format);
161 return (*decompressor)->Init();
165 : memory_pool_(mem_pool),
166 reuse_buffer_(reuse_buffer),
182 const uint8_t* input,
int* output_length, uint8_t** output) {
183 int64_t input_len64 = input_length;
184 int64_t output_len64 = *output_length;
190 if (
UNLIKELY(output_len64 > numeric_limits<int>::max())) {
191 return Status(Substitute(
"Arithmetic overflow in codec function. Output length is $0",
194 *output_length =
static_cast<int32_t
>(output_len64);
static const CodecMap CODEC_MAP
static Status CreateCompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *compressor)
static const char *const BZIP2_COMPRESSION
Codec(MemPool *mem_pool, bool reuse_buffer)
static Status CreateDecompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *decompressor)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
void AcquireData(MemPool *src, bool keep_current)
static const char *const DEFAULT_COMPRESSION
These are the codec string representations used in Hadoop.
static std::string GetCodecName(THdfsCompression::type)
Return the name of a compression algorithm.
static const char *const GZIP_COMPRESSION
MemTracker * mem_tracker()
static Status GetHadoopCodecClassName(THdfsCompression::type, std::string *out_name)
Returns the java class name for the given compression type.
virtual Status ProcessBlock(bool output_preallocated, int64_t input_length, const uint8_t *input, int64_t *output_length, uint8_t **output)=0
Process a block of data, either compressing or decompressing it.
static const char *const SNAPPY_COMPRESSION
static const char *const UNKNOWN_CODEC_ERROR
Status ProcessBlock32(bool output_preallocated, int input_length, const uint8_t *input, int *output_length, uint8_t **output)
boost::scoped_ptr< MemPool > temp_memory_pool_
std::map< const std::string, const THdfsCompression::type > CodecMap
Map from codec string to compression format.
const char *const NO_LZO_MSG
virtual void Close()
Must be called on codec before destructor for final cleanup.
MemPool * memory_pool_
Pool to allocate the buffer to hold transformed data.