Using Impala to Query Kudu Tables

You can use Impala to query tables stored by Apache Kudu. This capability allows convenient access to a storage system that is tuned for different kinds of workloads than the default with Impala.

By default, Impala tables are stored on HDFS using data files with various file formats. HDFS files are ideal for bulk loads (append operations) and queries using full-table scans, but do not support in-place updates or deletes. Kudu is an alternative storage engine used by Impala which can do both in-place updates (for mixed read/write workloads) and fast scans (for data-warehouse/analytic operations). Using Kudu tables with Impala can simplify the ETL pipeline by avoiding extra steps to segregate and reorganize newly arrived data.

Certain Impala SQL statements and clauses, such as DELETE, UPDATE, UPSERT, and PRIMARY KEY work only with Kudu tables. Other statements and clauses, such as LOAD DATA, TRUNCATE TABLE, and INSERT OVERWRITE, are not applicable to Kudu tables.

Benefits of Using Kudu Tables with Impala

The combination of Kudu and Impala works best for tables where scan performance is important, but data arrives continuously, in small batches, or needs to be updated without being completely replaced. HDFS-backed tables can require substantial overhead to replace or reorganize data files as new data arrives. Impala can perform efficient lookups and scans within Kudu tables, and Impala can also perform update or delete operations efficiently. You can also use the Kudu Java, C++, and Python APIs to do ingestion or transformation operations outside of Impala, and Impala can query the current data at any time.

Configuring Impala for Use with Kudu

The -kudu_master_hosts configuration property must be set correctly for the impalad daemon, for CREATE TABLE ... STORED AS KUDU statements to connect to the appropriate Kudu server. Typically, the required value for this setting is kudu_host:7051. In a high-availability Kudu deployment, specify the names of multiple Kudu hosts separated by commas.

If the -kudu_master_hosts configuration property is not set, you can still associate the appropriate value for each table by specifying a TBLPROPERTIES('kudu.master_addresses') clause in the CREATE TABLE statement or changing the TBLPROPERTIES('kudu.master_addresses') value with an ALTER TABLE statement.

Cluster Topology for Kudu Tables

With HDFS-backed tables, you are typically concerned with the number of DataNodes in the cluster, how many and how large HDFS data files are read during a query, and therefore the amount of work performed by each DataNode and the network communication to combine intermediate results and produce the final result set.

With Kudu tables, the topology considerations are different, because:

  • The underlying storage is managed and organized by Kudu, not represented as HDFS data files.

  • Kudu handles some of the underlying mechanics of partitioning the data. You can specify the partitioning scheme with combinations of hash and range partitioning, so that you can decide how much effort to expend to manage the partitions as new data arrives. For example, you can construct partitions that apply to date ranges rather than a separate partition for each day or each hour.

  • Data is physically divided based on units of storage called tablets. Tablets are stored by tablet servers. Each tablet server can store multiple tablets, and each tablet is replicated across multiple tablet servers, managed automatically by Kudu. Where practical, co-locate the tablet servers on the same hosts as the Impala daemons, although that is not required.

Kudu Replication Factor

By default, Kudu tables created through Impala use a tablet replication factor of 3. To change the replication factor for a Kudu table, specify the replication factor using TBLPROPERTIES ('kudu.num_tablet_replicas' = 'n') in the CREATE TABLE Statement statement.

The number of replicas for a Kudu table must be odd.

Altering the kudu.num_tablet_replicas property after table creation currently has no effect.

Impala DDL Enhancements for Kudu Tables (CREATE TABLE and ALTER TABLE)

You can use the Impala CREATE TABLE and ALTER TABLE statements to create and fine-tune the characteristics of Kudu tables. Because Kudu tables have features and properties that do not apply to other kinds of Impala tables, familiarize yourself with Kudu-related concepts and syntax first. For the general syntax of the CREATE TABLE statement for Kudu tables, see CREATE TABLE Statement.

Non-unique Primary Keys for Kudu Tables

Kudu now allows a user to create a non-unique primary key for a table when creating a table. The data engine handles this by appending a system generated auto-incrementing column to the non-unique primary key columns. This is done to guarantee the uniqueness of the primary key. This auto-incrementing column is named as 'auto_incrementing_id' with bigint type and this column is only system generated and cannot be explicitly created by the user. This auto_incrementing_id column is unique across a partition/tablet i.e. every partition/tablet would have this column starting from one and incrementing monotonically. The assignment to this column during insertion is automatic.

Create a Kudu Table with a non-unique PRIMARY KEY

The following example shows creating a table with a non-unique PRIMARY KEY.


CREATE TABLE kudu_tbl1
(
 id INT NON UNIQUE PRIMARY KEY,
 name STRING
)
PARTITION BY HASH (id) PARTITIONS 3 STORED as KUDU;

The effective PRIMARY KEY in the above case will be {id, auto_increment_id}

Note: "auto_incrementing_id" column cannot be added, removed or renamed with ALTER TABLE statements.

Verify the PRIMARY KEY is non-unique

You can now check the PRIMARY KEY created is non-unique by running the following DESCRIBE command. A new property "key_unique" shows if the primary key is unique. System generated column "auto_incrementing_id" is shown in the output for the table as a non-unique primary key.


  describe kudu_tbl1
  +----------------------+--------+---------+-------------+------------+----------+---------------+---------------+---------------------+------------+
  | name                 | type   | comment | primary_key | key_unique | nullable | default_value | encoding      | compression         | block_size |
  +----------------------+--------+---------+-------------+------------+----------+---------------+---------------+---------------------+------------+
  | id                   | int    |         | true        | false      | false    |               | AUTO_ENCODING | DEFAULT_COMPRESSION | 0          |
  | auto_incrementing_id | bigint |         | true        | false      | false    |               | AUTO_ENCODING | DEFAULT_COMPRESSION | 0          |
  | name                 | string |         | false       |            | true     |               | AUTO_ENCODING | DEFAULT_COMPRESSION | 0          |
  +----------------------+--------+---------+-------------+------------+----------+---------------+---------------+---------------------+------------+
  Fetched 3 row(s) in 4.72s

Query Auto Incrementing Column

When you query a table using the SELECT statement, it will not display the system generated auto incrementing column unless the column is explicitly specified in the select list.

Create a Kudu table without a PRIMARY KEY attribute

You can create a Kudu table without specifying a PRIMARY KEY or a PARTITION KEY since they are optional, however you cannot create a Kudu table without specifying both PRIMARY KEY and PARTITION KEY. If you do not specify the primary key attribute, the partition key columns can be promoted as a non-unique primary key. This is possible only if those columns are the beginning columns of the table.

In the following example, 'a' and 'b' will be promoted as a non-unique primary key, 'auto_incrementing_id' column will be added by Kudu engine. 'a', 'b' and 'auto_incrementing_id' form the effective unique composite primary key.


CREATE TABLE auto_table
(
 a BIGINT,
 b STRING,
)
PARTITION BY HASH(a, b) PARTITIONS 2 STORED AS KUDU;

The effective primary key in this case would be {a, b, auto_incrementing_id}

Limitations

  • UPSERT operation is not supported for Kudu tables with non-unique primary key. If you run an UPSERT statement for a Kudu table with a non-unique primary key it will fail with an error.
  • Since the auto generated key for each row will be assigned after the row’s data is generated and after the row lands in the tablet, you cannot use this column in the partition key.

Primary Key Columns for Kudu Tables

Kudu tables introduce the notion of primary keys to Impala for the first time. The primary key is made up of one or more columns, whose values are combined and used as a lookup key during queries. The primary key can be non-unique. The uniqueness of the primary key is guaranteed by appending a system-generated auto-incrementing column to the non-unique primary key columns. The tuple represented by these columns cannot contain any NULL values, and can never be updated once inserted. For a Kudu table, all the partition key columns must come from the set of primary key columns.

The primary key has both physical and logical aspects:

  • On the physical side, it is used to map the data values to particular tablets for fast retrieval. Because the tuples formed by the primary key values are unique, the primary key columns are typically highly selective.

  • You can insert non-unique data using an INSERT statement but the data saved in Kudu table for each row which will be turned to unique by the system generated auto-incrementing column. If the primary key is non-unique, the uniqueness will not cause insertion failure. However, if the primary key is set as non-unique and if an INSERT operation fails part way through, all rows except the rows with writing errors will be added into the table. The duplicated rows will be added with different values for auto-incrementing columns.

Note:

Impala only allows PRIMARY KEY clauses and NOT NULL constraints on columns for Kudu tables. These constraints are enforced on the Kudu side.

Kudu-Specific Column Attributes for CREATE TABLE

For the general syntax of the CREATE TABLE statement for Kudu tables, see CREATE TABLE Statement. The following sections provide more detail for some of the Kudu-specific keywords you can use in column definitions.

The column list in a CREATE TABLE statement can include the following attributes, which only apply to Kudu tables:


[NON UNIQUE] PRIMARY KEY
| [NOT] NULL
| ENCODING codec
| COMPRESSION algorithm
| DEFAULT constant_expression
| BLOCK_SIZE number

See the following sections for details about each column attribute.

PRIMARY KEY Attribute

The primary key for a Kudu table is a column, or set of columns, that uniquely identifies every row. The primary key value also is used as the natural sort order for the values from the table. The primary key value for each row is based on the combination of values for the columns.

Because all of the primary key columns must have non-null values, specifying a column in the PRIMARY KEY or NON-UNIQUE PRIMARY KEY clause implicitly adds the NOT NULL attribute to that column.

The primary key columns must be the first ones specified in the CREATE TABLE statement. For a single-column primary key, you can include a PRIMARY KEY attribute inline with the column definition. For a multi-column primary key, you include a PRIMARY KEY (c1, c2, ...) clause as a separate entry at the end of the column list.

You can specify the PRIMARY KEY attribute either inline in a single column definition, or as a separate clause at the end of the column list:


CREATE TABLE pk_inline
(
  col1 BIGINT PRIMARY KEY,
  col2 STRING,
  col3 BOOLEAN
) PARTITION BY HASH(col1) PARTITIONS 2 STORED AS KUDU;

CREATE TABLE pk_at_end
(
  col1 BIGINT,
  col2 STRING,
  col3 BOOLEAN,
  PRIMARY KEY (col1)
) PARTITION BY HASH(col1) PARTITIONS 2 STORED AS KUDU;

CREATE TABLE pk_inline
(
col1 BIGINT [NON UNIQUE] PRIMARY KEY,
col2 STRING,
col3 BOOLEAN
) PARTITION BY HASH(col1) PARTITIONS 2 STORED AS KUDU;

CREATE TABLE pk_at_end
(
col1 BIGINT,
col2 STRING,
col3 BOOLEAN,
[NON UNIQUE] PRIMARY KEY (col1)
) PARTITION BY HASH(col1) PARTITIONS 2 STORED AS KUDU;

When the primary key is a single column, these two forms are equivalent. If the primary key consists of more than one column, you must specify the primary key using a separate entry in the column list:


CREATE TABLE pk_multiple_columns
(
  col1 BIGINT,
  col2 STRING,
  col3 BOOLEAN,
  PRIMARY KEY (col1, col2)
) PARTITION BY HASH(col2) PARTITIONS 2 STORED AS KUDU;

The SHOW CREATE TABLE statement always represents the PRIMARY KEY specification as a separate item in the column list:


CREATE TABLE inline_pk_rewritten (id BIGINT PRIMARY KEY, s STRING)
  PARTITION BY HASH(id) PARTITIONS 2 STORED AS KUDU;

SHOW CREATE TABLE inline_pk_rewritten;
+------------------------------------------------------------------------------+
| result                                                                       |
+------------------------------------------------------------------------------+
| CREATE TABLE user.inline_pk_rewritten (                                      |
|   id BIGINT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, |
|   s STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,      |
|   PRIMARY KEY (id)                                                           |
| )                                                                            |
| PARTITION BY HASH (id) PARTITIONS 2                                          |
| STORED AS KUDU                                                               |
| TBLPROPERTIES ('kudu.master_addresses'='host.example.com')                   |
+------------------------------------------------------------------------------+

The notion of primary key only applies to Kudu tables. Every Kudu table requires a primary key. The primary key consists of one or more columns. You must specify any primary key columns first in the column list or specify partition key with the beginning columns of the table.

The contents of the primary key columns cannot be changed by an UPDATE or UPSERT statement. Including too many columns in the primary key (more than 5 or 6) can also reduce the performance of write operations. Therefore, pick the most selective and most frequently tested non-null columns for the primary key specification. If a column must always have a value, but that value might change later, leave it out of the primary key and use a NOT NULL clause for that column instead. If an existing row has an incorrect or outdated key column value, delete the old row and insert an entirely new row with the correct primary key.

NULL | NOT NULL Attribute

For Kudu tables, you can specify which columns can contain nulls or not. This constraint offers an extra level of consistency enforcement for Kudu tables. If an application requires a field to always be specified, include a NOT NULL clause in the corresponding column definition, and Kudu prevents rows from being inserted with a NULL in that column.

For example, a table containing geographic information might require the latitude and longitude coordinates to always be specified. Other attributes might be allowed to be NULL. For example, a location might not have a designated place name, its altitude might be unimportant, and its population might be initially unknown, to be filled in later.

Because all of the primary key columns must have non-null values, specifying a column in the PRIMARY KEY clause implicitly adds the NOT NULL attribute to that column.

For non-Kudu tables, Impala allows any column to contain NULL values, because it is not practical to enforce a "not null" constraint on HDFS data files that could be prepared using external tools and ETL processes.


CREATE TABLE required_columns
(
  id BIGINT PRIMARY KEY,
  latitude DOUBLE NOT NULL,
  longitude DOUBLE NOT NULL,
  place_name STRING,
  altitude DOUBLE,
  population BIGINT
) PARTITION BY HASH(id) PARTITIONS 2 STORED AS KUDU;

During performance optimization, Kudu can use the knowledge that nulls are not allowed to skip certain checks on each input row, speeding up queries and join operations. Therefore, specify NOT NULL constraints when appropriate.

The NULL clause is the default condition for all columns that are not part of the primary key. You can omit it, or specify it to clarify that you have made a conscious design decision to allow nulls in a column.

Because primary key columns cannot contain any NULL values, the NOT NULL clause is not required for the primary key columns, but you might still specify it to make your code self-describing.

DEFAULT Attribute

You can specify a default value for columns in Kudu tables. The default value can be any constant expression, for example, a combination of literal values, arithmetic and string operations. It cannot contain references to columns or non-deterministic function calls.

The following example shows different kinds of expressions for the DEFAULT clause. The requirement to use a constant value means that you can fill in a placeholder value such as NULL, empty string, 0, -1, 'N/A' and so on, but you cannot reference functions or column names. Therefore, you cannot use DEFAULT to do things such as automatically making an uppercase copy of a string value, storing Boolean values based on tests of other columns, or add or subtract one from another column representing a sequence number.


CREATE TABLE default_vals
(
  id BIGINT PRIMARY KEY,
  name STRING NOT NULL DEFAULT 'unknown',
  address STRING DEFAULT upper('no fixed address'),
  age INT DEFAULT -1,
  earthling BOOLEAN DEFAULT TRUE,
  planet_of_origin STRING DEFAULT 'Earth',
  optional_col STRING DEFAULT NULL
) PARTITION BY HASH(id) PARTITIONS 2 STORED AS KUDU;
Note:

When designing an entirely new schema, prefer to use NULL as the placeholder for any unknown or missing values, because that is the universal convention among database systems. Null values can be stored efficiently, and easily checked with the IS NULL or IS NOT NULL operators. The DEFAULT attribute is appropriate when ingesting data that already has an established convention for representing unknown or missing values, or where the vast majority of rows have some common non-null value.

ENCODING Attribute

Each column in a Kudu table can optionally use an encoding, a low-overhead form of compression that reduces the size on disk, then requires additional CPU cycles to reconstruct the original values during queries. Typically, highly compressible data benefits from the reduced I/O to read the data back from disk.

The encoding keywords that Impala recognizes are:
  • AUTO_ENCODING: use the default encoding based on the column type, which are bitshuffle for the numeric type columns and dictionary for the string type columns.

  • PLAIN_ENCODING: leave the value in its original binary format.

  • RLE: compress repeated values (when sorted in primary key order) by including a count.

  • DICT_ENCODING: when the number of different string values is low, replace the original string with a numeric ID.

  • BIT_SHUFFLE: rearrange the bits of the values to efficiently compress sequences of values that are identical or vary only slightly based on primary key order. The resulting encoded data is also compressed with LZ4.

  • PREFIX_ENCODING: compress common prefixes in string values; mainly for use internally within Kudu.

The following example shows the Impala keywords representing the encoding types. (The Impala keywords match the symbolic names used within Kudu.) For usage guidelines on the different kinds of encoding, see the Kudu documentation. The DESCRIBE output shows how the encoding is reported after the table is created, and that omitting the encoding (in this case, for the ID column) is the same as specifying DEFAULT_ENCODING.


CREATE TABLE various_encodings
(
  id BIGINT PRIMARY KEY,
  c1 BIGINT ENCODING PLAIN_ENCODING,
  c2 BIGINT ENCODING AUTO_ENCODING,
  c3 TINYINT ENCODING BIT_SHUFFLE,
  c4 DOUBLE ENCODING BIT_SHUFFLE,
  c5 BOOLEAN ENCODING RLE,
  c6 STRING ENCODING DICT_ENCODING,
  c7 STRING ENCODING PREFIX_ENCODING
) PARTITION BY HASH(id) PARTITIONS 2 STORED AS KUDU;

-- Some columns are omitted from the output for readability.
describe various_encodings;
+------+---------+-------------+----------+-----------------+
| name | type    | primary_key | nullable | encoding        |
+------+---------+-------------+----------+-----------------+
| id   | bigint  | true        | false    | AUTO_ENCODING   |
| c1   | bigint  | false       | true     | PLAIN_ENCODING  |
| c2   | bigint  | false       | true     | AUTO_ENCODING   |
| c3   | tinyint | false       | true     | BIT_SHUFFLE     |
| c4   | double  | false       | true     | BIT_SHUFFLE     |
| c5   | boolean | false       | true     | RLE             |
| c6   | string  | false       | true     | DICT_ENCODING   |
| c7   | string  | false       | true     | PREFIX_ENCODING |
+------+---------+-------------+----------+-----------------+

COMPRESSION Attribute

You can specify a compression algorithm to use for each column in a Kudu table. This attribute imposes more CPU overhead when retrieving the values than the ENCODING attribute does. Therefore, use it primarily for columns with long strings that do not benefit much from the less-expensive ENCODING attribute.

The choices for COMPRESSION are LZ4, SNAPPY, and ZLIB.

Note:

Columns that use the BITSHUFFLE encoding are already compressed using LZ4, and so typically do not need any additional COMPRESSION attribute.

The following example shows design considerations for several STRING columns with different distribution characteristics, leading to choices for both the ENCODING and COMPRESSION attributes. The country values come from a specific set of strings, therefore this column is a good candidate for dictionary encoding. The post_id column contains an ascending sequence of integers, where several leading bits are likely to be all zeroes, therefore this column is a good candidate for bitshuffle encoding. The body column and the corresponding columns for translated versions tend to be long unique strings that are not practical to use with any of the encoding schemes, therefore they employ the COMPRESSION attribute instead. The ideal compression codec in each case would require some experimentation to determine how much space savings it provided and how much CPU overhead it added, based on real-world data.


CREATE TABLE blog_posts
(
  user_id STRING ENCODING DICT_ENCODING,
  post_id BIGINT ENCODING BIT_SHUFFLE,
  subject STRING ENCODING PLAIN_ENCODING,
  body STRING COMPRESSION LZ4,
  spanish_translation STRING COMPRESSION SNAPPY,
  esperanto_translation STRING COMPRESSION ZLIB,
  PRIMARY KEY (user_id, post_id)
) PARTITION BY HASH(user_id, post_id) PARTITIONS 2 STORED AS KUDU;

BLOCK_SIZE Attribute

Although Kudu does not use HDFS files internally, and thus is not affected by the HDFS block size, it does have an underlying unit of I/O called the block size. The BLOCK_SIZE attribute lets you set the block size for any column.

The block size attribute is a relatively advanced feature. Refer to the Kudu documentation for usage details.

Partitioning for Kudu Tables

Kudu tables use special mechanisms to distribute data among the underlying tablet servers. Although we refer to such tables as partitioned tables, they are distinguished from traditional Impala partitioned tables by use of different clauses on the CREATE TABLE statement. Kudu tables use PARTITION BY, HASH, RANGE, and range specification clauses rather than the PARTITIONED BY clause for HDFS-backed tables, which specifies only a column name and creates a new partition for each different value.

For background information and architectural details about the Kudu partitioning mechanism, see the Kudu white paper, section 3.2.

Note:

The Impala DDL syntax for Kudu tables is different than in early Kudu versions, which used an experimental fork of the Impala code. For example, the DISTRIBUTE BY clause is now PARTITION BY, the INTO n BUCKETS clause is now PARTITIONS n and the range partitioning syntax is reworked to replace the SPLIT ROWS clause with more expressive syntax involving comparison operators.

Hash Partitioning

Hash partitioning is the simplest type of partitioning for Kudu tables. For hash-partitioned Kudu tables, inserted rows are divided up between a fixed number of "buckets" by applying a hash function to the values of the columns specified in the HASH clause. Hashing ensures that rows with similar values are evenly distributed, instead of clumping together all in the same bucket. Spreading new rows across the buckets this way lets insertion operations work in parallel across multiple tablet servers. Separating the hashed values can impose additional overhead on queries, where queries with range-based predicates might have to read multiple tablets to retrieve all the relevant values.


-- 1M rows with 50 hash partitions = approximately 20,000 rows per partition.
-- The values in each partition are not sequential, but rather based on a hash function.
-- Rows 1, 99999, and 123456 might be in the same partition.
CREATE TABLE million_rows (id string primary key, s string)
  PARTITION BY HASH(id) PARTITIONS 50
  STORED AS KUDU;

-- Because the ID values are unique, we expect the rows to be roughly
-- evenly distributed between the buckets in the destination table.
INSERT INTO million_rows SELECT * FROM billion_rows ORDER BY id LIMIT 1e6;
Note:

The largest number of buckets that you can create with a PARTITIONS clause varies depending on the number of tablet servers in the cluster, while the smallest is 2. For simplicity, some of the simple CREATE TABLE statements throughout this section use PARTITIONS 2 to illustrate the minimum requirements for a Kudu table. For large tables, prefer to use roughly 10 partitions per server in the cluster.

Range Partitioning

Range partitioning lets you specify partitioning precisely, based on single values or ranges of values within one or more columns. You add one or more RANGE clauses to the CREATE TABLE statement, following the PARTITION BY clause.

Range-partitioned Kudu tables use one or more range clauses, which include a combination of constant expressions, VALUE or VALUES keywords, and comparison operators. (This syntax replaces the SPLIT ROWS clause used with early Kudu versions.) For the full syntax, see CREATE TABLE Statement.


-- 50 buckets, all for IDs beginning with a lowercase letter.
-- Having only a single range enforces the allowed range of values
-- but does not add any extra parallelism.
create table million_rows_one_range (id string primary key, s string)
  partition by hash(id) partitions 50,
  range (partition 'a' <= values < '{')
  stored as kudu;

-- 50 buckets for IDs beginning with a lowercase letter
-- plus 50 buckets for IDs beginning with an uppercase letter.
-- Total number of buckets = number in the PARTITIONS clause x number of ranges.
-- We are still enforcing constraints on the primary key values
-- allowed in the table, and the 2 ranges provide better parallelism
-- as rows are inserted or the table is scanned.
create table million_rows_two_ranges (id string primary key, s string)
  partition by hash(id) partitions 50,
  range (partition 'a' <= values < '{', partition 'A' <= values < '[')
  stored as kudu;

-- Same as previous table, with an extra range covering the single key value '00000'.
create table million_rows_three_ranges (id string primary key, s string)
  partition by hash(id) partitions 50,
  range (partition 'a' <= values < '{', partition 'A' <= values < '[', partition value = '00000')
  stored as kudu;

-- The range partitioning can be displayed with a SHOW command in impala-shell.
show range partitions million_rows_three_ranges;
+---------------------+
| RANGE (id)          |
+---------------------+
| VALUE = "00000"     |
| "A" <= VALUES < "[" |
| "a" <= VALUES < "{" |
+---------------------+

Note:

When defining ranges, be careful to avoid "fencepost errors" where values at the extreme ends might be included or omitted by accident. For example, in the tables defined in the preceding code listings, the range "a" <= VALUES < "{" ensures that any values starting with z, such as za or zzz or zzz-ZZZ, are all included, by using a less-than operator for the smallest value after all the values starting with z.

For range-partitioned Kudu tables, an appropriate range must exist before a data value can be created in the table. Any INSERT, UPDATE, or UPSERT statements fail if they try to create column values that fall outside the specified ranges. The error checking for ranges is performed on the Kudu side; Impala passes the specified range information to Kudu, and passes back any error or warning if the ranges are not valid. (A nonsensical range specification causes an error for a DDL statement, but only a warning for a DML statement.)

Ranges can be non-contiguous:


partition by range (year) (partition 1885 <= values <= 1889, partition 1893 <= values <= 1897)

partition by range (letter_grade) (partition value = 'A', partition value = 'B',
  partition value = 'C', partition value = 'D', partition value = 'F')

The ALTER TABLE statement with the ADD PARTITION or DROP PARTITION clauses can be used to add or remove ranges from an existing Kudu table.


ALTER TABLE foo ADD PARTITION 30 <= VALUES < 50;
ALTER TABLE foo DROP PARTITION 1 <= VALUES < 5;

When a range is added, the new range must not overlap with any of the previous ranges; that is, it can only fill in gaps within the previous ranges.


alter table test_scores add range partition value = 'E';

alter table year_ranges add range partition 1890 <= values < 1893;

When a range is removed, all the associated rows in the table are deleted. (This is true whether the table is internal or external.)


alter table test_scores drop range partition value = 'E';

alter table year_ranges drop range partition 1890 <= values < 1893;

Kudu tables can also use a combination of hash and range partitioning.


partition by hash (school) partitions 10,
  range (letter_grade) (partition value = 'A', partition value = 'B',
    partition value = 'C', partition value = 'D', partition value = 'F')

Working with Partitioning in Kudu Tables

To see the current partitioning scheme for a Kudu table, you can use the SHOW CREATE TABLE statement or the SHOW PARTITIONS statement. The CREATE TABLE syntax displayed by this statement includes all the hash, range, or both clauses that reflect the original table structure plus any subsequent ALTER TABLE statements that changed the table structure.

To see the underlying buckets and partitions for a Kudu table, use the SHOW TABLE STATS or SHOW PARTITIONS statement.

Handling Date, Time, or Timestamp Data with Kudu

In Impala 2.9 and higher, you can include TIMESTAMP columns in Kudu tables, instead of representing the date and time as a BIGINT value. The behavior of TIMESTAMP for Kudu tables has some special considerations:
  • Any nanoseconds in the original 96-bit value produced by Impala are not stored, because Kudu represents date/time columns using 64-bit values. The nanosecond portion of the value is rounded, not truncated. Therefore, a TIMESTAMP value that you store in a Kudu table might not be bit-for-bit identical to the value returned by a query.

  • The conversion between the Impala 96-bit representation and the Kudu 64-bit representation introduces some performance overhead when reading or writing TIMESTAMP columns. You can minimize the overhead during writes by performing inserts through the Kudu API. Because the overhead during reads applies to each query, you might continue to use a BIGINT column to represent date/time values in performance-critical applications.

  • The Impala TIMESTAMP type has a narrower range for years than the underlying Kudu data type. Impala can represent years 1400-9999. If year values outside this range are written to a Kudu table by a non-Impala client, Impala returns NULL by default when reading those TIMESTAMP values during a query. Or, if the ABORT_ON_ERROR query option is enabled, the query fails when it encounters a value with an out-of-range year.

--- Make a table representing a date/time value as TIMESTAMP.
-- The strings representing the partition bounds are automatically
-- cast to TIMESTAMP values.
create table native_timestamp(id bigint, when_exactly timestamp, event string, primary key (id, when_exactly))
  partition by hash (id) partitions 20,
  range (when_exactly)
  (
    partition '2015-01-01' <= values < '2016-01-01',
    partition '2016-01-01' <= values < '2017-01-01',
    partition '2017-01-01' <= values < '2018-01-01'
  )
  stored as kudu;

insert into native_timestamp values (12345, now(), 'Working on doc examples');

select * from native_timestamp;
+-------+-------------------------------+-------------------------+
| id    | when_exactly                  | event                   |
+-------+-------------------------------+-------------------------+
| 12345 | 2017-05-31 16:27:42.667542000 | Working on doc examples |
+-------+-------------------------------+-------------------------+

Because Kudu tables have some performance overhead to convert TIMESTAMP columns to the Impala 96-bit internal representation, for performance-critical applications you might store date/time information as the number of seconds, milliseconds, or microseconds since the Unix epoch date of January 1, 1970. Specify the column as BIGINT in the Impala CREATE TABLE statement, corresponding to an 8-byte integer (an int64) in the underlying Kudu table). Then use Impala date/time conversion functions as necessary to produce a numeric, TIMESTAMP, or STRING value depending on the context.

For example, the unix_timestamp() function returns an integer result representing the number of seconds past the epoch. The now() function produces a TIMESTAMP representing the current date and time, which can be passed as an argument to unix_timestamp(). And string literals representing dates and date/times can be cast to TIMESTAMP, and from there converted to numeric values. The following examples show how you might store a date/time column as BIGINT in a Kudu table, but still use string literals and TIMESTAMP values for convenience.


-- now() returns a TIMESTAMP and shows the format for string literals you can cast to TIMESTAMP.
select now();
+-------------------------------+
| now()                         |
+-------------------------------+
| 2017-01-25 23:50:10.132385000 |
+-------------------------------+

-- unix_timestamp() accepts either a TIMESTAMP or an equivalent string literal.
select unix_timestamp(now());
+------------------+
| unix_timestamp() |
+------------------+
| 1485386670       |
+------------------+

select unix_timestamp('2017-01-01');
+------------------------------+
| unix_timestamp('2017-01-01') |
+------------------------------+
| 1483228800                   |
+------------------------------+

-- Make a table representing a date/time value as BIGINT.
-- Construct 1 range partition and 20 associated hash partitions for each year.
-- Use date/time conversion functions to express the ranges as human-readable dates.
create table time_series(id bigint, when_exactly bigint, event string, primary key (id, when_exactly))
  partition by hash (id) partitions 20,
  range (when_exactly)
  (
    partition unix_timestamp('2015-01-01') <= values < unix_timestamp('2016-01-01'),
    partition unix_timestamp('2016-01-01') <= values < unix_timestamp('2017-01-01'),
    partition unix_timestamp('2017-01-01') <= values < unix_timestamp('2018-01-01')
  )
  stored as kudu;

-- On insert, we can transform a human-readable date/time into a numeric value.
insert into time_series values (12345, unix_timestamp('2017-01-25 23:24:56'), 'Working on doc examples');

-- On retrieval, we can examine the numeric date/time value or turn it back into a string for readability.
select id, when_exactly, from_unixtime(when_exactly) as 'human-readable date/time', event
  from time_series order by when_exactly limit 100;
+-------+--------------+--------------------------+-------------------------+
| id    | when_exactly | human-readable date/time | event                   |
+-------+--------------+--------------------------+-------------------------+
| 12345 | 1485386696   | 2017-01-25 23:24:56      | Working on doc examples |
+-------+--------------+--------------------------+-------------------------+

Note:

If you do high-precision arithmetic involving numeric date/time values, when dividing millisecond values by 1000, or microsecond values by 1 million, always cast the integer numerator to a DECIMAL with sufficient precision and scale to avoid any rounding or loss of precision.


-- 1 million and 1 microseconds = 1.000001 seconds.
select microseconds,
  cast (microseconds as decimal(20,7)) / 1e6 as fractional_seconds
  from table_with_microsecond_column;
+--------------+----------------------+
| microseconds | fractional_seconds   |
+--------------+----------------------+
| 1000001      | 1.000001000000000000 |
+--------------+----------------------+

How Impala Handles Kudu Metadata

Note: This section only applies the Kudu services that are not integrated with the Hive Metastore (HMS).

By default, much of the metadata for Kudu tables is handled by the underlying storage layer. Kudu tables have less reliance on the Metastore database, and require less metadata caching on the Impala side. For example, information about partitions in Kudu tables is managed by Kudu, and Impala does not cache any block locality metadata for Kudu tables. If the Kudu service is not integrated with the Hive Metastore, Impala will manage Kudu table metadata in the Hive Metastore.

The REFRESH and INVALIDATE METADATA statements are needed less frequently for Kudu tables than for HDFS-backed tables. Neither statement is needed when data is added to, removed, or updated in a Kudu table, even if the changes are made directly to Kudu through a client program using the Kudu API. Run REFRESH table_name or INVALIDATE METADATA table_name for a Kudu table only after making a change to the Kudu table schema, such as adding or dropping a column.

Because Kudu manages the metadata for its own tables separately from the metastore database, there is a table name stored in the metastore database for Impala to use, and a table name on the Kudu side, and these names can be modified independently through ALTER TABLE statements.

To avoid potential name conflicts, the prefix impala:: and the Impala database name are encoded into the underlying Kudu table name:


create database some_database;
use some_database;

create table table_name_demo (x int primary key, y int)
  partition by hash (x) partitions 2 stored as kudu;

describe formatted table_name_demo;
...
kudu.table_name  | impala::some_database.table_name_demo

See Overview of Impala Tables for examples of how to change the name of the Impala table in the metastore database, the name of the underlying Kudu table, or both.

Working with Kudu Integrated with Hive Metastore

Starting from Kudu 1.10 and Impala 3.3, Impala supports Kudu services integrated with the Hive Metastore (HMS). See the HMS integration documentation for more details on Kudu’s Hive Metastore integration.

The following are some of the changes you need to consider when working with Kudu services integrated with the HMS.
  • When Kudu is integrated with the Hive Metastore, Impala must be configured to use the same HMS as Kudu.
  • Since there may be no one-to-one mapping between Kudu tables and external tables, only internal tables are automatically synchronized.
  • When you create a table in Kudu, Kudu will create an HMS entry for that table with the internal table type.
  • When the Kudu service is integrated with the HMS, internal table entries will be created automatically in the HMS when tables are created in Kudu without Impala. To access these tables through Impala, run INVALIDATE METADATA statement so Impala picks up the latest metadata.

Loading Data into Kudu Tables

Kudu tables are well-suited to use cases where data arrives continuously, in small or moderate volumes. To bring data into Kudu tables, use the Impala INSERT and UPSERT statements. The LOAD DATA statement does not apply to Kudu tables.

Because Kudu manages its own storage layer that is optimized for smaller block sizes than HDFS, and performs its own housekeeping to keep data evenly distributed, it is not subject to the "many small files" issue and does not need explicit reorganization and compaction as the data grows over time. The partitions within a Kudu table can be specified to cover a variety of possible data distributions, instead of hardcoding a new partition for each new day, hour, and so on, which can lead to inefficient, hard-to-scale, and hard-to-manage partition schemes with HDFS tables.

Your strategy for performing ETL or bulk updates on Kudu tables should take into account the limitations on consistency for DML operations.

Make INSERT, UPDATE, and UPSERT operations idempotent: that is, able to be applied multiple times and still produce an identical result.

If a bulk operation is in danger of exceeding capacity limits due to timeouts or high memory usage, split it into a series of smaller operations.

Avoid running concurrent ETL operations where the end results depend on precise ordering. In particular, do not rely on an INSERT ... SELECT statement that selects from the same table into which it is inserting, unless you include extra conditions in the WHERE clause to avoid reading the newly inserted rows within the same statement.

Because relationships between tables cannot be enforced by Impala and Kudu, and cannot be committed or rolled back together, do not expect transactional semantics for multi-table operations.

Impala DML Support for Kudu Tables (INSERT, UPDATE, DELETE, UPSERT)

Impala supports certain DML statements for Kudu tables only. The UPDATE and DELETE statements let you modify data within Kudu tables without rewriting substantial amounts of table data. The UPSERT statement acts as a combination of INSERT and UPDATE, inserting rows where the primary key does not already exist, and updating the non-primary key columns where the primary key does already exist in the table.

The INSERT statement for Kudu tables honors the unique and NOT NULL requirements for the primary key columns.

Because Impala and Kudu do not support transactions, the effects of any INSERT, UPDATE, or DELETE statement are immediately visible. For example, you cannot do a sequence of UPDATE statements and only make the changes visible after all the statements are finished. Also, if a DML statement fails partway through, any rows that were already inserted, deleted, or changed remain in the table; there is no rollback mechanism to undo the changes.

In particular, an INSERT ... SELECT statement that refers to the table being inserted into might insert more rows than expected, because the SELECT part of the statement sees some of the new rows being inserted and processes them again.

Note:

The LOAD DATA statement, which involves manipulation of HDFS data files, does not apply to Kudu tables.

Starting from Impala 2.9, the INSERT or UPSERT operations into Kudu tables automatically add an exchange and a sort node to the plan that partitions and sorts the rows according to the partitioning/primary key scheme of the target table (unless the number of rows to be inserted is small enough to trigger single node execution). Since Kudu partitions and sorts rows on write, pre-partitioning and sorting takes some of the load off of Kudu and helps large INSERT operations to complete without timing out. However, this default behavior may slow down the end-to-end performance of the INSERT or UPSERT operations. Starting fromImpala 2.10, you can use the /* +NOCLUSTERED */ and /* +NOSHUFFLE */ hints together to disable partitioning and sorting before the rows are sent to Kudu. Additionally, since sorting may consume a large amount of memory, consider setting the MEM_LIMIT query option for those queries.

Multi-row Transactions for Kudu Tables

When you use Impala to query Kudu tables, you can insert multiple rows into a Kudu table in a single transaction. This broader transactional support between Kudu and Impala is available to you at a query level and at a session level.

Using Multi-row Transaction Capability

You can control this multi-row transaction feature by using the following query option. You may set this option at per-query or per-session level. When the option is enabled for a session, Impala will open one Kudu transaction for each INSERT or CTAS statement.

set ENABLE_KUDU_TRANSACTION=true

The following example shows how to insert three rows into a table in a single transaction.

Example:

  1. Create table kudu-test-tbl-1.
    create table kudu-test-tbl-1 (a int primary key, b string) partition by hash(a) partitions 8 stored as kudu;
  2. Enable the multi-row transaction feature at the query level.
    set ENABLE_KUDU_TRANSACTION=true;
  3. Insert three rows into the newly created table in a single transaction.
    insert into kudu-test-tbl-1 values (0, 'a'), (1, 'b'), (2, 'c');
  4. Verify the number of rows of this table.
    select count(*) from kudu-test-tbl-1;

Note:

If you insert multiple rows with duplicate keys into a table, the transaction is aborted. To ignore the conflicts with duplicate keys during the transaction, start Impala daemons with the flag --kudu_ignore_conflicts_in_transaction=true. This flag is set to False by default. Note that this flag takes effect only if the flag --kudu_ignore_conflicts is set as True. The flag --kudu_ignore_conflicts is set to True by default.

When you enable the option ENABLE_KUDU_TRANSACTION, each Impala statement is executed with a new opened transaction. If the statement is executed successfully, then the Impala Coordinator commits the transaction. If there is an error returned by Kudu, then Impala aborts the transaction.

This applies to the following statements:

  • INSERT
  • CREATE TABLE AS SELECT

Advantages of Using This Capability

You can now easily build and manage Kudu applications, especially when Impala is used to interact with the data in the Kudu table. With multi-row transaction, you can atomically ingest large number of rows into a Kudu table with INSERT-SELECT or CTAS statement.

Limitation

INSERT and CTAS statements are supported for Kudu tables in the context of a multi-row transaction, but UPDATE/UPSERT/DELETE statements are not supported in multi-row transaction as of now.

Consistency Considerations for Kudu Tables

Kudu tables have consistency characteristics such as uniqueness, controlled by the primary key columns, and non-nullable columns. The emphasis for consistency is on preventing duplicate or incomplete data from being stored in a table.

Currently, Kudu does not enforce strong consistency for order of operations, or data that is read while a write operation is in progress. If multi-rows transaction is enabled, insertion of multiple rows in one insertion statement will be atomic, i.e. total success or total failure. But if multi-row transaction is not enabled, changes are applied atomically to each row, not applied as a single unit to all rows affected by a multi-row DML statement.

When multi-row transaction is not enabled and if some rows are rejected during a DML operation because of a mismatch with duplicate primary key values, NOT NULL constraints, and so on, the statement succeeds with a warning. Impala still inserts, deletes, or updates the other rows that are not affected by the constraint violation.

Consequently, the number of rows affected by a DML operation on a Kudu table might be different than you expect.

Because there is no strong consistency guarantee for information being inserted into with separate INSERT statements, deleted from, or updated across multiple tables simultaneously, consider denormalizing the data where practical. That is, if you run separate INSERT statements to insert related rows into two different tables, one INSERT might fail while the other succeeds, leaving the data in an inconsistent state. Even if both inserts succeed, a join query might happen during the interval between the completion of the first and second statements, and the query would encounter incomplete inconsistent data. Denormalizing the data into a single wide table can reduce the possibility of inconsistency due to multi-table operations.

Information about the number of rows affected by a DML operation is reported in impala-shell output, and in the PROFILE output, but is not currently reported to HiveServer2 clients such as JDBC or ODBC applications.

Security Considerations for Kudu Tables

Security for Kudu tables involves:

Impala Query Performance for Kudu Tables

For queries involving Kudu tables, Impala can delegate much of the work of filtering the result set to Kudu, avoiding some of the I/O involved in full table scans of tables containing HDFS data files. This type of optimization is especially effective for partitioned Kudu tables, where the Impala query WHERE clause refers to one or more primary key columns that are also used as partition key columns. For example, if a partitioned Kudu table uses a HASH clause for col1 and a RANGE clause for col2, a query using a clause such as WHERE col1 IN (1,2,3) AND col2 > 100 can determine exactly which tablet servers contain relevant data, and therefore parallelize the query very efficiently.

In Impala 2.11 and higher, Impala can push down additional information to optimize join queries involving Kudu tables. If the join clause contains predicates of the form column = expression, after Impala constructs a hash table of possible matching values for the join columns from the bigger table (either an HDFS table or a Kudu table), Impala can "push down" the minimum and maximum matching column values to Kudu, so that Kudu can more efficiently locate matching rows in the second (smaller) table. These min/max filters are affected by the RUNTIME_FILTER_MODE, RUNTIME_FILTER_WAIT_TIME_MS, and DISABLE_ROW_RUNTIME_FILTERING query options; the min/max filters are not affected by the RUNTIME_BLOOM_FILTER_SIZE, RUNTIME_FILTER_MIN_SIZE, RUNTIME_FILTER_MAX_SIZE, and MAX_NUM_RUNTIME_FILTERS query options.

See EXPLAIN Statement for examples of evaluating the effectiveness of the predicate pushdown for a specific query against a Kudu table.

The TABLESAMPLE clause of the SELECT statement does not apply to a table reference derived from a view, a subquery, or anything other than a real base table. This clause only works for tables backed by HDFS or HDFS-like data files, therefore it does not apply to Kudu or HBase tables.