Impala now supports Apache Iceberg which is an open table format for huge analytic datasets. With this functionality, you can access any existing Iceberg tables using SQL and perform analytics over them. Using Impala you can create and write Iceberg tables in different Iceberg Catalogs (e.g. HiveCatalog, HadoopCatalog). It also supports location-based tables (HadoopTables).
For more information on Iceberg, see the Apache Iceberg site.
When you have an existing Iceberg table that is not yet present in the Hive Metastore,
you can use the CREATE EXTERNAL TABLE
command in Impala to add the table to the Hive
Metastore and make Impala able to interact with this table. Currently Impala supports
HadoopTables, HadoopCatalog, and HiveCatalog. If you have an existing table in HiveCatalog,
and you are using the same Hive Metastore, you need no further actions.
CREATE EXTERNAL TABLE ice_hadoop_tbl
STORED AS ICEBERG
LOCATION '/path/to/table'
TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
CREATE EXTERNAL TABLE ice_hadoop_cat
STORED AS ICEBERG
TBLPROPERTIES('iceberg.catalog'='hadoop.catalog',
'iceberg.catalog_location'='/path/to/catalog',
'iceberg.table_identifier'='namespace.table');
Config Key | Description |
---|---|
iceberg.catalog.<catalog_name>.type | type of catalog: hive, hadoop, or left unset if using a custom catalog |
iceberg.catalog.<catalog_name>.catalog-impl | catalog implementation, must not be null if type is empty |
iceberg.catalog.<catalog_name>.<key> | any config key and value pairs for the catalog |
iceberg.catalog.hadoop.type=hadoop;
iceberg.catalog.hadoop.warehouse=hdfs://example.com:8020/warehouse;
CREATE EXTERNAL TABLE ice_catalogs STORED AS ICEBERG TBLPROPERTIES('iceberg.catalog'='<CATALOG-NAME>');
EXTERNAL
keyword. To create an Iceberg table in HiveCatalog the following
CREATE TABLE statement can be used:
CREATE TABLE ice_t (i INT) STORED AS ICEBERG;
By default Impala assumes that the Iceberg table uses Parquet data files. ORC and AVRO are also supported, but we need to tell Impala via setting the table property 'write.format.default' to e.g. 'ORC'.
CREATE TABLE AS SELECT
to create new Iceberg tables, e.g.:
CREATE TABLE ice_ctas STORED AS ICEBERG AS SELECT i, b FROM value_tbl;
CREATE TABLE ice_ctas_part PARTITIONED BY(d) STORED AS ICEBERG AS SELECT s, ts, d FROM value_tbl;
CREATE TABLE ice_ctas_part_spec PARTITIONED BY SPEC (truncate(3, s)) STORED AS ICEBERG AS SELECT cast(t as INT), s, d FROM value_tbl;
CREATE TABLE
statement, they just need to specify
the 'format-version' table property:
CREATE TABLE ice_v2 (i int) STORED BY ICEBERG TBLPROPERTIES('format-version'='2');
ALTER TABLE
statement to do so:
ALTER TABLE ice_v1_to_v2 SET TBLPROPERTIES('format-version'='2');
DROP TABLE
statement to remove an Iceberg table:
DROP TABLE ice_t;
When external.table.purge
table property is set to true, then the
DROP TABLE
statement will also delete the data files. This property
is set to true when Impala creates the Iceberg table via CREATE TABLE
.
When CREATE EXTERNAL TABLE
is used (the table already exists in some
catalog) then this external.table.purge
is set to false, i.e.
DROP TABLE
doesn't remove any files, only the table definition
in HMS.
You can get information about the supported Iceberg data types in the Iceberg spec.
Iceberg type | SQL type in Impala |
---|---|
boolean | BOOLEAN |
int | INTEGER |
long | BIGINT |
float | FLOAT |
double | DOUBLE |
decimal(P, S) | DECIMAL(P, S) |
date | DATE |
time | Not supported |
timestamp | TIMESTAMP |
timestamptz | Only read support via TIMESTAMP |
string | STRING |
uuid | Not supported |
fixed(L) | Not supported |
binary | Not supported |
struct | STRUCT (read only) |
list | ARRAY (read only) |
map | MAP (read only) |
ALTER TABLE ... RENAME TO ...
(renames the table if the Iceberg catalog supports it)ALTER TABLE ... CHANGE COLUMN ...
(change name and type of a column iff the new type is compatible with the old type)ALTER TABLE ... ADD COLUMNS ...
(adds columns to the end of the table)ALTER TABLE ... DROP COLUMN ...
Impala currently does not support schema evolution for tables with AVRO file format.
See schema evolution for more details.
The Iceberg spec has information about partitioning Iceberg tables. With Iceberg, we are not limited to value-based partitioning, we can also partition our tables via several partition transforms.
PARTITIONED BY SPEC
clause to the CREATE TABLE statement, e.g.:
CREATE TABLE ice_p (i INT, d DATE, s STRING, t TIMESTAMP)
PARTITIONED BY SPEC (BUCKET(5, i), MONTH(d), TRUNCATE(3, s), HOUR(t))
STORED AS ICEBERG;
ALTER TABLE SET PARTITION SPEC
statement, e.g.:
ALTER TABLE ice_p SET PARTITION SPEC (VOID(i), VOID(d), TRUNCATE(3, s), HOUR(t), i);
CREATE TABLE ice_p (i INT, b INT) PARTITIONED BY (p1 INT, p2 STRING) STORED AS ICEBERG;
One can inspect a table's partition spec by the SHOW PARTITIONS
or
SHOW CREATE TABLE
commands.
Impala is also able to insert new data to Iceberg tables. Currently the INSERT INTO
and INSERT OVERWRITE
DML statements are supported. One can also remove the
contents of an Iceberg table via the TRUNCATE
command.
CREATE TABLE ice_p (i INT, b INT) PARTITIONED BY SPEC (bucket(17, i)) STORED AS ICEBERG;
INSERT INTO ice_p VALUES (1, 2);
INSERT OVERWRITE
statements can replace data in the table with the result of a query.
For partitioned tables Impala does a dynamic overwrite, which means partitions that have rows produced
by the SELECT query will be replaced. And partitions that have no rows produced by the SELECT query
remain untouched. INSERT OVERWRITE is not allowed for tables that use the BUCKET partition transform
because dynamic overwrite behavior would be too random in this case. If one needs to replace all
contents of a table, they can still use TRUNCATE
and INSERT INTO
.
Impala can only write Iceberg tables with Parquet data files.
DELETE
statements against
Iceberg V2 tables. E.g.:
DELETE FROM ice_t where i = 3;
More information about the DELETE
statement can be found at DELETE Statement (Impala 2.8 or higher only).
UPDATE
statements against
Iceberg V2 tables. E.g.:
UPDATE ice_t SET val = val + 1;
UPDATE ice_t SET k = 4 WHERE i = 5;
UPDATE ice_t SET ice_t.k = o.k, ice_t.j = o.j, FROM ice_t, other_table o where ice_t.id = o.id;
The UPDATE FROM statement can be used to update a target Iceberg table based on a source table (or view) that doesn't need to be an Iceberg table. If there are multiple matches on the JOIN condition, Impala will raise an error.
More information about the UPDATE
statement can be found at UPDATE Statement (Impala 2.8 or higher only).
LOAD DATA
statement can be used to load a single file or directory into
an existing Iceberg table. This operation is executed differently compared to HMS tables, the
data is being inserted into the table via sequentially executed statements, which has
some limitations:
PARTITION
clause is not supported, but the partition transformations
are respected.
OPTIMIZE TABLE [db_name.]table_name;
OPTIMIZE TABLE
statement rewrites
the entire table, executing the following tasks:
write.format.default
has to be parquet
.When a table is optimized, a new snapshot is created. The old table state is still accessible by time travel to previous snapshots, because the rewritten data and delete files are not removed physically.
Note that the current implementation of OPTIMIZE TABLE
rewrites
the entire table, therefore this operation can take a long time to complete
depending on the size of the table.
Iceberg stores the table states in a chain of snapshots. By default, Impala uses the current snapshot of the table. But for Iceberg tables, it is also possible to query an earlier state of the table.
FOR SYSTEM_TIME AS OF
and FOR SYSTEM_VERSION AS OF
clauses in SELECT
queries, e.g.:
SELECT * FROM ice_t FOR SYSTEM_TIME AS OF '2022-01-04 10:00:00';
SELECT * FROM ice_t FOR SYSTEM_TIME AS OF now() - interval 5 days;
SELECT * FROM ice_t FOR SYSTEM_VERSION AS OF 123456;
DESCRIBE HISTORY
statement with the following syntax:
DESCRIBE HISTORY [db_name.]table_name
[FROM timestamp];
DESCRIBE HISTORY [db_name.]table_name
[BETWEEN timestamp AND timestamp]
For example:
DESCRIBE HISTORY ice_t FROM '2022-01-04 10:00:00';
DESCRIBE HISTORY ice_t FROM now() - interval 5 days;
DESCRIBE HISTORY ice_t BETWEEN '2022-01-04 10:00:00' AND '2022-01-05 10:00:00';
DESCRIBE HISTORY
statement is formed
of the following columns:
creation_time
: the snapshot's creation timestamp.snapshot_id
: the snapshot's ID or null.parent_id
: the snapshot's parent ID or null.is_current_ancestor
: TRUE if the snapshot is a current ancestor of the table.Please note that time travel queries are executed using the old schema of the table from the point specified by the time travel parameters. Prior to Impala 4.3.0 the current table schema is used to query an older snapshot of the table, which might have had a different schema in the past.
Iceberg table modifications cause new table snapshots to be created;
these snapshots represent an earlier version of the table.
The ALTER TABLE [db_name.]table_name EXECUTE ROLLBACK
statement can be used to roll back the table to a previous snapshot.
123456
use:
ALTER TABLE ice_tbl EXECUTE ROLLBACK(123456);
To roll the table back to the most recent (newest) snapshot
that has a creation timestamp that is older than the timestamp '2022-01-04 10:00:00' use:
ALTER TABLE ice_tbl EXECUTE ROLLBACK('2022-01-04 10:00:00');
The timestamp is evaluated using the Timezone for the current session.
It is only possible to roll back to a snapshot that is a current ancestor of the table.
When a table is rolled back to a snapshot, a new snapshot is created with the same snapshot id, but with a new creation timestamp.
ALTER TABLE ... EXECUTE expire_snapshots(...)
statement, which will expire snapshots that are older than the specified
timestamp. For example:
ALTER TABLE ice_tbl EXECUTE expire_snapshots('2022-01-04 10:00:00');
ALTER TABLE ice_tbl EXECUTE expire_snapshots(now() - interval 5 days);
history.expire.min-snapshots-to-keep
table property.
Old metadata file clean up can be configured with
write.metadata.delete-after-commit.enabled=true
and
write.metadata.previous-versions-max
table properties. This
allows automatic metadata file removal after operations that modify metadata
such as expiring snapshots or inserting data.
Iceberg stores extensive metadata for each table (e.g. snapshots, manifests, data and delete files etc.), which is accessible in Impala in the form of virtual tables called metadata tables.
Metadata tables can be queried just like regular tables, including filtering, aggregation and joining with other metadata and regular tables. On the other hand, they are read-only, so it is not possible to change, add or remove records from them, they cannot be dropped and new metadata tables cannot be created. Metadata changes made in other ways (not through metadata tables) are reflected in the tables.
SHOW
METADATA TABLES
command:
SHOW METADATA TABLES IN [db.]tbl [[LIKE] “pattern”]
It is possible to filter the result using pattern
. All Iceberg
tables have the same metadata tables, so this command is mostly for convenience.
Using SHOW METADATA TABLES
on a non-Iceberg table results in an
error.
Just like regular tables, metadata tables have schemas that can be queried with
the DESCRIBE
command. Note, however, that DESCRIBE
FORMATTED|EXTENDED
are not available for metadata tables.
DESCRIBE functional_parquet.iceberg_alltypes_part.history;
To retrieve information from metadata tables, use the usual
SELECT
statement. You can select any subset of the columns or all
of them using ‘*’. Note that in contrast to regular tables, SELECT
*
on metadata tables always includes complex-typed columns in the result.
Therefore, the query option EXPAND_COMPLEX_TYPES
only applies to
regular tables. This holds also in queries that mix metadata tables and regular
tables: for SELECT *
expressions from metadata tables, complex
types will always be included, and for SELECT *
expressions from
regular tables, complex types will be included if and only if
EXPAND_COMPLEX_TYPES
is true.
Note that unnesting collections from metadata tables is not supported.
SELECT
s.operation,
h.is_current_ancestor,
s.summary
FROM functional_parquet.iceberg_alltypes_part.history h
JOIN functional_parquet.iceberg_alltypes_part.snapshots s
ON h.snapshot_id = s.snapshot_id
WHERE s.operation = 'append'
ORDER BY made_current_at;
CREATE TABLE ... LIKE ...
to create an empty Iceberg table
based on the definition of another Iceberg table, including any column attributes in
the original table:
CREATE TABLE new_ice_tbl LIKE orig_ice_tbl;
Because of the Data Types of Iceberg and Impala do not correspond one by one, Impala can only clone between Iceberg tables.
iceberg.catalog
: controls which catalog is used for this Iceberg table.
It can be 'hive.catalog' (default), 'hadoop.catalog', 'hadoop.tables', or a name that
identifies a catalog defined in the Hadoop configurations, e.g. hive-site.xml
iceberg.catalog_location
: Iceberg table catalog location when iceberg.catalog
is 'hadoop.catalog'
iceberg.table_identifier
: Iceberg table identifier. We use <database>.<table> instead if this property is not setwrite.format.default
: data file format of the table. Impala can read AVRO, ORC and PARQUET data files in Iceberg tables, and can write PARQUET data files only.write.parquet.compression-codec
:
Parquet compression codec. Supported values are: NONE, GZIP, SNAPPY
(default value), LZ4, ZSTD. The table property will be ignored if
COMPRESSION_CODEC
query option is set.
write.parquet.compression-level
:
Parquet compression level. Used with ZSTD compression only.
Supported range is [1, 22]. Default value is 3. The table property
will be ignored if COMPRESSION_CODEC
query option is set.
write.parquet.row-group-size-bytes
:
Parquet row group size in bytes. Supported range is [8388608,
2146435072] (8MB - 2047MB). The table property will be ignored if
PARQUET_FILE_SIZE
query option is set.
If neither the table property nor the PARQUET_FILE_SIZE
query option
is set, the way Impala calculates row group size will remain
unchanged.
write.parquet.page-size-bytes
:
Parquet page size in bytes. Used for PLAIN encoding. Supported range
is [65536, 1073741824] (64KB - 1GB).
If the table property is unset, the way Impala calculates page size
will remain unchanged.
write.parquet.dict-size-bytes
:
Parquet dictionary page size in bytes. Used for dictionary encoding.
Supported range is [65536, 1073741824] (64KB - 1GB).
If the table property is unset, the way Impala calculates dictionary
page size will remain unchanged.
iceberg.io-impl=org.apache.iceberg.hadoop.HadoopFileIO;
iceberg.io.manifest.cache-enabled=true;
iceberg.io.manifest.cache.max-total-bytes=104857600;
iceberg.io.manifest.cache.expiration-interval-ms=3600000;
iceberg.io.manifest.cache.max-content-length=8388608;
iceberg.io-impl
: custom FileIO implementation to use in a
catalog. Must be set to enable manifest caching. Impala defaults to
HadoopFileIO. It is recommended to not change this to other than HadoopFileIO.
iceberg.io.manifest.cache-enabled
: enable/disable the
manifest caching feature.
iceberg.io.manifest.cache.max-total-bytes
: maximum total
amount of bytes to cache in the manifest cache. Must be a positive value.
iceberg.io.manifest.cache.expiration-interval-ms
: maximum
duration for which an entry stays in the manifest cache. Must be a
non-negative value. Setting zero means cache entries expire only if it gets
evicted due to memory pressure from
iceberg.io.manifest.cache.max-total-bytes
.
iceberg.io.manifest.cache.max-content-length
: maximum length
of a manifest file to be considered for caching in bytes. Manifest files with
a length exceeding this property value will not be cached. Must be set with a
positive value and lower than
iceberg.io.manifest.cache.max-total-bytes
.
Manifest caching only works for tables that are loaded with either of
HadoopCatalogs or HiveCatalogs. Individual HadoopCatalog and HiveCatalog will have
separate manifest caches with the same configuration. By default, only 8 catalogs
can have their manifest cache active in memory. This number can be raised by
setting a higher value in the java system property
iceberg.io.manifest.cache.fileio-max
.