Using Impala with Iceberg Tables

Impala now supports Apache Iceberg which is an open table format for huge analytic datasets. With this functionality, you can access any existing Iceberg tables using SQL and perform analytics over them. Using Impala you can create and write Iceberg tables in different Iceberg Catalogs (e.g. HiveCatalog, HadoopCatalog). It also supports location-based tables (HadoopTables).

Currently only Iceberg V1 DML operations are allowed, i.e. INSERT INTO /INSERT OVERWRITE. Iceberg V2 operations like row-level modifications (UPDATE, DELETE) are not supported yet.

For more information on Iceberg, see the Apache Iceberg site.

Overview of Iceberg features

Creating Iceberg tables with Impala

When you have an existing Iceberg table that is not yet present in the Hive Metastore, you can use the CREATE EXTERNAL TABLE command in Impala to add the table to the Hive Metastore and make Impala able to interact with this table. Currently Impala supports HadoopTables, HadoopCatalog, and HiveCatalog. If you have an existing table in HiveCatalog, and you are using the same Hive Metastore, you need no further actions.

You can also create new Iceberg tables with Impala. You can use the same commands as above, just omit the EXTERNAL keyword. To create an Iceberg table in HiveCatalog the following CREATE TABLE statement can be used:

CREATE TABLE ice_t (i INT) STORED AS ICEBERG;
        

By default Impala assumes that the Iceberg table uses Parquet data files. ORC is also supported, but we need to tell Impala via setting the table property 'write.format.default' to 'ORC'.

You can also use CREATE TABLE AS SELECT to create new Iceberg tables, e.g.:

CREATE TABLE ice_ctas STORED AS ICEBERG AS SELECT i, b FROM value_tbl;

CREATE TABLE ice_ctas_part PARTITIONED BY(d) STORED AS ICEBERG AS SELECT s, ts, d FROM value_tbl;

CREATE TABLE ice_ctas_part_spec PARTITIONED BY SPEC (truncate(3, s)) STORED AS ICEBERG AS SELECT cast(t as INT), s, d FROM value_tbl;
        

Dropping Iceberg tables

One can use DROP TABLE statement to remove an Iceberg table:

          DROP TABLE ice_t;
        

When external.table.purge table property is set to true, then the DROP TABLE statement will also delete the data files. This property is set to true when Impala creates the Iceberg table via CREATE TABLE. When CREATE EXTERNAL TABLE is used (the table already exists in some catalog) then this external.table.purge is set to false, i.e. DROP TABLE doesn't remove any files, only the table definition in HMS.

Supported Data Types for Iceberg Columns

You can get information about the supported Iceberg data tyeps in the Iceberg spec.

The Iceberg data types can be mapped to the following SQL types in Impala:
Iceberg type SQL type in Impala
boolean BOOLEAN
int INTEGER
long BIGINT
float FLOAT
double DOUBLE
decimal(P, S) DECIMAL(P, S)
date DATE
time Not supported
timestamp TIMESTAMP
timestamptz Only read support via TIMESTAMP
string STRING
uuid Not supported
fixed(L) Not supported
binary Not supported
struct STRUCT (read only)
list ARRAY (read only)
map MAP (read only)

Schema evolution of Iceberg tables

Iceberg assigns unique field ids to schema elements which means it is possible to reorder/delete/change columns and still be able to correctly read current and old data files. Impala supports the following statements to modify a table's schema:
  • ALTER TABLE ... RENAME TO ... (renames the table if the Iceberg catalog supports it)
  • ALTER TABLE ... CHANGE COLUMN ... (change name and type of a column iff the new type is compatible with the old type)
  • ALTER TABLE ... ADD COLUMNS ... (adds columns to the end of the table)
  • ALTER TABLE ... DROP COLUMN ...
Valid type promotions are:
  • int to long
  • float to double
  • decimal(P, S) to decimal(P', S) if P' > P – widen the precision of decimal types.

See schema evolution for more details.

Partitioning Iceberg tables

The Iceberg spec has information about partitioning Iceberg tables. With Iceberg, we are not limited to value-based partitioning, we can also partition our tables via several partition transforms.

Partition transforms are IDENTITY, BUCKET, TRUNCATE, YEAR, MONTH, DAY, HOUR, and VOID. Impala supports all of these transforms. To create a partitioned Iceberg table, one needs to add a PARTITIONED BY SPEC clause to the CREATE TABLE statement, e.g.:

CREATE TABLE ice_p (i INT, d DATE, s STRING, t TIMESTAMP)
PARTITIONED BY SPEC (BUCKET(5, i), MONTH(d), TRUNCATE(3, s), HOUR(t))
STORED AS ICEBERG;
        
Iceberg also supports partition evolution which means that the partitioning of a table can be changed, even without the need of rewriting existing data files. You can change an existing table's partitioning via an ALTER TABLE SET PARTITION SPEC statement, e.g.:

ALTER TABLE ice_p SET PARTITION SPEC (VOID(i), VOID(d), TRUNCATE(3, s), HOUR(t), i);
        
Please keep in mind that for Iceberg V1 tables:
  • Do not reorder partition fields
  • Do not drop partition fields; instead replace the field’s transform with the void transform
  • Only add partition fields at the end of the previous partition spec
You can also use the legacy syntax to create identity-partitioned Iceberg tables:

CREATE TABLE ice_p (i INT, b INT) PARTITIONED BY (p1 INT, p2 STRING) STORED AS ICEBERG;
        

One can inspect a table's partition spec by the SHOW PARTITIONS or SHOW CREATE TABLE commands.

Writing Iceberg tables

Impala is also able to insert new data to Iceberg tables. Currently the INSERT INTO and INSERT OVERWRITE DML statements are supported. One can also remove the contents of an Iceberg table via the TRUNCATE command.

Since Iceberg uses hidden partitioning it means you don't need a partition clause in your INSERT statements. E.g. insertion to a partitioned table looks like:

CREATE TABLE ice_p (i INT, b INT) PARTITIONED BY SPEC (bucket(17, i)) STORED AS ICEBERG;
INSERT INTO ice_p VALUES (1, 2);
        

INSERT OVERWRITE statements can replace data in the table with the result of a query. For partitioned tables Impala does a dynamic overwrite, which means partitions that have rows produced by the SELECT query will be replaced. And partitions that have no rows produced by the SELECT query remain untouched. INSERT OVERWRITE is not allowed for tables that use the BUCKET partition transform because dynamic overwrite behavior would be too random in this case. If one needs to replace all contents of a table, they can still use TRUNCATE and INSERT INTO.

Impala can only write Iceberg tables with Parquet data files.

Time travel for Iceberg tables

Iceberg stores the table states in a chain of snapshots. By default, Impala uses the current snapshot of the table. But for Iceberg tables, it is also possible to query an earlier state of the table.

We can use the FOR SYSTEM_TIME AS OF and FOR SYSTEM_VERSION AS OF clauses in SELECT queries, e.g.:

SELECT * FROM ice_t FOR SYSTEM_TIME AS OF '2022-01-04 10:00:00';
SELECT * FROM ice_t FOR SYSTEM_TIME AS OF now() - interval 5 days;
SELECT * FROM ice_t FOR SYSTEM_VERSION AS OF 123456;
        
If one needs to check the available snapshots of a table they can use the DESCRIBE HISTORY statement with the following syntax:

DESCRIBE HISTORY [db_name.]table_name
  [FROM timestamp];

DESCRIBE HISTORY [db_name.]table_name
  [BETWEEN timestamp AND timestamp]
        
For example:

DESCRIBE HISTORY ice_t FROM '2022-01-04 10:00:00';
DESCRIBE HISTORY ice_t FROM now() - interval 5 days;
DESCRIBE HISTORY ice_t BETWEEN '2022-01-04 10:00:00' AND '2022-01-05 10:00:00';

Please note that during time travel, Impala uses the current table schema to query an older snapshot of the table which might have had a different schema in the past.

Iceberg table properties

We can set the following table properties for Iceberg tables:
  • iceberg.catalog: controls which catalog is used for this Iceberg table. It can be 'hive.catalog' (default), 'hadoop.catalog', 'hadoop.tables', or a name that identifies a catalog defined in the Hadoop configurations, e.g. hive-site.xml
  • iceberg.catalog_location: Iceberg table catalog location when iceberg.catalog is 'hadoop.catalog'
  • iceberg.table_identifier: Iceberg table identifier. We use <database>.<table> instead if this property is not set
  • write.format.default: data file format of the table. Impala can read ORC and PARQUET data files in Iceberg tables, and can write PARQUET data files only.
  • write.parquet.compression-codec: Parquet compression codec. Supported values are: NONE, GZIP, SNAPPY (default value), LZ4, ZSTD. The table property will be ignored if COMPRESSION_CODEC query option is set.
  • write.parquet.compression-level: Parquet compression level. Used with ZSTD compression only. Supported range is [1, 22]. Default value is 3. The table property will be ignored if COMPRESSION_CODEC query option is set.
  • write.parquet.row-group-size-bytes: Parquet row group size in bytes. Supported range is [8388608, 2146435072] (8MB - 2047MB). The table property will be ignored if PARQUET_FILE_SIZE query option is set. If neither the table property nor the PARQUET_FILE_SIZE query option is set, the way Impala calculates row group size will remain unchanged.
  • write.parquet.page-size-bytes: Parquet page size in bytes. Used for PLAIN encoding. Supported range is [65536, 1073741824] (64KB - 1GB). If the table property is unset, the way Impala calculates page size will remain unchanged.
  • write.parquet.dict-size-bytes: Parquet dictionary page size in bytes. Used for dictionary encoding. Supported range is [65536, 1073741824] (64KB - 1GB). If the table property is unset, the way Impala calculates dictionary page size will remain unchanged.