17 #include <boost/bind.hpp>
18 #include <gutil/strings/substitute.h>
28 using namespace impala;
29 using namespace strings;
34 { 64 * 1024, 512 * 1024 };
39 ss <<
"RowIdx block=" << block() <<
" offset=" <<
offset() <<
" idx=" <<
idx();
46 bool delete_on_read,
bool read_write)
47 : use_small_buffers_(use_initial_small_buffers),
48 delete_on_read_(delete_on_read),
49 read_write_(read_write),
52 nullable_tuple_(row_desc.IsAnyTupleNullable()),
53 block_mgr_(block_mgr),
54 block_mgr_client_(client),
69 get_new_block_timer_(NULL) {
75 const int tuple_byte_size = tuple_desc->
byte_size();
84 int NumPinned(
const list<BufferedBlockMgr::Block*>& blocks) {
86 for (list<BufferedBlockMgr::Block*>::const_iterator it = blocks.begin();
87 it != blocks.end(); ++it) {
88 if ((*it)->is_pinned() && (*it)->is_max_size()) ++num_pinned;
95 ss <<
"BufferedTupleStream num_rows=" <<
num_rows_ <<
" rows_returned="
98 <<
" closed=" << (
closed_ ?
"true" :
"false")
107 for (list<BufferedBlockMgr::Block*>::const_iterator it =
blocks_.begin();
109 ss <<
"{" << (*it)->DebugString() <<
"}";
110 if (*it !=
blocks_.back()) ss <<
",\n";
117 if (profile != NULL) {
127 bool got_block =
false;
146 for (list<BufferedBlockMgr::Block*>::iterator it =
blocks_.begin();
158 for (list<BufferedBlockMgr::Block*>::const_iterator it =
blocks_.begin();
160 if (!(*it)->is_pinned())
continue;
161 if (!(*it)->is_max_size())
continue;
163 result += (*it)->buffer_len();
181 return Status(Substitute(
"Cannot process row that is bigger than the IO size "
182 "(row_size=$0). To run this query, increase the IO size (--read_size option).",
214 *got_block = (new_block != NULL);
217 DCHECK(unpin_block == NULL);
221 if (unpin_block != NULL) {
266 if (block_to_free != NULL && !block_to_free->
is_max_size()) {
268 block_to_free = NULL;
274 if (block_to_free != NULL && !block_to_free->
is_max_size()) block_to_free = NULL;
284 if (block_to_free != NULL) {
299 DCHECK(block_to_free == NULL) <<
"Should have been able to pin."
302 if (block_to_free == NULL && pinned) ++
num_pinned_;
327 for (list<BufferedBlockMgr::Block*>::iterator it =
blocks_.begin();
329 if (!(*it)->is_pinned()) {
333 if (!current_pinned) {
334 DCHECK(got_buffer != NULL) <<
"Should have reserved enough blocks";
341 if ((*it)->is_max_size())
break;
353 if (got_buffer != NULL) *got_buffer =
true;
359 DCHECK_NOTNULL(pinned);
360 if (!already_reserved) {
368 for (list<BufferedBlockMgr::Block*>::iterator it =
blocks_.begin();
370 if ((*it)->is_pinned())
continue;
375 VLOG_QUERY <<
"Should have been reserved." << endl
386 for (list<BufferedBlockMgr::Block*>::iterator it =
blocks_.begin();
421 const uint32_t block_size_in_bits = 8 * block_size;
422 const uint32_t max_num_rows = block_size_in_bits / min_row_size_in_bits;
448 vector<RowIdx>* indices) {
450 return GetNextInternal<true>(batch, eos, indices);
452 return GetNextInternal<false>(batch, eos, indices);
456 template <
bool HasNullableTuple>
458 vector<RowIdx>* indices) {
466 DCHECK_LE(
read_tuple_idx_ / tuples_per_row, (*read_block_)->num_rows());
471 if (
UNLIKELY(rows_returned_curr_block == (*read_block_)->num_rows())) {
480 rows_returned_curr_block = 0;
484 DCHECK((*read_block_)->is_pinned()) <<
DebugString();
488 int rows_to_fill = std::min(
490 DCHECK_GE(rows_to_fill, 1);
492 uint8_t* tuple_row_mem =
reinterpret_cast<uint8_t*
>(batch->
GetRow(batch->
num_rows()));
497 vector<RowIdx> local_indices;
498 if (indices == NULL) {
501 indices = &local_indices;
508 indices->reserve(rows_to_fill);
511 uint8_t* null_word = NULL;
512 uint32_t null_pos = 0;
516 while (i < rows_to_fill) {
518 if (
UNLIKELY(rows_returned_curr_block + i == (*read_block_)->num_rows()))
break;
523 indices->push_back(
RowIdx());
524 DCHECK_EQ(indices->size(), i + 1);
527 if (HasNullableTuple) {
528 for (
int j = 0; j < tuples_per_row; ++j) {
533 const bool is_not_null = ((*null_word & (1 << (7 - null_pos))) == 0);
537 row->
SetTuple(j, reinterpret_cast<Tuple*>(
538 reinterpret_cast<uint64_t>(
read_ptr_) * is_not_null));
549 for (
int j = 0; j < tuples_per_row; ++j) {
556 tuple_row_mem +=
sizeof(
Tuple*) * tuples_per_row;
561 if (HasNullableTuple && tuple == NULL)
continue;
562 DCHECK_NOTNULL(tuple);
582 rows_returned_curr_block + i == (*read_block_)->num_rows()) {
587 DCHECK_EQ(indices->size(), i);
600 DCHECK_NOTNULL(tuple);
uint8_t * read_ptr_
Current ptr offset in read_block_'s buffer.
uint32_t null_indicators_read_block_
int64_t total_byte_size_
Total size of blocks_, including small blocks.
Tuple * GetTuple(int tuple_idx)
void Close()
Must be called once at the end to cleanup all resources. Idempotent.
std::vector< uint8_t * > block_start_idx_
std::list< BufferedBlockMgr::Block * >::iterator read_block_
Status MemLimitTooLowError(Client *client)
std::string DebugString() const
A tuple with 0 materialised slots is represented as NULL.
T * Allocate(int size)
Allocates the specified number of bytes from this block.
const RowDescriptor & row_desc() const
MemTracker * get_tracker(Client *client) const
Status PinStream(bool already_reserved, bool *pinned)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Status GetNextInternal(RowBatch *batch, bool *eos, std::vector< RowIdx > *indices)
Templated GetNext implementation.
TupleRow * GetRow(int row_idx)
Status GetNewBlock(Client *client, Block *unpin_block, Block **block, int64_t len=-1)
#define ADD_TIMER(profile, name)
RuntimeProfile::Counter * unpin_timer_
int64_t max_block_size() const
BufferedTupleStream(RuntimeState *state, const RowDescriptor &row_desc, BufferedBlockMgr *block_mgr, BufferedBlockMgr::Client *client, bool use_initial_small_buffers=true, bool delete_on_read=false, bool read_write=false)
int fixed_tuple_row_size_
Sum of the fixed length portion of all the tuples in desc_.
Status NextBlockForRead()
std::vector< std::pair< int, std::vector< SlotDescriptor * > > > string_slots_
Vector of all the strings slots grouped by tuple_idx.
const RowDescriptor & desc_
Description of rows stored in the stream.
bool use_small_buffers_
If true, this stream is still using small buffers.
const NullIndicatorOffset & null_indicator_offset() const
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
bool IsNull(const NullIndicatorOffset &offset) const
const std::vector< SlotDescriptor * > & string_slots() const
int read_block_idx_
The block index of the current read block.
int blocks_unpinned() const
uint32_t write_tuple_idx_
Current idx of the tuple written at the write_block_ buffer.
int ComputeNumNullIndicatorBytes(int block_size) const
Computes the number of bytes needed for null indicators for a block of 'block_size'.
BufferedBlockMgr::Client * block_mgr_client_
uint32_t read_tuple_idx_
Current idx of the tuple read from the read_block_ buffer.
static const int64_t INITIAL_BLOCK_SIZES[]
const bool delete_on_read_
If true, blocks are deleted after they are read.
bool Equals(const RowDescriptor &other_desc) const
Return true if the tuple ids of this descriptor match tuple ids of other desc.
const bool nullable_tuple_
Whether any tuple in the rows is nullable.
BufferedBlockMgr::Block * write_block_
The current block for writing. NULL if there is no available block to write to.
Status GetRows(boost::scoped_ptr< RowBatch > *batch, bool *got_rows)
const RowDescriptor & row_desc() const
bool TryAcquireTmpReservation(Client *client, int num_buffers)
const std::vector< TupleDescriptor * > & tuple_descriptors() const
Return descriptors for all tuples in this row, in order of appearance.
Status UnpinBlock(BufferedBlockMgr::Block *block)
Unpins block if it is an io sized block and updates tracking stats.
BufferedBlockMgr * block_mgr_
Block manager and client used to allocate, pin and release blocks. Not owned.
void SetTuple(int tuple_idx, Tuple *tuple)
RuntimeProfile::Counter * pin_timer_
Counters added by this object to the parent runtime profile.
uint32_t null_indicators_write_block_
Status Init(RuntimeProfile *profile=NULL, bool pinned=true)
StringValue * GetStringSlot(int offset)
std::string DebugString() const
Status PrepareForRead(bool *got_buffer=NULL)
int64_t read_bytes_
Bytes read in read_block_.
std::list< BufferedBlockMgr::Block * > blocks_
List of blocks in the stream.
uint8_t offset[7 *64-sizeof(uint64_t)]
int64_t num_rows() const
Number of rows in the stream.
Status NewBlockForWrite(int min_size, bool *got_block)
std::string DebugString(Client *client=NULL)
Dumps block mgr state. Grabs lock. If client is not NULL, also dumps its state.
RuntimeProfile::Counter * get_new_block_timer_
Status UnpinStream(bool all=false)
Status SwitchToIoBuffers(bool *got_buffer)
static const int NUM_SMALL_BLOCKS
int NumPinned(const list< BufferedBlockMgr::Block * > &blocks)
int num_small_blocks_
The total number of small blocks in blocks_;.
static uint32_t RoundUpNumi64(uint32_t bits)
Returns the rounded up to 64 multiple. Used for conversions of bits to i64.
int64_t num_rows_
Number of rows stored in the stream.
Status GetNext(RowBatch *batch, bool *eos, std::vector< RowIdx > *indices=NULL)
int ComputeRowSize(TupleRow *row) const
Returns the byte size of this row when encoded in a block.
int64_t rows_returned_
Number of rows returned to the caller from GetNext().
int64_t bytes_in_mem(bool ignore_current) const