23 using namespace impala;
39 template<
int const JoinOp>
50 int num_rows_added = 0;
56 DCHECK(matched_build_row != NULL);
58 if ((JoinOp == TJoinOp::RIGHT_SEMI_JOIN || JoinOp == TJoinOp::RIGHT_ANTI_JOIN) &&
64 if (JoinOp == TJoinOp::LEFT_ANTI_JOIN || JoinOp == TJoinOp::LEFT_SEMI_JOIN ||
65 JoinOp == TJoinOp::RIGHT_ANTI_JOIN || JoinOp == TJoinOp::RIGHT_SEMI_JOIN ||
66 JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
69 if (num_other_join_conjuncts > 0) {
80 if (JoinOp == TJoinOp::LEFT_ANTI_JOIN || JoinOp == TJoinOp::LEFT_SEMI_JOIN ||
81 JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
84 out_batch->
CopyRow(matched_build_row, out_row);
99 if (JoinOp == TJoinOp::LEFT_ANTI_JOIN ||
100 JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
107 if (JoinOp == TJoinOp::LEFT_SEMI_JOIN) {
110 if (JoinOp == TJoinOp::RIGHT_OUTER_JOIN || JoinOp == TJoinOp::RIGHT_ANTI_JOIN ||
111 JoinOp == TJoinOp::FULL_OUTER_JOIN || JoinOp == TJoinOp::RIGHT_SEMI_JOIN) {
119 if ((JoinOp != TJoinOp::RIGHT_ANTI_JOIN) &&
122 out_row = out_row->next_row(out_batch);
123 if (num_rows_added == max_rows)
goto end;
127 if (JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !
matched_probe_) {
133 if (num_other_join_conjuncts == 0)
goto next_row;
142 if ((JoinOp == TJoinOp::LEFT_OUTER_JOIN || JoinOp == TJoinOp::FULL_OUTER_JOIN) &&
149 out_row = out_row->next_row(out_batch);
150 if (num_rows_added == max_rows)
goto end;
153 if ((JoinOp == TJoinOp::LEFT_ANTI_JOIN ||
154 JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) &&
161 out_row = out_row->next_row(out_batch);
162 if (num_rows_added == max_rows)
goto end;
182 if (JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
191 if (num_other_join_conjuncts == 0)
goto next_row;
223 DCHECK_LE(num_rows_added, max_rows);
224 return num_rows_added;
230 case TJoinOp::INNER_JOIN:
231 return ProcessProbeBatch<TJoinOp::INNER_JOIN>(out_batch, ht_ctx);
232 case TJoinOp::LEFT_OUTER_JOIN:
233 return ProcessProbeBatch<TJoinOp::LEFT_OUTER_JOIN>(out_batch, ht_ctx);
234 case TJoinOp::LEFT_SEMI_JOIN:
235 return ProcessProbeBatch<TJoinOp::LEFT_SEMI_JOIN>(out_batch, ht_ctx);
236 case TJoinOp::LEFT_ANTI_JOIN:
237 return ProcessProbeBatch<TJoinOp::LEFT_ANTI_JOIN>(out_batch, ht_ctx);
238 case TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN:
239 return ProcessProbeBatch<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>(out_batch, ht_ctx);
240 case TJoinOp::RIGHT_OUTER_JOIN:
241 return ProcessProbeBatch<TJoinOp::RIGHT_OUTER_JOIN>(out_batch, ht_ctx);
242 case TJoinOp::RIGHT_SEMI_JOIN:
243 return ProcessProbeBatch<TJoinOp::RIGHT_SEMI_JOIN>(out_batch, ht_ctx);
244 case TJoinOp::RIGHT_ANTI_JOIN:
245 return ProcessProbeBatch<TJoinOp::RIGHT_ANTI_JOIN>(out_batch, ht_ctx);
246 case TJoinOp::FULL_OUTER_JOIN:
247 return ProcessProbeBatch<TJoinOp::FULL_OUTER_JOIN>(out_batch, ht_ctx);
249 DCHECK(
false) <<
"Unknown join type";
255 for (
int i = 0; i < build_batch->
num_rows(); ++i) {
258 if (!
ht_ctx_->EvalAndHashBuild(build_row, &hash)) {
271 const bool result =
AppendRow(partition->build_rows(), build_row);
bool AtEnd() const
Returns true if this iterator is at the end, i.e. GetRow() cannot be called.
bool AppendRow(BufferedTupleStream *stream, TupleRow *row)
bool non_empty_build_
If true, the build side has at least one row.
void SetAtEnd()
Resets everything but the pointer to the hash table.
std::vector< ExprContext * > other_join_conjunct_ctxs_
Non-equi-join conjuncts from the JOIN clause.
void CreateOutputRow(TupleRow *out_row, TupleRow *probe_row, TupleRow *build_row)
Status ProcessBuildBatch(RowBatch *build_batch)
Reads the rows in build_batch and partitions them in hash_partitions_.
TupleRow * current_probe_row_
const StringSearch UrlParser::hash_search & hash
TupleRow * GetRow(int row_idx)
HashTable * hash_tbls_[PARTITION_FANOUT]
void IR_ALWAYS_INLINE NextDuplicate()
BufferedTupleStream * probe_rows()
std::vector< bool > matched_null_probe_
const std::vector< ExprContext * > & conjunct_ctxs() const
std::vector< Partition * > hash_partitions_
BufferedTupleStream * null_probe_rows_
bool IR_NO_INLINE EvalOtherJoinConjuncts(ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
BufferedTupleStream * build_rows()
void CopyRow(TupleRow *src, TupleRow *dest)
bool AddRow(TupleRow *row, uint8_t **dst=NULL)
Iterator IR_ALWAYS_INLINE Find(HashTableCtx *ht_ctx, uint32_t hash)
Partition * null_aware_partition_
int64_t num_rows() const
Number of rows in the stream.
int ProcessProbeBatch(RowBatch *out_batch, HashTableCtx *ht_ctx)
static bool EvalConjuncts(ExprContext *const *ctxs, int num_ctxs, TupleRow *row)
boost::scoped_ptr< HashTableCtx > ht_ctx_
boost::scoped_ptr< RowBatch > probe_batch_
HashTable::Iterator hash_tbl_iterator_
The iterator that corresponds to the look up of current_probe_row_.
TupleRow * semi_join_staging_row_
std::vector< ExprContext * > conjunct_ctxs_
State state_
State of the algorithm. Used just for debugging.
TupleRow * GetRow() const
bool IR_ALWAYS_INLINE EvalAndHashProbe(TupleRow *row, uint32_t *hash)
static const int NUM_PARTITIONING_BITS
Needs to be the log(PARTITION_FANOUT)