Using the Parquet File Format with Impala Tables
Impala allows you to create, manage, and query Parquet tables. Parquet is a
column-oriented binary file format intended to be highly efficient for the types of
large-scale queries that Impala is best at. Parquet is especially good for queries
scanning particular columns within a table, for example, to query "wide" tables with
many columns, or to perform aggregation operations such as SUM()
and
AVG()
that need to process most or all of the values from a column. Each
Parquet data file written by Impala contains the values for a set of rows (referred to as
the "row group"). Within a data file, the values from each column are organized so
that they are all adjacent, enabling good compression for the values from that column.
Queries against a Parquet table can retrieve and analyze these values from any column
quickly and with minimal I/O.
See How Impala Works with Hadoop File Formats for the summary of Parquet format support.
Creating Parquet Tables in Impala
To create a table named PARQUET_TABLE
that uses the Parquet format, you
would use a command like the following, substituting your own table name, column names,
and data types:
[impala-host:21000] > create table parquet_table_name (x INT, y STRING) STORED AS PARQUET;
Or, to clone the column names and data types of an existing table:
[impala-host:21000] > create table parquet_table_name LIKE other_table_name STORED AS PARQUET;
In Impala 1.4.0 and higher, you can derive column definitions from a raw Parquet data file, even without an existing Impala table. For example, you can create an external table pointing to an HDFS directory, and base the column definitions on one of the files in that directory:
CREATE EXTERNAL TABLE ingest_existing_files LIKE PARQUET '/user/etl/destination/datafile1.dat'
STORED AS PARQUET
LOCATION '/user/etl/destination';
Or, you can refer to an existing data file and create a new empty table with suitable
column definitions. Then you can use INSERT
to create new data files or
LOAD DATA
to transfer existing data files into the new table.
CREATE TABLE columns_from_data_file LIKE PARQUET '/user/etl/destination/datafile1.dat'
STORED AS PARQUET;
The default properties of the newly created table are the same as for any other
CREATE TABLE
statement. For example, the default file format is text;
if you want the new table to use the Parquet file format, include the STORED AS
PARQUET
file also.
In this example, the new table is partitioned by year, month, and day. These partition
key columns are not part of the data file, so you specify them in the CREATE
TABLE
statement:
CREATE TABLE columns_from_data_file LIKE PARQUET '/user/etl/destination/datafile1.dat'
PARTITION (year INT, month TINYINT, day TINYINT)
STORED AS PARQUET;
See CREATE TABLE Statement for more details about the
CREATE TABLE LIKE PARQUET
syntax.
Once you have created a table, to insert data into that table, use a command similar to the following, again with your own table names:
[impala-host:21000] > insert overwrite table parquet_table_name select * from other_table_name;
If the Parquet table has a different number of columns or different column names than
the other table, specify the names of columns from the other table rather than
*
in the SELECT
statement.
Loading Data into Parquet Tables
Choose from the following techniques for loading data into Parquet tables, depending on whether the original data is already in an Impala table, or exists as raw data files outside Impala.
If you already have data in an Impala or Hive table, perhaps in a different file format
or partitioning scheme, you can transfer the data to a Parquet table using the Impala
INSERT...SELECT
syntax. You can convert, filter, repartition, and do
other things to the data as part of this same INSERT
statement. See
Compressions for Parquet Data Files for some examples showing how to insert
data into Parquet tables.
When inserting into partitioned tables, especially using the Parquet file format, you
can include a hint in the INSERT
statement to fine-tune the overall
performance of the operation and its resource usage. See Optimizer Hints for
using hints in the INSERT
statements.
Any INSERT
statement for a Parquet table requires enough free space in
the HDFS filesystem to write one block. Because Parquet data files use a block size of 1
GB by default, an INSERT
might fail (even for a very small amount of
data) if your HDFS is running low on space.
Avoid the INSERT...VALUES
syntax for Parquet tables, because
INSERT...VALUES
produces a separate tiny data file for each
INSERT...VALUES
statement, and the strength of Parquet is in its
handling of data (compressing, parallelizing, and so on) in
large chunks.
If you have one or more Parquet data files produced outside of Impala, you can quickly make the data queryable through Impala by one of the following methods:
-
The
LOAD DATA
statement moves a single data file or a directory full of data files into the data directory for an Impala table. It does no validation or conversion of the data. The original data files must be somewhere in HDFS, not the local filesystem. -
The
CREATE TABLE
statement with theLOCATION
clause creates a table where the data continues to reside outside the Impala data directory. The original data files must be somewhere in HDFS, not the local filesystem. For extra safety, if the data is intended to be long-lived and reused by other applications, you can use theCREATE EXTERNAL TABLE
syntax so that the data files are not deleted by an ImpalaDROP TABLE
statement. -
If the Parquet table already exists, you can copy Parquet data files directly into it,
then use the
REFRESH
statement to make Impala recognize the newly added data. Remember to preserve the block size of the Parquet data files by using thehadoop distcp -pb
command rather than a-put
or-cp
operation on the Parquet files. See Example of Copying Parquet Data Files for an example of this kind of operation.
Currently, Impala always decodes the column data in Parquet files based on the ordinal position of the columns, not by looking up the position of each column based on its name. Parquet files produced outside of Impala must write column data in the same order as the columns are declared in the Impala table. Any optional columns that are omitted from the data files must be the rightmost columns in the Impala table definition.
If you created compressed Parquet files through some tool other than Impala, make sure
that any compression codecs are supported in Parquet by Impala. For example, Impala
does not currently support LZO compression in Parquet files. Also doublecheck that you
used any recommended compatibility settings in the other tool, such as
spark.sql.parquet.binaryAsString
when writing Parquet files through
Spark.
Recent versions of Sqoop can produce Parquet output files using the
--as-parquetfile
option.
If the data exists outside Impala and is in some other format, combine both of the
preceding techniques. First, use a LOAD DATA
or CREATE EXTERNAL
TABLE ... LOCATION
statement to bring the data into an Impala table that uses
the appropriate file format. Then, use an INSERT...SELECT
statement to
copy the data to the Parquet table, converting to Parquet format as part of the process.
Loading data into Parquet tables is a memory-intensive operation, because the incoming data is buffered until it reaches one data block in size, then that chunk of data is organized and compressed in memory before being written out. The memory consumption can be larger when inserting data into partitioned Parquet tables, because a separate data file is written for each combination of partition key column values, potentially requiring several large chunks to be manipulated in memory at once.
When inserting into a partitioned Parquet table, Impala redistributes the data among the
nodes to reduce memory consumption. You might still need to temporarily increase the
memory dedicated to Impala during the insert operation, or break up the load operation
into several INSERT
statements, or both.
CREATE TABLE AS SELECT
or INSERT ...
SELECT
to reorder or rename columns, divide the data among multiple partitions,
and so on. For example to take a single comprehensive Parquet data file and load it into
a partitioned table, you would use an INSERT ... SELECT
statement with
dynamic partitioning to let Impala create separate data files with the appropriate
partition values; for an example, see INSERT Statement.
Query Performance for Impala Parquet Tables
Query performance for Parquet tables depends on the number of columns needed to process
the SELECT
list and WHERE
clauses of the query, the
way data is divided into large data files with block size
equal to file size, the reduction in I/O by reading the data for each column in
compressed format, which data files can be skipped (for partitioned tables), and the CPU
overhead of decompressing the data for each column.
select avg(income) from census_data where state = 'CA';
The query processes only 2 columns out of a large number of total columns. If the table
is partitioned by the STATE
column, it is even more efficient because
the query only has to read and decode 1 column from each data file, and it can read only
the data files in the partition directory for the state 'CA'
, skipping
the data files for all the other states, which will be physically located in other
directories.
select * from census_data;
Impala would have to read the entire contents of each
large data file, and decompress the contents of each
column for each row group, negating the I/O optimizations of the column-oriented format.
This query might still be faster for a Parquet table than a table with some other file
format, but it does not take advantage of the unique strengths of Parquet data files.
Impala can optimize queries on Parquet tables, especially join queries, better when
statistics are available for all the tables. Issue the COMPUTE STATS
statement for each table after substantial amounts of data are loaded into or appended
to it. See COMPUTE STATS Statement for details.
The runtime filtering feature, available in Impala 2.5 and higher, works best with Parquet tables. The per-row filtering aspect only applies to Parquet tables. See Runtime Filtering for Impala Queries (Impala 2.5 or higher only) for details.
In Impala 2.6 and higher, Impala queries are optimized for files
stored in Amazon S3. For Impala tables that use the file formats Parquet, ORC, RCFile,
SequenceFile, Avro, and uncompressed text, the setting
fs.s3a.block.size
in the core-site.xml
configuration file determines how Impala divides the I/O work of reading the data files.
This configuration setting is specified in bytes. By default, this value is 33554432 (32
MB), meaning that Impala parallelizes S3 read operations on the files as if they were
made up of 32 MB blocks. For example, if your S3 queries primarily access Parquet files
written by MapReduce or Hive, increase fs.s3a.block.size
to 134217728
(128 MB) to match the row group size of those files. If most S3 queries involve Parquet
files written by Impala, increase fs.s3a.block.size
to 268435456 (256
MB) to match the row group size produced by Impala.
Starting in Impala 3.4.0, use the query option
PARQUET_OBJECT_STORE_SPLIT_SIZE
to control the
Parquet split size for non-block stores (e.g. S3, ADLS, etc.). The
default value is 256 MB.
In Impala 2.9 and higher, Parquet files written by Impala include
embedded metadata specifying the minimum and maximum values for each column, within each
row group and each data page within the row group. Impala-written Parquet files
typically contain a single row group; a row group can contain many data pages. Impala
uses this information (currently, only the metadata for each row group) when reading
each Parquet data file during a query, to quickly determine whether each row group
within the file potentially includes any rows that match the conditions in the
WHERE
clause. For example, if the column X
within a
particular Parquet file has a minimum value of 1 and a maximum value of 100, then a
query including the clause WHERE x > 200
can quickly determine that
it is safe to skip that particular file, instead of scanning all the associated column
values. This optimization technique is especially effective for tables that use the
SORT BY
clause for the columns most frequently checked in
WHERE
clauses, because any INSERT
operation on such
tables produces Parquet data files with relatively narrow ranges of column values within
each file.
To disable Impala from writing the Parquet page index when creating
Parquet files, set the PARQUET_WRITE_PAGE_INDEX
query
option to FALSE
.
Partitioning for Parquet Tables
As explained in Partitioning for Impala Tables, partitioning is an important performance technique for Impala generally. This section explains some of the performance considerations for partitioned Parquet tables.
The Parquet file format is ideal for tables containing many columns, where most
queries only refer to a small subset of the columns. As explained in
How Parquet Data Files Are Organized, the physical layout of Parquet data files lets
Impala read only a small fraction of the data for many queries. The performance
benefits of this approach are amplified when you use Parquet tables in combination
with partitioning. Impala can skip the data files for certain partitions entirely,
based on the comparisons in the WHERE
clause that refer to the
partition key columns. For example, queries on partitioned tables often analyze data
for time intervals based on columns such as YEAR
,
MONTH
, and/or DAY
, or for geographic regions.
Remember that Parquet data files use a large block
size, so when deciding how finely to partition the data, try to find a granularity
where each partition contains 256 MB or more of
data, rather than creating a large number of smaller files split among many
partitions.
Inserting into a partitioned Parquet table can be a resource-intensive operation, because each Impala node could potentially be writing a separate data file to HDFS for each combination of different values for the partition key columns. The large number of simultaneous open files could exceed the HDFS "transceivers" limit. To avoid exceeding this limit, consider the following techniques:
-
Load different subsets of data using separate
INSERT
statements with specific values for thePARTITION
clause, such asPARTITION (year=2010)
. -
Increase the "transceivers" value for HDFS, sometimes spelled "xcievers"
(sic). The property value in the hdfs-site.xml configuration
file is
dfs.datanode.max.transfer.threads
. For example, if you were loading 12 years of data partitioned by year, month, and day, even a value of 4096 might not be high enough. This blog post explores the considerations for setting this value higher or lower, using HBase examples for illustration. -
Use the
COMPUTE STATS
statement to collect column statistics on the source table from which data is being copied, so that the Impala query can estimate the number of different values in the partition key columns and distribute the work accordingly.
Compressions for Parquet Data Files
When Impala writes Parquet data files using the INSERT
statement, the
underlying compression is controlled by the COMPRESSION_CODEC
query
option. (Prior to Impala 2.0, the query option name was
PARQUET_COMPRESSION_CODEC
.) The allowed values for this query option
are snappy
(the default), gzip
, zstd
,
lz4
, and none
. The option value is not case-sensitive.
If the option is set to an unrecognized value, all kinds of queries will fail due to
the invalid option setting, not just queries involving Parquet tables.
Example of Parquet Table with Snappy Compression
By default, the underlying data files for a Parquet table are compressed with Snappy.
The combination of fast compression and decompression makes it a good choice for many
data sets. To ensure Snappy compression is used, for example after experimenting with
other compression codecs, set the COMPRESSION_CODEC
query option to
snappy
before inserting the data:
[localhost:21000] > create database parquet_compression;
[localhost:21000] > use parquet_compression;
[localhost:21000] > create table parquet_snappy like raw_text_data;
[localhost:21000] > set COMPRESSION_CODEC=snappy;
[localhost:21000] > insert into parquet_snappy select * from raw_text_data;
Inserted 1000000000 rows in 181.98s
Example of Parquet Table with GZip Compression
If you need more intensive compression (at the expense of more CPU cycles for
uncompressing during queries), set the COMPRESSION_CODEC
query option
to gzip
before inserting the data:
[localhost:21000] > create table parquet_gzip like raw_text_data;
[localhost:21000] > set COMPRESSION_CODEC=gzip;
[localhost:21000] > insert into parquet_gzip select * from raw_text_data;
Inserted 1000000000 rows in 1418.24s
Example of Uncompressed Parquet Table
If your data compresses very poorly, or you want to avoid the CPU overhead of
compression and decompression entirely, set the COMPRESSION_CODEC
query option to none
before inserting the data:
[localhost:21000] > create table parquet_none like raw_text_data;
[localhost:21000] > set COMPRESSION_CODEC=none;
[localhost:21000] > insert into parquet_none select * from raw_text_data;
Inserted 1000000000 rows in 146.90s
Examples of Sizes and Speeds for Compressed Parquet Tables
Here are some examples showing differences in data sizes and query speeds for 1 billion rows of synthetic data, compressed with each kind of codec. As always, run similar tests with realistic data sets of your own. The actual compression ratios, and relative insert and query speeds, will vary depending on the characteristics of the actual data.
In this case, switching from Snappy to GZip compression shrinks the data by an additional 40% or so, while switching from Snappy compression to no compression expands the data also by about 40%:
$ hdfs dfs -du -h /user/hive/warehouse/parquet_compression.db
23.1 G /user/hive/warehouse/parquet_compression.db/parquet_snappy
13.5 G /user/hive/warehouse/parquet_compression.db/parquet_gzip
32.8 G /user/hive/warehouse/parquet_compression.db/parquet_none
Because Parquet data files are typically large, each directory will have a different number of data files and the row groups will be arranged differently.
At the same time, the less agressive the compression, the faster the data can be decompressed. In this case using a table with a billion rows, a query that evaluates all the values for a particular column runs faster with no compression than with Snappy compression, and faster with Snappy compression than with Gzip compression. Query performance depends on several other factors, so as always, run your own benchmarks with your own data to determine the ideal tradeoff between data size, CPU efficiency, and speed of insert and query operations.
[localhost:21000] > desc parquet_snappy;
Query finished, fetching results ...
+-----------+---------+---------+
| name | type | comment |
+-----------+---------+---------+
| id | int | |
| val | int | |
| zfill | string | |
| name | string | |
| assertion | boolean | |
+-----------+---------+---------+
Returned 5 row(s) in 0.14s
[localhost:21000] > select avg(val) from parquet_snappy;
Query finished, fetching results ...
+-----------------+
| _c0 |
+-----------------+
| 250000.93577915 |
+-----------------+
Returned 1 row(s) in 4.29s
[localhost:21000] > select avg(val) from parquet_gzip;
Query finished, fetching results ...
+-----------------+
| _c0 |
+-----------------+
| 250000.93577915 |
+-----------------+
Returned 1 row(s) in 6.97s
[localhost:21000] > select avg(val) from parquet_none;
Query finished, fetching results ...
+-----------------+
| _c0 |
+-----------------+
| 250000.93577915 |
+-----------------+
Returned 1 row(s) in 3.67s
Example of Copying Parquet Data Files
Here is a final example, to illustrate how the data files using the various
compression codecs are all compatible with each other for read operations. The
metadata about the compression format is written into each data file, and can be
decoded during queries regardless of the COMPRESSION_CODEC
setting in
effect at the time. In this example, we copy data files from the
PARQUET_SNAPPY
, PARQUET_GZIP
, and
PARQUET_NONE
tables used in the previous examples, each containing 1
billion rows, all to the data directory of a new table
PARQUET_EVERYTHING
. A couple of sample queries demonstrate that the
new table now contains 3 billion rows featuring a variety of compression codecs for
the data files.
First, we create the table in Impala so that there is a destination directory in HDFS to put the data files:
[localhost:21000] > create table parquet_everything like parquet_snappy;
Query: create table parquet_everything like parquet_snappy
Then in the shell, we copy the relevant data files into the data directory for this
new table. Rather than using hdfs dfs -cp
as with typical files, we
use hadoop distcp -pb
to ensure that the special
block size of the Parquet data files is preserved.
$ hadoop distcp -pb /user/hive/warehouse/parquet_compression.db/parquet_snappy \
/user/hive/warehouse/parquet_compression.db/parquet_everything
...MapReduce output...
$ hadoop distcp -pb /user/hive/warehouse/parquet_compression.db/parquet_gzip \
/user/hive/warehouse/parquet_compression.db/parquet_everything
...MapReduce output...
$ hadoop distcp -pb /user/hive/warehouse/parquet_compression.db/parquet_none \
/user/hive/warehouse/parquet_compression.db/parquet_everything
...MapReduce output...
Back in the impala-shell interpreter, we use the
REFRESH
statement to alert the Impala server to the new data files
for this table, then we can run queries demonstrating that the data files represent 3
billion rows, and the values for one of the numeric columns match what was in the
original smaller tables:
[localhost:21000] > refresh parquet_everything;
Query finished, fetching results ...
Returned 0 row(s) in 0.32s
[localhost:21000] > select count(*) from parquet_everything;
Query finished, fetching results ...
+------------+
| _c0 |
+------------+
| 3000000000 |
+------------+
Returned 1 row(s) in 8.18s
[localhost:21000] > select avg(val) from parquet_everything;
Query finished, fetching results ...
+-----------------+
| _c0 |
+-----------------+
| 250000.93577915 |
+-----------------+
Returned 1 row(s) in 13.35s
Parquet Tables for Impala Complex Types
In Impala 2.3 and higher, Impala supports the complex types
ARRAY
, STRUCT
, and MAP
. In
Impala 3.2 and higher, Impala also supports these
complex types in ORC. See
Complex Types (Impala 2.3 or higher only) for details.
These Complex types are currently supported only for the Parquet or ORC file formats.
Because Impala has better performance on Parquet than ORC, if you plan to use complex
types, become familiar with the performance and storage aspects of Parquet first.
Exchanging Parquet Data Files with Other Hadoop Components
You can read and write Parquet data files from other Hadoop components. See the documentation for your Apache Hadoop distribution for details.
Previously, it was not possible to create Parquet data through Impala and reuse that table within Hive. Now that Parquet support is available for Hive, reusing existing Impala Parquet data files in Hive requires updating the table metadata. Use the following command if you are already running Impala 1.1.1 or higher:
ALTER TABLE table_name SET FILEFORMAT PARQUET;
If you are running a level of Impala that is older than 1.1.1, do the metadata update through Hive:
ALTER TABLE table_name SET SERDE 'parquet.hive.serde.ParquetHiveSerDe';
ALTER TABLE table_name SET FILEFORMAT
INPUTFORMAT "parquet.hive.DeprecatedParquetInputFormat"
OUTPUTFORMAT "parquet.hive.DeprecatedParquetOutputFormat";
Impala 1.1.1 and higher can reuse Parquet data files created by Hive, without any action required.
Impala supports the scalar data types that you can encode in a Parquet data file, but not composite or nested types such as maps or arrays. In Impala 2.2 and higher, Impala can query Parquet data files that include composite or nested types, as long as the query only refers to columns with scalar types.
If you copy Parquet data files between nodes, or even between different directories on
the same node, make sure to preserve the block size by using the command hadoop
distcp -pb
. To verify that the block size was preserved, issue the command
hdfs fsck -blocks HDFS_path_of_impala_table_dir
and
check that the average block size is at or near 256 MB (or
whatever other size is defined by the PARQUET_FILE_SIZE
query
option).. (The hadoop distcp
operation typically leaves some
directories behind, with names matching _distcp_logs_*, that you
can delete from the destination directory afterward.)
Issue the command hadoop distcp for details about
distcp command syntax.
Impala can query Parquet files that use the PLAIN
,
PLAIN_DICTIONARY
, BIT_PACKED
, RLE
and RLE_DICTIONARY
encodings. RLE_DICTIONARY
is supported
only in Impala 4.0 and up.
When creating files outside of Impala for use by Impala, make sure to use one of the
supported encodings. In particular, for MapReduce jobs,
parquet.writer.version
must not be defined (especially as
PARQUET_2_0
) for writing the configurations of Parquet MR jobs. Use the
default version (or format). The default format, 1.0, includes some enhancements that
are compatible with older versions. Data using the 2.0 format might not be consumable by
Impala, due to use of the RLE_DICTIONARY
encoding.
$PATH
. (Typically, it is symlinked from /usr/bin;
sometimes, depending on your installation setup, you might need to locate it under an
alternative bin
directory.) The arguments to this command let you
perform operations such as:
-
cat
: Print a file's contents to standard out. In Impala 2.3 and higher, you can use the-j
option to output JSON. -
head
: Print the first few records of a file to standard output. -
schema
: Print the Parquet schema for the file. -
meta
: Print the file footer metadata, including key-value properties (like Avro schema), compression ratios, encodings, compression used, and row group information. -
dump
: Print all data and metadata.
parquet-tools -h
to see usage information for all the arguments.
Here are some examples showing parquet-tools usage:
$ # Be careful doing this for a big file! Use parquet-tools head to be safe.
$ parquet-tools cat sample.parq
year = 1992
month = 1
day = 2
dayofweek = 4
dep_time = 748
crs_dep_time = 750
arr_time = 851
crs_arr_time = 846
carrier = US
flight_num = 53
actual_elapsed_time = 63
crs_elapsed_time = 56
arrdelay = 5
depdelay = -2
origin = CMH
dest = IND
distance = 182
cancelled = 0
diverted = 0
year = 1992
month = 1
day = 3
...
$ parquet-tools head -n 2 sample.parq
year = 1992
month = 1
day = 2
dayofweek = 4
dep_time = 748
crs_dep_time = 750
arr_time = 851
crs_arr_time = 846
carrier = US
flight_num = 53
actual_elapsed_time = 63
crs_elapsed_time = 56
arrdelay = 5
depdelay = -2
origin = CMH
dest = IND
distance = 182
cancelled = 0
diverted = 0
year = 1992
month = 1
day = 3
...
$ parquet-tools schema sample.parq
message schema {
optional int32 year;
optional int32 month;
optional int32 day;
optional int32 dayofweek;
optional int32 dep_time;
optional int32 crs_dep_time;
optional int32 arr_time;
optional int32 crs_arr_time;
optional binary carrier;
optional int32 flight_num;
...
$ parquet-tools meta sample.parq
creator: impala version 2.2.0-...
file schema: schema
-------------------------------------------------------------------
year: OPTIONAL INT32 R:0 D:1
month: OPTIONAL INT32 R:0 D:1
day: OPTIONAL INT32 R:0 D:1
dayofweek: OPTIONAL INT32 R:0 D:1
dep_time: OPTIONAL INT32 R:0 D:1
crs_dep_time: OPTIONAL INT32 R:0 D:1
arr_time: OPTIONAL INT32 R:0 D:1
crs_arr_time: OPTIONAL INT32 R:0 D:1
carrier: OPTIONAL BINARY R:0 D:1
flight_num: OPTIONAL INT32 R:0 D:1
...
row group 1: RC:20636601 TS:265103674
-------------------------------------------------------------------
year: INT32 SNAPPY DO:4 FPO:35 SZ:10103/49723/4.92 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
month: INT32 SNAPPY DO:10147 FPO:10210 SZ:11380/35732/3.14 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
day: INT32 SNAPPY DO:21572 FPO:21714 SZ:3071658/9868452/3.21 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
dayofweek: INT32 SNAPPY DO:3093276 FPO:3093319 SZ:2274375/5941876/2.61 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
dep_time: INT32 SNAPPY DO:5367705 FPO:5373967 SZ:28281281/28573175/1.01 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
crs_dep_time: INT32 SNAPPY DO:33649039 FPO:33654262 SZ:10220839/11574964/1.13 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
arr_time: INT32 SNAPPY DO:43869935 FPO:43876489 SZ:28562410/28797767/1.01 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
crs_arr_time: INT32 SNAPPY DO:72432398 FPO:72438151 SZ:10908972/12164626/1.12 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
carrier: BINARY SNAPPY DO:83341427 FPO:83341558 SZ:114916/128611/1.12 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
flight_num: INT32 SNAPPY DO:83456393 FPO:83488603 SZ:10216514/11474301/1.12 VC:20636601 ENC:PLAIN_DICTIONARY,RLE,PLAIN
...
How Parquet Data Files Are Organized
Although Parquet is a column-oriented file format, do not expect to find one data file for each column. Parquet keeps all the data for a row within the same data file, to ensure that the columns for a row are always available on the same node for processing. What Parquet does is to set a large HDFS block size and a matching maximum data file size, to ensure that I/O and network transfer requests apply to large batches of data.
Within that data file, the data for a set of rows is rearranged so that all the values from the first column are organized in one contiguous block, then all the values from the second column, and so on. Putting the values from the same column next to each other lets Impala use effective compression techniques on the values in that column.
Impala INSERT
statements write Parquet data files using an HDFS block
size that matches the data file size, to ensure that
each data file is represented by a single HDFS block, and the entire file can be
processed on a single node without requiring any remote reads.
If you create Parquet data files outside of Impala, such as through a MapReduce or Pig
job, ensure that the HDFS block size is greater than or equal to the file size, so
that the "one file per block" relationship is maintained. Set the
dfs.block.size
or the dfs.blocksize
property large
enough that each file fits within a single HDFS block, even if that size is larger
than the normal HDFS block size.
If the block size is reset to a lower value during a file copy, you will see lower
performance for queries involving those files, and the PROFILE
statement will reveal that some I/O is being done suboptimally, through remote reads.
See Example of Copying Parquet Data Files for an example
showing how to preserve the block size when copying Parquet data files.
When Impala retrieves or tests the data for a particular column, it opens all the data
files, but only reads the portion of each file containing the values for that column.
The column values are stored consecutively, minimizing the I/O required to process the
values within a single column. If other columns are named in the SELECT
list or WHERE
clauses, the data for all columns in the same row is
available within that same data file.
If an INSERT
statement brings in less than
one Parquet block's worth of data, the resulting data
file is smaller than ideal. Thus, if you do split up an ETL job to use multiple
INSERT
statements, try to keep the volume of data for each
INSERT
statement to approximately 256 MB,
or a multiple of 256 MB.
RLE and Dictionary Encoding for Parquet Data Files
Parquet uses some automatic compression techniques, such as run-length encoding (RLE) and dictionary encoding, based on analysis of the actual data values. Once the data values are encoded in a compact form, the encoded data can optionally be further compressed using a compression algorithm. Parquet data files created by Impala can use Snappy, GZip, or no compression; the Parquet spec also allows LZO compression, but currently Impala does not support LZO-compressed Parquet files.
RLE and dictionary encoding are compression techniques that Impala applies automatically to groups of Parquet data values, in addition to any Snappy or GZip compression applied to the entire data files. These automatic optimizations can save you time and planning that are normally needed for a traditional data warehouse. For example, dictionary encoding reduces the need to create numeric IDs as abbreviations for longer string values.
Run-length encoding condenses sequences of repeated data values. For example, if many consecutive rows all contain the same value for a country code, those repeating values can be represented by the value followed by a count of how many times it appears consecutively.
Dictionary encoding takes the different values present in a column, and represents
each one in compact 2-byte form rather than the original value, which could be several
bytes. (Additional compression is applied to the compacted values, for extra space
savings.) This type of encoding applies when the number of different values for a
column is less than 2**16 (16,384). It does not apply to columns of data type
BOOLEAN
, which are already very short. TIMESTAMP
columns sometimes have a unique value for each row, in which case they can quickly
exceed the 2**16 limit on distinct values. The 2**16 limit on different values within
a column is reset for each data file, so if several different data files each
contained 10,000 different city names, the city name column in each data file could
still be condensed using dictionary encoding.
Compacting Data Files for Parquet Tables
If you reuse existing table structures or ETL processes for Parquet tables, you might encounter a "many small files" situation, which is suboptimal for query efficiency. For example, statements like these might produce inefficiently organized data files:
-- In an N-node cluster, each node produces a data file
-- for the INSERT operation. If you have less than
-- N GB of data to copy, some files are likely to be
-- much smaller than the default Parquet block size.
insert into parquet_table select * from text_table;
-- Even if this operation involves an overall large amount of data,
-- when split up by year/month/day, each partition might only
-- receive a small amount of data. Then the data files for
-- the partition might be divided between the N nodes in the cluster.
-- A multi-gigabyte copy operation might produce files of only
-- a few MB each.
insert into partitioned_parquet_table partition (year, month, day)
select year, month, day, url, referer, user_agent, http_code, response_time
from web_stats;
Here are techniques to help you produce large data files in Parquet
INSERT
operations, and to compact existing too-small data files:
-
When inserting into a partitioned Parquet table, use statically partitioned
INSERT
statements where the partition key values are specified as constant values. Ideally, use a separateINSERT
statement for each partition. -
You might set the
NUM_NODES
option to 1 briefly, duringINSERT
orCREATE TABLE AS SELECT
statements. Normally, those statements produce one or more data files per data node. If the write operation involves small amounts of data, a Parquet table, and/or a partitioned table, the default behavior could produce many small files when intuitively you might expect only a single output file.SET NUM_NODES=1
turns off the "distributed" aspect of the write operation, making it more likely to produce only one or a few data files. -
Be prepared to reduce the number of partition key columns from what you are used to with traditional analytic database systems.
-
Do not expect Impala-written Parquet files to fill up the entire Parquet block size. Impala estimates on the conservative side when figuring out how much data to write to each Parquet file. Typically, the of uncompressed data in memory is substantially reduced on disk by the compression and encoding techniques in the Parquet file format. The final data file size varies depending on the compressibility of the data. Therefore, it is not an indication of a problem if 256 MB of text data is turned into 2 Parquet data files, each less than 256 MB.
-
If you accidentally end up with a table with many small data files, consider using one or more of the preceding techniques and copying all the data into a new Parquet table, either through
CREATE TABLE AS SELECT
orINSERT ... SELECT
statements.To avoid rewriting queries to change table names, you can adopt a convention of always running important queries against a view. Changing the view definition immediately switches any subsequent queries to use the new underlying tables:
create view production_table as select * from table_with_many_small_files; -- CTAS or INSERT...SELECT all the data into a more efficient layout... alter view production_table as select * from table_with_few_big_files; select * from production_table where c1 = 100 and c2 < 50 and ...;
Schema Evolution for Parquet Tables
Schema evolution refers to using the statement ALTER TABLE ... REPLACE
COLUMNS
to change the names, data type, or number of columns in a table. You
can perform schema evolution for Parquet tables as follows:
-
The Impala
ALTER TABLE
statement never changes any data files in the tables. From the Impala side, schema evolution involves interpreting the same data files in terms of a new table definition. Some types of schema changes make sense and are represented correctly. Other types of changes cannot be represented in a sensible way, and produce special result values or conversion errors during queries. -
The
INSERT
statement always creates data using the latest table definition. You might end up with data files with different numbers of columns or internal data representations if you do a sequence ofINSERT
andALTER TABLE ... REPLACE COLUMNS
statements. -
If you use
ALTER TABLE ... REPLACE COLUMNS
to define additional columns at the end, when the original data files are used in a query, these final columns are considered to be allNULL
values. -
If you use
ALTER TABLE ... REPLACE COLUMNS
to define fewer columns than before, when the original data files are used in a query, the unused columns still present in the data file are ignored. -
Parquet represents the
TINYINT
,SMALLINT
, andINT
types the same internally, all stored in 32-bit integers.-
That means it is easy to promote a
TINYINT
column toSMALLINT
orINT
, or aSMALLINT
column toINT
. The numbers are represented exactly the same in the data file, and the columns being promoted would not contain any out-of-range values. -
If you change any of these column types to a smaller type, any values that are out-of-range for the new type are returned incorrectly, typically as negative numbers.
-
You cannot change a
TINYINT
,SMALLINT
, orINT
column toBIGINT
, or the other way around. Although theALTER TABLE
succeeds, any attempt to query those columns results in conversion errors. -
Any other type conversion for columns produces a conversion error during queries. For example,
INT
toSTRING
,FLOAT
toDOUBLE
,TIMESTAMP
toSTRING
,DECIMAL(9,0)
toDECIMAL(5,2)
, and so on.
-
That means it is easy to promote a
C1,C2,C3,C4
, and now you want to reuse the same
Parquet file in a table with columns C4,C2
. By default, Impala expects
the columns in the data file to appear in the same order as the columns defined for the
table, making it impractical to do some kinds of file reuse or schema evolution. In
Impala 2.6 and higher, the query option
PARQUET_FALLBACK_SCHEMA_RESOLUTION=name
lets Impala resolve columns by
name, and therefore handle out-of-order or extra columns in the data file. For example:
create database schema_evolution;
use schema_evolution;
create table t1 (c1 int, c2 boolean, c3 string, c4 timestamp)
stored as parquet;
insert into t1 values
(1, true, 'yes', now()),
(2, false, 'no', now() + interval 1 day);
select * from t1;
+----+-------+-----+-------------------------------+
| c1 | c2 | c3 | c4 |
+----+-------+-----+-------------------------------+
| 1 | true | yes | 2016-06-28 14:53:26.554369000 |
| 2 | false | no | 2016-06-29 14:53:26.554369000 |
+----+-------+-----+-------------------------------+
desc formatted t1;
...
| Location: | /user/hive/warehouse/schema_evolution.db/t1 |
...
-- Make T2 have the same data file as in T1, including 2
-- unused columns and column order different than T2 expects.
load data inpath '/user/hive/warehouse/schema_evolution.db/t1'
into table t2;
+----------------------------------------------------------+
| summary |
+----------------------------------------------------------+
| Loaded 1 file(s). Total files in destination location: 1 |
+----------------------------------------------------------+
-- 'position' is the default setting.
-- Impala cannot read the Parquet file if the column order does not match.
set PARQUET_FALLBACK_SCHEMA_RESOLUTION=position;
PARQUET_FALLBACK_SCHEMA_RESOLUTION set to position
select * from t2;
WARNINGS:
File 'schema_evolution.db/t2/45331705_data.0.parq'
has an incompatible Parquet schema for column 'schema_evolution.t2.c4'.
Column type: TIMESTAMP, Parquet schema: optional int32 c1 [i:0 d:1 r:0]
File 'schema_evolution.db/t2/45331705_data.0.parq'
has an incompatible Parquet schema for column 'schema_evolution.t2.c4'.
Column type: TIMESTAMP, Parquet schema: optional int32 c1 [i:0 d:1 r:0]
-- With the 'name' setting, Impala can read the Parquet data files
-- despite mismatching column order.
set PARQUET_FALLBACK_SCHEMA_RESOLUTION=name;
PARQUET_FALLBACK_SCHEMA_RESOLUTION set to name
select * from t2;
+-------------------------------+-------+
| c4 | c2 |
+-------------------------------+-------+
| 2016-06-28 14:53:26.554369000 | true |
| 2016-06-29 14:53:26.554369000 | false |
+-------------------------------+-------+
See
PARQUET_FALLBACK_SCHEMA_RESOLUTION Query Option (Impala 2.6 or higher only)
for more details.
Data Type Considerations for Parquet Tables
The Parquet format defines a set of data types whose names differ from the names of the corresponding Impala data types. If you are preparing Parquet files using other Hadoop components such as Pig or MapReduce, you might need to work with the type names defined by Parquet. The following tables list the Parquet-defined types and the equivalent types in Impala.
Primitive types
Parquet type | Impala type |
---|---|
BINARY | STRING |
BOOLEAN | BOOLEAN |
DOUBLE | DOUBLE |
FLOAT | FLOAT |
INT32 | INT |
INT64 | BIGINT |
INT96 | TIMESTAMP |
Logical types
Parquet uses type annotations to extend the types that it can store, by specifying how the primitive types should be interpreted.
Parquet primitive type and annotation | Impala type |
---|---|
BINARY annotated with the UTF8 OriginalType | STRING |
BINARY annotated with the STRING LogicalType | STRING |
BINARY annotated with the ENUM OriginalType | STRING |
BINARY annotated with the DECIMAL OriginalType | DECIMAL |
INT64 annotated with the TIMESTAMP_MILLIS OriginalType | TIMESTAMP (in Impala 3.2 or
higher) or BIGINT (for backward compatibility) |
INT64 annotated with the TIMESTAMP_MICROS OriginalType | TIMESTAMP (in Impala 3.2 or
higher) or BIGINT (for backward compatibility) |
INT64 annotated with the TIMESTAMP LogicalType | TIMESTAMP (in Impala 3.2 or
higher) or BIGINT (for backward compatibility) |
Complex types:
For the complex types (ARRAY
, MAP
, and
STRUCT
) available in Impala 2.3 and higher,
Impala only supports queries against those types in Parquet tables.