Using Impala with Iceberg Tables

Impala now supports Apache Iceberg which is an open table format for huge analytic datasets. With this functionality, you can access any existing Iceberg tables using SQL and perform analytics over them. Using Impala you can create and write Iceberg tables in different Iceberg Catalogs (e.g. HiveCatalog, HadoopCatalog). It also supports location-based tables (HadoopTables).

For more information on Iceberg, see the Apache Iceberg site.

Overview of Iceberg features

Creating Iceberg tables with Impala

When you have an existing Iceberg table that is not yet present in the Hive Metastore, you can use the CREATE EXTERNAL TABLE command in Impala to add the table to the Hive Metastore and make Impala able to interact with this table. Currently Impala supports HadoopTables, HadoopCatalog, and HiveCatalog. If you have an existing table in HiveCatalog, and you are using the same Hive Metastore, you need no further actions.

You can also create new Iceberg tables with Impala. You can use the same commands as above, just omit the EXTERNAL keyword. To create an Iceberg table in HiveCatalog the following CREATE TABLE statement can be used:

CREATE TABLE ice_t (i INT) STORED AS ICEBERG;
        

By default Impala assumes that the Iceberg table uses Parquet data files. ORC and AVRO are also supported, but we need to tell Impala via setting the table property 'write.format.default' to e.g. 'ORC'.

You can also use CREATE TABLE AS SELECT to create new Iceberg tables, e.g.:

CREATE TABLE ice_ctas STORED AS ICEBERG AS SELECT i, b FROM value_tbl;

CREATE TABLE ice_ctas_part PARTITIONED BY(d) STORED AS ICEBERG AS SELECT s, ts, d FROM value_tbl;

CREATE TABLE ice_ctas_part_spec PARTITIONED BY SPEC (truncate(3, s)) STORED AS ICEBERG AS SELECT cast(t as INT), s, d FROM value_tbl;
        

Iceberg V2 tables

Iceberg V2 tables support row-level modifications (DELETE, UPDATE) via "merge-on-read", which means instead of rewriting existing data files, separate so-called delete files are being written that store information about the deleted records. There are two kinds of delete files in Iceberg:
  • position deletes
  • equality deletes
Impala only supports position delete files. These files contain the file path and file position of the deleted rows.
One can create Iceberg V2 tables via the CREATE TABLE statement, they just need to specify the 'format-version' table property:

CREATE TABLE ice_v2 (i int) STORED BY ICEBERG TBLPROPERTIES('format-version'='2');
        
It is also possible to upgrade existing Iceberg V1 tables to Iceberg V2 tables. One can use the following ALTER TABLE statement to do so:

ALTER TABLE ice_v1_to_v2 SET TBLPROPERTIES('format-version'='2');
        

Dropping Iceberg tables

One can use DROP TABLE statement to remove an Iceberg table:

          DROP TABLE ice_t;
        

When external.table.purge table property is set to true, then the DROP TABLE statement will also delete the data files. This property is set to true when Impala creates the Iceberg table via CREATE TABLE. When CREATE EXTERNAL TABLE is used (the table already exists in some catalog) then this external.table.purge is set to false, i.e. DROP TABLE doesn't remove any files, only the table definition in HMS.

Supported Data Types for Iceberg Columns

You can get information about the supported Iceberg data types in the Iceberg spec.

The Iceberg data types can be mapped to the following SQL types in Impala:
Iceberg type SQL type in Impala
boolean BOOLEAN
int INTEGER
long BIGINT
float FLOAT
double DOUBLE
decimal(P, S) DECIMAL(P, S)
date DATE
time Not supported
timestamp TIMESTAMP
timestamptz Only read support via TIMESTAMP
string STRING
uuid Not supported
fixed(L) Not supported
binary Not supported
struct STRUCT (read only)
list ARRAY (read only)
map MAP (read only)

Schema evolution of Iceberg tables

Iceberg assigns unique field ids to schema elements which means it is possible to reorder/delete/change columns and still be able to correctly read current and old data files. Impala supports the following statements to modify a table's schema:
  • ALTER TABLE ... RENAME TO ... (renames the table if the Iceberg catalog supports it)
  • ALTER TABLE ... CHANGE COLUMN ... (change name and type of a column iff the new type is compatible with the old type)
  • ALTER TABLE ... ADD COLUMNS ... (adds columns to the end of the table)
  • ALTER TABLE ... DROP COLUMN ...
Valid type promotions are:
  • int to long
  • float to double
  • decimal(P, S) to decimal(P', S) if P' > P – widen the precision of decimal types.

Impala currently does not support schema evolution for tables with AVRO file format.

See schema evolution for more details.

Partitioning Iceberg tables

The Iceberg spec has information about partitioning Iceberg tables. With Iceberg, we are not limited to value-based partitioning, we can also partition our tables via several partition transforms.

Partition transforms are IDENTITY, BUCKET, TRUNCATE, YEAR, MONTH, DAY, HOUR, and VOID. Impala supports all of these transforms. To create a partitioned Iceberg table, one needs to add a PARTITIONED BY SPEC clause to the CREATE TABLE statement, e.g.:

CREATE TABLE ice_p (i INT, d DATE, s STRING, t TIMESTAMP)
PARTITIONED BY SPEC (BUCKET(5, i), MONTH(d), TRUNCATE(3, s), HOUR(t))
STORED AS ICEBERG;
        
Iceberg also supports partition evolution which means that the partitioning of a table can be changed, even without the need of rewriting existing data files. You can change an existing table's partitioning via an ALTER TABLE SET PARTITION SPEC statement, e.g.:

ALTER TABLE ice_p SET PARTITION SPEC (VOID(i), VOID(d), TRUNCATE(3, s), HOUR(t), i);
        
Please keep in mind that for Iceberg V1 tables:
  • Do not reorder partition fields
  • Do not drop partition fields; instead replace the field’s transform with the void transform
  • Only add partition fields at the end of the previous partition spec
You can also use the legacy syntax to create identity-partitioned Iceberg tables:

CREATE TABLE ice_p (i INT, b INT) PARTITIONED BY (p1 INT, p2 STRING) STORED AS ICEBERG;
        

One can inspect a table's partition spec by the SHOW PARTITIONS or SHOW CREATE TABLE commands.

Inserting data into Iceberg tables

Impala is also able to insert new data to Iceberg tables. Currently the INSERT INTO and INSERT OVERWRITE DML statements are supported. One can also remove the contents of an Iceberg table via the TRUNCATE command.

Since Iceberg uses hidden partitioning it means you don't need a partition clause in your INSERT statements. E.g. insertion to a partitioned table looks like:

CREATE TABLE ice_p (i INT, b INT) PARTITIONED BY SPEC (bucket(17, i)) STORED AS ICEBERG;
INSERT INTO ice_p VALUES (1, 2);
        

INSERT OVERWRITE statements can replace data in the table with the result of a query. For partitioned tables Impala does a dynamic overwrite, which means partitions that have rows produced by the SELECT query will be replaced. And partitions that have no rows produced by the SELECT query remain untouched. INSERT OVERWRITE is not allowed for tables that use the BUCKET partition transform because dynamic overwrite behavior would be too random in this case. If one needs to replace all contents of a table, they can still use TRUNCATE and INSERT INTO.

Impala can only write Iceberg tables with Parquet data files.

Delete data from Iceberg tables

Since Impala 4.3 Impala is able to run DELETE statements against Iceberg V2 tables. E.g.:

DELETE FROM ice_t where i = 3;
        

More information about the DELETE statement can be found at DELETE Statement (Impala 2.8 or higher only).

Updating data int Iceberg tables

Since Impala 4.4 Impala is able to run UPDATE statements against Iceberg V2 tables. E.g.:

UPDATE ice_t SET val = val + 1;
UPDATE ice_t SET k = 4 WHERE i = 5;
UPDATE ice_t SET ice_t.k = o.k, ice_t.j = o.j, FROM ice_t, other_table o where ice_t.id = o.id;
        

The UPDATE FROM statement can be used to update a target Iceberg table based on a source table (or view) that doesn't need to be an Iceberg table. If there are multiple matches on the JOIN condition, Impala will raise an error.

Limitations:
  • Only the merge-on-read update mode is supported.
  • Only writes position delete files, i.e. no support for writing equality deletes.
  • Cannot update tables with complex types.
  • Can only write data and delete files in Parquet format. This means if table properties 'write.format.default' and 'write.delete.format.default' are set, their values must be PARQUET.
  • Updating partitioning column with non-constant expression via the UPDATE FROM statement is not allowed. The upcoming MERGE statement will not have this limitation.

More information about the UPDATE statement can be found at UPDATE Statement (Impala 2.8 or higher only).

Loading data into Iceberg tables

LOAD DATA statement can be used to load a single file or directory into an existing Iceberg table. This operation is executed differently compared to HMS tables, the data is being inserted into the table via sequentially executed statements, which has some limitations:
  • Only Parquet or ORC files can be loaded.
  • PARTITION clause is not supported, but the partition transformations are respected.
  • The loaded files will be re-written as Parquet files.

Optimizing (Compacting) Iceberg tables

Frequent updates and row-level modifications on Iceberg tables can write many small data files and delete files, which have to be merged-on-read. This causes read performance to degrade over time. The following statement can be used to compact the table and optimize it for reading.

OPTIMIZE TABLE [db_name.]table_name;
        
The current implementation of the OPTIMIZE TABLE statement rewrites the entire table, executing the following tasks:
  • compact small files
  • merge delete and update deltas
  • rewrite all files, converting them to the latest table schema
  • rewrite all partitions according to the latest partition spec
To execute table optimization:
  • The user needs ALL privileges on the table.
  • The table can conatin any file formats that Impala can read, but write.format.default has to be parquet.
  • The table cannot contain complex types.

When a table is optimized, a new snapshot is created. The old table state is still accessible by time travel to previous snapshots, because the rewritten data and delete files are not removed physically.

Note that the current implementation of OPTIMIZE TABLE rewrites the entire table, therefore this operation can take a long time to complete depending on the size of the table.

Time travel for Iceberg tables

Iceberg stores the table states in a chain of snapshots. By default, Impala uses the current snapshot of the table. But for Iceberg tables, it is also possible to query an earlier state of the table.

We can use the FOR SYSTEM_TIME AS OF and FOR SYSTEM_VERSION AS OF clauses in SELECT queries, e.g.:

SELECT * FROM ice_t FOR SYSTEM_TIME AS OF '2022-01-04 10:00:00';
SELECT * FROM ice_t FOR SYSTEM_TIME AS OF now() - interval 5 days;
SELECT * FROM ice_t FOR SYSTEM_VERSION AS OF 123456;
        
If one needs to check the available snapshots of a table they can use the DESCRIBE HISTORY statement with the following syntax:

DESCRIBE HISTORY [db_name.]table_name
  [FROM timestamp];

DESCRIBE HISTORY [db_name.]table_name
  [BETWEEN timestamp AND timestamp]
        
For example:

DESCRIBE HISTORY ice_t FROM '2022-01-04 10:00:00';
DESCRIBE HISTORY ice_t FROM now() - interval 5 days;
DESCRIBE HISTORY ice_t BETWEEN '2022-01-04 10:00:00' AND '2022-01-05 10:00:00';
The output of the DESCRIBE HISTORY statement is formed of the following columns:
  • creation_time: the snapshot's creation timestamp.
  • snapshot_id: the snapshot's ID or null.
  • parent_id: the snapshot's parent ID or null.
  • is_current_ancestor: TRUE if the snapshot is a current ancestor of the table.

Please note that time travel queries are executed using the old schema of the table from the point specified by the time travel parameters. Prior to Impala 4.3.0 the current table schema is used to query an older snapshot of the table, which might have had a different schema in the past.

Rolling Iceberg tables back to a previous state

Iceberg table modifications cause new table snapshots to be created; these snapshots represent an earlier version of the table. The ALTER TABLE [db_name.]table_name EXECUTE ROLLBACK statement can be used to roll back the table to a previous snapshot.

For example, to roll the table back to the snapshot id 123456 use:

ALTER TABLE ice_tbl EXECUTE ROLLBACK(123456);
        
To roll the table back to the most recent (newest) snapshot that has a creation timestamp that is older than the timestamp '2022-01-04 10:00:00' use:

ALTER TABLE ice_tbl EXECUTE ROLLBACK('2022-01-04 10:00:00');
        
The timestamp is evaluated using the Timezone for the current session.

It is only possible to roll back to a snapshot that is a current ancestor of the table.

When a table is rolled back to a snapshot, a new snapshot is created with the same snapshot id, but with a new creation timestamp.

Expiring snapshots

Iceberg snapshots accumulate until they are deleted by a user action. Snapshots can be deleted with ALTER TABLE ... EXECUTE expire_snapshots(...) statement, which will expire snapshots that are older than the specified timestamp. For example:

ALTER TABLE ice_tbl EXECUTE expire_snapshots('2022-01-04 10:00:00');
ALTER TABLE ice_tbl EXECUTE expire_snapshots(now() - interval 5 days);
        
Expire snapshots:
  • does not remove old metadata files by default.
  • does not remove orphaned data files.
  • respects the minimum number of snapshots to keep: history.expire.min-snapshots-to-keep table property.

Old metadata file clean up can be configured with write.metadata.delete-after-commit.enabled=true and write.metadata.previous-versions-max table properties. This allows automatic metadata file removal after operations that modify metadata such as expiring snapshots or inserting data.

Iceberg metadata tables

Iceberg stores extensive metadata for each table (e.g. snapshots, manifests, data and delete files etc.), which is accessible in Impala in the form of virtual tables called metadata tables.

Metadata tables can be queried just like regular tables, including filtering, aggregation and joining with other metadata and regular tables. On the other hand, they are read-only, so it is not possible to change, add or remove records from them, they cannot be dropped and new metadata tables cannot be created. Metadata changes made in other ways (not through metadata tables) are reflected in the tables.

To list the metadata tables available for an Iceberg table, use the SHOW METADATA TABLES command:

SHOW METADATA TABLES IN [db.]tbl [[LIKE] “pattern”]
        
It is possible to filter the result using pattern. All Iceberg tables have the same metadata tables, so this command is mostly for convenience. Using SHOW METADATA TABLES on a non-Iceberg table results in an error.

Just like regular tables, metadata tables have schemas that can be queried with the DESCRIBE command. Note, however, that DESCRIBE FORMATTED|EXTENDED are not available for metadata tables.

Example:

DESCRIBE functional_parquet.iceberg_alltypes_part.history;
        

To retrieve information from metadata tables, use the usual SELECT statement. You can select any subset of the columns or all of them using ‘*’. Note that in contrast to regular tables, SELECT * on metadata tables always includes complex-typed columns in the result. Therefore, the query option EXPAND_COMPLEX_TYPES only applies to regular tables. This holds also in queries that mix metadata tables and regular tables: for SELECT * expressions from metadata tables, complex types will always be included, and for SELECT * expressions from regular tables, complex types will be included if and only if EXPAND_COMPLEX_TYPES is true.

Note that unnesting collections from metadata tables is not supported.

Example:

SELECT
    s.operation,
    h.is_current_ancestor,
    s.summary
FROM functional_parquet.iceberg_alltypes_part.history h
JOIN functional_parquet.iceberg_alltypes_part.snapshots s
  ON h.snapshot_id = s.snapshot_id
WHERE s.operation = 'append'
ORDER BY made_current_at;
        

Cloning Iceberg tables (LIKE clause)

Use CREATE TABLE ... LIKE ... to create an empty Iceberg table based on the definition of another Iceberg table, including any column attributes in the original table:

          CREATE TABLE new_ice_tbl LIKE orig_ice_tbl;
        

Because of the Data Types of Iceberg and Impala do not correspond one by one, Impala can only clone between Iceberg tables.

Iceberg table properties

We can set the following table properties for Iceberg tables:
  • iceberg.catalog: controls which catalog is used for this Iceberg table. It can be 'hive.catalog' (default), 'hadoop.catalog', 'hadoop.tables', or a name that identifies a catalog defined in the Hadoop configurations, e.g. hive-site.xml
  • iceberg.catalog_location: Iceberg table catalog location when iceberg.catalog is 'hadoop.catalog'
  • iceberg.table_identifier: Iceberg table identifier. We use <database>.<table> instead if this property is not set
  • write.format.default: data file format of the table. Impala can read AVRO, ORC and PARQUET data files in Iceberg tables, and can write PARQUET data files only.
  • write.parquet.compression-codec: Parquet compression codec. Supported values are: NONE, GZIP, SNAPPY (default value), LZ4, ZSTD. The table property will be ignored if COMPRESSION_CODEC query option is set.
  • write.parquet.compression-level: Parquet compression level. Used with ZSTD compression only. Supported range is [1, 22]. Default value is 3. The table property will be ignored if COMPRESSION_CODEC query option is set.
  • write.parquet.row-group-size-bytes: Parquet row group size in bytes. Supported range is [8388608, 2146435072] (8MB - 2047MB). The table property will be ignored if PARQUET_FILE_SIZE query option is set. If neither the table property nor the PARQUET_FILE_SIZE query option is set, the way Impala calculates row group size will remain unchanged.
  • write.parquet.page-size-bytes: Parquet page size in bytes. Used for PLAIN encoding. Supported range is [65536, 1073741824] (64KB - 1GB). If the table property is unset, the way Impala calculates page size will remain unchanged.
  • write.parquet.dict-size-bytes: Parquet dictionary page size in bytes. Used for dictionary encoding. Supported range is [65536, 1073741824] (64KB - 1GB). If the table property is unset, the way Impala calculates dictionary page size will remain unchanged.

Iceberg manifest caching

Starting from version 1.1.0, Apache Iceberg provides a mechanism to cache the contents of Iceberg manifest files in memory. This manifest caching feature helps to reduce repeated reads of small Iceberg manifest files from remote storage by Coordinators and Catalogd. This feature can be enabled for Impala Coordinators and Catalogd by setting properties in Hadoop's core-site.xml as in the following:

iceberg.io-impl=org.apache.iceberg.hadoop.HadoopFileIO;
iceberg.io.manifest.cache-enabled=true;
iceberg.io.manifest.cache.max-total-bytes=104857600;
iceberg.io.manifest.cache.expiration-interval-ms=3600000;
iceberg.io.manifest.cache.max-content-length=8388608;
        
The description of each property is as follows:
  • iceberg.io-impl: custom FileIO implementation to use in a catalog. Must be set to enable manifest caching. Impala defaults to HadoopFileIO. It is recommended to not change this to other than HadoopFileIO.
  • iceberg.io.manifest.cache-enabled: enable/disable the manifest caching feature.
  • iceberg.io.manifest.cache.max-total-bytes: maximum total amount of bytes to cache in the manifest cache. Must be a positive value.
  • iceberg.io.manifest.cache.expiration-interval-ms: maximum duration for which an entry stays in the manifest cache. Must be a non-negative value. Setting zero means cache entries expire only if it gets evicted due to memory pressure from iceberg.io.manifest.cache.max-total-bytes.
  • iceberg.io.manifest.cache.max-content-length: maximum length of a manifest file to be considered for caching in bytes. Manifest files with a length exceeding this property value will not be cached. Must be set with a positive value and lower than iceberg.io.manifest.cache.max-total-bytes.

Manifest caching only works for tables that are loaded with either of HadoopCatalogs or HiveCatalogs. Individual HadoopCatalog and HiveCatalog will have separate manifest caches with the same configuration. By default, only 8 catalogs can have their manifest cache active in memory. This number can be raised by setting a higher value in the java system property iceberg.io.manifest.cache.fileio-max.