Runtime Filtering for Impala Queries (Impala 2.5 or higher only)
Runtime filtering is a wide-ranging optimization feature available in Impala 2.5 and higher. When only a fraction of the data in a table is needed for a query against a partitioned table or to evaluate a join condition, Impala determines the appropriate conditions while the query is running, and broadcasts that information to all the impalad nodes that are reading the table so that they can avoid unnecessary I/O to read partition data, and avoid unnecessary network transmission by sending only the subset of rows that match the join keys across the network.
This feature is primarily used to optimize queries against large partitioned tables (under the name dynamic partition pruning) and joins of large tables. The information in this section includes concepts, internals, and troubleshooting information for the entire runtime filtering feature. For specific tuning steps for partitioned tables, see Dynamic Partition Pruning.
When this feature made its debut in Impala 2.5,
the default setting was
Now the default is
RUNTIME_FILTER_MODE=GLOBAL in Impala 2.6 and higher,
which enables more wide-ranging and ambitious query optimization without requiring you to
explicitly set any query options.
Background Information for Runtime Filtering
To understand how runtime filtering works at a detailed level, you must be familiar with some terminology from the field of distributed database technology:
What a plan fragment is. Impala decomposes each query into smaller units of work that are distributed across the cluster. Wherever possible, a data block is read, filtered, and aggregated by plan fragments executing on the same host. For some operations, such as joins and combining intermediate results into a final result set, data is transmitted across the network from one DataNode to another.
HASH JOINplan nodes are, and their role in computing query results:
In the Impala query plan, a scan node performs the I/O to read from the underlying data files. Although this is an expensive operation from the traditional database perspective, Hadoop clusters and Impala are optimized to do this kind of I/O in a highly parallel fashion. The major potential cost savings come from using the columnar Parquet format (where Impala can avoid reading data for unneeded columns) and partitioned tables (where Impala can avoid reading data for unneeded partitions).
Most Impala joins use the hash join mechanism. (It is only fairly recently that Impala started using the nested-loop join technique, for certain kinds of non-equijoin queries.) In a hash join, when evaluating join conditions from two tables, Impala constructs a hash table in memory with all the different column values from the table on one side of the join. Then, for each row from the table on the other side of the join, Impala tests whether the relevant column values are in this hash table or not.
A hash join node constructs such an in-memory hash table, then performs the comparisons to identify which rows match the relevant join conditions and should be included in the result set (or at least sent on to the subsequent intermediate stage of query processing). Because some of the input for a hash join might be transmitted across the network from another host, it is especially important from a performance perspective to prune out ahead of time any data that is known to be irrelevant.
The more distinct values are in the columns used as join keys, the larger the in-memory hash table and thus the more memory required to process the query.
The difference between a broadcast join and a shuffle join. (The Hadoop notion of a shuffle join is sometimes referred to in Impala as a partitioned join.) In a broadcast join, the table from one side of the join (typically the smaller table) is sent in its entirety to all the hosts involved in the query. Then each host can compare its portion of the data from the other (larger) table against the full set of possible join keys. In a shuffle join, there is no obvious "smaller" table, and so the contents of both tables are divided up, and corresponding portions of the data are transmitted to each host involved in the query. See Optimizer Hints for information about how these different kinds of joins are processed.
The notion of the build phase and probe phase when Impala processes a join query. The build phase is where the rows containing the join key columns, typically for the smaller table, are transmitted across the network and built into an in-memory hash table data structure on one or more destination nodes. The probe phase is where data is read locally (typically from the larger table) and the join key columns are compared to the values in the in-memory hash table. The corresponding input sources (tables, subqueries, and so on) for these phases are referred to as the build side and the probe side.
How to set Impala query options: interactively within an impala-shell session through the
SETcommand, for a JDBC or ODBC application through the
SETstatement, or globally for all impalad daemons through the
Runtime Filtering Internals
The filter that is transmitted between plan fragments is essentially a list of values for join key columns. When this list is values is transmitted in time to a scan node, Impala can filter out non-matching values immediately after reading them, rather than transmitting the raw data to another host to compare against the in-memory hash table on that host.
For HDFS-based tables, this data structure is implemented as a Bloom filter, which uses a probability-based algorithm to determine all possible matching values. (The probability-based aspects means that the filter might include some non-matching values, but if so, that does not cause any inaccuracy in the final results.)
Another kind of filter is the "min-max" filter. It currently only applies to Kudu tables. The filter is a data structure representing a minimum and maximum value. These filters are passed to Kudu to reduce the number of rows returned to Impala when scanning the probe side of the join.
There are different kinds of filters to match the different kinds of joins (partitioned and broadcast). A broadcast filter reflects the complete list of relevant values and can be immediately evaluated by a scan node. A partitioned filter reflects only the values processed by one host in the cluster; all the partitioned filters must be combined into one (by the coordinator node) before the scan nodes can use the results to accurately filter the data as it is read from storage.
Broadcast filters are also classified as local or global. With a local broadcast filter, the information
in the filter is used by a subsequent query fragment that is running on the same host that produced the filter.
A non-local broadcast filter must be transmitted across the network to a query fragment that is running on a
different host. Impala designates 3 hosts to each produce non-local broadcast filters, to guard against the
possibility of a single slow host taking too long. Depending on the setting of the
RUNTIME_FILTER_MODE query option
GLOBAL), Impala either uses a conservative optimization
strategy where filters are only consumed on the same host that produced them, or a more aggressive strategy
where filters are eligible to be transmitted across the network.
File Format Considerations for Runtime Filtering
Parquet tables get the most benefit from the runtime filtering optimizations. Runtime filtering can speed up join queries against partitioned or unpartitioned Parquet tables, and single-table queries against partitioned Parquet tables. See Using the Parquet File Format with Impala Tables for information about using Parquet tables with Impala.
For other file formats (text, Avro, RCFile, and SequenceFile), runtime filtering speeds up queries against partitioned tables only. Because partitioned tables can use a mixture of formats, Impala produces the filters in all cases, even if they are not ultimately used to optimize the query.
Wait Intervals for Runtime Filters
Because it takes time to produce runtime filters, especially for
partitioned filters that must be combined by the coordinator node,
there is a time interval above which it is more efficient for
the scan nodes to go ahead and construct their intermediate result sets,
even if that intermediate data is larger than optimal. If it only takes
a few seconds to produce the filters, it is worth the extra time if pruning
the unnecessary data can save minutes in the overall query time.
You can specify the maximum wait time in milliseconds using the
RUNTIME_FILTER_WAIT_TIME_MS query option.
By default, each scan node waits for up to 1 second (1000 milliseconds) for filters to arrive. If all filters have not arrived within the specified interval, the scan node proceeds, using whatever filters did arrive to help avoid reading unnecessary data. If a filter arrives after the scan node begins reading data, the scan node applies that filter to the data that is read after the filter arrives, but not to the data that was already read.
If the cluster is relatively busy and your workload contains many resource-intensive or long-running queries, consider increasing the wait time so that complicated queries do not miss opportunities for optimization. If the cluster is lightly loaded and your workload contains many small queries taking only a few seconds, consider decreasing the wait time to avoid the 1 second delay for each query.
Query Options for Runtime Filtering
See the following sections for information about the query options that control runtime filtering:
The first query option adjusts the "sensitivity" of this feature. By default, it is set to the highest level (
GLOBAL). (This default applies to Impala 2.6 and higher. In previous releases, the default was
The other query options are tuning knobs that you typically only adjust after doing performance testing, and that you might want to change only for the duration of a single expensive query:
RUNTIME_BLOOM_FILTER_SIZE Query Option (Impala 2.5 or higher only); in Impala 2.6 and higher, this setting acts as a fallback when statistics are not available, rather than as a directive.
Runtime Filtering and Query Plans
In the same way the query plan displayed by the
EXPLAIN statement includes information
about predicates used by each plan fragment, it also
includes annotations showing whether a plan fragment
produces or consumes a runtime filter.
A plan fragment that produces a filter includes an
annotation such as
runtime filters: filter_id <- table.column,
while a plan fragment that consumes a filter includes an annotation such as
runtime filters: filter_id -> table.column.
Setting the query option
EXPLAIN_LEVEL=2 adds additional
annotations showing the type of the filter, either
(for HDFS-based tables) or
filter_id[min_max] (for Kudu tables).
The following example shows a query that uses a single runtime filter (labelled
to prune the partitions that are scanned in one stage of the query, based on evaluating the
result set of a subquery:
create table yy (s string) partitioned by (year int) stored as parquet; insert into yy partition (year) values ('1999', 1999), ('2000', 2000), ('2001', 2001), ('2010',2010); compute stats yy; create table yy2 (s string) partitioned by (year int) stored as parquet; insert into yy2 partition (year) values ('1999', 1999), ('2000', 2000), ('2001', 2001); compute stats yy2; -- The query reads an unknown number of partitions, whose key values are only -- known at run time. The 'runtime filters' lines show how the information about -- the partitions is calculated in query fragment 02, and then used in query -- fragment 00 to decide which partitions to skip. explain select s from yy2 where year in (select year from yy where year between 2000 and 2005); +----------------------------------------------------------+ | Explain String | +----------------------------------------------------------+ | Estimated Per-Host Requirements: Memory=16.00MB VCores=2 | | | | 04:EXCHANGE [UNPARTITIONED] | | | | | 02:HASH JOIN [LEFT SEMI JOIN, BROADCAST] | | | hash predicates: year = year | | | runtime filters: RF000 <- year | | | | | |--03:EXCHANGE [BROADCAST] | | | | | | | 01:SCAN HDFS [dpp.yy] | | | partitions=2/4 files=2 size=468B | | | | | 00:SCAN HDFS [dpp.yy2] | | partitions=2/3 files=2 size=468B | | runtime filters: RF000 -> year | +----------------------------------------------------------+
The query profile (displayed by the
PROFILE command in impala-shell)
contains both the
EXPLAIN plan and more detailed information about the internal
workings of the query. The profile output includes a section labelled the "filter routing table",
with information about each filter based on its ID.
Examples of Queries that Benefit from Runtime Filtering
In this example, Impala would normally do extra work to interpret the columns
for each row in
HUGE_T1, before checking the
value against the in-memory hash table constructed from all the
values. By producing a filter containing all the
even before the query starts scanning the
HUGE_T1 table, Impala
can skip the unnecessary work to parse the column info as soon as it determines
ID value does not match any of the values from the other table.
The example shows
COMPUTE STATS statements for both the tables (even
though that is a one-time operation after loading data into those tables) because
Impala relies on up-to-date statistics to
determine which one has more distinct
ID values than the other.
That information lets Impala make effective decisions about which table to use to
construct the in-memory hash table, and which table to read from disk and
compare against the entries in the hash table.
COMPUTE STATS huge_t1; COMPUTE STATS tiny_t2; SELECT c1, c2, c3 FROM huge_t1 JOIN tiny_t2 WHERE huge_t1.id = tiny_t2.id;
In this example,
T1 is a table partitioned by year. The subquery
T2 produces multiple values, and transmits those values as a filter to the plan
fragments that are reading from
T1. Any non-matching partitions in
select c1 from t1 where year in (select distinct year from t2);
WHERE clause contains an additional test that does not apply to
the partition key column.
A filter on a column that is not a partition key is called a per-row filter.
Because per-row filters only apply for Parquet,
T1 must be a Parquet table.
The subqueries result in two filters being transmitted to
the scan nodes that read from
T1. The filter on
YEAR helps the query eliminate
entire partitions based on non-matching years. The filter on
C2 lets Impala discard
rows with non-matching
C2 values immediately after reading them. Without runtime filtering,
Impala would have to keep the non-matching values in memory, assemble
C3 into rows in the intermediate result set, and transmit all the intermediate rows
back to the coordinator node, where they would be eliminated only at the very end of the query.
select c1, c2, c3 from t1 where year in (select distinct year from t2) and c2 in (select other_column from t3);
This example involves a broadcast join.
The fact that the
ON clause would
return a small number of matching rows (because there
are not very many rows in
means that the corresponding filter is very selective.
Therefore, runtime filtering will probably be effective
in optimizing this query.
select c1 from huge_t1 join [broadcast] tiny_t2 on huge_t1.id = tiny_t2.id where huge_t1.year in (select distinct year from tiny_t2) and c2 in (select other_column from t3);
This example involves a shuffle or partitioned join.
Assume that most rows in
have a corresponding row in
The fact that the
ON clause could
return a large number of matching rows means that
the corresponding filter would not be very selective.
Therefore, runtime filtering might be less effective
in optimizing this query.
select c1 from huge_t1 join [shuffle] huge_t2 on huge_t1.id = huge_t2.id where huge_t1.year in (select distinct year from huge_t2) and c2 in (select other_column from t3);
Tuning and Troubleshooting Queries that Use Runtime Filtering
These tuning and troubleshooting procedures apply to queries that are resource-intensive enough, long-running enough, and frequent enough that you can devote special attention to optimizing them individually.
EXPLAIN statement and examine the
lines to determine whether runtime filters are being applied to the
and join clauses that you expect. For example, runtime filtering does not apply to queries that use
the nested loop join mechanism due to non-equijoin operators.
Make sure statistics are up-to-date for all tables involved in the queries.
COMPUTE STATS statement after loading data into non-partitioned tables,
COMPUTE INCREMENTAL STATS after adding new partitions to partitioned tables.
If join queries involving large tables use unique columns as the join keys,
for example joining a primary key column with a foreign key column, the overhead of
producing and transmitting the filter might outweigh the performance benefit because
not much data could be pruned during the early stages of the query.
For such queries, consider setting the query option
Limitations and Restrictions for Runtime Filtering
The runtime filtering feature is most effective for the Parquet file formats. For other file formats, filtering only applies for partitioned tables. See File Format Considerations for Runtime Filtering. For the ways in which runtime filtering works for Kudu tables, see Impala Query Performance for Kudu Tables.
When the spill-to-disk mechanism is activated on a particular host during a query, that host does not produce any filters while processing that query. This limitation does not affect the correctness of results; it only reduces the amount of optimization that can be applied to the query.