Runtime filtering is a wide-ranging optimization feature available in Impala 2.5 and higher. When only a fraction of the data in a table is needed for a query against a partitioned table or to evaluate a join condition, Impala determines the appropriate conditions while the query is running, and broadcasts that information to all the impalad nodes that are reading the table so that they can avoid unnecessary I/O to read partition data, and avoid unnecessary network transmission by sending only the subset of rows that match the join keys across the network.
This feature is primarily used to optimize queries against large partitioned tables (under the name dynamic partition pruning) and joins of large tables. The information in this section includes concepts, internals, and troubleshooting information for the entire runtime filtering feature. For specific tuning steps for partitioned tables, see Dynamic Partition Pruning.
When this feature made its debut in Impala 2.5,
the default setting was RUNTIME_FILTER_MODE=LOCAL
.
Now the default is RUNTIME_FILTER_MODE=GLOBAL
in Impala 2.6 and higher,
which enables more wide-ranging and ambitious query optimization without requiring you to
explicitly set any query options.
To understand how runtime filtering works at a detailed level, you must be familiar with some terminology from the field of distributed database technology:
What a plan fragment is. Impala decomposes each query into smaller units of work that are distributed across the cluster. Wherever possible, a data block is read, filtered, and aggregated by plan fragments executing on the same host. For some operations, such as joins and combining intermediate results into a final result set, data is transmitted across the network from one Impala daemon to another.
What SCAN
and HASH JOIN
plan nodes are, and their role in computing query results:
In the Impala query plan, a scan node performs the I/O to read from the underlying data files. Although this is an expensive operation from the traditional database perspective, Hadoop clusters and Impala are optimized to do this kind of I/O in a highly parallel fashion. The major potential cost savings come from using the columnar Parquet format (where Impala can avoid reading data for unneeded columns) and partitioned tables (where Impala can avoid reading data for unneeded partitions).
Most Impala joins use the hash join mechanism. (It is only fairly recently that Impala started using the nested-loop join technique, for certain kinds of non-equijoin queries.) In a hash join, when evaluating join conditions from two tables, Impala constructs a hash table in memory with all the different column values from the table on one side of the join. Then, for each row from the table on the other side of the join, Impala tests whether the relevant column values are in this hash table or not.
A hash join node constructs such an in-memory hash table, then performs the comparisons to identify which rows match the relevant join conditions and should be included in the result set (or at least sent on to the subsequent intermediate stage of query processing). Because some of the input for a hash join might be transmitted across the network from another host, it is especially important from a performance perspective to prune out ahead of time any data that is known to be irrelevant.
The more distinct values are in the columns used as join keys, the larger the in-memory hash table and thus the more memory required to process the query.
The difference between a broadcast join and a shuffle join. (The Hadoop notion of a shuffle join is sometimes referred to in Impala as a partitioned join.) In a broadcast join, the table from one side of the join (typically the smaller table) is sent in its entirety to all the hosts involved in the query. Then each host can compare its portion of the data from the other (larger) table against the full set of possible join keys. In a shuffle join, there is no obvious "smaller" table, and so the contents of both tables are divided up, and corresponding portions of the data are transmitted to each host involved in the query. See Optimizer Hints for information about how these different kinds of joins are processed.
The notion of the build phase and probe phase when Impala processes a join query. The build phase is where the rows containing the join key columns, typically for the smaller table, are transmitted across the network and built into an in-memory hash table data structure on one or more destination nodes. The probe phase is where data is read locally (typically from the larger table) and the join key columns are compared to the values in the in-memory hash table. The corresponding input sources (tables, subqueries, and so on) for these phases are referred to as the build side and the probe side.
How to set Impala query options: interactively within an impala-shell session through
the SET
command, for a JDBC or ODBC application through the SET
statement, or
globally for all impalad daemons through the default_query_options
configuration
setting.
The filter that is transmitted between plan fragments is essentially a list of values for join key columns. When this list of values is transmitted in time to a scan node, Impala can filter out non-matching values immediately after reading them, rather than transmitting the raw data to another host to compare against the in-memory hash table on that host.
For HDFS-based tables, this data structure is implemented as a Bloom filter, which uses a probability-based algorithm to determine all possible matching values. (The probability-based aspects means that the filter might include some non-matching values, but if so, that does not cause any inaccuracy in the final results.)
Another kind of filter is the "min-max" filter. It currently only applies to Kudu tables. The filter is a data structure representing a minimum and maximum value. These filters are passed to Kudu to reduce the number of rows returned to Impala when scanning the probe side of the join.
There are different kinds of filters to match the different kinds of joins (partitioned and broadcast). A broadcast filter reflects the complete list of relevant values and can be immediately evaluated by a scan node. A partitioned filter reflects only the values processed by one host in the cluster; all the partitioned filters must be combined into one (by the coordinator node) before the scan nodes can use the results to accurately filter the data as it is read from storage.
Broadcast filters are also classified as local or global. With a local broadcast filter, the information
in the filter is used by a subsequent query fragment that is running on the same host that produced the filter.
A non-local broadcast filter must be transmitted across the network to a query fragment that is running on a
different host. Impala designates 3 hosts to each produce non-local broadcast filters, to guard against the
possibility of a single slow host taking too long. Depending on the setting of the RUNTIME_FILTER_MODE
query option
(LOCAL
or GLOBAL
), Impala either uses a conservative optimization
strategy where filters are only consumed on the same host that produced them, or a more aggressive strategy
where filters are eligible to be transmitted across the network.
GLOBAL
setting.
Parquet tables get the most benefit from the runtime filtering optimizations. Runtime filtering can speed up join queries against partitioned or unpartitioned Parquet tables, and single-table queries against partitioned Parquet tables. See Using the Parquet File Format with Impala Tables for information about using Parquet tables with Impala.
For other file formats (text, Avro, RCFile, and SequenceFile), runtime filtering speeds up queries against partitioned tables only. Because partitioned tables can use a mixture of formats, Impala produces the filters in all cases, even if they are not ultimately used to optimize the query.
Because it takes time to produce runtime filters, especially for
partitioned filters that must be combined by the coordinator node,
there is a time interval above which it is more efficient for
the scan nodes to go ahead and construct their intermediate result sets,
even if that intermediate data is larger than optimal. If it only takes
a few seconds to produce the filters, it is worth the extra time if pruning
the unnecessary data can save minutes in the overall query time.
You can specify the maximum wait time in milliseconds using the
RUNTIME_FILTER_WAIT_TIME_MS
query option.
The time is counted from the start of executing the query — see the query option's doc page for details. If all filters have not arrived within the specified interval, the scan node proceeds, using whatever filters did arrive to help avoid reading unnecessary data. If a filter arrives after the scan node begins reading data, the scan node applies that filter to the data that is read after the filter arrives, but not to the data that was already read.
If the cluster is relatively busy and your workload contains many resource-intensive or long-running queries, consider increasing the wait time so that complicated queries do not miss opportunities for optimization. If the cluster is lightly loaded and your workload contains many small queries taking only a few seconds, consider decreasing the wait time to avoid the 1 second delay for each query.
See the following sections for information about the query options that control runtime filtering:
The first query option adjusts the "sensitivity" of this feature.
By default, it is set to the highest level (GLOBAL
).
(This default applies to Impala 2.6 and higher.
In previous releases, the default was LOCAL
.)
The other query options are tuning knobs that you typically only adjust after doing performance testing, and that you might want to change only for the duration of a single expensive query:
MAX_NUM_RUNTIME_FILTERS Query Option (Impala 2.5 or higher only)
DISABLE_ROW_RUNTIME_FILTERING Query Option (Impala 2.5 or higher only)
RUNTIME_FILTER_MAX_SIZE Query Option (Impala 2.6 or higher only)
RUNTIME_FILTER_MIN_SIZE Query Option (Impala 2.6 or higher only)
RUNTIME_BLOOM_FILTER_SIZE Query Option (Impala 2.5 or higher only); in Impala 2.6 and higher, this setting acts as a fallback when statistics are not available, rather than as a directive.
In the same way the query plan displayed by the
EXPLAIN
statement includes information
about predicates used by each plan fragment, it also
includes annotations showing whether a plan fragment
produces or consumes a runtime filter.
A plan fragment that produces a filter includes an
annotation such as
runtime filters: filter_id <- table.column
,
while a plan fragment that consumes a filter includes an annotation such as
runtime filters: filter_id -> table.column
.
Setting the query option EXPLAIN_LEVEL=2
adds additional
annotations showing the type of the filter, either filter_id[bloom]
(for HDFS-based tables) or filter_id[min_max]
(for Kudu tables).
The following example shows a query that uses a single runtime filter,
labeled RF000
, to prune the partitions based on
evaluating the result set of a subquery at runtime:
CREATE TABLE yy (s STRING) PARTITIONED BY (year INT);
INSERT INTO yy PARTITION (year) VALUES ('1999', 1999), ('2000', 2000),
('2001', 2001), ('2010', 2010), ('2018', 2018);
COMPUTE STATS yy;
CREATE TABLE yy2 (s STRING, year INT);
INSERT INTO yy2 VALUES ('1999', 1999), ('2000', 2000), ('2001', 2001);
COMPUTE STATS yy2;
-- The following query reads an unknown number of partitions, whose key values
-- are only known at run time. The runtime filters line shows the
-- information used in query fragment 02 to decide which partitions to skip.
EXPLAIN SELECT s FROM yy WHERE year IN (SELECT year FROM yy2);
+--------------------------------------------------------------------------+
| PLAN-ROOT SINK |
| | |
| 04:EXCHANGE [UNPARTITIONED] |
| | |
| 02:HASH JOIN [LEFT SEMI JOIN, BROADCAST] |
| | hash predicates: year = year |
| | runtime filters: RF000 <- year |
| | |
| |--03:EXCHANGE [BROADCAST] |
| | | |
| | 01:SCAN HDFS [default.yy2] |
| | partitions=1/1 files=1 size=620B |
| | |
| 00:SCAN HDFS [default.yy] |
| partitions=5/5 files=5 size=1.71KB |
| runtime filters: RF000 -> year |
+--------------------------------------------------------------------------+
SELECT s FROM yy WHERE year IN (SELECT year FROM yy2); -- Returns 3 rows from yy
PROFILE;
The query profile (displayed by the PROFILE
command
in impala-shell) contains both the
EXPLAIN
plan and more detailed information about the
internal workings of the query. The profile output includes the
Filter routing table
section with information about
each filter based on its ID.
In this example, Impala would normally do extra work to interpret the columns
C1
, C2
, C3
, and ID
for each row in HUGE_T1
, before checking the ID
value against the in-memory hash table constructed from all the TINY_T2.ID
values. By producing a filter containing all the TINY_T2.ID
values
even before the query starts scanning the HUGE_T1
table, Impala
can skip the unnecessary work to parse the column info as soon as it determines
that an ID
value does not match any of the values from the other table.
The example shows COMPUTE STATS
statements for both the tables (even
though that is a one-time operation after loading data into those tables) because
Impala relies on up-to-date statistics to
determine which one has more distinct ID
values than the other.
That information lets Impala make effective decisions about which table to use to
construct the in-memory hash table, and which table to read from disk and
compare against the entries in the hash table.
COMPUTE STATS huge_t1;
COMPUTE STATS tiny_t2;
SELECT c1, c2, c3 FROM huge_t1 JOIN tiny_t2 WHERE huge_t1.id = tiny_t2.id;
In this example, T1
is a table partitioned by year. The subquery
on T2
produces multiple values, and transmits those values as a filter to the plan
fragments that are reading from T1
. Any non-matching partitions in T1
are skipped.
select c1 from t1 where year in (select distinct year from t2);
Now the WHERE
clause contains an additional test that does not apply to
the partition key column.
A filter on a column that is not a partition key is called a per-row filter.
Because per-row filters only apply for Parquet, T1
must be a Parquet table.
The subqueries result in two filters being transmitted to
the scan nodes that read from T1
. The filter on YEAR
helps the query eliminate
entire partitions based on non-matching years. The filter on C2
lets Impala discard
rows with non-matching C2
values immediately after reading them. Without runtime filtering,
Impala would have to keep the non-matching values in memory, assemble C1
, C2
,
and C3
into rows in the intermediate result set, and transmit all the intermediate rows
back to the coordinator node, where they would be eliminated only at the very end of the query.
select c1, c2, c3 from t1
where year in (select distinct year from t2)
and c2 in (select other_column from t3);
This example involves a broadcast join.
The fact that the ON
clause would
return a small number of matching rows (because there
are not very many rows in TINY_T2
)
means that the corresponding filter is very selective.
Therefore, runtime filtering will probably be effective
in optimizing this query.
select c1 from huge_t1 join [broadcast] tiny_t2
on huge_t1.id = tiny_t2.id
where huge_t1.year in (select distinct year from tiny_t2)
and c2 in (select other_column from t3);
This example involves a shuffle or partitioned join.
Assume that most rows in HUGE_T1
have a corresponding row in HUGE_T2
.
The fact that the ON
clause could
return a large number of matching rows means that
the corresponding filter would not be very selective.
Therefore, runtime filtering might be less effective
in optimizing this query.
select c1 from huge_t1 join [shuffle] huge_t2
on huge_t1.id = huge_t2.id
where huge_t1.year in (select distinct year from huge_t2)
and c2 in (select other_column from t3);
These tuning and troubleshooting procedures apply to queries that are resource-intensive enough, long-running enough, and frequent enough that you can devote special attention to optimizing them individually.
Use the EXPLAIN
statement and examine the runtime filters:
lines to determine whether runtime filters are being applied to the WHERE
predicates
and join clauses that you expect. For example, runtime filtering does not apply to queries that use
the nested loop join mechanism due to non-equijoin operators.
Make sure statistics are up-to-date for all tables involved in the queries.
Use the COMPUTE STATS
statement after loading data into non-partitioned tables,
and COMPUTE INCREMENTAL STATS
after adding new partitions to partitioned tables.
If join queries involving large tables use unique columns as the join keys,
for example joining a primary key column with a foreign key column, the overhead of
producing and transmitting the filter might outweigh the performance benefit because
not much data could be pruned during the early stages of the query.
For such queries, consider setting the query option RUNTIME_FILTER_MODE=OFF
.
The runtime filtering feature is most effective for the Parquet file formats. For other file formats, filtering only applies for partitioned tables. See File Format Considerations for Runtime Filtering. For the ways in which runtime filtering works for Kudu tables, see Impala Query Performance for Kudu Tables.
When the spill-to-disk mechanism is activated on a particular host during a query, that host does not produce any filters while processing that query. This limitation does not affect the correctness of results; it only reduces the amount of optimization that can be applied to the query.