Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
partitioned-aggregation-node-ir.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include "exec/hash-table.inline.h"
19 #include "runtime/row-batch.h"
20 #include "runtime/tuple-row.h"
21 
22 using namespace impala;
23 
25  RowBatch* batch, HashTableCtx* ht_ctx) {
26  for (int i = 0; i < batch->num_rows(); ++i) {
28  }
29  return Status::OK;
30 }
31 
32 template<bool AGGREGATED_ROWS>
34  DCHECK(!hash_partitions_.empty());
35 
36  // Make sure that no resizes will happen when inserting individual rows to the hash
37  // table of each partition by pessimistically assuming that all the rows in each batch
38  // will end up to the same partition.
39  // TODO: Once we have a histogram with the number of rows per partition, we will have
40  // accurate resize calls.
41  int num_rows = batch->num_rows();
42  for (int partition_idx = 0; partition_idx < PARTITION_FANOUT; ++partition_idx) {
43  Partition* dst_partition = hash_partitions_[partition_idx];
44  while (!dst_partition->is_spilled()) {
45  {
47  if (dst_partition->hash_tbl->CheckAndResize(num_rows, ht_ctx)) break;
48  }
49  // There was not enough memory for the resize. Spill a partition and retry.
51  }
52  }
53 
54  for (int i = 0; i < num_rows; ++i) {
55  TupleRow* row = batch->GetRow(i);
56  uint32_t hash = 0;
57  if (AGGREGATED_ROWS) {
58  if (!ht_ctx->EvalAndHashBuild(row, &hash)) continue;
59  } else {
60  if (!ht_ctx->EvalAndHashProbe(row, &hash)) continue;
61  }
62 
63  // To process this row, we first see if it can be aggregated or inserted into this
64  // partition's hash table. If we need to insert it and that fails, due to OOM, we
65  // spill the partition. The partition to spill is not necessarily dst_partition,
66  // so we can try again to insert the row.
67  Partition* dst_partition = hash_partitions_[hash >> (32 - NUM_PARTITIONING_BITS)];
68  if (!dst_partition->is_spilled()) {
69  DCHECK_NOTNULL(dst_partition->hash_tbl.get());
70  DCHECK(dst_partition->aggregated_row_stream->is_pinned());
71 
72  HashTable* ht = dst_partition->hash_tbl.get();
73  if (!AGGREGATED_ROWS) {
74  // If the row is already an aggregate row, it cannot match anything in the
75  // hash table since we process the aggregate rows first. These rows should
76  // have been aggregated in the initial pass.
77  // TODO: change HT interface to use a FindOrInsert() call
78  HashTable::Iterator it = ht->Find(ht_ctx, hash);
79  if (!it.AtEnd()) {
80  // Row is already in hash table. Do the aggregation and we're done.
81  UpdateTuple(&dst_partition->agg_fn_ctxs[0], it.GetTuple(), row);
82  continue;
83  }
84  } else {
85  DCHECK(ht->Find(ht_ctx, hash).AtEnd()) << ht->size();
86  }
87 
88  Tuple* intermediate_tuple = NULL;
89 allocate_tuple:
90 #if 0
91  // TODO: this optimization doesn't work. Why?
92  // First construct the intermediate tuple in the dst partition's stream.
93  // TODO: needs_serialize can be removed with codegen.
94  if (AGGREGATED_ROWS && !needs_serialize_) {
95  // We can just copy the row into the stream.
96  if (!dst_partition->aggregated_row_stream->AddRow(
97  row, reinterpret_cast<uint8_t**>(&intermediate_tuple))) {
98  intermediate_tuple = NULL;
99  }
100  }
101 #endif
102  // If this aggregate function requires serialize, or we are seeing this
103  // result row the first time, we need to construct the result row and
104  // initialize it.
105  intermediate_tuple = ConstructIntermediateTuple(dst_partition->agg_fn_ctxs,
106  NULL, dst_partition->aggregated_row_stream.get());
107  if (intermediate_tuple != NULL) {
108  UpdateTuple(&dst_partition->agg_fn_ctxs[0],
109  intermediate_tuple, row, AGGREGATED_ROWS);
110  }
111 
112  // After copying and initialize it, try to insert the tuple into the hash table.
113  // If it inserts, we are done.
114  if (intermediate_tuple != NULL && ht->Insert(ht_ctx, intermediate_tuple, hash)) {
115  continue;
116  }
117 
118  // In this case, we either did not have enough memory to add the intermediate_tuple
119  // to the stream or we did not have enough memory to insert it into the hash table.
120  // We need to spill until there is enough memory to insert this tuple or
121  // dst_partition is spilled.
122  while (true) {
123  RETURN_IF_ERROR(SpillPartition(dst_partition, intermediate_tuple));
124  if (!dst_partition->is_spilled()) {
125  DCHECK(dst_partition->aggregated_row_stream->is_pinned());
126  // We spilled a different partition, try to insert this tuple.
127  if (intermediate_tuple == NULL) goto allocate_tuple;
128  if (ht->Insert(ht_ctx, intermediate_tuple, hash)) break;
129  } else {
130  break;
131  }
132  }
133 
134  // In this case, we were able to add the tuple to the stream but not enough
135  // to put it in the hash table. Nothing left to do, the tuple is spilled.
136  if (intermediate_tuple != NULL) continue;
137  }
138 
139  // This partition is already spilled, just append the row.
140  BufferedTupleStream* dst_stream = AGGREGATED_ROWS ?
141  dst_partition->aggregated_row_stream.get() :
142  dst_partition->unaggregated_row_stream.get();
143  DCHECK(dst_stream != NULL);
144  DCHECK(!dst_stream->is_pinned()) << AGGREGATED_ROWS;
145  DCHECK(dst_stream->has_write_block()) << AGGREGATED_ROWS;
146  if (dst_stream->AddRow(row)) continue;
147  Status status = dst_stream->status();
148  DCHECK(!status.ok()) << AGGREGATED_ROWS;
149  return status;
150  }
151 
152  return Status::OK;
153 }
154 
156  RowBatch* batch, HashTableCtx* ht_ctx) {
157  return ProcessBatch<false>(batch, ht_ctx);
158 }
159 
161  RowBatch* batch, HashTableCtx* ht_ctx) {
162  return ProcessBatch<true>(batch, ht_ctx);
163 }
stl-like iterator interface.
Definition: hash-table.h:450
The underlying memory management is done by the BufferedBlockMgr.
bool AtEnd() const
Returns true if this iterator is at the end, i.e. GetRow() cannot be called.
Definition: hash-table.h:492
int num_rows() const
Definition: row-batch.h:215
bool needs_serialize_
Contains any evaluators that require the serialize step.
Status ProcessBatch_false(RowBatch *batch, HashTableCtx *ht_ctx)
Tuple * ConstructIntermediateTuple(const std::vector< impala_udf::FunctionContext * > &agg_fn_ctxs, MemPool *pool, BufferedTupleStream *stream)
boost::scoped_ptr< BufferedTupleStream > unaggregated_row_stream
Unaggregated rows that are spilled.
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
const StringSearch UrlParser::hash_search & hash
Definition: url-parser.cc:41
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
std::vector< impala_udf::FunctionContext * > agg_fn_ctxs_
static const int PARTITION_FANOUT
Number of initial partitions to create. Must be a power of 2.
#define SCOPED_TIMER(c)
Status SpillPartition(Partition *curr_partition=NULL, Tuple *curr_intermediate_tuple=NULL)
Status ProcessBatch_true(RowBatch *batch, HashTableCtx *ht_ctx)
std::vector< Partition * > hash_partitions_
Current partitions we are partitioning into.
bool AddRow(TupleRow *row, uint8_t **dst=NULL)
std::vector< impala_udf::FunctionContext * > agg_fn_ctxs
Clone of parent's agg_fn_ctxs_ and backing MemPool.
static const Status OK
Definition: status.h:87
boost::scoped_ptr< BufferedTupleStream > aggregated_row_stream
Status ProcessBatchNoGrouping(RowBatch *batch, HashTableCtx *ht_ctx=NULL)
bool IR_ALWAYS_INLINE EvalAndHashBuild(TupleRow *row, uint32_t *hash)
bool ok() const
Definition: status.h:172
void UpdateTuple(impala_udf::FunctionContext **agg_fn_ctxs, Tuple *tuple, TupleRow *row, bool is_merge=false)
bool IR_ALWAYS_INLINE EvalAndHashProbe(TupleRow *row, uint32_t *hash)
RuntimeProfile::Counter * ht_resize_timer_
Total time spent resizing hash tables.
Status IR_ALWAYS_INLINE ProcessBatch(RowBatch *batch, HashTableCtx *ht_ctx)