Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
buffered-tuple-stream.cc
Go to the documentation of this file.
1 // Copyright 2013 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <boost/bind.hpp>
18 #include <gutil/strings/substitute.h>
19 
20 #include "runtime/descriptors.h"
21 #include "runtime/row-batch.h"
22 #include "runtime/tuple-row.h"
23 #include "util/bit-util.h"
24 #include "util/debug-util.h"
25 
26 #include "common/names.h"
27 
28 using namespace impala;
29 using namespace strings;
30 
31 // The first NUM_SMALL_BLOCKS of the tuple stream are made of blocks less than the
32 // IO size. These blocks never spill.
33 static const int64_t INITIAL_BLOCK_SIZES[] =
34  { 64 * 1024, 512 * 1024 };
35 static const int NUM_SMALL_BLOCKS = sizeof(INITIAL_BLOCK_SIZES) / sizeof(int64_t);
36 
38  stringstream ss;
39  ss << "RowIdx block=" << block() << " offset=" << offset() << " idx=" << idx();
40  return ss.str();
41 }
42 
44  const RowDescriptor& row_desc, BufferedBlockMgr* block_mgr,
45  BufferedBlockMgr::Client* client, bool use_initial_small_buffers,
46  bool delete_on_read, bool read_write)
47  : use_small_buffers_(use_initial_small_buffers),
48  delete_on_read_(delete_on_read),
49  read_write_(read_write),
50  state_(state),
51  desc_(row_desc),
52  nullable_tuple_(row_desc.IsAnyTupleNullable()),
53  block_mgr_(block_mgr),
54  block_mgr_client_(client),
55  total_byte_size_(0),
56  read_ptr_(NULL),
57  read_tuple_idx_(0),
58  read_bytes_(0),
59  rows_returned_(0),
60  read_block_idx_(-1),
61  write_block_(NULL),
62  num_pinned_(0),
63  num_small_blocks_(0),
64  closed_(false),
65  num_rows_(0),
66  pinned_(true),
67  pin_timer_(NULL),
68  unpin_timer_(NULL),
69  get_new_block_timer_(NULL) {
71  read_block_ = blocks_.end();
73  for (int i = 0; i < desc_.tuple_descriptors().size(); ++i) {
74  const TupleDescriptor* tuple_desc = desc_.tuple_descriptors()[i];
75  const int tuple_byte_size = tuple_desc->byte_size();
76  fixed_tuple_row_size_ += tuple_byte_size;
77  if (tuple_desc->string_slots().empty()) continue;
78  string_slots_.push_back(make_pair(i, tuple_desc->string_slots()));
79  }
80 }
81 
82 // Returns the number of pinned blocks in the list. Only called in DCHECKs to validate
83 // num_pinned_.
84 int NumPinned(const list<BufferedBlockMgr::Block*>& blocks) {
85  int num_pinned = 0;
86  for (list<BufferedBlockMgr::Block*>::const_iterator it = blocks.begin();
87  it != blocks.end(); ++it) {
88  if ((*it)->is_pinned() && (*it)->is_max_size()) ++num_pinned;
89  }
90  return num_pinned;
91 }
92 
94  stringstream ss;
95  ss << "BufferedTupleStream num_rows=" << num_rows_ << " rows_returned="
96  << rows_returned_ << " pinned=" << (pinned_ ? "true" : "false")
97  << " delete_on_read=" << (delete_on_read_ ? "true" : "false")
98  << " closed=" << (closed_ ? "true" : "false")
99  << " num_pinned=" << num_pinned_
100  << " write_block=" << write_block_ << " read_block_=";
101  if (read_block_ == blocks_.end()) {
102  ss << "<end>";
103  } else {
104  ss << *read_block_;
105  }
106  ss << " blocks=[\n";
107  for (list<BufferedBlockMgr::Block*>::const_iterator it = blocks_.begin();
108  it != blocks_.end(); ++it) {
109  ss << "{" << (*it)->DebugString() << "}";
110  if (*it != blocks_.back()) ss << ",\n";
111  }
112  ss << "]";
113  return ss.str();
114 }
115 
117  if (profile != NULL) {
118  pin_timer_ = ADD_TIMER(profile, "PinTime");
119  unpin_timer_ = ADD_TIMER(profile, "UnpinTime");
120  get_new_block_timer_ = ADD_TIMER(profile, "GetNewBlockTime");
121  }
122 
124  use_small_buffers_ = false;
125  }
126 
127  bool got_block = false;
129  if (!got_block) return block_mgr_->MemLimitTooLowError(block_mgr_client_);
130  DCHECK(write_block_ != NULL);
132  if (!pinned) RETURN_IF_ERROR(UnpinStream());
133  return Status::OK;
134 }
135 
137  if (!use_small_buffers_) {
138  *got_buffer = (write_block_ != NULL);
139  return Status::OK;
140  }
141  use_small_buffers_ = false;
142  return NewBlockForWrite(block_mgr_->max_block_size(), got_buffer);
143 }
144 
146  for (list<BufferedBlockMgr::Block*>::iterator it = blocks_.begin();
147  it != blocks_.end(); ++it) {
148  (*it)->Delete();
149  }
150  blocks_.clear();
151  num_pinned_ = 0;
152  DCHECK_EQ(num_pinned_, NumPinned(blocks_));
153  closed_ = true;
154 }
155 
156 int64_t BufferedTupleStream::bytes_in_mem(bool ignore_current) const {
157  int64_t result = 0;
158  for (list<BufferedBlockMgr::Block*>::const_iterator it = blocks_.begin();
159  it != blocks_.end(); ++it) {
160  if (!(*it)->is_pinned()) continue;
161  if (!(*it)->is_max_size()) continue;
162  if (*it == write_block_ && ignore_current) continue;
163  result += (*it)->buffer_len();
164  }
165  return result;
166 }
167 
170  DCHECK(block->is_pinned());
171  if (!block->is_max_size()) return Status::OK;
172  RETURN_IF_ERROR(block->Unpin());
173  --num_pinned_;
174  DCHECK_EQ(num_pinned_, NumPinned(blocks_));
175  return Status::OK;
176 }
177 
178 Status BufferedTupleStream::NewBlockForWrite(int min_size, bool* got_block) {
179  DCHECK(!closed_);
180  if (min_size > block_mgr_->max_block_size()) {
181  return Status(Substitute("Cannot process row that is bigger than the IO size "
182  "(row_size=$0). To run this query, increase the IO size (--read_size option).",
183  PrettyPrinter::Print(min_size, TUnit::BYTES)));
184  }
185 
186  BufferedBlockMgr::Block* unpin_block = write_block_;
187  if (write_block_ != NULL) {
188  DCHECK(write_block_->is_pinned());
190  // In these cases, don't unpin the current write block.
191  unpin_block = NULL;
192  }
193  }
194 
195  int64_t block_len = block_mgr_->max_block_size();
196  if (use_small_buffers_) {
197  if (blocks_.size() < NUM_SMALL_BLOCKS) {
198  block_len = min(block_len, INITIAL_BLOCK_SIZES[blocks_.size()]);
199  if (block_len < min_size) block_len = block_mgr_->max_block_size();
200  }
201  if (block_len == block_mgr_->max_block_size()) {
202  // Cannot switch to non small buffers automatically. Don't get a buffer.
203  *got_block = false;
204  return Status::OK;
205  }
206  }
207 
208  BufferedBlockMgr::Block* new_block = NULL;
209  {
212  block_mgr_client_, unpin_block, &new_block, block_len));
213  }
214  *got_block = (new_block != NULL);
215 
216  if (!*got_block) {
217  DCHECK(unpin_block == NULL);
218  return Status::OK;
219  }
220 
221  if (unpin_block != NULL) {
222  DCHECK(unpin_block == write_block_);
223  DCHECK(!write_block_->is_pinned());
224  --num_pinned_;
225  DCHECK_EQ(num_pinned_, NumPinned(blocks_));
226  }
227 
228  // Compute and allocate the block header with the null indicators
230  new_block->Allocate<uint8_t>(null_indicators_write_block_);
231  write_tuple_idx_ = 0;
232 
233  blocks_.push_back(new_block);
234  block_start_idx_.push_back(new_block->buffer());
235  write_block_ = new_block;
236  DCHECK(write_block_->is_pinned());
237  DCHECK_EQ(write_block_->num_rows(), 0);
238  if (write_block_->is_max_size()) {
239  ++num_pinned_;
240  DCHECK_EQ(num_pinned_, NumPinned(blocks_));
241  } else {
243  }
244  total_byte_size_ += block_len;
245  return Status::OK;
246 }
247 
249  DCHECK(!closed_);
250  DCHECK(read_block_ != blocks_.end());
251  DCHECK_EQ(num_pinned_, NumPinned(blocks_)) << pinned_;
252 
253  // If non-NULL, this will be the current block if we are going to free it while
254  // grabbing the next block. This will stay NULL if we don't want to free the
255  // current block.
256  BufferedBlockMgr::Block* block_to_free =
257  (!pinned_ || delete_on_read_) ? *read_block_ : NULL;
258  if (delete_on_read_) {
259  // TODO: this is weird. We are deleting even if it is pinned. The analytic
260  // eval node needs this.
261  DCHECK(read_block_ == blocks_.begin());
262  DCHECK(*read_block_ != write_block_);
263  blocks_.pop_front();
264  read_block_ = blocks_.begin();
265  read_block_idx_ = 0;
266  if (block_to_free != NULL && !block_to_free->is_max_size()) {
267  RETURN_IF_ERROR(block_to_free->Delete());
268  block_to_free = NULL;
269  DCHECK_EQ(num_pinned_, NumPinned(blocks_)) << DebugString();
270  }
271  } else {
272  ++read_block_;
273  ++read_block_idx_;
274  if (block_to_free != NULL && !block_to_free->is_max_size()) block_to_free = NULL;
275  }
276 
277  read_ptr_ = NULL;
278  read_tuple_idx_ = 0;
279  read_bytes_ = 0;
280 
281  bool pinned = false;
282  if (read_block_ == blocks_.end() || (*read_block_)->is_pinned()) {
283  // End of the blocks or already pinned, just handle block_to_free
284  if (block_to_free != NULL) {
286  if (delete_on_read_) {
287  RETURN_IF_ERROR(block_to_free->Delete());
288  --num_pinned_;
289  } else {
290  RETURN_IF_ERROR(UnpinBlock(block_to_free));
291  }
292  }
293  } else {
294  // Call into the block mgr to atomically unpin/delete the old block and pin the
295  // new block.
297  RETURN_IF_ERROR((*read_block_)->Pin(&pinned, block_to_free, !delete_on_read_));
298  if (!pinned) {
299  DCHECK(block_to_free == NULL) << "Should have been able to pin."
301  }
302  if (block_to_free == NULL && pinned) ++num_pinned_;
303  }
304 
305  if (read_block_ != blocks_.end() && (*read_block_)->is_pinned()) {
307  ComputeNumNullIndicatorBytes((*read_block_)->buffer_len());
308  read_ptr_ = (*read_block_)->buffer() + null_indicators_read_block_;
309  }
310  DCHECK_EQ(num_pinned_, NumPinned(blocks_)) << DebugString();
311  return Status::OK;
312 }
313 
315  DCHECK(!closed_);
316  if (blocks_.empty()) return Status::OK;
317 
318  if (!read_write_ && write_block_ != NULL) {
319  DCHECK(write_block_->is_pinned());
320  if (!pinned_ && write_block_ != blocks_.front()) {
322  }
323  write_block_ = NULL;
324  }
325 
326  // Walk the blocks and pin the first non-io sized block.
327  for (list<BufferedBlockMgr::Block*>::iterator it = blocks_.begin();
328  it != blocks_.end(); ++it) {
329  if (!(*it)->is_pinned()) {
331  bool current_pinned;
332  RETURN_IF_ERROR((*it)->Pin(&current_pinned));
333  if (!current_pinned) {
334  DCHECK(got_buffer != NULL) << "Should have reserved enough blocks";
335  *got_buffer = false;
336  return Status::OK;
337  }
338  ++num_pinned_;
339  DCHECK_EQ(num_pinned_, NumPinned(blocks_));
340  }
341  if ((*it)->is_max_size()) break;
342  }
343 
344  read_block_ = blocks_.begin();
345  DCHECK(read_block_ != blocks_.end());
347  ComputeNumNullIndicatorBytes((*read_block_)->buffer_len());
348  read_ptr_ = (*read_block_)->buffer() + null_indicators_read_block_;
349  read_tuple_idx_ = 0;
350  read_bytes_ = 0;
351  rows_returned_ = 0;
352  read_block_idx_ = 0;
353  if (got_buffer != NULL) *got_buffer = true;
354  return Status::OK;
355 }
356 
357 Status BufferedTupleStream::PinStream(bool already_reserved, bool* pinned) {
358  DCHECK(!closed_);
359  DCHECK_NOTNULL(pinned);
360  if (!already_reserved) {
361  // If we can't get all the blocks, don't try at all.
363  *pinned = false;
364  return Status::OK;
365  }
366  }
367 
368  for (list<BufferedBlockMgr::Block*>::iterator it = blocks_.begin();
369  it != blocks_.end(); ++it) {
370  if ((*it)->is_pinned()) continue;
371  {
373  RETURN_IF_ERROR((*it)->Pin(pinned));
374  }
375  VLOG_QUERY << "Should have been reserved." << endl
377  if (!*pinned) return Status::OK;
378  ++num_pinned_;
379  DCHECK_EQ(num_pinned_, NumPinned(blocks_));
380  }
381 
382  if (!delete_on_read_) {
383  // Populate block_start_idx_ on pin.
384  DCHECK_EQ(block_start_idx_.size(), blocks_.size());
385  block_start_idx_.clear();
386  for (list<BufferedBlockMgr::Block*>::iterator it = blocks_.begin();
387  it != blocks_.end(); ++it) {
388  block_start_idx_.push_back((*it)->buffer());
389  }
390  }
391  *pinned = true;
392  pinned_ = true;
393  return Status::OK;
394 }
395 
397  DCHECK(!closed_);
399 
400  BOOST_FOREACH(BufferedBlockMgr::Block* block, blocks_) {
401  if (!block->is_pinned()) continue;
402  if (!all && (block == write_block_ || (read_write_ && block == *read_block_))) {
403  continue;
404  }
405  RETURN_IF_ERROR(UnpinBlock(block));
406  }
407  if (all) {
408  read_block_ = blocks_.end();
409  write_block_ = NULL;
410  }
411  pinned_ = false;
412  return Status::OK;
413 }
414 
416  if (nullable_tuple_) {
417  // We assume that all rows will use their max size, so we may be underutilizing the
418  // space, i.e. we may have some unused space in case of rows with NULL tuples.
419  const uint32_t tuples_per_row = desc_.tuple_descriptors().size();
420  const uint32_t min_row_size_in_bits = 8 * fixed_tuple_row_size_ + tuples_per_row;
421  const uint32_t block_size_in_bits = 8 * block_size;
422  const uint32_t max_num_rows = block_size_in_bits / min_row_size_in_bits;
423  return
424  BitUtil::RoundUpNumi64(max_num_rows * tuples_per_row) * 8;
425  } else {
426  // If there are no nullable tuples then no need to waste space for null indicators.
427  return 0;
428  }
429 }
430 
431 Status BufferedTupleStream::GetRows(scoped_ptr<RowBatch>* batch, bool* got_rows) {
432  RETURN_IF_ERROR(PinStream(false, got_rows));
433  if (!*got_rows) return Status::OK;
435  batch->reset(
437  bool eos = false;
438  // Loop until GetNext fills the entire batch. Each call can stop at block
439  // boundaries. We generally want it to stop, so that blocks can be freed
440  // as we read. It is safe in this case because we pin the entire stream.
441  while (!eos) {
442  RETURN_IF_ERROR(GetNext(batch->get(), &eos));
443  }
444  return Status::OK;
445 }
446 
448  vector<RowIdx>* indices) {
449  if (nullable_tuple_) {
450  return GetNextInternal<true>(batch, eos, indices);
451  } else {
452  return GetNextInternal<false>(batch, eos, indices);
453  }
454 }
455 
456 template <bool HasNullableTuple>
458  vector<RowIdx>* indices) {
459  DCHECK(!closed_);
460  DCHECK(batch->row_desc().Equals(desc_));
461  *eos = (rows_returned_ == num_rows_);
462  if (*eos) return Status::OK;
463  DCHECK_GE(null_indicators_read_block_, 0);
464 
465  const uint64_t tuples_per_row = desc_.tuple_descriptors().size();
466  DCHECK_LE(read_tuple_idx_ / tuples_per_row, (*read_block_)->num_rows());
467  DCHECK_EQ(read_tuple_idx_ % tuples_per_row, 0);
468  int rows_returned_curr_block = read_tuple_idx_ / tuples_per_row;
469 
470  int64_t data_len = (*read_block_)->valid_data_len() - null_indicators_read_block_;
471  if (UNLIKELY(rows_returned_curr_block == (*read_block_)->num_rows())) {
472  // Get the next block in the stream. We need to do this at the beginning of
473  // the GetNext() call to ensure the buffer management semantics. NextBlockForRead()
474  // will recycle the memory for the rows returned from the *previous* call to
475  // GetNext().
477  DCHECK(read_block_ != blocks_.end()) << DebugString();
478  DCHECK_GE(null_indicators_read_block_, 0);
479  data_len = (*read_block_)->valid_data_len() - null_indicators_read_block_;
480  rows_returned_curr_block = 0;
481  }
482 
483  DCHECK(read_block_ != blocks_.end());
484  DCHECK((*read_block_)->is_pinned()) << DebugString();
485  DCHECK(read_ptr_ != NULL);
486 
487  int64_t rows_left = num_rows_ - rows_returned_;
488  int rows_to_fill = std::min(
489  static_cast<int64_t>(batch->capacity() - batch->num_rows()), rows_left);
490  DCHECK_GE(rows_to_fill, 1);
491  batch->AddRows(rows_to_fill);
492  uint8_t* tuple_row_mem = reinterpret_cast<uint8_t*>(batch->GetRow(batch->num_rows()));
493 
494 
495  // Produce tuple rows from the current block and the corresponding position on the
496  // null tuple indicator.
497  vector<RowIdx> local_indices;
498  if (indices == NULL) {
499  // A hack so that we do not need to check whether 'indices' is not null in the
500  // tight loop.
501  indices = &local_indices;
502  } else {
503  DCHECK(is_pinned());
504  DCHECK(!delete_on_read_);
505  DCHECK_EQ(batch->num_rows(), 0);
506  indices->clear();
507  }
508  indices->reserve(rows_to_fill);
509 
510  int i = 0;
511  uint8_t* null_word = NULL;
512  uint32_t null_pos = 0;
513  // Start reading from position read_tuple_idx_ in the block.
514  uint64_t last_read_ptr = 0;
515  uint64_t last_read_row = read_tuple_idx_ / tuples_per_row;
516  while (i < rows_to_fill) {
517  // Check if current block is done.
518  if (UNLIKELY(rows_returned_curr_block + i == (*read_block_)->num_rows())) break;
519 
520  // Copy the row into the output batch.
521  TupleRow* row = reinterpret_cast<TupleRow*>(tuple_row_mem);
522  last_read_ptr = reinterpret_cast<uint64_t>(read_ptr_);
523  indices->push_back(RowIdx());
524  DCHECK_EQ(indices->size(), i + 1);
526  last_read_row);
527  if (HasNullableTuple) {
528  for (int j = 0; j < tuples_per_row; ++j) {
529  // Stitch together the tuples from the block and the NULL ones.
530  null_word = (*read_block_)->buffer() + (read_tuple_idx_ >> 3);
531  null_pos = read_tuple_idx_ & 7;
532  ++read_tuple_idx_;
533  const bool is_not_null = ((*null_word & (1 << (7 - null_pos))) == 0);
534  // Copy tuple and advance read_ptr_. If it it is a NULL tuple, it calls SetTuple
535  // with Tuple* being 0x0. To do that we multiply the current read_ptr_ with
536  // false (0x0).
537  row->SetTuple(j, reinterpret_cast<Tuple*>(
538  reinterpret_cast<uint64_t>(read_ptr_) * is_not_null));
539  read_ptr_ += desc_.tuple_descriptors()[j]->byte_size() * is_not_null;
540  }
541  const uint64_t row_read_bytes =
542  reinterpret_cast<uint64_t>(read_ptr_) - last_read_ptr;
543  DCHECK_GE(fixed_tuple_row_size_, row_read_bytes);
544  read_bytes_ += row_read_bytes;
545  last_read_ptr = reinterpret_cast<uint64_t>(read_ptr_);
546  } else {
547  // When we know that there are no nullable tuples we can safely copy them without
548  // checking for nullability.
549  for (int j = 0; j < tuples_per_row; ++j) {
550  row->SetTuple(j, reinterpret_cast<Tuple*>(read_ptr_));
551  read_ptr_ += desc_.tuple_descriptors()[j]->byte_size();
552  }
554  read_tuple_idx_ += tuples_per_row;
555  }
556  tuple_row_mem += sizeof(Tuple*) * tuples_per_row;
557 
558  // Update string slot ptrs.
559  for (int j = 0; j < string_slots_.size(); ++j) {
560  Tuple* tuple = row->GetTuple(string_slots_[j].first);
561  if (HasNullableTuple && tuple == NULL) continue;
562  DCHECK_NOTNULL(tuple);
563  for (int k = 0; k < string_slots_[j].second.size(); ++k) {
564  const SlotDescriptor* slot_desc = string_slots_[j].second[k];
565  if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
566 
567  StringValue* sv = tuple->GetStringSlot(slot_desc->tuple_offset());
568  DCHECK_LE(sv->len, data_len - read_bytes_);
569  sv->ptr = reinterpret_cast<char*>(read_ptr_);
570  read_ptr_ += sv->len;
571  read_bytes_ += sv->len;
572  }
573  }
574  ++last_read_row;
575  ++i;
576  }
577 
578  batch->CommitRows(i);
579  rows_returned_ += i;
580  *eos = (rows_returned_ == num_rows_);
581  if ((!pinned_ || delete_on_read_) &&
582  rows_returned_curr_block + i == (*read_block_)->num_rows()) {
583  // No more data in this block. Mark this batch as needing to return so
584  // the caller can pass the rows up the operator tree.
585  batch->MarkNeedToReturn();
586  }
587  DCHECK_EQ(indices->size(), i);
588  return Status::OK;
589 }
590 
591 // TODO: Move this somewhere in general. We don't want this function inlined
592 // for the buffered tuple stream case though.
593 // TODO: In case of null-able tuples we ignore the space we could have saved from the
594 // null tuples of this row.
596  int size = fixed_tuple_row_size_;
597  for (int i = 0; i < string_slots_.size(); ++i) {
598  Tuple* tuple = row->GetTuple(string_slots_[i].first);
599  if (nullable_tuple_ && tuple == NULL) continue;
600  DCHECK_NOTNULL(tuple);
601  for (int j = 0; j < string_slots_[i].second.size(); ++j) {
602  const SlotDescriptor* slot_desc = string_slots_[i].second[j];
603  if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
604  StringValue* sv = tuple->GetStringSlot(slot_desc->tuple_offset());
605  size += sv->len;
606  }
607  }
608  return size;
609 }
uint8_t * read_ptr_
Current ptr offset in read_block_'s buffer.
int num_rows() const
Definition: row-batch.h:215
int AddRows(int n)
Definition: row-batch.h:94
int64_t total_byte_size_
Total size of blocks_, including small blocks.
Tuple * GetTuple(int tuple_idx)
Definition: tuple-row.h:30
void Close()
Must be called once at the end to cleanup all resources. Idempotent.
std::vector< uint8_t * > block_start_idx_
std::list< BufferedBlockMgr::Block * >::iterator read_block_
Status MemLimitTooLowError(Client *client)
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
T * Allocate(int size)
Allocates the specified number of bytes from this block.
const RowDescriptor & row_desc() const
Definition: row-batch.h:218
MemTracker * get_tracker(Client *client) const
Status PinStream(bool already_reserved, bool *pinned)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
Status GetNextInternal(RowBatch *batch, bool *eos, std::vector< RowIdx > *indices)
Templated GetNext implementation.
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
Status GetNewBlock(Client *client, Block *unpin_block, Block **block, int64_t len=-1)
#define ADD_TIMER(profile, name)
RuntimeProfile::Counter * unpin_timer_
int64_t max_block_size() const
BufferedTupleStream(RuntimeState *state, const RowDescriptor &row_desc, BufferedBlockMgr *block_mgr, BufferedBlockMgr::Client *client, bool use_initial_small_buffers=true, bool delete_on_read=false, bool read_write=false)
int byte_size() const
Definition: descriptors.h:300
int fixed_tuple_row_size_
Sum of the fixed length portion of all the tuples in desc_.
std::vector< std::pair< int, std::vector< SlotDescriptor * > > > string_slots_
Vector of all the strings slots grouped by tuple_idx.
const RowDescriptor & desc_
Description of rows stored in the stream.
bool use_small_buffers_
If true, this stream is still using small buffers.
const NullIndicatorOffset & null_indicator_offset() const
Definition: descriptors.h:89
#define SCOPED_TIMER(c)
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
bool IsNull(const NullIndicatorOffset &offset) const
Definition: tuple.h:112
const std::vector< SlotDescriptor * > & string_slots() const
Definition: descriptors.h:303
int read_block_idx_
The block index of the current read block.
#define VLOG_QUERY
Definition: logging.h:57
uint32_t write_tuple_idx_
Current idx of the tuple written at the write_block_ buffer.
int ComputeNumNullIndicatorBytes(int block_size) const
Computes the number of bytes needed for null indicators for a block of 'block_size'.
BufferedBlockMgr::Client * block_mgr_client_
uint32_t read_tuple_idx_
Current idx of the tuple read from the read_block_ buffer.
static const int64_t INITIAL_BLOCK_SIZES[]
const bool delete_on_read_
If true, blocks are deleted after they are read.
bool Equals(const RowDescriptor &other_desc) const
Return true if the tuple ids of this descriptor match tuple ids of other desc.
Definition: descriptors.cc:361
const bool nullable_tuple_
Whether any tuple in the rows is nullable.
void MarkNeedToReturn()
Definition: row-batch.h:167
BufferedBlockMgr::Block * write_block_
The current block for writing. NULL if there is no available block to write to.
Status GetRows(boost::scoped_ptr< RowBatch > *batch, bool *got_rows)
const RowDescriptor & row_desc() const
bool TryAcquireTmpReservation(Client *client, int num_buffers)
const std::vector< TupleDescriptor * > & tuple_descriptors() const
Return descriptors for all tuples in this row, in order of appearance.
Definition: descriptors.h:412
Status UnpinBlock(BufferedBlockMgr::Block *block)
Unpins block if it is an io sized block and updates tracking stats.
BufferedBlockMgr * block_mgr_
Block manager and client used to allocate, pin and release blocks. Not owned.
void SetTuple(int tuple_idx, Tuple *tuple)
Definition: tuple-row.h:34
RuntimeProfile::Counter * pin_timer_
Counters added by this object to the parent runtime profile.
int capacity() const
Definition: row-batch.h:216
Status Init(RuntimeProfile *profile=NULL, bool pinned=true)
#define UNLIKELY(expr)
Definition: compiler-util.h:33
StringValue * GetStringSlot(int offset)
Definition: tuple.h:128
Status PrepareForRead(bool *got_buffer=NULL)
static const Status OK
Definition: status.h:87
int64_t read_bytes_
Bytes read in read_block_.
std::list< BufferedBlockMgr::Block * > blocks_
List of blocks in the stream.
uint8_t offset[7 *64-sizeof(uint64_t)]
int64_t num_rows() const
Number of rows in the stream.
int tuple_offset() const
Definition: descriptors.h:88
Status NewBlockForWrite(int min_size, bool *got_block)
std::string DebugString(Client *client=NULL)
Dumps block mgr state. Grabs lock. If client is not NULL, also dumps its state.
RuntimeProfile::Counter * get_new_block_timer_
Status UnpinStream(bool all=false)
Status SwitchToIoBuffers(bool *got_buffer)
static const int NUM_SMALL_BLOCKS
int NumPinned(const list< BufferedBlockMgr::Block * > &blocks)
int num_small_blocks_
The total number of small blocks in blocks_;.
static uint32_t RoundUpNumi64(uint32_t bits)
Returns the rounded up to 64 multiple. Used for conversions of bits to i64.
Definition: bit-util.h:97
int64_t num_rows_
Number of rows stored in the stream.
Status GetNext(RowBatch *batch, bool *eos, std::vector< RowIdx > *indices=NULL)
void CommitRows(int n)
Definition: row-batch.h:102
int ComputeRowSize(TupleRow *row) const
Returns the byte size of this row when encoded in a block.
int64_t rows_returned_
Number of rows returned to the caller from GetNext().
int64_t bytes_in_mem(bool ignore_current) const