Using Text Data Files with Impala Tables

Impala supports using text files as the storage format for input and output. Text files are a convenient format to use for interchange with other applications or scripts that produce or read delimited text files, such as CSV or TSV with commas or tabs for delimiters.

Text files are also very flexible in their column definitions. For example, a text file could have more fields than the Impala table, and those extra fields are ignored during queries; or it could have fewer fields than the Impala table, and those missing fields are treated as NULL values in queries. You could have fields that were treated as numbers or timestamps in a table, then use ALTER TABLE ... REPLACE COLUMNS to switch them to strings, or the reverse.

Table 1. Text Format Support in Impala
File Type Format Compression Codecs Impala Can CREATE? Impala Can INSERT?
Text Unstructured bzip2, deflate, gzip, LZO, Snappy, zstd Yes. For CREATE TABLE with no STORED AS clause, the default file format is uncompressed text, with values separated by ASCII 0x01 characters (typically represented as Ctrl-A). Yes if uncompressed.

No if compressed.

If LZO compression is used, you must create the table and load data in Hive.

If other kinds of compression are used, you must load data through LOAD DATA, Hive, or manually in HDFS.

Query Performance for Impala Text Tables

Data stored in text format is relatively bulky, and not as efficient to query as binary formats such as Parquet. You typically use text tables with Impala if that is the format you receive the data and you do not have control over that process, or if you are a relatively new Hadoop user and not familiar with techniques to generate files in other formats. (Because the default format for CREATE TABLE is text, you might create your first Impala tables as text without giving performance much thought.) Either way, look for opportunities to use more efficient file formats for the tables used in your most performance-critical queries.

For frequently queried data, you might load the original text data files into one Impala table, then use an INSERT statement to transfer the data to another table that uses the Parquet file format; the data is converted automatically as it is stored in the destination table.

For more compact data, consider using LZO compression for the text files. LZO is the only compression codec that Impala supports for text data, because the "splittable" nature of LZO data files lets different nodes work on different parts of the same file in parallel. See Using LZO-Compressed Text Files for details.

You can also use text data compressed in the bzip2, deflate, gzip, Snappy, or zstd formats. Because these compressed formats are not "splittable" in the way that LZO is, there is less opportunity for Impala to parallelize queries on them. Therefore, use these types of compressed data only for convenience if that is the format in which you receive the data. Prefer to use LZO compression for text data if you have the choice, or convert the data to Parquet using an INSERT ... SELECT statement to copy the original data into a Parquet table.

Note:

Impala supports bzip files created by the bzip2 command, but not bzip files with multiple streams created by the pbzip2 command. Impala decodes only the data from the first part of such files, leading to incomplete results.

The maximum size that Impala can accommodate for an individual bzip file is 1 GB (after uncompression).

Impala supports zstd files created by the zstd command line tool.

In Impala 2.6 and higher, Impala queries are optimized for files stored in Amazon S3. For Impala tables that use the file formats Parquet, ORC, RCFile, SequenceFile, Avro, and uncompressed text, the setting fs.s3a.block.size in the core-site.xml configuration file determines how Impala divides the I/O work of reading the data files. This configuration setting is specified in bytes. By default, this value is 33554432 (32 MB), meaning that Impala parallelizes S3 read operations on the files as if they were made up of 32 MB blocks. For example, if your S3 queries primarily access Parquet files written by MapReduce or Hive, increase fs.s3a.block.size to 134217728 (128 MB) to match the row group size of those files. If most S3 queries involve Parquet files written by Impala, increase fs.s3a.block.size to 268435456 (256 MB) to match the row group size produced by Impala.

Creating Text Tables

To create a table using text data files:

If the exact format of the text data files (such as the delimiter character) is not significant, use the CREATE TABLE statement with no extra clauses at the end to create a text-format table. For example:

create table my_table(id int, s string, n int, t timestamp, b boolean);

The data files created by any INSERT statements will use the Ctrl-A character (hex 01) as a separator between each column value.

A common use case is to import existing text files into an Impala table. The syntax is more verbose; the significant part is the FIELDS TERMINATED BY clause, which must be preceded by the ROW FORMAT DELIMITED clause. The statement can end with a STORED AS TEXTFILE clause, but that clause is optional because text format tables are the default. For example:

create table csv(id int, s string, n int, t timestamp, b boolean)
  row format delimited
  fields terminated by ',';

create table tsv(id int, s string, n int, t timestamp, b boolean)
  row format delimited
  fields terminated by '\t';

create table pipe_separated(id int, s string, n int, t timestamp, b boolean)
  row format delimited
  fields terminated by '|'
  stored as textfile;

You can create tables with specific separator characters to import text files in familiar formats such as CSV, TSV, or pipe-separated. You can also use these tables to produce output data files, by copying data into them through the INSERT ... SELECT syntax and then extracting the data files from the Impala data directory.

In Impala 1.3.1 and higher, you can specify a delimiter character '\0' to use the ASCII 0 (nul) character for text tables:

create table nul_separated(id int, s string, n int, t timestamp, b boolean)
  row format delimited
  fields terminated by '\0'
  stored as textfile;
Note:

Do not surround string values with quotation marks in text data files that you construct. If you need to include the separator character inside a field value, for example to put a string value with a comma inside a CSV-format data file, specify an escape character on the CREATE TABLE statement with the ESCAPED BY clause, and insert that character immediately before any separator characters that need escaping.

Issue a DESCRIBE FORMATTED table_name statement to see the details of how each table is represented internally in Impala.

Complex type considerations: Although you can create tables in this file format using the complex types (ARRAY, STRUCT, and MAP) available in Impala 2.3 and higher, currently, Impala can query these types only in Parquet tables. The one exception to the preceding rule is COUNT(*) queries on RCFile tables that include complex types. Such queries are allowed in Impala 2.6 and higher.

Data Files for Text Tables

When Impala queries a table with data in text format, it consults all the data files in the data directory for that table, with some exceptions:

File names for data produced through Impala INSERT statements are given unique names to avoid file name conflicts.

An INSERT ... SELECT statement produces one data file from each node that processes the SELECT part of the statement. An INSERT ... VALUES statement produces a separate data file for each statement; because Impala is more efficient querying a small number of huge files than a large number of tiny files, the INSERT ... VALUES syntax is not recommended for loading a substantial volume of data. If you find yourself with a table that is inefficient due to too many small data files, reorganize the data into a few large files by doing INSERT ... SELECT to transfer the data to a new table.

Special values within text data files:

Loading Data into Impala Text Tables

To load an existing text file into an Impala text table, use the LOAD DATA statement and specify the path of the file in HDFS. That file is moved into the appropriate Impala data directory.

To load multiple existing text files into an Impala text table, use the LOAD DATA statement and specify the HDFS path of the directory containing the files. All non-hidden files are moved into the appropriate Impala data directory.

To convert data to text from any other file format supported by Impala, use a SQL statement such as:

-- Text table with default delimiter, the hex 01 character.
CREATE TABLE text_table AS SELECT * FROM other_file_format_table;

-- Text table with user-specified delimiter. Currently, you cannot specify
-- the delimiter as part of CREATE TABLE LIKE or CREATE TABLE AS SELECT.
-- But you can change an existing text table to have a different delimiter.
CREATE TABLE csv LIKE other_file_format_table;
ALTER TABLE csv SET SERDEPROPERTIES ('serialization.format'=',', 'field.delim'=',');
INSERT INTO csv SELECT * FROM other_file_format_table;

This can be a useful technique to see how Impala represents special values within a text-format data file. Use the DESCRIBE FORMATTED statement to see the HDFS directory where the data files are stored, then use Linux commands such as hdfs dfs -ls hdfs_directory and hdfs dfs -cat hdfs_file to display the contents of an Impala-created text file.

To create a few rows in a text table for test purposes, you can use the INSERT ... VALUES syntax:

INSERT INTO text_table VALUES ('string_literal',100,hex('hello world'));
Note: Because Impala and the HDFS infrastructure are optimized for multi-megabyte files, avoid the INSERT ... VALUES notation when you are inserting many rows. Each INSERT ... VALUES statement produces a new tiny file, leading to fragmentation and reduced performance. When creating any substantial volume of new data, use one of the bulk loading techniques such as LOAD DATA or INSERT ... SELECT. Or, use an HBase table for single-row INSERT operations, because HBase tables are not subject to the same fragmentation issues as tables stored on HDFS.

When you create a text file for use with an Impala text table, specify \N to represent a NULL value. For the differences between NULL and empty strings, see NULL.

If a text file has fewer fields than the columns in the corresponding Impala table, all the corresponding columns are set to NULL when the data in that file is read by an Impala query.

If a text file has more fields than the columns in the corresponding Impala table, the extra fields are ignored when the data in that file is read by an Impala query.

You can also use manual HDFS operations such as hdfs dfs -put or hdfs dfs -cp to put data files in the data directory for an Impala table. When you copy or move new data files into the HDFS directory for the Impala table, issue a REFRESH table_name statement in impala-shell before issuing the next query against that table, to make Impala recognize the newly added files.

Using LZO-Compressed Text Files

Impala supports using text data files that employ LZO compression. Where practical, apply compression to text data files. Impala queries are usually I/O-bound; reducing the amount of data read from disk typically speeds up a query, despite the extra CPU work to uncompress the data in memory.

Impala can work with LZO-compressed text files are preferable to files compressed by other codecs, because LZO-compressed files are "splittable", meaning that different portions of a file can be uncompressed and processed independently by different nodes.

Impala does not currently support writing LZO-compressed text files.

Because Impala can query LZO-compressed files but currently cannot write them, you use Hive to do the initial CREATE TABLE and load the data, then switch back to Impala to run queries. For instructions on setting up LZO compression for Hive CREATE TABLE and INSERT statements, see the LZO page on the Hive wiki. Once you have created an LZO text table, you can also manually add LZO-compressed text files to it, produced by the lzop command or similar method.

Preparing to Use LZO-Compressed Text Files

Before using LZO-compressed tables in Impala, do the following one-time setup for each machine in the cluster. Install the necessary packages using either the public repository, a private repository you establish, or by using packages. You must do these steps manually, whether or not you are using cluster management software.

  1. Prepare your systems to work with LZO by downloading and installing the appropriate libraries:

    Download and install the appropriate file to each machine on which you intend to use LZO with Impala.

  2. Configure Impala to use LZO:

    Use one of the following sets of commands to refresh your package management system's repository information, install the base LZO support for Hadoop, and install the LZO support for Impala.

    Note:

    The name of the Hadoop LZO package changed in the distant past. Currently, the package name is hadoop-lzo.

    For RHEL/CentOS systems:

    $ sudo yum update
    $ sudo yum install hadoop-lzo
    $ sudo yum install impala-lzo

    For SUSE systems:

    $ sudo apt-get update
    $ sudo zypper install hadoop-lzo
    $ sudo zypper install impala-lzo

    For Debian/Ubuntu systems:

    $ sudo zypper update
    $ sudo apt-get install hadoop-lzo
    $ sudo apt-get install impala-lzo
    Note:

    The level of the impala-lzo package is closely tied to the version of Impala you use. Any time you upgrade Impala, re-do the installation command for impala-lzo on each applicable machine to make sure you have the appropriate version of that package.

  3. For core-site.xml on the client and server (that is, in the configuration directories for both Impala and Hadoop), append com.hadoop.compression.lzo.LzopCodec to the comma-separated list of codecs. For example:
    <property>
      <name>io.compression.codecs</name>
      <value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,
            org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec,
            org.apache.hadoop.io.compress.SnappyCodec,com.hadoop.compression.lzo.LzopCodec</value>
    </property>
    Note:

    If this is the first time you have edited the Hadoop core-site.xml file, note that the /etc/hadoop/conf directory is typically a symbolic link, so the canonical core-site.xml might reside in a different directory:

    $ ls -l /etc/hadoop
    total 8
    lrwxrwxrwx. 1 root root   29 Feb 26  2013 conf -> /etc/alternatives/hadoop-conf
    lrwxrwxrwx. 1 root root   10 Feb 26  2013 conf.dist -> conf.empty
    drwxr-xr-x. 2 root root 4096 Feb 26  2013 conf.empty
    drwxr-xr-x. 2 root root 4096 Oct 28 15:46 conf.pseudo

    If the io.compression.codecs property is missing from core-site.xml, only add com.hadoop.compression.lzo.LzopCodec to the new property value, not all the names from the preceding example.

  4. Restart the MapReduce and Impala services.

Creating LZO Compressed Text Tables

A table containing LZO-compressed text files must be created in Hive with the following storage clause:

STORED AS
    INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'

Also, certain Hive settings need to be in effect. For example:

hive> SET mapreduce.output.fileoutputformat.compress=true;
hive> SET hive.exec.compress.output=true;
hive> SET mapreduce.output.fileoutputformat.compress.codec=com.hadoop.compression.lzo.LzopCodec;
hive> CREATE TABLE lzo_t (s string) STORED AS
  > INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
  > OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
hive> INSERT INTO TABLE lzo_t SELECT col1, col2 FROM uncompressed_text_table;

Once you have created LZO-compressed text tables, you can convert data stored in other tables (regardless of file format) by using the INSERT ... SELECT statement in Hive.

Files in an LZO-compressed table must use the .lzo extension. Examine the files in the HDFS data directory after doing the INSERT in Hive, to make sure the files have the right extension. If the required settings are not in place, you end up with regular uncompressed files, and Impala cannot access the table because it finds data files with the wrong (uncompressed) format.

After loading data into an LZO-compressed text table, index the files so that they can be split. You index the files by running a Java class, com.hadoop.compression.lzo.DistributedLzoIndexer, through the Linux command line. This Java class is included in the hadoop-lzo package.

Run the indexer using a command like the following:

$ hadoop jar /usr/lib/hadoop/lib/hadoop-lzo-version-gplextras.jar
  com.hadoop.compression.lzo.DistributedLzoIndexer /hdfs_location_of_table/
Note: If the path of the JAR file in the preceding example is not recognized, do a find command to locate hadoop-lzo-*-gplextras.jar and use that path.

Indexed files have the same name as the file they index, with the .index extension. If the data files are not indexed, Impala queries still work, but the queries read the data from remote DataNodes, which is very inefficient.

Once the LZO-compressed tables are created, and data is loaded and indexed, you can query them through Impala. As always, the first time you start impala-shell after creating a table in Hive, issue an INVALIDATE METADATA statement so that Impala recognizes the new table. (In Impala 1.2 and higher, you only have to run INVALIDATE METADATA on one node, rather than on all the Impala nodes.)

Using bzip2, deflate, gzip, Snappy, or zstd Text Files

Impala supports using text data files that employ bzip2, deflate, gzip, Snappy, or zstd compression. These compression types are primarily for convenience within an existing ETL pipeline rather than maximum performance. Although it requires less I/O to read compressed text than the equivalent uncompressed text, files compressed by these codecs are not "splittable" and therefore cannot take full advantage of the Impala parallel query capability. Impala can read compressed text files written by Hive.

As each Snappy-compressed file is processed, the node doing the work reads the entire file into memory and then decompresses it. Therefore, the node must have enough memory to hold both the compressed and uncompressed data from the text file. The memory required to hold the uncompressed data is difficult to estimate in advance, potentially causing problems on systems with low memory limits or with resource management enabled. This memory overhead is reduced for bzip2-, deflate-, gzip-, and zstd-compressed text files. The compressed data is decompressed as it is read, rather than all at once.

To create a table to hold compressed text, create a text table with no special compression options. Specify the delimiter and escape character if required, using the ROW FORMAT clause.

Because Impala can query compressed text files but currently cannot write them, produce the compressed text files outside Impala and use the LOAD DATA statement, manual HDFS commands to move them to the appropriate Impala data directory. (Or, you can use CREATE EXTERNAL TABLE and point the LOCATION attribute at a directory containing existing compressed text files.)

The following example shows how you can create a regular text table, put different kinds of compressed and uncompressed files into it, and Impala automatically recognizes and decompresses each one based on their file extensions:

create table csv_compressed (a string, b string, c string)
  row format delimited fields terminated by ",";

insert into csv_compressed values
  ('one - uncompressed', 'two - uncompressed', 'three - uncompressed'),
  ('abc - uncompressed', 'xyz - uncompressed', '123 - uncompressed');
...make equivalent .bz2, .gz, .snappy, and .zst files and load them into same table directory...

select * from csv_compressed;
+--------------------+--------------------+----------------------+
| a                  | b                  | c                    |
+--------------------+--------------------+----------------------+
| one - snappy       | two - snappy       | three - snappy       |
| one - uncompressed | two - uncompressed | three - uncompressed |
| abc - uncompressed | xyz - uncompressed | 123 - uncompressed   |
| one - bz2          | two - bz2          | three - bz2          |
| abc - bz2          | xyz - bz2          | 123 - bz2            |
| one - gzip         | two - gzip         | three - gzip         |
| abc - gzip         | xyz - gzip         | 123 - gzip           |
| one - zstd         | two - zstd         | three - zstd         |
| abc - zstd         | xyz - zstd         | 123 - zstd           |
| one - deflate      | two - deflate      | three - deflate      |
| abc - deflate      | xyz - deflate      | 123 - deflate        |
+--------------------+--------------------+----------------------+

$ hdfs dfs -ls 'hdfs://127.0.0.1:8020/user/hive/warehouse/file_formats.db/csv_compressed/';
...truncated for readability...
75 hdfs://127.0.0.1:8020/user/hive/warehouse/file_formats.db/csv_compressed/csv_compressed.snappy
79 hdfs://127.0.0.1:8020/user/hive/warehouse/file_formats.db/csv_compressed/csv_compressed_bz2.csv.bz2
80 hdfs://127.0.0.1:8020/user/hive/warehouse/file_formats.db/csv_compressed/csv_compressed_gzip.csv.gz
58 hdfs://127.0.0.1:8020/user/hive/warehouse/file_formats.db/csv_compressed/csv_compressed_zstd.csv.zst
48 hdfs://127.0.0.1:8020/user/hive/warehouse/file_formats.db/csv_compressed/csv_compressed_deflate.csv.deflate
116 hdfs://127.0.0.1:8020/user/hive/warehouse/file_formats.db/csv_compressed/dd414df64d67d49b_data.0.