Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
data-source-scan-node.cc
Go to the documentation of this file.
1 // Copyright 2014 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <boost/foreach.hpp>
18 #include <vector>
19 #include <gutil/strings/substitute.h>
20 
21 #include "exec/parquet-common.h"
22 #include "exec/read-write-util.h"
23 #include "exprs/expr.h"
24 #include "runtime/mem-pool.h"
25 #include "runtime/runtime-state.h"
26 #include "runtime/row-batch.h"
27 #include "runtime/string-value.h"
28 #include "runtime/tuple-row.h"
29 #include "util/jni-util.h"
31 #include "util/runtime-profile.h"
32 
33 #include "common/names.h"
34 
35 using namespace strings;
36 using namespace impala::extdatasource;
37 
38 DEFINE_int32(data_source_batch_size, 1024, "Batch size for calls to GetNext() on "
39  "external data sources.");
40 
41 namespace impala {
42 
43 // $0 = num expected cols, $1 = actual num columns
44 const string ERROR_NUM_COLUMNS = "Data source returned unexpected number of columns. "
45  "Expected $0 but received $1. This likely indicates a problem with the data source "
46  "library.";
47 const string ERROR_MISMATCHED_COL_SIZES = "Data source returned columns containing "
48  "different numbers of rows. This likely indicates a problem with the data source "
49  "library.";
50 // $0 = column type (e.g. INT)
51 const string ERROR_INVALID_COL_DATA = "Data source returned inconsistent column data. "
52  "Expected value of type $0 based on column metadata. This likely indicates a "
53  "problem with the data source library.";
54 const string ERROR_INVALID_TIMESTAMP = "Data source returned invalid timestamp data. "
55  "This likely indicates a problem with the data source library.";
56 const string ERROR_INVALID_DECIMAL = "Data source returned invalid decimal data. "
57  "This likely indicates a problem with the data source library.";
58 
59 // Size of an encoded TIMESTAMP
60 const size_t TIMESTAMP_SIZE = sizeof(int64_t) + sizeof(int32_t);
61 
62 DataSourceScanNode::DataSourceScanNode(ObjectPool* pool, const TPlanNode& tnode,
63  const DescriptorTbl& descs)
64  : ScanNode(pool, tnode, descs),
65  data_src_node_(tnode.data_source_node),
66  tuple_idx_(0),
67  num_rows_(0),
68  next_row_idx_(0) {
69 }
70 
72 }
73 
77  DCHECK(tuple_desc_ != NULL);
78 
80  RETURN_IF_ERROR(data_source_executor_->Init(data_src_node_.data_source.hdfs_location,
81  data_src_node_.data_source.class_name, data_src_node_.data_source.api_version));
82 
83  // Initialize materialized_slots_ and cols_next_val_idx_.
84  BOOST_FOREACH(SlotDescriptor* slot, tuple_desc_->slots()) {
85  if (!slot->is_materialized()) continue;
86  materialized_slots_.push_back(slot);
87  cols_next_val_idx_.push_back(0);
88  }
89  return Status::OK;
90 }
91 
93  SCOPED_TIMER(runtime_profile_->total_time_counter());
95  RETURN_IF_CANCELLED(state);
96 
97  // Prepare the schema for TOpenParams.row_schema
98  vector<extdatasource::TColumnDesc> cols;
99  BOOST_FOREACH(const SlotDescriptor* slot, materialized_slots_) {
100  extdatasource::TColumnDesc col;
101  int col_idx = slot->col_pos();
102  col.__set_name(tuple_desc_->table_desc()->col_names()[col_idx]);
103  col.__set_type(slot->type().ToThrift());
104  cols.push_back(col);
105  }
106  extdatasource::TTableSchema row_schema;
107  row_schema.__set_cols(cols);
108 
109  TOpenParams params;
110  params.__set_query_id(state->query_id());
111  params.__set_table_name(tuple_desc_->table_desc()->name());
112  params.__set_init_string(data_src_node_.init_string);
113  params.__set_authenticated_user_name(state->effective_user());
114  params.__set_row_schema(row_schema);
115  params.__set_batch_size(FLAGS_data_source_batch_size);
116  params.__set_predicates(data_src_node_.accepted_predicates);
117  TOpenResult result;
118  RETURN_IF_ERROR(data_source_executor_->Open(params, &result));
119  RETURN_IF_ERROR(Status(result.status));
120  scan_handle_ = result.scan_handle;
121  return GetNextInputBatch();
122 }
123 
125  if (!input_batch_->__isset.rows) return Status::OK;
126  const vector<TColumnData>& cols = input_batch_->rows.cols;
127  if (materialized_slots_.size() != cols.size()) {
128  return Status(Substitute(ERROR_NUM_COLUMNS, materialized_slots_.size(), cols.size()));
129  }
130 
131  num_rows_ = -1;
132  // If num_rows was set, use that, otherwise we set it to be the number of rows in
133  // the first TColumnData and then ensure the number of rows in other columns are
134  // consistent.
135  if (input_batch_->rows.__isset.num_rows) num_rows_ = input_batch_->rows.num_rows;
136  for (int i = 0; i < materialized_slots_.size(); ++i) {
137  const TColumnData& col_data = cols[i];
138  if (num_rows_ < 0) num_rows_ = col_data.is_null.size();
139  if (num_rows_ != col_data.is_null.size()) return Status(ERROR_MISMATCHED_COL_SIZES);
140  }
141  return Status::OK;
142 }
143 
145  input_batch_.reset(new TGetNextResult());
146  next_row_idx_ = 0;
147  // Reset all the indexes into the column value arrays to 0
148  memset(&cols_next_val_idx_[0], 0, sizeof(int) * cols_next_val_idx_.size());
149  TGetNextParams params;
150  params.__set_scan_handle(scan_handle_);
151  RETURN_IF_ERROR(data_source_executor_->GetNext(params, input_batch_.get()));
154  if (!InputBatchHasNext() && !input_batch_->eos) {
155  // The data source should have set eos, but if it didn't we should just log a
156  // warning and continue as if it had.
157  VLOG_QUERY << "Data source " << data_src_node_.data_source.name << " returned no "
158  << "rows but did not set 'eos'. No more rows will be fetched from the "
159  << "data source.";
160  input_batch_->eos = true;
161  }
162  return Status::OK;
163 }
164 
165 // Sets the decimal value in the slot. Inline method to avoid nested switch statements.
166 inline Status SetDecimalVal(const ColumnType& type, char* bytes, int len,
167  void* slot) {
168  uint8_t* buffer = reinterpret_cast<uint8_t*>(bytes);
169  switch (type.GetByteSize()) {
170  case 4: {
171  Decimal4Value* val = reinterpret_cast<Decimal4Value*>(slot);
172  if (len > sizeof(Decimal4Value)) return Status(ERROR_INVALID_DECIMAL);
173  // TODO: Move Decode() to a more generic utils class (here and below)
174  ParquetPlainEncoder::Decode(buffer, len, val);
175  }
176  case 8: {
177  Decimal8Value* val = reinterpret_cast<Decimal8Value*>(slot);
178  if (len > sizeof(Decimal8Value)) return Status(ERROR_INVALID_DECIMAL);
179  ParquetPlainEncoder::Decode(buffer, len, val);
180  break;
181  }
182  case 16: {
183  Decimal16Value* val = reinterpret_cast<Decimal16Value*>(slot);
184  if (len > sizeof(Decimal16Value)) return Status(ERROR_INVALID_DECIMAL);
185  ParquetPlainEncoder::Decode(buffer, len, val);
186  break;
187  }
188  default: DCHECK(false);
189  }
190  return Status::OK;
191 }
192 
194  const vector<TColumnData>& cols = input_batch_->rows.cols;
196 
197  for (int i = 0; i < materialized_slots_.size(); ++i) {
198  const SlotDescriptor* slot_desc = materialized_slots_[i];
199  void* slot = tuple_->GetSlot(slot_desc->tuple_offset());
200  const TColumnData& col = cols[i];
201 
202  if (col.is_null[next_row_idx_]) {
203  tuple_->SetNull(slot_desc->null_indicator_offset());
204  continue;
205  }
206 
207  // Get and increment the index into the values array (e.g. int_vals) for this col.
208  int val_idx = cols_next_val_idx_[i]++;
209  switch (slot_desc->type().type) {
210  case TYPE_STRING: {
211  if (val_idx >= col.string_vals.size()) {
212  return Status(Substitute(ERROR_INVALID_COL_DATA, "STRING"));
213  }
214  const string& val = col.string_vals[val_idx];
215  size_t val_size = val.size();
216  char* buffer = reinterpret_cast<char*>(tuple_pool->Allocate(val_size));
217  memcpy(buffer, val.data(), val_size);
218  reinterpret_cast<StringValue*>(slot)->ptr = buffer;
219  reinterpret_cast<StringValue*>(slot)->len = val_size;
220  break;
221  }
222  case TYPE_TINYINT:
223  if (val_idx >= col.byte_vals.size()) {
224  return Status(Substitute(ERROR_INVALID_COL_DATA, "TINYINT"));
225  }
226  *reinterpret_cast<int8_t*>(slot) = col.byte_vals[val_idx];
227  break;
228  case TYPE_SMALLINT:
229  if (val_idx >= col.short_vals.size()) {
230  return Status(Substitute(ERROR_INVALID_COL_DATA, "SMALLINT"));
231  }
232  *reinterpret_cast<int16_t*>(slot) = col.short_vals[val_idx];
233  break;
234  case TYPE_INT:
235  if (val_idx >= col.int_vals.size()) {
236  return Status(Substitute(ERROR_INVALID_COL_DATA, "INT"));
237  }
238  *reinterpret_cast<int32_t*>(slot) = col.int_vals[val_idx];
239  break;
240  case TYPE_BIGINT:
241  if (val_idx >= col.long_vals.size()) {
242  return Status(Substitute(ERROR_INVALID_COL_DATA, "BIGINT"));
243  }
244  *reinterpret_cast<int64_t*>(slot) = col.long_vals[val_idx];
245  break;
246  case TYPE_DOUBLE:
247  if (val_idx >= col.double_vals.size()) {
248  return Status(Substitute(ERROR_INVALID_COL_DATA, "DOUBLE"));
249  }
250  *reinterpret_cast<double*>(slot) = col.double_vals[val_idx];
251  break;
252  case TYPE_FLOAT:
253  if (val_idx >= col.double_vals.size()) {
254  return Status(Substitute(ERROR_INVALID_COL_DATA, "FLOAT"));
255  }
256  *reinterpret_cast<float*>(slot) = col.double_vals[val_idx];
257  break;
258  case TYPE_BOOLEAN:
259  if (val_idx >= col.bool_vals.size()) {
260  return Status(Substitute(ERROR_INVALID_COL_DATA, "BOOLEAN"));
261  }
262  *reinterpret_cast<int8_t*>(slot) = col.bool_vals[val_idx];
263  break;
264  case TYPE_TIMESTAMP: {
265  if (val_idx >= col.binary_vals.size()) {
266  return Status(Substitute(ERROR_INVALID_COL_DATA, "TIMESTAMP"));
267  }
268  const string& val = col.binary_vals[val_idx];
269  if (val.size() != TIMESTAMP_SIZE) return Status(ERROR_INVALID_TIMESTAMP);
270  const uint8_t* bytes = reinterpret_cast<const uint8_t*>(val.data());
271  *reinterpret_cast<TimestampValue*>(slot) = TimestampValue(
272  ReadWriteUtil::GetInt<uint64_t>(bytes),
273  ReadWriteUtil::GetInt<uint32_t>(bytes + sizeof(int64_t)));
274  break;
275  }
276  case TYPE_DECIMAL: {
277  if (val_idx >= col.binary_vals.size()) {
278  return Status(Substitute(ERROR_INVALID_COL_DATA, "DECIMAL"));
279  }
280  const string& val = col.binary_vals[val_idx];
281  RETURN_IF_ERROR(SetDecimalVal(slot_desc->type(), const_cast<char*>(val.data()),
282  val.size(), slot));
283  break;
284  }
285  default:
286  DCHECK(false);
287  }
288  }
289  return Status::OK;
290 }
291 
292 Status DataSourceScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
293  RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
294  RETURN_IF_CANCELLED(state);
295  SCOPED_TIMER(runtime_profile_->total_time_counter());
296  if (ReachedLimit()) {
297  *eos = true;
298  return Status::OK;
299  }
300  *eos = false;
301 
302  // create new tuple buffer for row_batch
303  MemPool* tuple_pool = row_batch->tuple_data_pool();
304  int tuple_buffer_size = row_batch->MaxTupleBufferSize();
305  void* tuple_buffer = tuple_pool->Allocate(tuple_buffer_size);
306  tuple_ = reinterpret_cast<Tuple*>(tuple_buffer);
307  ExprContext** ctxs = &conjunct_ctxs_[0];
308  int num_ctxs = conjunct_ctxs_.size();
309 
310  while (true) {
311  {
312  SCOPED_TIMER(materialize_tuple_timer());
313  // copy rows until we hit the limit/capacity or until we exhaust input_batch_
314  while (!ReachedLimit() && !row_batch->AtCapacity(tuple_pool) &&
315  InputBatchHasNext()) {
316  RETURN_IF_ERROR(MaterializeNextRow(tuple_pool));
317  int row_idx = row_batch->AddRow();
318  TupleRow* tuple_row = row_batch->GetRow(row_idx);
319  tuple_row->SetTuple(tuple_idx_, tuple_);
320 
321  if (ExecNode::EvalConjuncts(ctxs, num_ctxs, tuple_row)) {
322  row_batch->CommitLastRow();
323  char* new_tuple = reinterpret_cast<char*>(tuple_);
324  new_tuple += tuple_desc_->byte_size();
325  tuple_ = reinterpret_cast<Tuple*>(new_tuple);
326  ++num_rows_returned_;
327  }
328  ++next_row_idx_;
329  }
330  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
331 
332  if (ReachedLimit() || row_batch->AtCapacity() || input_batch_->eos) {
333  *eos = ReachedLimit() || input_batch_->eos;
334  return Status::OK;
335  }
336  }
337 
338  // Need more rows
339  DCHECK(!InputBatchHasNext());
340  RETURN_IF_ERROR(GetNextInputBatch());
341  }
342 }
343 
344 Status DataSourceScanNode::Reset(RuntimeState* state) {
345  DCHECK(false) << "NYI";
346  return Status("NYI");
347 }
348 
349 void DataSourceScanNode::Close(RuntimeState* state) {
350  if (is_closed()) return;
351  SCOPED_TIMER(runtime_profile_->total_time_counter());
352  PeriodicCounterUpdater::StopRateCounter(total_throughput_counter());
353  PeriodicCounterUpdater::StopTimeSeriesCounter(bytes_read_timeseries_counter_);
354  input_batch_.reset();
355  TCloseParams params;
356  params.__set_scan_handle(scan_handle_);
357  TCloseResult result;
358  Status status = data_source_executor_->Close(params, &result);
359  if (!status.ok()) state->LogError(status.msg());
360  ExecNode::Close(state);
361 }
362 
363 void DataSourceScanNode::DebugString(int indentation_level, stringstream* out) const {
364  string indent(indentation_level * 2, ' ');
365  *out << indent << "DataSourceScanNode(tupleid=" << data_src_node_.tuple_id << ")";
366 }
367 
368 } // namespace impala
const TableDescriptor * table_desc() const
Definition: descriptors.h:304
const string ERROR_INVALID_TIMESTAMP
std::string scan_handle_
The opaque handle returned by the data source for the scan.
virtual Status Prepare(RuntimeState *state)
Definition: scan-node.cc:44
void SetNull(const NullIndicatorOffset &offset)
Definition: tuple.h:101
const TUniqueId & query_id() const
bool InputBatchHasNext()
True if input_batch_ has more rows.
Status GetNext(RowBatch **batch, RuntimeState *state)
boost::scoped_ptr< RuntimeProfile > runtime_profile_
Definition: exec-node.h:225
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
const string ERROR_NUM_COLUMNS
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
void Init(int size)
Definition: tuple.h:58
virtual Status Open(RuntimeState *state)
Open the data source and initialize the first row batch.
bool AtCapacity()
Definition: row-batch.h:120
void * GetSlot(int offset)
Definition: tuple.h:118
const std::vector< SlotDescriptor * > & slots() const
Definition: descriptors.h:302
int byte_size() const
Definition: descriptors.h:300
const NullIndicatorOffset & null_indicator_offset() const
Definition: descriptors.h:89
Status MaterializeNextRow(MemPool *mem_pool)
Materializes the next row (next_row_idx_) into tuple_.
TupleDescriptor * GetTupleDescriptor(TupleId id) const
Definition: descriptors.cc:437
#define SCOPED_TIMER(c)
const string ERROR_MISMATCHED_COL_SIZES
boost::scoped_ptr< ExternalDataSourceExecutor > data_source_executor_
Used to call the external data source.
std::vector< int > cols_next_val_idx_
std::vector< SlotDescriptor * > materialized_slots_
#define VLOG_QUERY
Definition: logging.h:57
const TDataSourceScanNode data_src_node_
Thrift structure describing the data source scan node.
PrimitiveType type
Definition: types.h:60
const string ERROR_INVALID_DECIMAL
bool LogError(const ErrorMsg &msg)
const ColumnType & type() const
Definition: descriptors.h:78
#define RETURN_IF_CANCELLED(state)
ObjectPool pool
std::string DebugString(const T &val)
Definition: udf-debug.h:27
int GetByteSize() const
Returns the byte size of this type. Returns 0 for variable length types.
Definition: types.h:178
int col_pos() const
Definition: descriptors.h:84
Tuple * tuple_
Current tuple.
DEFINE_int32(data_source_batch_size, 1024,"Batch size for calls to GetNext() on ""external data sources.")
static int Decode(uint8_t *buffer, int fixed_len_size, T *v)
const DescriptorTbl & desc_tbl() const
Definition: runtime-state.h:93
Abstract base class of all scan nodes; introduces SetScanRange().
Definition: scan-node.h:77
void CommitLastRow()
Definition: row-batch.h:109
const std::vector< std::string > & col_names() const
Definition: descriptors.h:165
#define COUNTER_SET(c, v)
const string ERROR_INVALID_COL_DATA
MemPool * tuple_data_pool()
Definition: row-batch.h:148
const std::string & effective_user() const
const size_t TIMESTAMP_SIZE
const std::string & name() const
Definition: descriptors.h:163
void SetTuple(int tuple_idx, Tuple *tuple)
Definition: tuple-row.h:34
Status SetDecimalVal(const ColumnType &type, char *bytes, int len, void *slot)
static const Status OK
Definition: status.h:87
boost::scoped_ptr< extdatasource::TGetNextResult > input_batch_
Status GetNextInputBatch()
Gets the next batch from the data source, stored in input_batch_.
int tuple_offset() const
Definition: descriptors.h:88
const TupleDescriptor * tuple_desc_
Descriptor of tuples read.
int MaxTupleBufferSize()
Computes the maximum size needed to store tuple data for this row batch.
Definition: row-batch.cc:325
bool is_materialized() const
Definition: descriptors.h:92
const ErrorMsg & msg() const
Returns the error message associated with a non-successful status.
Definition: status.h:189
TColumnType ToThrift() const
Definition: types.h:147
virtual Status Open(RuntimeState *state)
Definition: exec-node.cc:154
bool ok() const
Definition: status.h:172
virtual Status Prepare(RuntimeState *state)
Load the data source library and create the ExternalDataSourceExecutor.
uint8_t * Allocate(int size)
Definition: mem-pool.h:92