Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
parquet-reader.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <iostream>
16 #include <sstream>
17 #include <vector>
18 #include <gflags/gflags.h>
19 #include <snappy.h>
20 #include "gen-cpp/parquet_types.h"
21 
22 // TCompactProtocol requires some #defines to work right.
23 // TODO: is there a better include to use?
24 #define SIGNED_RIGHT_SHIFT_IS 1
25 #define ARITHMETIC_RIGHT_SHIFT 1
26 #pragma clang diagnostic push
27 #pragma clang diagnostic ignored "-Wstring-plus-int"
28 #include <thrift/protocol/TCompactProtocol.h>
29 #include <thrift/protocol/TBinaryProtocol.h>
30 #include <thrift/protocol/TDebugProtocol.h>
31 #include <thrift/TApplicationException.h>
32 #include <thrift/transport/TBufferTransports.h>
33 #pragma clang diagnostic pop
34 
35 #include "util/rle-encoding.h"
36 
37 #include "common/names.h"
38 
39 DEFINE_string(file, "", "File to read");
40 DEFINE_bool(output_page_header, false, "If true, output page headers to stderr.");
41 
42 using namespace apache::thrift::protocol;
43 using namespace apache::thrift::transport;
44 using namespace apache::thrift;
45 using namespace parquet;
46 using std::min;
47 
48 // Some code is replicated to make this more stand-alone.
49 const uint8_t PARQUET_VERSION_NUMBER[] = {'P', 'A', 'R', '1'};
50 
51 shared_ptr<TProtocol> CreateDeserializeProtocol(
52  shared_ptr<TMemoryBuffer> mem, bool compact) {
53  if (compact) {
54  TCompactProtocolFactoryT<TMemoryBuffer> tproto_factory;
55  return tproto_factory.getProtocol(mem);
56  } else {
57  TBinaryProtocolFactoryT<TMemoryBuffer> tproto_factory;
58  return tproto_factory.getProtocol(mem);
59  }
60 }
61 
62 // Deserialize a thrift message from buf/len. buf/len must at least contain
63 // all the bytes needed to store the thrift message. On return, len will be
64 // set to the actual length of the header.
65 template <class T>
66 bool DeserializeThriftMsg(uint8_t* buf, uint32_t* len, bool compact,
67  T* deserialized_msg) {
68  // Deserialize msg bytes into c++ thrift msg using memory transport.
69  shared_ptr<TMemoryBuffer> tmem_transport(new TMemoryBuffer(buf, *len));
70  shared_ptr<TProtocol> tproto = CreateDeserializeProtocol(tmem_transport, compact);
71  try {
72  deserialized_msg->read(tproto.get());
73  } catch (apache::thrift::protocol::TProtocolException& e) {
74  cerr << "couldn't deserialize thrift msg:\n" << e.what() << endl;
75  return false;
76  }
77  uint32_t bytes_left = tmem_transport->available_read();
78  *len = *len - bytes_left;
79  return true;
80 }
81 
82 string TypeMapping(Type::type t) {
83  switch (t) {
84  case Type::BOOLEAN:
85  return "BOOLEAN";
86  case Type::INT32:
87  return "INT32";
88  case Type::INT64:
89  return "INT64";
90  case Type::FLOAT:
91  return "FLOAT";
92  case Type::DOUBLE:
93  return "DOUBLE";
94  case Type::BYTE_ARRAY:
95  return "BYTE_ARRAY";
96  default:
97  return "UNKNOWN";
98  }
99 }
100 
101 void AppendSchema(const vector<SchemaElement>& schema, int level,
102  int* idx, stringstream* ss) {
103  for (int i = 0; i < level; ++i) {
104  (*ss) << " ";
105  }
106  (*ss) << schema[*idx].name;
107  if (schema[*idx].__isset.type) (*ss) << " " << TypeMapping(schema[*idx].type);
108  (*ss) << endl;
109 
110  int num_children = schema[*idx].num_children;
111  ++*idx;
112  for (int i = 0; i < num_children; ++i) {
113  AppendSchema(schema, level + 1, idx, ss);
114  }
115 }
116 
117 string GetSchema(const FileMetaData& md) {
118  const vector<SchemaElement>& schema = md.schema;
119  if (schema.empty()) return "Invalid schema";
120  int num_root_elements = schema[0].num_children;
121  stringstream ss;
122  int idx = 1;
123  for (int i = 0; i < num_root_elements; ++i) {
124  AppendSchema(schema, 0, &idx, &ss);
125  }
126  return ss.str();
127 }
128 
129 // Simple utility to read parquet files on local disk. This utility validates the
130 // file is correctly formed and can output values from each data page. The
131 // entire file is buffered in memory so this is not suitable for very large files.
132 // We expect the table to be split into multiple parquet files, each the size of
133 // a HDFS block, in which case this utility will be able to able to handle it.
134 // cerr is used to output diagnostics of the file
135 // cout is used to output converted data (in csv)
136 int main(int argc, char** argv) {
137  google::ParseCommandLineFlags(&argc, &argv, true);
138 
139  if (FLAGS_file.size() == 0) {
140  cout << "Must specify input file." << endl;
141  return -1;
142  }
143 
144  FILE* file = fopen(FLAGS_file.c_str(), "r");
145  assert(file != NULL);
146 
147  fseek(file, 0L, SEEK_END);
148  size_t file_len = ftell(file);
149  fseek(file, 0L, SEEK_SET);
150 
151  cerr << "File Length: " << file_len << endl;
152 
153  uint8_t* buffer = reinterpret_cast<uint8_t*>(malloc(file_len));
154  size_t bytes_read = fread(buffer, 1, file_len, file);
155  assert(bytes_read == file_len);
156 
157  // Check file starts and ends with magic bytes
158  assert(
159  memcmp(buffer, PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)) == 0);
160  assert(memcmp(buffer + file_len - sizeof(PARQUET_VERSION_NUMBER),
162 
163  // Get metadata
164  uint8_t* metadata_len_ptr =
165  buffer + file_len - sizeof(PARQUET_VERSION_NUMBER) - sizeof(uint32_t);
166  uint32_t metadata_len = *reinterpret_cast<uint32_t*>(metadata_len_ptr);
167  cerr << "Metadata len: " << metadata_len << endl;
168 
169  uint8_t* metadata = metadata_len_ptr - metadata_len;
170 
171  FileMetaData file_metadata;
172  bool status = DeserializeThriftMsg(metadata, &metadata_len, true, &file_metadata);
173  assert(status);
174  cerr << ThriftDebugString(file_metadata) << endl;
175  cerr << "Schema: " << endl << GetSchema(file_metadata) << endl;
176 
177  int pages_skipped = 0;
178  int pages_read = 0;
179  int num_rows = 0;
180  int total_page_header_size = 0;
181  int total_compressed_data_size = 0;
182  int total_uncompressed_data_size = 0;
183  vector<int> column_sizes;
184 
185  // Buffer to decompress data into. Reused across pages.
186  vector<char> decompression_buffer;
187 
188  for (int i = 0; i < file_metadata.row_groups.size(); ++i) {
189  cerr << "Reading row group " << i << endl;
190  RowGroup& rg = file_metadata.row_groups[i];
191  column_sizes.resize(rg.columns.size());
192 
193  for (int c = 0; c < rg.columns.size(); ++c) {
194  cerr << " Reading column " << c << endl;
195  ColumnChunk& col = rg.columns[c];
196 
197  int first_page_offset = col.meta_data.data_page_offset;
198  if (col.meta_data.__isset.dictionary_page_offset) {
199  first_page_offset = ::min(first_page_offset,
200  (int)col.meta_data.dictionary_page_offset);
201  }
202  uint8_t* data = buffer + first_page_offset;
203  uint8_t* col_end = data + col.meta_data.total_compressed_size;
204 
205  // Loop through the entire column chunk. This lets us walk all the pages.
206  while (data < col_end) {
207  uint32_t header_size = file_len - col.file_offset;
208  PageHeader header;
209  status = DeserializeThriftMsg(data, &header_size, true, &header);
210  assert(status);
211  if (FLAGS_output_page_header) {
212  cerr << ThriftDebugString(header) << endl;
213  }
214 
215  data += header_size;
216  total_page_header_size += header_size;
217  column_sizes[c] += header.compressed_page_size;
218  total_compressed_data_size += header.compressed_page_size;
219  total_uncompressed_data_size += header.uncompressed_page_size;
220  data += header.compressed_page_size;
221  ++pages_read;
222  }
223  // Check that we ended exactly where we should have
224  assert(data == col_end);
225  }
226  }
227  double compression_ratio =
228  (double)total_uncompressed_data_size / total_compressed_data_size;
229  stringstream ss;
230  ss << "\nSummary:\n"
231  << " Rows: " << num_rows << endl
232  << " Read pages: " << pages_read << endl
233  << " Skipped pages: " << pages_skipped << endl
234  << " Metadata size: " << metadata_len
235  << "(" << (metadata_len / (double)file_len) << ")" << endl
236  << " Total page header size: " << total_page_header_size
237  << "(" << (total_page_header_size / (double)file_len) << ")" << endl;
238  ss << " Column compressed size: " << total_compressed_data_size
239  << "(" << (total_compressed_data_size / (double)file_len) << ")" << endl;
240  ss << " Column uncompressed size: " << total_uncompressed_data_size
241  << "(" << compression_ratio << ")" << endl;
242  for (int i = 0; i < column_sizes.size(); ++i) {
243  ss << " " << "Col " << i << ": " << column_sizes[i]
244  << "(" << (column_sizes[i] / (double)file_len) << ")" << endl;
245  }
246  cerr << ss.str() << endl;
247 
248  return 0;
249 }
DEFINE_bool(output_page_header, false,"If true, output page headers to stderr.")
DEFINE_string(file,"","File to read")
const StringSearch UrlParser::protocol_search & protocol
Definition: url-parser.cc:36
int main(int argc, char **argv)
shared_ptr< TProtocol > CreateDeserializeProtocol(shared_ptr< TMemoryBuffer > mem, bool compact)
string GetSchema(const FileMetaData &md)
void AppendSchema(const vector< SchemaElement > &schema, int level, int *idx, stringstream *ss)
const uint8_t PARQUET_VERSION_NUMBER[]
string TypeMapping(Type::type t)
bool DeserializeThriftMsg(uint8_t *buf, uint32_t *len, bool compact, T *deserialized_msg)