22 using namespace impala;
26 for (
int i = 0; i < batch->
num_rows(); ++i) {
32 template<
bool AGGREGATED_ROWS>
42 for (
int partition_idx = 0; partition_idx <
PARTITION_FANOUT; ++partition_idx) {
47 if (dst_partition->
hash_tbl->CheckAndResize(num_rows, ht_ctx))
break;
54 for (
int i = 0; i < num_rows; ++i) {
57 if (AGGREGATED_ROWS) {
69 DCHECK_NOTNULL(dst_partition->
hash_tbl.get());
73 if (!AGGREGATED_ROWS) {
85 DCHECK(ht->Find(ht_ctx, hash).AtEnd()) << ht->size();
88 Tuple* intermediate_tuple = NULL;
97 row, reinterpret_cast<uint8_t**>(&intermediate_tuple))) {
98 intermediate_tuple = NULL;
107 if (intermediate_tuple != NULL) {
109 intermediate_tuple, row, AGGREGATED_ROWS);
114 if (intermediate_tuple != NULL && ht->Insert(ht_ctx, intermediate_tuple, hash)) {
127 if (intermediate_tuple == NULL)
goto allocate_tuple;
128 if (ht->Insert(ht_ctx, intermediate_tuple, hash))
break;
136 if (intermediate_tuple != NULL)
continue;
143 DCHECK(dst_stream != NULL);
144 DCHECK(!dst_stream->
is_pinned()) << AGGREGATED_ROWS;
146 if (dst_stream->
AddRow(row))
continue;
148 DCHECK(!status.
ok()) << AGGREGATED_ROWS;
157 return ProcessBatch<false>(batch, ht_ctx);
162 return ProcessBatch<true>(batch, ht_ctx);
stl-like iterator interface.
The underlying memory management is done by the BufferedBlockMgr.
bool AtEnd() const
Returns true if this iterator is at the end, i.e. GetRow() cannot be called.
bool needs_serialize_
Contains any evaluators that require the serialize step.
bool has_write_block() const
Status ProcessBatch_false(RowBatch *batch, HashTableCtx *ht_ctx)
Tuple * ConstructIntermediateTuple(const std::vector< impala_udf::FunctionContext * > &agg_fn_ctxs, MemPool *pool, BufferedTupleStream *stream)
boost::scoped_ptr< BufferedTupleStream > unaggregated_row_stream
Unaggregated rows that are spilled.
A tuple with 0 materialised slots is represented as NULL.
const StringSearch UrlParser::hash_search & hash
#define RETURN_IF_ERROR(stmt)
some generally useful macros
TupleRow * GetRow(int row_idx)
std::vector< impala_udf::FunctionContext * > agg_fn_ctxs_
static const int PARTITION_FANOUT
Number of initial partitions to create. Must be a power of 2.
Tuple * singleton_output_tuple_
Status SpillPartition(Partition *curr_partition=NULL, Tuple *curr_intermediate_tuple=NULL)
Status ProcessBatch_true(RowBatch *batch, HashTableCtx *ht_ctx)
static const int NUM_PARTITIONING_BITS
boost::scoped_ptr< HashTable > hash_tbl
std::vector< Partition * > hash_partitions_
Current partitions we are partitioning into.
bool AddRow(TupleRow *row, uint8_t **dst=NULL)
std::vector< impala_udf::FunctionContext * > agg_fn_ctxs
Clone of parent's agg_fn_ctxs_ and backing MemPool.
boost::scoped_ptr< BufferedTupleStream > aggregated_row_stream
Status ProcessBatchNoGrouping(RowBatch *batch, HashTableCtx *ht_ctx=NULL)
bool IR_ALWAYS_INLINE EvalAndHashBuild(TupleRow *row, uint32_t *hash)
void UpdateTuple(impala_udf::FunctionContext **agg_fn_ctxs, Tuple *tuple, TupleRow *row, bool is_merge=false)
bool IR_ALWAYS_INLINE EvalAndHashProbe(TupleRow *row, uint32_t *hash)
RuntimeProfile::Counter * ht_resize_timer_
Total time spent resizing hash tables.
Status IR_ALWAYS_INLINE ProcessBatch(RowBatch *batch, HashTableCtx *ht_ctx)