Here are performance guidelines and best practices that you can use during planning, experimentation, and performance tuning for an Impala-enabled cluster. All of this information is also available in more detail elsewhere in the Impala documentation; it is gathered together here to serve as a cookbook and emphasize which performance techniques typically provide the highest return on investment
Typically, for large volumes of data (multiple gigabytes per table or partition), the Parquet file format performs best because of its combination of columnar storage layout, large I/O request size, and compression and encoding. See How Impala Works with Hadoop File Formats for comparisons of all file formats supported by Impala, and Using the Parquet File Format with Impala Tables for details about the Parquet file format.
When producing data files outside of Impala, prefer either text format or Avro, where you can build up the
files row by row. Once the data is in Impala, you can convert it to the more efficient Parquet format and
split into multiple data files using a single INSERT ... SELECT
statement. Or, if you have
the infrastructure to produce multi-megabyte Parquet files as part of your data preparation process, do
that and skip the conversion step inside Impala.
Always use INSERT ... SELECT
to copy significant volumes of data from table to table
within Impala. Avoid INSERT ... VALUES
for any substantial volume of data or
performance-critical tables, because each such statement produces a separate tiny data file. See
INSERT Statement for examples of the INSERT ... SELECT
syntax.
For example, if you have thousands of partitions in a Parquet table, each with less than
256 MB of data, consider partitioning in a less granular way, such as by
year / month rather than year / month / day. If an inefficient data ingestion process produces thousands of
data files in the same table or partition, consider compacting the data by performing an INSERT ...
SELECT
to copy all the data to a different table; the data will be reorganized into a smaller
number of larger files by this process.
Partitioning is a technique that physically divides the data based on values of one or more columns, such as by year, month, day, region, city, section of a web site, and so on. When you issue queries that request a specific value or range of values for the partition key columns, Impala can avoid reading the irrelevant data, potentially yielding a huge savings in disk I/O.
When deciding which column(s) to use for partitioning, choose the right level of granularity. For example, should you partition by year, month, and day, or only by year and month? Choose a partitioning strategy that puts at least 256 MB of data in each partition, to take advantage of HDFS bulk I/O and Impala distributed queries.
Over-partitioning can also cause query planning to take longer than necessary, as Impala prunes the unnecessary partitions. Ideally, keep the number of partitions in the table under 30 thousand.
When preparing data files to go in a partition directory, create several large files rather than many small
ones. If you receive data in the form of many small files and have no control over the input format,
consider using the INSERT ... SELECT
syntax to copy data from one table or partition to
another, which compacts the files into a relatively small number (based on the number of nodes in the
cluster).
If you need to reduce the overall number of partitions and increase the amount of data in each partition, first look for partition key columns that are rarely referenced or are referenced in non-critical queries (not subject to an SLA). For example, your web site log data might be partitioned by year, month, day, and hour, but if most queries roll up the results by day, perhaps you only need to partition by year, month, and day.
If you need to reduce the granularity even more, consider creating "buckets", computed values
corresponding to different sets of partition key values. For example, you can use the
TRUNC()
function with a TIMESTAMP
column to group date and time values
based on intervals such as week or quarter. See
Impala Date and Time Functions for details.
See Partitioning for Impala Tables for full details and performance considerations for partitioning.
Although it is tempting to use strings for partition key columns, since those values are turned into HDFS
directory names anyway, you can minimize memory usage by using numeric values for common partition key
fields such as YEAR
, MONTH
, and DAY
. Use the smallest
integer type that holds the appropriate range of values, typically TINYINT
for
MONTH
and DAY
, and SMALLINT
for YEAR
.
Use the EXTRACT()
function to pull out individual date and time fields from a
TIMESTAMP
value, and CAST()
the return value to the appropriate integer
type.
By default, the Impala INSERT ... SELECT
statement creates Parquet files with a 256 MB
block size. (This default was changed in Impala 2.0. Formerly, the limit was 1 GB, but Impala made
conservative estimates about compression, resulting in files that were smaller than 1 GB.)
Each Parquet file written by Impala is a single block, allowing the whole file to be processed as a unit by a single host.
As you copy Parquet files into HDFS or between HDFS filesystems, use hdfs dfs -pb
to preserve the original
block size.
If there is only one or a few data block in your Parquet table, or in a partition that is the only one
accessed by a query, then you might experience a slowdown for a different reason: not enough data to take
advantage of Impala's parallel distributed queries. Each data block is processed by a single core on one of
the DataNodes. In a 100-node cluster of 16-core machines, you could potentially process thousands of data
files simultaneously. You want to find a sweet spot between "many tiny files" and "single giant
file" that balances bulk I/O and parallel processing. You can set the PARQUET_FILE_SIZE
query option before doing an INSERT ... SELECT
statement to reduce the size of each
generated Parquet file. (Specify the file size as an absolute number of bytes, or in Impala
2.0 and later, in units ending with m
for megabytes or g
for
gigabytes.) Run benchmarks with different file sizes to find the right balance point for your
particular data volume.
Gather the statistics with the COMPUTE STATS
statement. See
Performance Considerations for Join Queries for details.
Use techniques such as:
COUNT()
, SUM()
, and MAX()
in the query rather than
sending the result set to an application and doing those computations there. Remember that the size of an
unaggregated result set could be huge, requiring substantial time to transmit across the network.
WHERE
clause of a query to eliminate rows
that are not relevant, rather than producing a big result set and filtering it using application logic.
LIMIT
clause. If you only need to see a few sample values from a result set, or the top
or bottom values from a query using ORDER BY
, include the LIMIT
clause
to reduce the size of the result set rather than asking for the full result set and then throwing most of
the rows away.
-B
and --output_delimiter
to produce results without special
formatting, and redirect output to a file rather than printing to the screen. Consider using
INSERT ... SELECT
to write the results directly to new files in HDFS. See
impala-shell Configuration Options for details about the
impala-shell command-line options.
Examine the EXPLAIN
plan for a query before actually running it. See
EXPLAIN Statement and Using the EXPLAIN Plan for Performance Tuning for
details.
Verify that the low-level aspects of I/O, memory usage, network bandwidth, CPU utilization, and so on are within expected ranges by examining the query profile for a query after running it. See Using the Query Profile for Performance Tuning for details.
See the documentation for your Apache Hadoop distribution for recommendations about operating system
settings that you can change to influence Impala performance. In particular, you might find
that changing the vm.swappiness
Linux kernel setting to a non-zero value improves
overall performance.
In the context of Impala, a hotspot is defined as “an Impala daemon that for a single query or a workload is spending a far greater amount of time processing data relative to its neighbours”.
Before discussing the options to tackle this issue some background is first required to understand how this problem can occur.
By default, the scheduling of scan based plan fragments is deterministic. This means that for multiple queries needing to read the same block of data, the same node will be picked to host the scan. The default scheduling logic does not take into account node workload from prior queries. The complexity of materializing a tuple depends on a few factors, namely: decoding and decompression. If the tuples are densely packed into data pages due to good encoding/compression ratios, there will be more work required when reconstructing the data. Each compression codec offers different performance tradeoffs and should be considered before writing the data. Due to the deterministic nature of the scheduler, single nodes can become bottlenecks for highly concurrent queries that use the same tables.
If, for example, a Parquet based dataset is tiny, e.g. a small dimension table, such that it fits into a single HDFS block (Impala by default will create 256 MB blocks when Parquet is used, each containing a single row group) then there are a number of options that can be considered to resolve the potential scheduling hotspots when querying this data:
REPLICA_PREFERENCE
and
RANDOM_REPLICA
. For a detailed description of each
of these modes see IMPALA-2696.
PARQUET_FILE_SIZE
query option when writing the
table data. Using this approach the data will span more nodes. However
it’s not recommended to drop the size below 32 MB.