18 #include <gflags/gflags.h>
20 #include "gen-cpp/parquet_types.h"
24 #define SIGNED_RIGHT_SHIFT_IS 1
25 #define ARITHMETIC_RIGHT_SHIFT 1
26 #pragma clang diagnostic push
27 #pragma clang diagnostic ignored "-Wstring-plus-int"
28 #include <thrift/protocol/TCompactProtocol.h>
29 #include <thrift/protocol/TBinaryProtocol.h>
30 #include <thrift/protocol/TDebugProtocol.h>
31 #include <thrift/TApplicationException.h>
32 #include <thrift/transport/TBufferTransports.h>
33 #pragma clang diagnostic pop
40 DEFINE_bool(output_page_header,
false,
"If true, output page headers to stderr.");
43 using namespace apache::thrift::transport;
44 using namespace apache::thrift;
45 using namespace parquet;
52 shared_ptr<TMemoryBuffer> mem,
bool compact) {
54 TCompactProtocolFactoryT<TMemoryBuffer> tproto_factory;
55 return tproto_factory.getProtocol(mem);
57 TBinaryProtocolFactoryT<TMemoryBuffer> tproto_factory;
58 return tproto_factory.getProtocol(mem);
67 T* deserialized_msg) {
69 shared_ptr<TMemoryBuffer> tmem_transport(
new TMemoryBuffer(buf, *len));
72 deserialized_msg->read(tproto.get());
73 }
catch (apache::thrift::protocol::TProtocolException& e) {
74 cerr <<
"couldn't deserialize thrift msg:\n" << e.what() << endl;
77 uint32_t bytes_left = tmem_transport->available_read();
78 *len = *len - bytes_left;
94 case Type::BYTE_ARRAY:
102 int*
idx, stringstream* ss) {
103 for (
int i = 0; i < level; ++i) {
106 (*ss) << schema[*
idx].name;
107 if (schema[*idx].__isset.type) (*ss) <<
" " <<
TypeMapping(schema[*idx].type);
110 int num_children = schema[*
idx].num_children;
112 for (
int i = 0; i < num_children; ++i) {
118 const vector<SchemaElement>& schema = md.schema;
119 if (schema.empty())
return "Invalid schema";
120 int num_root_elements = schema[0].num_children;
123 for (
int i = 0; i < num_root_elements; ++i) {
136 int main(
int argc,
char** argv) {
137 google::ParseCommandLineFlags(&argc, &argv,
true);
139 if (FLAGS_file.size() == 0) {
140 cout <<
"Must specify input file." << endl;
144 FILE* file = fopen(FLAGS_file.c_str(),
"r");
145 assert(file != NULL);
147 fseek(file, 0L, SEEK_END);
148 size_t file_len = ftell(file);
149 fseek(file, 0L, SEEK_SET);
151 cerr <<
"File Length: " << file_len << endl;
153 uint8_t* buffer =
reinterpret_cast<uint8_t*
>(malloc(file_len));
154 size_t bytes_read = fread(buffer, 1, file_len, file);
155 assert(bytes_read == file_len);
164 uint8_t* metadata_len_ptr =
166 uint32_t metadata_len = *
reinterpret_cast<uint32_t*
>(metadata_len_ptr);
167 cerr <<
"Metadata len: " << metadata_len << endl;
169 uint8_t* metadata = metadata_len_ptr - metadata_len;
171 FileMetaData file_metadata;
174 cerr << ThriftDebugString(file_metadata) << endl;
175 cerr <<
"Schema: " << endl <<
GetSchema(file_metadata) << endl;
177 int pages_skipped = 0;
180 int total_page_header_size = 0;
181 int total_compressed_data_size = 0;
182 int total_uncompressed_data_size = 0;
183 vector<int> column_sizes;
186 vector<char> decompression_buffer;
188 for (
int i = 0; i < file_metadata.row_groups.size(); ++i) {
189 cerr <<
"Reading row group " << i << endl;
190 RowGroup& rg = file_metadata.row_groups[i];
191 column_sizes.resize(rg.columns.size());
193 for (
int c = 0; c < rg.columns.size(); ++c) {
194 cerr <<
" Reading column " << c << endl;
195 ColumnChunk& col = rg.columns[c];
197 int first_page_offset = col.meta_data.data_page_offset;
198 if (col.meta_data.__isset.dictionary_page_offset) {
199 first_page_offset = ::min(first_page_offset,
200 (
int)col.meta_data.dictionary_page_offset);
202 uint8_t* data = buffer + first_page_offset;
203 uint8_t* col_end = data + col.meta_data.total_compressed_size;
206 while (data < col_end) {
207 uint32_t header_size = file_len - col.file_offset;
211 if (FLAGS_output_page_header) {
212 cerr << ThriftDebugString(header) << endl;
216 total_page_header_size += header_size;
217 column_sizes[c] += header.compressed_page_size;
218 total_compressed_data_size += header.compressed_page_size;
219 total_uncompressed_data_size += header.uncompressed_page_size;
220 data += header.compressed_page_size;
224 assert(data == col_end);
227 double compression_ratio =
228 (double)total_uncompressed_data_size / total_compressed_data_size;
231 <<
" Rows: " << num_rows << endl
232 <<
" Read pages: " << pages_read << endl
233 <<
" Skipped pages: " << pages_skipped << endl
234 <<
" Metadata size: " << metadata_len
235 <<
"(" << (metadata_len / (double)file_len) <<
")" << endl
236 <<
" Total page header size: " << total_page_header_size
237 <<
"(" << (total_page_header_size / (double)file_len) <<
")" << endl;
238 ss <<
" Column compressed size: " << total_compressed_data_size
239 <<
"(" << (total_compressed_data_size / (double)file_len) <<
")" << endl;
240 ss <<
" Column uncompressed size: " << total_uncompressed_data_size
241 <<
"(" << compression_ratio <<
")" << endl;
242 for (
int i = 0; i < column_sizes.size(); ++i) {
243 ss <<
" " <<
"Col " << i <<
": " << column_sizes[i]
244 <<
"(" << (column_sizes[i] / (double)file_len) <<
")" << endl;
246 cerr << ss.str() << endl;
DEFINE_bool(output_page_header, false,"If true, output page headers to stderr.")
DEFINE_string(file,"","File to read")
const StringSearch UrlParser::protocol_search & protocol
int main(int argc, char **argv)
shared_ptr< TProtocol > CreateDeserializeProtocol(shared_ptr< TMemoryBuffer > mem, bool compact)
string GetSchema(const FileMetaData &md)
void AppendSchema(const vector< SchemaElement > &schema, int level, int *idx, stringstream *ss)
const uint8_t PARQUET_VERSION_NUMBER[]
string TypeMapping(Type::type t)
bool DeserializeThriftMsg(uint8_t *buf, uint32_t *len, bool compact, T *deserialized_msg)