Using Impala with Iceberg Tables

Impala now supports Apache Iceberg which is an open table format for huge analytic datasets. With this functionality, you can access any existing Iceberg tables using SQL and perform analytics over them. Using Impala you can create and write Iceberg tables in different Iceberg Catalogs (e.g. HiveCatalog, HadoopCatalog). It also supports location-based tables (HadoopTables).

Currently only Iceberg V1 DML operations are allowed, i.e. INSERT INTO /INSERT OVERWRITE. Iceberg V2 operations like row-level modifications (UPDATE, DELETE) are not supported yet.

For more information on Iceberg, see the Apache Iceberg site.

Overview of Iceberg features

  • ACID compliance: DML operations are atomic, queries always read a consistent snapshot.
  • Hidden partitioning: Iceberg produces partition values by taking a column value and optionally transforming it. Partition information is stored in the Iceberg metadata files. Iceberg is able to TRUNCATE column values or calculate a hash of them and use it for partitioning. Readers don't need to be aware of the partitioning of the table.
  • Partition layout evolution: When the data volume or the query patterns change you can update the layout of a table. Since hidden partitioning is used, you don't need to rewrite the data files during partition layout evolution.
  • Schema evolution: supports add, drop, update, or rename schema elements, and has no side-effects.
  • Time travel: enables reproducible queries that use exactly the same table snapshot, or lets users easily examine changes.
  • Cloning Iceberg tables: create an empty Iceberg table based on the definition of another Iceberg table.

Creating Iceberg tables with Impala

When you have an existing Iceberg table that is not yet present in the Hive Metastore, you can use the CREATE EXTERNAL TABLE command in Impala to add the table to the Hive Metastore and make Impala able to interact with this table. Currently Impala supports HadoopTables, HadoopCatalog, and HiveCatalog. If you have an existing table in HiveCatalog, and you are using the same Hive Metastore, you need no further actions.

  • HadoopTables. When the table already exists in a HadoopTable it means there is a location on the file system that contains your table. Use the following command to add this table to Impala's catalog:
    
    CREATE EXTERNAL TABLE ice_hadoop_tbl
    STORED AS ICEBERG
    LOCATION '/path/to/table'
    TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
              
  • HadoopCatalog. A table in HadoopCatalog means that there is a catalog location in the file system under which Iceberg tables are stored. Use the following command to add a table in a HadoopCatalog to Impala:
    
    CREATE EXTERNAL TABLE ice_hadoop_cat
    STORED AS ICEBERG
    TBLPROPERTIES('iceberg.catalog'='hadoop.catalog',
                  'iceberg.catalog_location'='/path/to/catalog',
                  'iceberg.table_identifier'='namespace.table');
              
  • Alternatively, you can also use custom catalogs to use existing tables. It means you need to define your catalog in hive-site.xml. The adventage of this method is that other engines are more likely to be able to interact with this table. To globally register different catalogs, set the following Hadoop configurations:
    Config Key Description
    iceberg.catalog.<catalog_name>.type type of catalog: hive, hadoop, or left unset if using a custom catalog
    iceberg.catalog.<catalog_name>.catalog-impl catalog implementation, must not be null if type is empty
    iceberg.catalog.<catalog_name>.<key> any config key and value pairs for the catalog
    For example, to register a HadoopCatalog called 'hadoop', set the following properties in hive-site.xml:
    
    iceberg.catalog.hadoop.type=hadoop;
    iceberg.catalog.hadoop.warehouse=hdfs://example.com:8020/warehouse;
                
    Then in the CREATE TABLE statement you can just refer to the catalog name:
    
    CREATE EXTERNAL TABLE ice_catalogs STORED AS ICEBERG TBLPROPERTIES('iceberg.catalog'='<CATALOG-NAME>');
                
  • If the table already exists in HiveCatalog then Impala should be able to see it without any additional commands.
You can also create new Iceberg tables with Impala. You can use the same commands as above, just omit the EXTERNAL keyword. To create an Iceberg table in HiveCatalog the following CREATE TABLE statement can be used:

CREATE TABLE ice_t (i INT) STORED AS ICEBERG;
        

By default Impala assumes that the Iceberg table uses Parquet data files. ORC is also supported, but we need to tell Impala via setting the table property 'write.format.default' to 'ORC'.

You can also use CREATE TABLE AS SELECT to create new Iceberg tables, e.g.:

CREATE TABLE ice_ctas STORED AS ICEBERG AS SELECT i, b FROM value_tbl;

CREATE TABLE ice_ctas_part PARTITIONED BY(d) STORED AS ICEBERG AS SELECT s, ts, d FROM value_tbl;

CREATE TABLE ice_ctas_part_spec PARTITIONED BY SPEC (truncate(3, s)) STORED AS ICEBERG AS SELECT cast(t as INT), s, d FROM value_tbl;
        

Dropping Iceberg tables

One can use DROP TABLE statement to remove an Iceberg table:

          DROP TABLE ice_t;
        

When external.table.purge table property is set to true, then the DROP TABLE statement will also delete the data files. This property is set to true when Impala creates the Iceberg table via CREATE TABLE. When CREATE EXTERNAL TABLE is used (the table already exists in some catalog) then this external.table.purge is set to false, i.e. DROP TABLE doesn't remove any files, only the table definition in HMS.

Supported Data Types for Iceberg Columns

You can get information about the supported Iceberg data tyeps in the Iceberg spec.

The Iceberg data types can be mapped to the following SQL types in Impala:
Iceberg type SQL type in Impala
boolean BOOLEAN
int INTEGER
long BIGINT
float FLOAT
double DOUBLE
decimal(P, S) DECIMAL(P, S)
date DATE
time Not supported
timestamp TIMESTAMP
timestamptz Only read support via TIMESTAMP
string STRING
uuid Not supported
fixed(L) Not supported
binary Not supported
struct STRUCT (read only)
list ARRAY (read only)
map MAP (read only)

Schema evolution of Iceberg tables

Iceberg assigns unique field ids to schema elements which means it is possible to reorder/delete/change columns and still be able to correctly read current and old data files. Impala supports the following statements to modify a table's schema:
  • ALTER TABLE ... RENAME TO ... (renames the table if the Iceberg catalog supports it)
  • ALTER TABLE ... CHANGE COLUMN ... (change name and type of a column iff the new type is compatible with the old type)
  • ALTER TABLE ... ADD COLUMNS ... (adds columns to the end of the table)
  • ALTER TABLE ... DROP COLUMN ...
Valid type promotions are:
  • int to long
  • float to double
  • decimal(P, S) to decimal(P', S) if P' > P – widen the precision of decimal types.

See schema evolution for more details.

Partitioning Iceberg tables

The Iceberg spec has information about partitioning Iceberg tables. With Iceberg, we are not limited to value-based partitioning, we can also partition our tables via several partition transforms.

Partition transforms are IDENTITY, BUCKET, TRUNCATE, YEAR, MONTH, DAY, HOUR, and VOID. Impala supports all of these transforms. To create a partitioned Iceberg table, one needs to add a PARTITIONED BY SPEC clause to the CREATE TABLE statement, e.g.:

CREATE TABLE ice_p (i INT, d DATE, s STRING, t TIMESTAMP)
PARTITIONED BY SPEC (BUCKET(5, i), MONTH(d), TRUNCATE(3, s), HOUR(t))
STORED AS ICEBERG;
        
Iceberg also supports partition evolution which means that the partitioning of a table can be changed, even without the need of rewriting existing data files. You can change an existing table's partitioning via an ALTER TABLE SET PARTITION SPEC statement, e.g.:

ALTER TABLE ice_p SET PARTITION SPEC (VOID(i), VOID(d), TRUNCATE(3, s), HOUR(t), i);
        
Please keep in mind that for Iceberg V1 tables:
  • Do not reorder partition fields
  • Do not drop partition fields; instead replace the field’s transform with the void transform
  • Only add partition fields at the end of the previous partition spec
You can also use the legacy syntax to create identity-partitioned Iceberg tables:

CREATE TABLE ice_p (i INT, b INT) PARTITIONED BY (p1 INT, p2 STRING) STORED AS ICEBERG;
        

One can inspect a table's partition spec by the SHOW PARTITIONS or SHOW CREATE TABLE commands.

Writing Iceberg tables

Impala is also able to insert new data to Iceberg tables. Currently the INSERT INTO and INSERT OVERWRITE DML statements are supported. One can also remove the contents of an Iceberg table via the TRUNCATE command.

Since Iceberg uses hidden partitioning it means you don't need a partition clause in your INSERT statements. E.g. insertion to a partitioned table looks like:

CREATE TABLE ice_p (i INT, b INT) PARTITIONED BY SPEC (bucket(17, i)) STORED AS ICEBERG;
INSERT INTO ice_p VALUES (1, 2);
        

INSERT OVERWRITE statements can replace data in the table with the result of a query. For partitioned tables Impala does a dynamic overwrite, which means partitions that have rows produced by the SELECT query will be replaced. And partitions that have no rows produced by the SELECT query remain untouched. INSERT OVERWRITE is not allowed for tables that use the BUCKET partition transform because dynamic overwrite behavior would be too random in this case. If one needs to replace all contents of a table, they can still use TRUNCATE and INSERT INTO.

Impala can only write Iceberg tables with Parquet data files.

Time travel for Iceberg tables

Iceberg stores the table states in a chain of snapshots. By default, Impala uses the current snapshot of the table. But for Iceberg tables, it is also possible to query an earlier state of the table.

We can use the FOR SYSTEM_TIME AS OF and FOR SYSTEM_VERSION AS OF clauses in SELECT queries, e.g.:

SELECT * FROM ice_t FOR SYSTEM_TIME AS OF '2022-01-04 10:00:00';
SELECT * FROM ice_t FOR SYSTEM_TIME AS OF now() - interval 5 days;
SELECT * FROM ice_t FOR SYSTEM_VERSION AS OF 123456;
        
If one needs to check the available snapshots of a table they can use the DESCRIBE HISTORY statement with the following syntax:

DESCRIBE HISTORY [db_name.]table_name
  [FROM timestamp];

DESCRIBE HISTORY [db_name.]table_name
  [BETWEEN timestamp AND timestamp]
        
For example:

DESCRIBE HISTORY ice_t FROM '2022-01-04 10:00:00';
DESCRIBE HISTORY ice_t FROM now() - interval 5 days;
DESCRIBE HISTORY ice_t BETWEEN '2022-01-04 10:00:00' AND '2022-01-05 10:00:00';

Please note that during time travel, Impala uses the current table schema to query an older snapshot of the table which might have had a different schema in the past.

Cloning Iceberg tables (LIKE clause)

Use CREATE TABLE ... LIKE ... to create an empty Iceberg table based on the definition of another Iceberg table, including any column attributes in the original table:

CREATE TABLE new_ice_tbl LIKE orig_ice_tbl;
        

Because of the Data Types of Iceberg and Impala do not correspond one by one, Impala can only clone between Iceberg tables.

Expiring snapshots

Iceberg snapshots accumulate until they are deleted by a user action. Snapshots can be deleted with ALTER TABLE ... EXECUTE expire_snapshots(...) statement, which will expire snapshots that are older than the specified timestamp. For example:

ALTER TABLE ice_tbl EXECUTE expire_snapshots('2022-01-04 10:00:00');
ALTER TABLE ice_tbl EXECUTE expire_snapshots(now() - interval 5 days);
        
Expire snapshots:
  • does not remove old metadata files by default.
  • does not remove orphaned data files.
  • respects the minimum number of snapshots to keep: history.expire.min-snapshots-to-keep table property.

Old metadata file clean up can be configured with write.metadata.delete-after-commit.enabled=true and write.metadata.previous-versions-max table properties. This allows automatic metadata file removal after operations that modify metadata such as expiring snapshots or inserting data.

Iceberg table properties

We can set the following table properties for Iceberg tables:
  • iceberg.catalog: controls which catalog is used for this Iceberg table. It can be 'hive.catalog' (default), 'hadoop.catalog', 'hadoop.tables', or a name that identifies a catalog defined in the Hadoop configurations, e.g. hive-site.xml
  • iceberg.catalog_location: Iceberg table catalog location when iceberg.catalog is 'hadoop.catalog'
  • iceberg.table_identifier: Iceberg table identifier. We use <database>.<table> instead if this property is not set
  • write.format.default: data file format of the table. Impala can read ORC and PARQUET data files in Iceberg tables, and can write PARQUET data files only.
  • write.parquet.compression-codec: Parquet compression codec. Supported values are: NONE, GZIP, SNAPPY (default value), LZ4, ZSTD. The table property will be ignored if COMPRESSION_CODEC query option is set.
  • write.parquet.compression-level: Parquet compression level. Used with ZSTD compression only. Supported range is [1, 22]. Default value is 3. The table property will be ignored if COMPRESSION_CODEC query option is set.
  • write.parquet.row-group-size-bytes: Parquet row group size in bytes. Supported range is [8388608, 2146435072] (8MB - 2047MB). The table property will be ignored if PARQUET_FILE_SIZE query option is set. If neither the table property nor the PARQUET_FILE_SIZE query option is set, the way Impala calculates row group size will remain unchanged.
  • write.parquet.page-size-bytes: Parquet page size in bytes. Used for PLAIN encoding. Supported range is [65536, 1073741824] (64KB - 1GB). If the table property is unset, the way Impala calculates page size will remain unchanged.
  • write.parquet.dict-size-bytes: Parquet dictionary page size in bytes. Used for dictionary encoding. Supported range is [65536, 1073741824] (64KB - 1GB). If the table property is unset, the way Impala calculates dictionary page size will remain unchanged.