Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
delimited-text-parser.inline.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 
16 #ifndef IMPALA_EXEC_DELIMITED_TEXT_PARSER_INLINE_H
17 #define IMPALA_EXEC_DELIMITED_TEXT_PARSER_INLINE_H
18 
19 #include "delimited-text-parser.h"
20 #include "util/cpu-info.h"
21 #include "util/sse-util.h"
22 
23 namespace impala {
24 
28 inline void ProcessEscapeMask(uint16_t escape_mask, bool* last_char_is_escape,
29  uint16_t* delim_mask) {
30  // Escape characters can escape escape characters.
31  bool first_char_is_escape = *last_char_is_escape;
32  bool escape_next = first_char_is_escape;
33  for (int i = 0; i < SSEUtil::CHARS_PER_128_BIT_REGISTER; ++i) {
34  if (escape_next) {
35  escape_mask &= ~SSEUtil::SSE_BITMASK[i];
36  }
37  escape_next = escape_mask & SSEUtil::SSE_BITMASK[i];
38  }
39 
40  // Remember last character for the next iteration
41  *last_char_is_escape = escape_mask &
42  SSEUtil::SSE_BITMASK[SSEUtil::CHARS_PER_128_BIT_REGISTER - 1];
43 
44  // Shift escape mask up one so they match at the same bit index as the tuple and
45  // field mask (instead of being the character before) and set the correct first bit
46  escape_mask = escape_mask << 1 | (first_char_is_escape ? 1 : 0);
47 
48  // If escape_mask[n] is true, then tuple/field_mask[n] is escaped
49  *delim_mask &= ~escape_mask;
50 }
51 
52 template <bool process_escapes>
53 inline void DelimitedTextParser::AddColumn(int len, char** next_column_start,
54  int* num_fields, FieldLocation* field_locations) {
55  if (ReturnCurrentColumn()) {
56  // Found a column that needs to be parsed, write the start/len to 'field_locations'
57  field_locations[*num_fields].start = *next_column_start;
58  if (process_escapes && current_column_has_escape_) {
59  field_locations[*num_fields].len = -len;
60  } else {
61  field_locations[*num_fields].len = len;
62  }
63  ++(*num_fields);
64  }
65  if (process_escapes) current_column_has_escape_ = false;
66  *next_column_start += len + 1;
67  ++column_idx_;
68 }
69 
70 template <bool process_escapes>
71 void inline DelimitedTextParser:: FillColumns(int len, char** last_column,
72  int* num_fields, FieldLocation* field_locations) {
73  // Fill in any columns missing from the end of the tuple.
74  char* dummy = NULL;
75  if (last_column == NULL) last_column = &dummy;
76  while (column_idx_ < num_cols_) {
77  AddColumn<process_escapes>(len, last_column, num_fields, field_locations);
78  // The rest of the columns will be null.
79  last_column = &dummy;
80  len = 0;
81  }
82 }
83 
87 //
97 template <bool process_escapes>
98 inline void DelimitedTextParser::ParseSse(int max_tuples,
99  int64_t* remaining_len, char** byte_buffer_ptr,
100  char** row_end_locations,
101  FieldLocation* field_locations,
102  int* num_tuples, int* num_fields, char** next_column_start) {
104 
105  // To parse using SSE, we:
106  // 1. Load into different sse registers the different characters we need to search for
107  // tuple breaks, field breaks, escape characters
108  // 2. Load 16 characters at a time into the sse register
109  // 3. Use the SSE instruction to do strchr on those 16 chars, the result is a bitmask
110  // 4. Compute the bitmask for tuple breaks, field breaks and escape characters.
111  // 5. If there are escape characters, fix up the matching masked bits in the
112  // field/tuple mask
113  // 6. Go through the mask bit by bit and write the parsed data.
114 
115  // xmm registers:
116  // - xmm_buffer: the register holding the current (16 chars) we're working on from the
117  // file
118  // - xmm_delim_search_: the delim search register. Contains field delimiter,
119  // collection_item delim_char and tuple delimiter
120  // - xmm_escape_search_: the escape search register. Only contains escape char
121  // - xmm_delim_mask: the result of doing strchr for the delimiters
122  // - xmm_escape_mask: the result of doing strchr for the escape char
123  __m128i xmm_buffer, xmm_delim_mask, xmm_escape_mask;
124 
125  while (LIKELY(*remaining_len >= SSEUtil::CHARS_PER_128_BIT_REGISTER)) {
126  // Load the next 16 bytes into the xmm register
127  xmm_buffer = _mm_loadu_si128(reinterpret_cast<__m128i*>(*byte_buffer_ptr));
128 
129  // Do the strchr for tuple and field breaks
130  // The strchr sse instruction returns the result in the lower bits of the sse
131  // register. Since we only process 16 characters at a time, only the lower 16 bits
132  // can contain non-zero values.
133  // _mm_extract_epi16 will extract 16 bits out of the xmm register. The second
134  // parameter specifies which 16 bits to extract (0 for the lowest 16 bits).
135  xmm_delim_mask = SSE4_cmpestrm(xmm_delim_search_, num_delims_, xmm_buffer,
137  uint16_t delim_mask = _mm_extract_epi16(xmm_delim_mask, 0);
138 
139  uint16_t escape_mask = 0;
140  // If the table does not use escape characters, skip processing for it.
141  if (process_escapes) {
142  DCHECK(escape_char_ != '\0');
143  xmm_escape_mask = SSE4_cmpestrm(xmm_escape_search_, 1,
145  escape_mask = _mm_extract_epi16(xmm_escape_mask, 0);
146  ProcessEscapeMask(escape_mask, &last_char_is_escape_, &delim_mask);
147  }
148 
149  char* last_char = *byte_buffer_ptr + 15;
150  bool last_char_is_unescaped_delim = delim_mask >> 15;
151  unfinished_tuple_ = !(last_char_is_unescaped_delim &&
152  (*last_char == tuple_delim_ || (tuple_delim_ == '\n' && *last_char == '\r')));
153 
154  int last_col_idx = 0;
155  // Process all non-zero bits in the delim_mask from lsb->msb. If a bit
156  // is set, the character in that spot is either a field or tuple delimiter.
157  while (delim_mask != 0) {
158  // ffs is a libc function that returns the index of the first set bit (1-indexed)
159  int n = ffs(delim_mask) - 1;
160  DCHECK_GE(n, 0);
161  DCHECK_LT(n, 16);
162  // clear current bit
163  delim_mask &= ~(SSEUtil::SSE_BITMASK[n]);
164 
165  if (process_escapes) {
166  // Determine if there was an escape character between [last_col_idx, n]
167  bool escaped = (escape_mask & low_mask_[last_col_idx] & high_mask_[n]) != 0;
168  current_column_has_escape_ |= escaped;
169  last_col_idx = n;
170  }
171 
172  char* delim_ptr = *byte_buffer_ptr + n;
173 
174  if (*delim_ptr == field_delim_ || *delim_ptr == collection_item_delim_) {
175  AddColumn<process_escapes>(delim_ptr - *next_column_start,
176  next_column_start, num_fields, field_locations);
177  continue;
178  }
179 
180  if (*delim_ptr == tuple_delim_ || (tuple_delim_ == '\n' && *delim_ptr == '\r')) {
181  if (UNLIKELY(
182  last_row_delim_offset_ == *remaining_len - n && *delim_ptr == '\n')) {
183  // If the row ended in \r\n then move the next start past the \n
184  ++*next_column_start;
186  continue;
187  }
188  AddColumn<process_escapes>(delim_ptr - *next_column_start,
189  next_column_start, num_fields, field_locations);
190  FillColumns<false>(0, NULL, num_fields, field_locations);
192  row_end_locations[*num_tuples] = delim_ptr;
193  ++(*num_tuples);
194  // Remember where we saw the last \r.
195  last_row_delim_offset_ = *delim_ptr == '\r' ? *remaining_len - n - 1 : -1;
196  if (UNLIKELY(*num_tuples == max_tuples)) {
197  (*byte_buffer_ptr) += (n + 1);
198  if (process_escapes) last_char_is_escape_ = false;
199  *remaining_len -= (n + 1);
200  // If the last character we processed was \r then set the offset to 0
201  // so that we will use it at the beginning of the next batch.
202  if (last_row_delim_offset_ == *remaining_len) last_row_delim_offset_ = 0;
203  return;
204  }
205  }
206  }
207 
208  if (process_escapes) {
209  // Determine if there was an escape character between (last_col_idx, 15)
210  bool unprocessed_escape = escape_mask & low_mask_[last_col_idx] & high_mask_[15];
211  current_column_has_escape_ |= unprocessed_escape;
212  }
213 
214  *remaining_len -= SSEUtil::CHARS_PER_128_BIT_REGISTER;
215  *byte_buffer_ptr += SSEUtil::CHARS_PER_128_BIT_REGISTER;
216  }
217 }
218 
220 template <bool process_escapes>
221 inline void DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* buffer,
222  FieldLocation* field_locations, int* num_fields) {
223  char* next_column_start = buffer;
224  __m128i xmm_buffer, xmm_delim_mask, xmm_escape_mask;
225 
229  while (LIKELY(remaining_len >= SSEUtil::CHARS_PER_128_BIT_REGISTER)) {
230  // Load the next 16 bytes into the xmm register
231  xmm_buffer = _mm_loadu_si128(reinterpret_cast<__m128i*>(buffer));
232 
233  xmm_delim_mask = SSE4_cmpestrm(xmm_delim_search_, num_delims_, xmm_buffer,
235  uint16_t delim_mask = _mm_extract_epi16(xmm_delim_mask, 0);
236 
237  uint16_t escape_mask = 0;
238  // If the table does not use escape characters, skip processing for it.
239  if (process_escapes) {
240  DCHECK(escape_char_ != '\0');
241  xmm_escape_mask = SSE4_cmpestrm(xmm_escape_search_, 1, xmm_buffer,
243  escape_mask = _mm_extract_epi16(xmm_escape_mask, 0);
244  ProcessEscapeMask(escape_mask, &last_char_is_escape_, &delim_mask);
245  }
246 
247  int last_col_idx = 0;
248  // Process all non-zero bits in the delim_mask from lsb->msb. If a bit
249  // is set, the character in that spot is a field.
250  while (delim_mask != 0) {
251  // ffs is a libc function that returns the index of the first set bit (1-indexed)
252  int n = ffs(delim_mask) - 1;
253  DCHECK_GE(n, 0);
254  DCHECK_LT(n, 16);
255 
256  if (process_escapes) {
257  // Determine if there was an escape character between [last_col_idx, n]
258  bool escaped = (escape_mask & low_mask_[last_col_idx] & high_mask_[n]) != 0;
259  current_column_has_escape_ |= escaped;
260  last_col_idx = n;
261  }
262 
263  // clear current bit
264  delim_mask &= ~(SSEUtil::SSE_BITMASK[n]);
265 
266  AddColumn<process_escapes>(buffer + n - next_column_start,
267  &next_column_start, num_fields, field_locations);
268  }
269 
270  if (process_escapes) {
271  // Determine if there was an escape character between (last_col_idx, 15)
272  bool unprocessed_escape = escape_mask & low_mask_[last_col_idx] & high_mask_[15];
273  current_column_has_escape_ |= unprocessed_escape;
274  }
275 
276  remaining_len -= SSEUtil::CHARS_PER_128_BIT_REGISTER;
278  }
279  }
280 
281  while (remaining_len > 0) {
282  if (*buffer == escape_char_) {
285  } else {
286  last_char_is_escape_ = false;
287  }
288 
289  if (!last_char_is_escape_ &&
290  (*buffer == field_delim_ || *buffer == collection_item_delim_)) {
291  AddColumn<process_escapes>(buffer - next_column_start,
292  &next_column_start, num_fields, field_locations);
293  }
294 
295  --remaining_len;
296  ++buffer;
297  }
298 
299  // Last column does not have a delimiter after it. Add that column and also
300  // pad with empty cols if the input is ragged.
301  FillColumns<process_escapes>(buffer - next_column_start,
302  &next_column_start, num_fields, field_locations);
303 }
304 
305 }
306 
307 #endif
static const int SSE_BITMASK[CHARS_PER_128_BIT_REGISTER]
Precomputed mask values up to 16 bits.
Definition: sse-util.h:48
uint16_t low_mask_[16]
Precomputed masks to process escape characters.
__m128i xmm_delim_search_
SSE(xmm) register containing the delimiter search character.
char tuple_delim_
Character delimiting tuples.
void AddColumn(int len, char **next_column_start, int *num_fields, FieldLocation *field_locations)
int num_delims_
The number of delimiters contained in xmm_delim_search_, i.e. its length.
__m128i xmm_escape_search_
SSE(xmm) register containing the escape search character.
bool last_char_is_escape_
Whether or not the previous character was the escape character.
char collection_item_delim_
Character delimiting collection items (to become slots).
static const int STRCHR_MODE
Definition: sse-util.h:40
int num_cols_
Number of columns in the table (including partition columns)
char escape_char_
Escape character. Only used if process_escapes_ is true.
void FillColumns(int len, char **last_column, int *num_fields, impala::FieldLocation *field_locations)
static const int64_t SSE4_2
Definition: cpu-info.h:34
char field_delim_
Character delimiting fields (to become slots).
void ParseSse(int max_tuples, int64_t *remaining_len, char **byte_buffer_ptr, char **row_end_locations_, FieldLocation *field_locations, int *num_tuples, int *num_fields, char **next_column_start)
void ParseSingleTuple(int64_t len, char *buffer, FieldLocation *field_locations, int *num_fields)
Simplified version of ParseSSE which does not handle tuple delimiters.
#define UNLIKELY(expr)
Definition: compiler-util.h:33
void ProcessEscapeMask(uint16_t escape_mask, bool *last_char_is_escape, uint16_t *delim_mask)
#define LIKELY(expr)
Definition: compiler-util.h:32
bool unfinished_tuple_
True if the last tuple is unfinished (not ended with tuple delimiter).
static SSE_ALWAYS_INLINE __m128i SSE4_cmpestrm(__m128i str1, int len1, __m128i str2, int len2, const int mode)
Definition: sse-util.h:89
static bool IsSupported(long flag)
Returns whether of not the cpu supports this flag.
Definition: cpu-info.h:58
int column_idx_
Index to keep track of the current column in the current file.
int num_partition_keys_
Number of partition columns in the table.
static const int CHARS_PER_128_BIT_REGISTER
Definition: sse-util.h:28