17 #include <boost/foreach.hpp>
19 #include <gutil/strings/substitute.h>
35 using namespace strings;
36 using namespace impala::extdatasource;
38 DEFINE_int32(data_source_batch_size, 1024,
"Batch size for calls to GetNext() on "
39 "external data sources.");
45 "Expected $0 but received $1. This likely indicates a problem with the data source "
48 "different numbers of rows. This likely indicates a problem with the data source "
52 "Expected value of type $0 based on column metadata. This likely indicates a "
53 "problem with the data source library.";
55 "This likely indicates a problem with the data source library.";
57 "This likely indicates a problem with the data source library.";
62 DataSourceScanNode::DataSourceScanNode(
ObjectPool*
pool,
const TPlanNode& tnode,
65 data_src_node_(tnode.data_source_node),
98 vector<extdatasource::TColumnDesc> cols;
100 extdatasource::TColumnDesc col;
106 extdatasource::TTableSchema row_schema;
107 row_schema.__set_cols(cols);
110 params.__set_query_id(state->
query_id());
114 params.__set_row_schema(row_schema);
115 params.__set_batch_size(FLAGS_data_source_batch_size);
126 const vector<TColumnData>& cols =
input_batch_->rows.cols;
137 const TColumnData& col_data = cols[i];
149 TGetNextParams params;
158 <<
"rows but did not set 'eos'. No more rows will be fetched from the "
168 uint8_t* buffer =
reinterpret_cast<uint8_t*
>(bytes);
188 default: DCHECK(
false);
194 const vector<TColumnData>& cols =
input_batch_->rows.cols;
200 const TColumnData& col = cols[i];
211 if (val_idx >= col.string_vals.size()) {
214 const string& val = col.string_vals[val_idx];
215 size_t val_size = val.size();
216 char* buffer =
reinterpret_cast<char*
>(tuple_pool->
Allocate(val_size));
217 memcpy(buffer, val.data(), val_size);
218 reinterpret_cast<StringValue*
>(slot)->ptr = buffer;
219 reinterpret_cast<StringValue*
>(slot)->len = val_size;
223 if (val_idx >= col.byte_vals.size()) {
226 *
reinterpret_cast<int8_t*
>(slot) = col.byte_vals[val_idx];
229 if (val_idx >= col.short_vals.size()) {
232 *
reinterpret_cast<int16_t*
>(slot) = col.short_vals[val_idx];
235 if (val_idx >= col.int_vals.size()) {
238 *
reinterpret_cast<int32_t*
>(slot) = col.int_vals[val_idx];
241 if (val_idx >= col.long_vals.size()) {
244 *
reinterpret_cast<int64_t*
>(slot) = col.long_vals[val_idx];
247 if (val_idx >= col.double_vals.size()) {
250 *
reinterpret_cast<double*
>(slot) = col.double_vals[val_idx];
253 if (val_idx >= col.double_vals.size()) {
256 *
reinterpret_cast<float*
>(slot) = col.double_vals[val_idx];
259 if (val_idx >= col.bool_vals.size()) {
262 *
reinterpret_cast<int8_t*
>(slot) = col.bool_vals[val_idx];
265 if (val_idx >= col.binary_vals.size()) {
268 const string& val = col.binary_vals[val_idx];
270 const uint8_t* bytes =
reinterpret_cast<const uint8_t*
>(val.data());
272 ReadWriteUtil::GetInt<uint64_t>(bytes),
273 ReadWriteUtil::GetInt<uint32_t>(bytes +
sizeof(int64_t)));
277 if (val_idx >= col.binary_vals.size()) {
280 const string& val = col.binary_vals[val_idx];
296 if (ReachedLimit()) {
305 void* tuple_buffer = tuple_pool->
Allocate(tuple_buffer_size);
306 tuple_ =
reinterpret_cast<Tuple*
>(tuple_buffer);
308 int num_ctxs = conjunct_ctxs_.size();
314 while (!ReachedLimit() && !row_batch->
AtCapacity(tuple_pool) &&
315 InputBatchHasNext()) {
317 int row_idx = row_batch->
AddRow();
319 tuple_row->
SetTuple(tuple_idx_, tuple_);
321 if (ExecNode::EvalConjuncts(ctxs, num_ctxs, tuple_row)) {
323 char* new_tuple =
reinterpret_cast<char*
>(tuple_);
324 new_tuple += tuple_desc_->byte_size();
325 tuple_ =
reinterpret_cast<Tuple*
>(new_tuple);
326 ++num_rows_returned_;
330 COUNTER_SET(rows_returned_counter_, num_rows_returned_);
332 if (ReachedLimit() || row_batch->
AtCapacity() || input_batch_->eos) {
333 *eos = ReachedLimit() || input_batch_->eos;
339 DCHECK(!InputBatchHasNext());
345 DCHECK(
false) <<
"NYI";
350 if (is_closed())
return;
352 PeriodicCounterUpdater::StopRateCounter(total_throughput_counter());
353 PeriodicCounterUpdater::StopTimeSeriesCounter(bytes_read_timeseries_counter_);
354 input_batch_.reset();
356 params.__set_scan_handle(scan_handle_);
358 Status status = data_source_executor_->Close(params, &result);
360 ExecNode::Close(state);
364 string indent(indentation_level * 2,
' ');
365 *out << indent <<
"DataSourceScanNode(tupleid=" << data_src_node_.tuple_id <<
")";
const TableDescriptor * table_desc() const
const string ERROR_INVALID_TIMESTAMP
std::string scan_handle_
The opaque handle returned by the data source for the scan.
virtual Status Prepare(RuntimeState *state)
void SetNull(const NullIndicatorOffset &offset)
const TUniqueId & query_id() const
bool InputBatchHasNext()
True if input_batch_ has more rows.
Status GetNext(RowBatch **batch, RuntimeState *state)
boost::scoped_ptr< RuntimeProfile > runtime_profile_
A tuple with 0 materialised slots is represented as NULL.
const string ERROR_NUM_COLUMNS
#define RETURN_IF_ERROR(stmt)
some generally useful macros
TupleRow * GetRow(int row_idx)
virtual Status Open(RuntimeState *state)
Open the data source and initialize the first row batch.
void * GetSlot(int offset)
const std::vector< SlotDescriptor * > & slots() const
Status ValidateRowBatchSize()
const NullIndicatorOffset & null_indicator_offset() const
Status MaterializeNextRow(MemPool *mem_pool)
Materializes the next row (next_row_idx_) into tuple_.
TupleDescriptor * GetTupleDescriptor(TupleId id) const
const string ERROR_MISMATCHED_COL_SIZES
boost::scoped_ptr< ExternalDataSourceExecutor > data_source_executor_
Used to call the external data source.
std::vector< int > cols_next_val_idx_
std::vector< SlotDescriptor * > materialized_slots_
const TDataSourceScanNode data_src_node_
Thrift structure describing the data source scan node.
const string ERROR_INVALID_DECIMAL
bool LogError(const ErrorMsg &msg)
const ColumnType & type() const
#define RETURN_IF_CANCELLED(state)
std::string DebugString(const T &val)
int GetByteSize() const
Returns the byte size of this type. Returns 0 for variable length types.
Tuple * tuple_
Current tuple.
DEFINE_int32(data_source_batch_size, 1024,"Batch size for calls to GetNext() on ""external data sources.")
static int Decode(uint8_t *buffer, int fixed_len_size, T *v)
const DescriptorTbl & desc_tbl() const
Abstract base class of all scan nodes; introduces SetScanRange().
const std::vector< std::string > & col_names() const
#define COUNTER_SET(c, v)
const string ERROR_INVALID_COL_DATA
MemPool * tuple_data_pool()
const std::string & effective_user() const
const size_t TIMESTAMP_SIZE
const std::string & name() const
void SetTuple(int tuple_idx, Tuple *tuple)
Status SetDecimalVal(const ColumnType &type, char *bytes, int len, void *slot)
boost::scoped_ptr< extdatasource::TGetNextResult > input_batch_
Status GetNextInputBatch()
Gets the next batch from the data source, stored in input_batch_.
const TupleDescriptor * tuple_desc_
Descriptor of tuples read.
int MaxTupleBufferSize()
Computes the maximum size needed to store tuple data for this row batch.
bool is_materialized() const
const ErrorMsg & msg() const
Returns the error message associated with a non-successful status.
TColumnType ToThrift() const
virtual Status Open(RuntimeState *state)
virtual Status Prepare(RuntimeState *state)
Load the data source library and create the ExternalDataSourceExecutor.
uint8_t * Allocate(int size)