Using Impala with Iceberg Tables
Impala now supports Apache Iceberg which is an open table format for huge analytic datasets. With this functionality, you can access any existing Iceberg tables using SQL and perform analytics over them. Using Impala you can create and write Iceberg tables in different Iceberg Catalogs (e.g. HiveCatalog, HadoopCatalog). It also supports location-based tables (HadoopTables).
Currently only Iceberg V1 DML operations are allowed, i.e. INSERT INTO /INSERT OVERWRITE. Iceberg V2 operations like row-level modifications (UPDATE, DELETE) are not supported yet.
For more information on Iceberg, see the Apache Iceberg site.
Overview of Iceberg features
- ACID compliance: DML operations are atomic, queries always read a consistent snapshot.
- Hidden partitioning: Iceberg produces partition values by taking a column value and optionally transforming it. Partition information is stored in the Iceberg metadata files. Iceberg is able to TRUNCATE column values or calculate a hash of them and use it for partitioning. Readers don't need to be aware of the partitioning of the table.
- Partition layout evolution: When the data volume or the query patterns change you can update the layout of a table. Since hidden partitioning is used, you don't need to rewrite the data files during partition layout evolution.
- Schema evolution: supports add, drop, update, or rename schema elements, and has no side-effects.
- Time travel: enables reproducible queries that use exactly the same table snapshot, or lets users easily examine changes.
- Cloning Iceberg tables: create an empty Iceberg table based on the definition of another Iceberg table.
Creating Iceberg tables with Impala
When you have an existing Iceberg table that is not yet present in the Hive Metastore,
you can use the CREATE EXTERNAL TABLE
command in Impala to add the table to the Hive
Metastore and make Impala able to interact with this table. Currently Impala supports
HadoopTables, HadoopCatalog, and HiveCatalog. If you have an existing table in HiveCatalog,
and you are using the same Hive Metastore, you need no further actions.
-
HadoopTables. When the table already exists in a HadoopTable it means there is
a location on the file system that contains your table. Use the following command
to add this table to Impala's catalog:
CREATE EXTERNAL TABLE ice_hadoop_tbl STORED AS ICEBERG LOCATION '/path/to/table' TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
-
HadoopCatalog. A table in HadoopCatalog means that there is a catalog location
in the file system under which Iceberg tables are stored. Use the following command
to add a table in a HadoopCatalog to Impala:
CREATE EXTERNAL TABLE ice_hadoop_cat STORED AS ICEBERG TBLPROPERTIES('iceberg.catalog'='hadoop.catalog', 'iceberg.catalog_location'='/path/to/catalog', 'iceberg.table_identifier'='namespace.table');
-
Alternatively, you can also use custom catalogs to use existing tables. It means you need to define
your catalog in hive-site.xml.
The adventage of this method is that other engines are more likely to be able to interact with this table.
To globally register different catalogs, set the following Hadoop configurations:
Config Key Description iceberg.catalog.<catalog_name>.type type of catalog: hive, hadoop, or left unset if using a custom catalog iceberg.catalog.<catalog_name>.catalog-impl catalog implementation, must not be null if type is empty iceberg.catalog.<catalog_name>.<key> any config key and value pairs for the catalog For example, to register a HadoopCatalog called 'hadoop', set the following properties in hive-site.xml:iceberg.catalog.hadoop.type=hadoop; iceberg.catalog.hadoop.warehouse=hdfs://example.com:8020/warehouse;
Then in the CREATE TABLE statement you can just refer to the catalog name:CREATE EXTERNAL TABLE ice_catalogs STORED AS ICEBERG TBLPROPERTIES('iceberg.catalog'='<CATALOG-NAME>');
- If the table already exists in HiveCatalog then Impala should be able to see it without any additional commands.
EXTERNAL
keyword. To create an Iceberg table in HiveCatalog the following
CREATE TABLE statement can be used:
CREATE TABLE ice_t (i INT) STORED AS ICEBERG;
By default Impala assumes that the Iceberg table uses Parquet data files. ORC is also supported, but we need to tell Impala via setting the table property 'write.format.default' to 'ORC'.
CREATE TABLE AS SELECT
to create new Iceberg tables, e.g.:
CREATE TABLE ice_ctas STORED AS ICEBERG AS SELECT i, b FROM value_tbl;
CREATE TABLE ice_ctas_part PARTITIONED BY(d) STORED AS ICEBERG AS SELECT s, ts, d FROM value_tbl;
CREATE TABLE ice_ctas_part_spec PARTITIONED BY SPEC (truncate(3, s)) STORED AS ICEBERG AS SELECT cast(t as INT), s, d FROM value_tbl;
Dropping Iceberg tables
DROP TABLE
statement to remove an Iceberg table:
DROP TABLE ice_t;
When external.table.purge
table property is set to true, then the
DROP TABLE
statement will also delete the data files. This property
is set to true when Impala creates the Iceberg table via CREATE TABLE
.
When CREATE EXTERNAL TABLE
is used (the table already exists in some
catalog) then this external.table.purge
is set to false, i.e.
DROP TABLE
doesn't remove any files, only the table definition
in HMS.
Supported Data Types for Iceberg Columns
You can get information about the supported Iceberg data tyeps in the Iceberg spec.
Iceberg type | SQL type in Impala |
---|---|
boolean | BOOLEAN |
int | INTEGER |
long | BIGINT |
float | FLOAT |
double | DOUBLE |
decimal(P, S) | DECIMAL(P, S) |
date | DATE |
time | Not supported |
timestamp | TIMESTAMP |
timestamptz | Only read support via TIMESTAMP |
string | STRING |
uuid | Not supported |
fixed(L) | Not supported |
binary | Not supported |
struct | STRUCT (read only) |
list | ARRAY (read only) |
map | MAP (read only) |
Schema evolution of Iceberg tables
ALTER TABLE ... RENAME TO ...
(renames the table if the Iceberg catalog supports it)ALTER TABLE ... CHANGE COLUMN ...
(change name and type of a column iff the new type is compatible with the old type)ALTER TABLE ... ADD COLUMNS ...
(adds columns to the end of the table)ALTER TABLE ... DROP COLUMN ...
- int to long
- float to double
- decimal(P, S) to decimal(P', S) if P' > P – widen the precision of decimal types.
See schema evolution for more details.
Partitioning Iceberg tables
The Iceberg spec has information about partitioning Iceberg tables. With Iceberg, we are not limited to value-based partitioning, we can also partition our tables via several partition transforms.
PARTITIONED BY SPEC
clause to the CREATE TABLE statement, e.g.:
CREATE TABLE ice_p (i INT, d DATE, s STRING, t TIMESTAMP)
PARTITIONED BY SPEC (BUCKET(5, i), MONTH(d), TRUNCATE(3, s), HOUR(t))
STORED AS ICEBERG;
ALTER TABLE SET PARTITION SPEC
statement, e.g.:
ALTER TABLE ice_p SET PARTITION SPEC (VOID(i), VOID(d), TRUNCATE(3, s), HOUR(t), i);
- Do not reorder partition fields
- Do not drop partition fields; instead replace the field’s transform with the void transform
- Only add partition fields at the end of the previous partition spec
CREATE TABLE ice_p (i INT, b INT) PARTITIONED BY (p1 INT, p2 STRING) STORED AS ICEBERG;
One can inspect a table's partition spec by the SHOW PARTITIONS
or
SHOW CREATE TABLE
commands.
Writing Iceberg tables
Impala is also able to insert new data to Iceberg tables. Currently the INSERT INTO
and INSERT OVERWRITE
DML statements are supported. One can also remove the
contents of an Iceberg table via the TRUNCATE
command.
CREATE TABLE ice_p (i INT, b INT) PARTITIONED BY SPEC (bucket(17, i)) STORED AS ICEBERG;
INSERT INTO ice_p VALUES (1, 2);
INSERT OVERWRITE
statements can replace data in the table with the result of a query.
For partitioned tables Impala does a dynamic overwrite, which means partitions that have rows produced
by the SELECT query will be replaced. And partitions that have no rows produced by the SELECT query
remain untouched. INSERT OVERWRITE is not allowed for tables that use the BUCKET partition transform
because dynamic overwrite behavior would be too random in this case. If one needs to replace all
contents of a table, they can still use TRUNCATE
and INSERT INTO
.
Impala can only write Iceberg tables with Parquet data files.
Time travel for Iceberg tables
Iceberg stores the table states in a chain of snapshots. By default, Impala uses the current snapshot of the table. But for Iceberg tables, it is also possible to query an earlier state of the table.
FOR SYSTEM_TIME AS OF
and FOR SYSTEM_VERSION AS OF
clauses in SELECT
queries, e.g.:
SELECT * FROM ice_t FOR SYSTEM_TIME AS OF '2022-01-04 10:00:00';
SELECT * FROM ice_t FOR SYSTEM_TIME AS OF now() - interval 5 days;
SELECT * FROM ice_t FOR SYSTEM_VERSION AS OF 123456;
DESCRIBE HISTORY
statement with the following syntax:
DESCRIBE HISTORY [db_name.]table_name
[FROM timestamp];
DESCRIBE HISTORY [db_name.]table_name
[BETWEEN timestamp AND timestamp]
For example:
DESCRIBE HISTORY ice_t FROM '2022-01-04 10:00:00';
DESCRIBE HISTORY ice_t FROM now() - interval 5 days;
DESCRIBE HISTORY ice_t BETWEEN '2022-01-04 10:00:00' AND '2022-01-05 10:00:00';
Please note that during time travel, Impala uses the current table schema to query an older snapshot of the table which might have had a different schema in the past.
Cloning Iceberg tables (LIKE clause)
CREATE TABLE ... LIKE ...
to create an empty Iceberg table
based on the definition of another Iceberg table, including any column attributes in
the original table:
CREATE TABLE new_ice_tbl LIKE orig_ice_tbl;
Because of the Data Types of Iceberg and Impala do not correspond one by one, Impala can only clone between Iceberg tables.
Expiring snapshots
ALTER TABLE ... EXECUTE expire_snapshots(...)
statement, which will expire snapshots that are older than the specified
timestamp. For example:
ALTER TABLE ice_tbl EXECUTE expire_snapshots('2022-01-04 10:00:00');
ALTER TABLE ice_tbl EXECUTE expire_snapshots(now() - interval 5 days);
- does not remove old metadata files by default.
- does not remove orphaned data files.
- respects the minimum number of snapshots to keep:
history.expire.min-snapshots-to-keep
table property.
Old metadata file clean up can be configured with
write.metadata.delete-after-commit.enabled=true
and
write.metadata.previous-versions-max
table properties. This
allows automatic metadata file removal after operations that modify metadata
such as expiring snapshots or inserting data.
Iceberg table properties
-
iceberg.catalog
: controls which catalog is used for this Iceberg table. It can be 'hive.catalog' (default), 'hadoop.catalog', 'hadoop.tables', or a name that identifies a catalog defined in the Hadoop configurations, e.g. hive-site.xml iceberg.catalog_location
: Iceberg table catalog location wheniceberg.catalog
is'hadoop.catalog'
iceberg.table_identifier
: Iceberg table identifier. We use <database>.<table> instead if this property is not setwrite.format.default
: data file format of the table. Impala can read ORC and PARQUET data files in Iceberg tables, and can write PARQUET data files only.write.parquet.compression-codec
: Parquet compression codec. Supported values are: NONE, GZIP, SNAPPY (default value), LZ4, ZSTD. The table property will be ignored ifCOMPRESSION_CODEC
query option is set.write.parquet.compression-level
: Parquet compression level. Used with ZSTD compression only. Supported range is [1, 22]. Default value is 3. The table property will be ignored ifCOMPRESSION_CODEC
query option is set.write.parquet.row-group-size-bytes
: Parquet row group size in bytes. Supported range is [8388608, 2146435072] (8MB - 2047MB). The table property will be ignored ifPARQUET_FILE_SIZE
query option is set. If neither the table property nor thePARQUET_FILE_SIZE
query option is set, the way Impala calculates row group size will remain unchanged.write.parquet.page-size-bytes
: Parquet page size in bytes. Used for PLAIN encoding. Supported range is [65536, 1073741824] (64KB - 1GB). If the table property is unset, the way Impala calculates page size will remain unchanged.write.parquet.dict-size-bytes
: Parquet dictionary page size in bytes. Used for dictionary encoding. Supported range is [65536, 1073741824] (64KB - 1GB). If the table property is unset, the way Impala calculates dictionary page size will remain unchanged.