Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
buffered-tuple-stream.inline.h
Go to the documentation of this file.
1 // Copyright 2013 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef IMPALA_RUNTIME_TUPLE_BUFFERED_STREAM_INLINE_H
16 #define IMPALA_RUNTIME_TUPLE_BUFFERED_STREAM_INLINE_H
17 
19 
20 #include "runtime/descriptors.h"
21 #include "runtime/tuple-row.h"
22 
23 namespace impala {
24 
25 inline bool BufferedTupleStream::AddRow(TupleRow* row, uint8_t** dst) {
26  DCHECK(!closed_);
27  if (LIKELY(DeepCopy(row, dst))) return true;
28  bool got_block = false;
29  status_ = NewBlockForWrite(ComputeRowSize(row), &got_block);
30  if (!status_.ok() || !got_block) return false;
31  return DeepCopy(row, dst);
32 }
33 
34 inline uint8_t* BufferedTupleStream::AllocateRow(int size) {
35  DCHECK(!closed_);
36  if (UNLIKELY(write_block_ == NULL || write_block_->BytesRemaining() < size)) {
37  bool got_block = false;
38  status_ = NewBlockForWrite(size, &got_block);
39  if (!status_.ok() || !got_block) return NULL;
40  }
41  DCHECK(write_block_ != NULL);
42  DCHECK(write_block_->is_pinned());
43  DCHECK_GE(write_block_->BytesRemaining(), size);
44  ++num_rows_;
46  return write_block_->Allocate<uint8_t>(size);
47 }
48 
49 inline void BufferedTupleStream::GetTupleRow(const RowIdx& idx, TupleRow* row) const {
50  DCHECK_NOTNULL(row);
51  DCHECK(!closed_);
52  DCHECK(is_pinned());
53  DCHECK(!delete_on_read_);
54  DCHECK_EQ(blocks_.size(), block_start_idx_.size());
55  DCHECK_LT(idx.block(), blocks_.size());
56 
57  uint8_t* data = block_start_idx_[idx.block()] + idx.offset();
58  if (nullable_tuple_) {
59  // Stitch together the tuples from the block and the NULL ones.
60  const int tuples_per_row = desc_.tuple_descriptors().size();
61  uint32_t tuple_idx = idx.idx() * tuples_per_row;
62  for (int i = 0; i < tuples_per_row; ++i) {
63  const uint8_t* null_word = block_start_idx_[idx.block()] + (tuple_idx >> 3);
64  const uint32_t null_pos = tuple_idx & 7;
65  const bool is_not_null = ((*null_word & (1 << (7 - null_pos))) == 0);
66  row->SetTuple(i, reinterpret_cast<Tuple*>(
67  reinterpret_cast<uint64_t>(data) * is_not_null));
68  data += desc_.tuple_descriptors()[i]->byte_size() * is_not_null;
69  ++tuple_idx;
70  }
71  } else {
72  for (int i = 0; i < desc_.tuple_descriptors().size(); ++i) {
73  row->SetTuple(i, reinterpret_cast<Tuple*>(data));
74  data += desc_.tuple_descriptors()[i]->byte_size();
75  }
76  }
77 }
78 
79 }
80 
81 #endif
std::vector< uint8_t * > block_start_idx_
T * Allocate(int size)
Allocates the specified number of bytes from this block.
bool DeepCopy(TupleRow *row, uint8_t **dst)
Wrapper of the templated DeepCopyInternal() function.
const RowDescriptor & desc_
Description of rows stored in the stream.
int BytesRemaining() const
Return the number of remaining bytes that can be allocated in this block.
const bool delete_on_read_
If true, blocks are deleted after they are read.
const bool nullable_tuple_
Whether any tuple in the rows is nullable.
BufferedBlockMgr::Block * write_block_
The current block for writing. NULL if there is no available block to write to.
const std::vector< TupleDescriptor * > & tuple_descriptors() const
Return descriptors for all tuples in this row, in order of appearance.
Definition: descriptors.h:412
void SetTuple(int tuple_idx, Tuple *tuple)
Definition: tuple-row.h:34
#define UNLIKELY(expr)
Definition: compiler-util.h:33
bool AddRow(TupleRow *row, uint8_t **dst=NULL)
#define LIKELY(expr)
Definition: compiler-util.h:32
std::list< BufferedBlockMgr::Block * > blocks_
List of blocks in the stream.
Status NewBlockForWrite(int min_size, bool *got_block)
bool ok() const
Definition: status.h:172
int64_t num_rows_
Number of rows stored in the stream.
int ComputeRowSize(TupleRow *row) const
Returns the byte size of this row when encoded in a block.
void GetTupleRow(const RowIdx &idx, TupleRow *row) const