Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
delimited-text-parser.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include "exec/hdfs-scanner.h"
18 #include "util/cpu-info.h"
19 
20 #include "common/names.h"
21 
22 using namespace impala;
23 
25  int num_cols, int num_partition_keys, const bool* is_materialized_col,
26  char tuple_delim, char field_delim, char collection_item_delim, char escape_char)
27  : num_delims_(0),
28  field_delim_(field_delim),
29  process_escapes_(escape_char != '\0'),
30  escape_char_(escape_char),
31  collection_item_delim_(collection_item_delim),
32  tuple_delim_(tuple_delim),
33  current_column_has_escape_(false),
34  last_char_is_escape_(false),
35  last_row_delim_offset_(-1),
36  num_cols_(num_cols),
37  num_partition_keys_(num_partition_keys),
38  is_materialized_col_(is_materialized_col),
39  column_idx_(0),
40  unfinished_tuple_(false){
41  // Escape character should not be the same as tuple or col delim unless it is the
42  // empty delimiter.
43  DCHECK(escape_char == '\0' || escape_char != tuple_delim);
44  DCHECK(escape_char == '\0' || escape_char != field_delim);
45  DCHECK(escape_char == '\0' || escape_char != collection_item_delim);
46 
47  // Initialize the sse search registers.
48  char search_chars[SSEUtil::CHARS_PER_128_BIT_REGISTER];
49  memset(search_chars, 0, sizeof(search_chars));
50  if (process_escapes_) {
51  search_chars[0] = escape_char_;
52  xmm_escape_search_ = _mm_loadu_si128(reinterpret_cast<__m128i*>(search_chars));
53 
54  // To process escape characters, we need to check if there was an escape
55  // character between (col_start,col_end). The SSE instructions return
56  // a bit mask for 16 bits so we need to mask off the bits below col_start
57  // and after col_end.
58  low_mask_[0] = 0xffff;
59  high_mask_[15] = 0xffff;
60  for (int i = 1; i < 16; ++i) {
61  low_mask_[i] = low_mask_[i - 1] << 1;
62  }
63  for (int i = 14; i >= 0; --i) {
64  high_mask_[i] = high_mask_[i + 1] >> 1;
65  }
66  } else {
67  memset(high_mask_, 0, sizeof(high_mask_));
68  memset(low_mask_, 0, sizeof(low_mask_));
69  }
70 
71  if (tuple_delim != '\0') {
72  search_chars[num_delims_++] = tuple_delim_;
73  // Hive will treats \r (^M) as an alternate tuple delimiter, but \r\n is a
74  // single tuple delimiter.
75  if (tuple_delim_ == '\n') search_chars[num_delims_++] = '\r';
76  xmm_tuple_search_ = _mm_loadu_si128(reinterpret_cast<__m128i*>(search_chars));
77  }
78 
79  if (field_delim != '\0' || collection_item_delim != '\0') {
80  search_chars[num_delims_++] = field_delim_;
81  search_chars[num_delims_++] = collection_item_delim_;
82  }
83 
84  DCHECK_GT(num_delims_, 0);
85  xmm_delim_search_ = _mm_loadu_si128(reinterpret_cast<__m128i*>(search_chars));
86 
87  ParserReset();
88 }
89 
92  last_char_is_escape_ = false;
95 }
96 
97 // Parsing raw csv data into FieldLocation descriptors.
98 Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remaining_len,
99  char** byte_buffer_ptr, char** row_end_locations,
100  FieldLocation* field_locations,
101  int* num_tuples, int* num_fields, char** next_column_start) {
102  // Start of this batch.
103  *next_column_start = *byte_buffer_ptr;
104  // If there was a '\r' at the end of the last batch, set the offset to
105  // just before the beginning. Otherwise make it invalid.
106  if (last_row_delim_offset_ == 0) {
107  last_row_delim_offset_ = remaining_len;
108  } else {
110  }
111 
113  if (process_escapes_) {
114  ParseSse<true>(max_tuples, &remaining_len, byte_buffer_ptr, row_end_locations,
115  field_locations, num_tuples, num_fields, next_column_start);
116  } else {
117  ParseSse<false>(max_tuples, &remaining_len, byte_buffer_ptr, row_end_locations,
118  field_locations, num_tuples, num_fields, next_column_start);
119  }
120  }
121 
122  if (*num_tuples == max_tuples) return Status::OK;
123 
124  // Handle the remaining characters
125  while (remaining_len > 0) {
126  bool new_tuple = false;
127  bool new_col = false;
128  unfinished_tuple_ = true;
129 
130  if (!last_char_is_escape_) {
131  if (tuple_delim_ != '\0' && (**byte_buffer_ptr == tuple_delim_ ||
132  (tuple_delim_ == '\n' && **byte_buffer_ptr == '\r'))) {
133  new_tuple = true;
134  new_col = true;
135  } else if (**byte_buffer_ptr == field_delim_
136  || **byte_buffer_ptr == collection_item_delim_) {
137  new_col = true;
138  }
139  }
140 
141  if (process_escapes_ && **byte_buffer_ptr == escape_char_) {
144  } else {
145  last_char_is_escape_ = false;
146  }
147 
148  if (new_tuple) {
149  if (last_row_delim_offset_ == remaining_len && **byte_buffer_ptr == '\n') {
150  // If the row ended in \r\n then move to the \n
151  ++*next_column_start;
152  } else {
153  AddColumn<true>(*byte_buffer_ptr - *next_column_start,
154  next_column_start, num_fields, field_locations);
155  FillColumns<false>(0, NULL, num_fields, field_locations);
157  row_end_locations[*num_tuples] = *byte_buffer_ptr;
158  ++(*num_tuples);
159  }
160  unfinished_tuple_ = false;
161  last_row_delim_offset_ = **byte_buffer_ptr == '\r' ? remaining_len - 1 : -1;
162  if (*num_tuples == max_tuples) {
163  ++*byte_buffer_ptr;
164  --remaining_len;
165  if (last_row_delim_offset_ == remaining_len) last_row_delim_offset_ = 0;
166  return Status::OK;
167  }
168  } else if (new_col) {
169  AddColumn<true>(*byte_buffer_ptr - *next_column_start,
170  next_column_start, num_fields, field_locations);
171  }
172 
173  --remaining_len;
174  ++*byte_buffer_ptr;
175  }
176 
177  // For formats that store the length of the row, the row is not delimited:
178  // e.g. Sequence files.
179  if (tuple_delim_ == '\0') {
180  DCHECK_EQ(remaining_len, 0);
181  AddColumn<true>(*byte_buffer_ptr - *next_column_start,
182  next_column_start, num_fields, field_locations);
183  FillColumns<false>(0, NULL, num_fields, field_locations);
185  ++(*num_tuples);
186  unfinished_tuple_ = false;
187  }
188  return Status::OK;
189 }
190 
191 // Find the first instance of the tuple delimiter. This will
192 // find the start of the first full tuple in buffer by looking for the end of
193 // the previous tuple.
194 int DelimitedTextParser::FindFirstInstance(const char* buffer, int len) {
195  int tuple_start = 0;
196  const char* buffer_start = buffer;
197  bool found = false;
198 
199  // If the last char in the previous buffer was \r then either return the start of
200  // this buffer or skip a \n at the beginning of the buffer.
201  if (last_row_delim_offset_ != -1) {
202  if (*buffer_start == '\n') return 1;
203  return 0;
204  }
205 restart:
206  found = false;
207 
209  __m128i xmm_buffer, xmm_tuple_mask;
210  while (len - tuple_start >= SSEUtil::CHARS_PER_128_BIT_REGISTER) {
211  // TODO: can we parallelize this as well? Are there multiple sse execution units?
212  // Load the next 16 bytes into the xmm register and do strchr for the
213  // tuple delimiter.
214  xmm_buffer = _mm_loadu_si128(reinterpret_cast<const __m128i*>(buffer));
215  xmm_tuple_mask = SSE4_cmpestrm(xmm_tuple_search_, 1, xmm_buffer,
217  int tuple_mask = _mm_extract_epi16(xmm_tuple_mask, 0);
218  if (tuple_mask != 0) {
219  found = true;
220  for (int i = 0; i < SSEUtil::CHARS_PER_128_BIT_REGISTER; ++i) {
221  if ((tuple_mask & SSEUtil::SSE_BITMASK[i]) != 0) {
222  tuple_start += i + 1;
223  buffer += i + 1;
224  break;
225  }
226  }
227  break;
228  }
231  }
232  }
233  if (!found) {
234  for (; tuple_start < len; ++tuple_start) {
235  char c = *buffer++;
236  if (c == tuple_delim_ || (c == '\r' && tuple_delim_ == '\n')) {
237  ++tuple_start;
238  found = true;
239  break;
240  }
241  }
242  }
243 
244  if (!found) return -1;
245 
246  if (process_escapes_) {
247  // Scan backwards for escape characters. We do this after
248  // finding the tuple break rather than during the (above)
249  // forward scan to make the forward scan faster. This will
250  // perform worse if there are many characters right before the
251  // tuple break that are all escape characters, but that is
252  // unlikely.
253  int num_escape_chars = 0;
254  int before_tuple_end = tuple_start - 2;
255  // TODO: If scan range is split between escape character and tuple delimiter,
256  // before_tuple_end will be -1. Need to scan previous range for escape characters
257  // in this case.
258  for (; before_tuple_end >= 0; --before_tuple_end) {
259  if (buffer_start[before_tuple_end] == escape_char_) {
260  ++num_escape_chars;
261  } else {
262  break;
263  }
264  }
265 
266  // TODO: This sucks. All the preceding characters before the tuple delim were
267  // escape characters. We need to read from the previous block to see what to do.
268  if (before_tuple_end < 0) {
269  static bool warning_logged = false;
270  if (!warning_logged) {
271  LOG(WARNING) << "Unhandled code path. This might cause a tuple to be "
272  << "skipped or repeated.";
273  warning_logged = true;
274  }
275  }
276 
277  // An even number of escape characters means they cancel out and this tuple break
278  // is *not* escaped.
279  if (num_escape_chars % 2 != 0) goto restart;
280  }
281 
282  if (tuple_start == len - 1 && buffer_start[tuple_start] == '\r') {
283  // If \r is the last char we need to wait to see if the next one is \n or not.
285  return -1;
286  }
287  if (tuple_start < len && buffer_start[tuple_start] == '\n' &&
288  buffer_start[tuple_start - 1] == '\r') {
289  // We have \r\n, move to the next character.
290  ++tuple_start;
291  }
292  return tuple_start;
293 }
static const int SSE_BITMASK[CHARS_PER_128_BIT_REGISTER]
Precomputed mask values up to 16 bits.
Definition: sse-util.h:48
uint16_t low_mask_[16]
Precomputed masks to process escape characters.
__m128i xmm_delim_search_
SSE(xmm) register containing the delimiter search character.
DelimitedTextParser(int num_cols, int num_partition_keys, const bool *is_materialized_col, char tuple_delim, char field_delim_= '\0', char collection_item_delim= '^', char escape_char= '\0')
num_cols is the total number of columns including partition keys.
char tuple_delim_
Character delimiting tuples.
int num_delims_
The number of delimiters contained in xmm_delim_search_, i.e. its length.
__m128i xmm_escape_search_
SSE(xmm) register containing the escape search character.
bool last_char_is_escape_
Whether or not the previous character was the escape character.
char collection_item_delim_
Character delimiting collection items (to become slots).
static const int STRCHR_MODE
Definition: sse-util.h:40
__m128i xmm_tuple_search_
SSE(xmm) register containing the tuple search character.
char escape_char_
Escape character. Only used if process_escapes_ is true.
int FindFirstInstance(const char *buffer, int len)
static const int64_t SSE4_2
Definition: cpu-info.h:34
char field_delim_
Character delimiting fields (to become slots).
Status ParseFieldLocations(int max_tuples, int64_t remaining_len, char **byte_buffer_ptr, char **row_end_locations, FieldLocation *field_locations, int *num_tuples, int *num_fields, char **next_column_start)
bool process_escapes_
True if this parser should handle escape characters.
static const Status OK
Definition: status.h:87
bool unfinished_tuple_
True if the last tuple is unfinished (not ended with tuple delimiter).
static SSE_ALWAYS_INLINE __m128i SSE4_cmpestrm(__m128i str1, int len1, __m128i str2, int len2, const int mode)
Definition: sse-util.h:89
void ParserReset()
Called to initialize parser at beginning of scan range.
static bool IsSupported(long flag)
Returns whether of not the cpu supports this flag.
Definition: cpu-info.h:58
int column_idx_
Index to keep track of the current column in the current file.
int num_partition_keys_
Number of partition columns in the table.
static const int CHARS_PER_128_BIT_REGISTER
Definition: sse-util.h:28