Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
impala::RowBatch Class Reference

#include <row-batch.h>

Collaboration diagram for impala::RowBatch:

Public Member Functions

 RowBatch (const RowDescriptor &row_desc, int capacity, MemTracker *tracker)
 
 RowBatch (const RowDescriptor &row_desc, const TRowBatch &input_batch, MemTracker *tracker)
 
 ~RowBatch ()
 
int AddRows (int n)
 
int AddRow ()
 
void CommitRows (int n)
 
void CommitLastRow ()
 
void set_num_rows (int num_rows)
 
bool AtCapacity ()
 
bool AtCapacity (MemPool *tuple_pool)
 
int TotalByteSize ()
 
TupleRowGetRow (int row_idx)
 
int row_byte_size ()
 
MemPooltuple_data_pool ()
 
int num_io_buffers () const
 
int num_tuple_streams () const
 
void Reset ()
 Resets the row batch, returning all resources it has accumulated. More...
 
void AddIoBuffer (DiskIoMgr::BufferDescriptor *buffer)
 Add io buffer to this row batch. More...
 
void AddTupleStream (BufferedTupleStream *stream)
 
void MarkNeedToReturn ()
 
void TransferResourceOwnership (RowBatch *dest)
 
void CopyRow (TupleRow *src, TupleRow *dest)
 
void CopyRows (int dest, int src, int num_rows)
 
void ClearRow (TupleRow *row)
 
void AcquireState (RowBatch *src)
 
int Serialize (TRowBatch *output_batch)
 
int num_rows () const
 
int capacity () const
 
const RowDescriptorrow_desc () const
 
int MaxTupleBufferSize ()
 Computes the maximum size needed to store tuple data for this row batch. More...
 

Static Public Member Functions

static int GetBatchSize (const TRowBatch &batch)
 Utility function: returns total size of batch. More...
 

Static Public Attributes

static const int INVALID_ROW_INDEX = -1
 
static const int AT_CAPACITY_MEM_USAGE = 8 * 1024 * 1024
 

Private Attributes

MemTrackermem_tracker_
 
bool has_in_flight_row_
 All members below need to be handled in RowBatch::AcquireState() More...
 
int num_rows_
 
int capacity_
 
int num_tuples_per_row_
 
RowDescriptor row_desc_
 
Tuple ** tuple_ptrs_
 
int tuple_ptrs_size_
 
int64_t auxiliary_mem_usage_
 
bool need_to_return_
 
boost::scoped_ptr< MemPooltuple_data_pool_
 holding (some of the) data referenced by rows More...
 
std::vector
< DiskIoMgr::BufferDescriptor * > 
io_buffers_
 
std::vector
< BufferedTupleStream * > 
tuple_streams_
 Tuple streams currently owned by this row batch. More...
 
std::string compression_scratch_
 

Detailed Description

A RowBatch encapsulates a batch of rows, each composed of a number of tuples. The maximum number of rows is fixed at the time of construction. The row batch reference a few different sources of memory.

  1. TupleRow ptrs - this is always owned and managed by the row batch.
  2. Tuple memory - this is allocated (or transferred to) the row batches tuple pool.
  3. Auxiliary tuple memory (e.g. string data) - this can either be stored externally (don't copy strings) or from the tuple pool (strings are copied). If external, the data is in an io buffer that may not be attached to this row batch. The creator of that row batch has to make sure that the io buffer is not recycled until all batches that reference the memory have been consumed. In order to minimize memory allocations, RowBatches and TRowBatches that have been serialized and sent over the wire should be reused (this prevents compression_scratch_ from being needlessly reallocated). Row batches and memory usage: We attempt to stream row batches through the plan tree without copying the data. This means that row batches are often not-compact and reference memory outside of the row batch. This results in most row batches having a very small memory footprint and in some row batches having a very large one (it contains all the memory that other row batches are referencing). An example is IoBuffers which are only attached to one row batch. Only when the row batch reaches a blocking operator or the root of the fragment is the row batch memory freed. This means that in some cases (e.g. very selective queries), we still need to pass the row batch through the exec nodes (even if they have no rows) to trigger memory deletion. AtCapacity() encapsulates the check that we are not accumulating excessive memory. A row batch is considered at capacity if all the rows are full or it has accumulated auxiliary memory up to a soft cap. (See at_capacity_mem_usage_ comment).

Definition at line 66 of file row-batch.h.

Constructor & Destructor Documentation

impala::RowBatch::RowBatch ( const RowDescriptor row_desc,
int  capacity,
MemTracker tracker 
)

Create RowBatch for a maximum of 'capacity' rows of tuples specified by 'row_desc'. tracker cannot be NULL.

Definition at line 34 of file row-batch.cc.

References capacity_, mem_tracker_, num_tuples_per_row_, tuple_data_pool_, tuple_ptrs_, and tuple_ptrs_size_.

impala::RowBatch::RowBatch ( const RowDescriptor row_desc,
const TRowBatch &  input_batch,
MemTracker tracker 
)

Populate a row batch from input_batch by copying input_batch's tuple_data into the row batch's mempool and converting all offsets in the data back into pointers. TODO: figure out how to transfer the data from input_batch to this RowBatch (so that we don't need to make yet another copy)

Definition at line 57 of file row-batch.cc.

References impala::Codec::CreateDecompressor(), impala::Status::GetDetail(), GetRow(), impala::Tuple::GetStringSlot(), impala::TupleRow::GetTuple(), mem_tracker_, num_rows_, offset, impala::Status::ok(), row_desc_, tuple_data_pool_, impala::RowDescriptor::tuple_descriptors(), tuple_ptrs_, and tuple_ptrs_size_.

impala::RowBatch::~RowBatch ( )

Releases all resources accumulated at this row batch. This includes

  • tuple_ptrs
  • tuple mem pool data
  • buffer handles from the io mgr

Definition at line 137 of file row-batch.cc.

References io_buffers_, tuple_data_pool_, and tuple_streams_.

Member Function Documentation

void impala::RowBatch::AcquireState ( RowBatch src)

Acquires state from the 'src' row batch into this row batch. This includes all IO buffers and tuple data. This row batch must be empty and have the same row descriptor as the src batch. This is used for scan nodes which produce RowBatches asynchronously. Typically, an ExecNode is handed a row batch to populate (pull model) but ScanNodes have multiple threads which push row batches. TODO: this is wasteful and makes a copy that's unnecessary. Think about cleaning this up. TOOD: rename this or unify with TransferResourceOwnership()

Definition at line 271 of file row-batch.cc.

References auxiliary_mem_usage_, impala::DiskIoMgr::BufferDescriptor::buffer_len(), capacity_, impala::RowDescriptor::Equals(), has_in_flight_row_, io_buffers_, mem_tracker_, need_to_return_, num_rows_, num_tuples_per_row_, row_desc_, impala::DiskIoMgr::BufferDescriptor::SetMemTracker(), tuple_data_pool_, tuple_ptrs_, tuple_ptrs_size_, and tuple_streams_.

Referenced by impala::HdfsScanNode::GetNextInternal().

int impala::RowBatch::AddRows ( int  n)
inline

Add n rows of tuple pointers after the last committed row and return its index. The rows are uninitialized and each tuple of the row must be set. Returns INVALID_ROW_INDEX if the row batch cannot fit n rows. Two consecutive AddRow() calls without a CommitLastRow() between them have the same effect as a single call.

Definition at line 94 of file row-batch.h.

References capacity_, has_in_flight_row_, INVALID_ROW_INDEX, and num_rows_.

Referenced by AddRow(), impala::BufferedTupleStream::GetNextInternal(), impala::CrossJoinNode::ProcessLeftChildBatch(), impala::HashJoinNode::ProcessProbeBatch(), and impala::HdfsScanner::WriteEmptyTuples().

void impala::RowBatch::AddTupleStream ( BufferedTupleStream stream)

Add tuple stream to this row batch. The row batch must call Close() on the stream when freeing resources.

Definition at line 218 of file row-batch.cc.

References auxiliary_mem_usage_, impala::BufferedTupleStream::byte_size(), and tuple_streams_.

Referenced by impala::PartitionedHashJoinNode::Partition::Close().

bool impala::RowBatch::AtCapacity ( MemPool tuple_pool)
inline

Returns true if the row batch has filled all the rows or has accumulated enough memory. tuple_pool is an intermediate memory pool containing tuple data that will eventually be attached to this row batch. We need to make sure the tuple pool does not accumulate excessive memory.

Definition at line 129 of file row-batch.h.

References AT_CAPACITY_MEM_USAGE, AtCapacity(), num_rows_, and impala::MemPool::total_allocated_bytes().

void impala::RowBatch::ClearRow ( TupleRow row)
inline

Definition at line 187 of file row-batch.h.

References num_tuples_per_row_.

Referenced by impala::ExchangeNode::GetNext().

void impala::RowBatch::CopyRows ( int  dest,
int  src,
int  num_rows 
)
inline

Copy 'num_rows' rows from 'src' to 'dest' within the batch. Useful for exec nodes that skip an offset and copied more than necessary.

Definition at line 179 of file row-batch.h.

References capacity_, num_tuples_per_row_, and tuple_ptrs_.

Referenced by impala::SortNode::GetNext(), and impala::ExchangeNode::GetNextMerging().

int impala::RowBatch::GetBatchSize ( const TRowBatch &  batch)
static

Utility function: returns total size of batch.

Definition at line 264 of file row-batch.cc.

Referenced by impala::DataStreamRecvr::SenderQueue::AddBatch(), Serialize(), and impala::DataStreamSender::SerializeBatch().

TupleRow* impala::RowBatch::GetRow ( int  row_idx)
inline

Definition at line 140 of file row-batch.h.

References has_in_flight_row_, num_rows_, num_tuples_per_row_, and tuple_ptrs_.

Referenced by impala::Sorter::Run::AddBatch(), impala::HBaseTableWriter::AppendRowBatch(), impala::HdfsSequenceTableWriter::AppendRowBatch(), impala::HdfsTextTableWriter::AppendRowBatch(), impala::HdfsParquetTableWriter::AppendRowBatch(), impala::HdfsAvroTableWriter::AppendRowBatch(), impala::SelectNode::CopyRows(), impala::SimpleTupleStreamTest::CreateIntBatch(), impala::RowBatchListTest::CreateRowBatch(), impala::DataStreamTest::CreateRowBatch(), impala::SimpleTupleStreamTest::CreateStringBatch(), impala::SortedRunMerger::BatchedRowSupplier::current_row(), impala::UnionNode::EvalAndMaterializeExprs(), impala::HdfsScanner::GetMemory(), impala::HBaseScanNode::GetNext(), impala::TopNNode::GetNext(), impala::ExchangeNode::GetNext(), impala::DataSourceScanNode::GetNext(), impala::HashJoinNode::GetNext(), impala::AggregationNode::GetNext(), impala::SortedRunMerger::GetNext(), impala::Sorter::Run::GetNext(), impala::PartitionedAggregationNode::GetNext(), impala::DataStreamTest::GetNextBatch(), impala::BufferedTupleStream::GetNextInternal(), impala::AnalyticEvalNode::GetNextOutputBatch(), impala::PlanFragmentExecutor::OpenInternal(), impala::PartitionedHashJoinNode::OutputNullAwareNullProbe(), impala::PartitionedHashJoinNode::OutputNullAwareProbeRows(), impala::PartitionedHashJoinNode::OutputUnmatchedBuild(), impala::PrintBatch(), impala::PartitionedAggregationNode::ProcessBatch(), impala::PartitionedAggregationNode::ProcessBatchNoGrouping(), impala::HashJoinNode::ProcessBuildBatch(), impala::PartitionedHashJoinNode::ProcessBuildBatch(), impala::CrossJoinNode::ProcessLeftChildBatch(), impala::HashJoinNode::ProcessProbeBatch(), impala::PartitionedHashJoinNode::ProcessProbeBatch(), impala::AggregationNode::ProcessRowBatchNoGrouping(), impala::AggregationNode::ProcessRowBatchWithGrouping(), impala::DataStreamTest::ReadStream(), impala::DataStreamTest::ReadStreamMerging(), impala::SimpleTupleStreamTest::ReadValues(), RowBatch(), impala::DataStreamSender::Send(), impala::HdfsTableSink::Send(), Serialize(), impala::TEST_F(), impala::SimpleTupleStreamTest::TestIntValuesInterleaved(), impala::SimpleTupleStreamTest::TestValues(), TotalByteSize(), and impala::HdfsScanner::WriteEmptyTuples().

void impala::RowBatch::MarkNeedToReturn ( )
inline

Called to indicate this row batch must be returned up the operator tree. This is used to control memory management for streaming rows. TODO: consider using this mechanism instead of AddIoBuffer/AddTupleStream. This is the property we need rather than meticulously passing resources up so the operator tree.

Definition at line 167 of file row-batch.h.

References need_to_return_.

Referenced by impala::PartitionedAggregationNode::GetNext(), and impala::BufferedTupleStream::GetNextInternal().

int impala::RowBatch::MaxTupleBufferSize ( )

Computes the maximum size needed to store tuple data for this row batch.

Definition at line 325 of file row-batch.cc.

References AT_CAPACITY_MEM_USAGE, capacity_, impala::RowDescriptor::GetRowSize(), num_rows(), and row_desc_.

Referenced by impala::UnionNode::GetNext(), impala::HBaseScanNode::GetNext(), and impala::DataSourceScanNode::GetNext().

int impala::RowBatch::num_io_buffers ( ) const
inline
int impala::RowBatch::num_rows ( ) const
inline

Definition at line 215 of file row-batch.h.

References num_rows_.

Referenced by impala::Sorter::Run::AddBatch(), impala::RowBatchList::AddRowBatch(), impala::HBaseTableWriter::AppendRowBatch(), impala::HdfsSequenceTableWriter::AppendRowBatch(), impala::HdfsTextTableWriter::AppendRowBatch(), impala::HdfsParquetTableWriter::AppendRowBatch(), impala::HdfsAvroTableWriter::AppendRowBatch(), impala::HdfsScanner::CommitRows(), impala::HdfsScanner::GetMemory(), impala::SortNode::GetNext(), impala::CrossJoinNode::GetNext(), impala::PlanFragmentExecutor::GetNext(), impala::BufferedTupleStream::GetNextInternal(), impala::HdfsScanNode::GetNextInternal(), impala::ExchangeNode::GetNextMerging(), impala::HashJoinNode::LeftJoinGetNext(), MaxTupleBufferSize(), impala::SortedRunMerger::BatchedRowSupplier::Next(), impala::PlanFragmentExecutor::OpenInternal(), impala::PartitionedHashJoinNode::OutputUnmatchedBuild(), impala::PrintBatch(), impala::PartitionedAggregationNode::ProcessBatch(), impala::PartitionedAggregationNode::ProcessBatchNoGrouping(), impala::HashJoinNode::ProcessBuildBatch(), impala::PartitionedHashJoinNode::ProcessBuildBatch(), impala::CrossJoinNode::ProcessLeftChildBatch(), impala::HashJoinNode::ProcessProbeBatch(), impala::PartitionedHashJoinNode::ProcessProbeBatch(), impala::AggregationNode::ProcessRowBatchNoGrouping(), impala::AggregationNode::ProcessRowBatchWithGrouping(), impala::DataStreamTest::ReadStream(), impala::DataStreamTest::ReadStreamMerging(), impala::SimpleTupleStreamTest::ReadValues(), impala::HBaseTableSink::Send(), impala::DataStreamSender::Send(), impala::HdfsTableSink::Send(), impala::DataStreamSender::SerializeBatch(), set_num_rows(), impala::TEST_F(), impala::SimpleTupleStreamTest::TestIntValuesInterleaved(), impala::SimpleTupleStreamTest::TestValues(), and impala::HdfsScanner::WriteEmptyTuples().

int impala::RowBatch::num_tuple_streams ( ) const
inline

Definition at line 150 of file row-batch.h.

References tuple_streams_.

Referenced by AtCapacity().

const RowDescriptor& impala::RowBatch::row_desc ( ) const
inline
int impala::RowBatch::Serialize ( TRowBatch *  output_batch)

Create a serialized version of this row batch in output_batch, attaching all of the data it references to output_batch.tuple_data. output_batch.tuple_data will be snappy-compressed unless the compressed data is larger than the uncompressed data. Use output_batch.is_compressed to determine whether tuple_data is compressed. If an in-flight row is present in this row batch, it is ignored. This function does not Reset(). Returns the uncompressed serialized size (this will be the true size of output_batch if tuple_data is actually uncompressed).

Definition at line 147 of file row-batch.cc.

References compression_scratch_, impala::Codec::CreateCompressor(), impala::Tuple::DeepCopy(), GetBatchSize(), impala::Status::GetDetail(), GetRow(), impala::TupleRow::GetTuple(), LIKELY, num_rows_, num_tuples_per_row_, offset, impala::Status::ok(), row_desc_, TotalByteSize(), impala::RowDescriptor::ToThrift(), impala::RowDescriptor::tuple_descriptors(), and VLOG_ROW.

Referenced by impala::DataStreamSender::SerializeBatch().

void impala::RowBatch::set_num_rows ( int  num_rows)
inline

Set function can be used to reduce the number of rows in the batch. This is only used in the limit case where more rows were added than necessary.

Definition at line 113 of file row-batch.h.

References num_rows(), and num_rows_.

Referenced by impala::SortNode::GetNext(), impala::HdfsScanNode::GetNextInternal(), and impala::ExchangeNode::GetNextMerging().

int impala::RowBatch::TotalByteSize ( )

The total size of all data represented in this row batch (tuples and referenced string data). This is the size of the row batch after removing all gaps in the auxiliary (i.e. the smallest footprint for the row batch).

Definition at line 303 of file row-batch.cc.

References GetRow(), impala::Tuple::GetStringSlot(), impala::TupleRow::GetTuple(), impala::Tuple::IsNull(), impala::StringValue::len, num_rows_, row_desc_, and impala::RowDescriptor::tuple_descriptors().

Referenced by Serialize().

void impala::RowBatch::TransferResourceOwnership ( RowBatch dest)

Member Data Documentation

const int impala::RowBatch::AT_CAPACITY_MEM_USAGE = 8 * 1024 * 1024
static

Max memory that this row batch can accumulate in tuple_data_pool_ before it is considered at capacity.

Definition at line 222 of file row-batch.h.

Referenced by AtCapacity(), impala::Sorter::EstimateMergeMem(), and MaxTupleBufferSize().

int64_t impala::RowBatch::auxiliary_mem_usage_
private

Sum of all auxiliary bytes. This includes IoBuffers and memory from TransferResourceOwnership().

Definition at line 246 of file row-batch.h.

Referenced by AcquireState(), AddIoBuffer(), AddTupleStream(), AtCapacity(), Reset(), and TransferResourceOwnership().

int impala::RowBatch::capacity_
private
std::string impala::RowBatch::compression_scratch_
private

String to write compressed tuple data to in Serialize(). This is a string so we can swap() with the string in the TRowBatch we're serializing to (we don't compress directly into the TRowBatch in case the compressed data is longer than the uncompressed data). Swapping avoids copying data to the TRowBatch and avoids excess memory allocations: since we reuse RowBatchs and TRowBatchs, and assuming all row batches are roughly the same size, all strings will eventually be allocated to the right size.

Definition at line 270 of file row-batch.h.

Referenced by Serialize().

bool impala::RowBatch::has_in_flight_row_
private

All members below need to be handled in RowBatch::AcquireState()

Definition at line 232 of file row-batch.h.

Referenced by AcquireState(), AddRows(), CommitRows(), GetRow(), and Reset().

std::vector<DiskIoMgr::BufferDescriptor*> impala::RowBatch::io_buffers_
private

IO buffers current owned by this row batch. Ownership of IO buffers transfer between row batches. Any IO buffer will be owned by at most one row batch (i.e. they are not ref counted) so most row batches don't own any.

Definition at line 258 of file row-batch.h.

Referenced by AcquireState(), AddIoBuffer(), num_io_buffers(), Reset(), TransferResourceOwnership(), and ~RowBatch().

MemTracker* impala::RowBatch::mem_tracker_
private

Definition at line 228 of file row-batch.h.

Referenced by AcquireState(), AddIoBuffer(), Reset(), RowBatch(), and TransferResourceOwnership().

bool impala::RowBatch::need_to_return_
private

If true, this batch is considered at capacity. This is explicitly set by streaming components that return rows via row batches.

Definition at line 250 of file row-batch.h.

Referenced by AcquireState(), AtCapacity(), MarkNeedToReturn(), Reset(), and TransferResourceOwnership().

int impala::RowBatch::num_rows_
private
int impala::RowBatch::num_tuples_per_row_
private
RowDescriptor impala::RowBatch::row_desc_
private
boost::scoped_ptr<MemPool> impala::RowBatch::tuple_data_pool_
private

holding (some of the) data referenced by rows

Definition at line 253 of file row-batch.h.

Referenced by AcquireState(), Reset(), RowBatch(), TransferResourceOwnership(), tuple_data_pool(), and ~RowBatch().

Tuple** impala::RowBatch::tuple_ptrs_
private

array of pointers (w/ capacity_ * num_tuples_per_row_ elements) TODO: replace w/ tr1 array?

Definition at line 241 of file row-batch.h.

Referenced by AcquireState(), CopyRows(), GetRow(), Reset(), RowBatch(), and TransferResourceOwnership().

int impala::RowBatch::tuple_ptrs_size_
private

Definition at line 242 of file row-batch.h.

Referenced by AcquireState(), Reset(), and RowBatch().

std::vector<BufferedTupleStream*> impala::RowBatch::tuple_streams_
private

Tuple streams currently owned by this row batch.

Definition at line 261 of file row-batch.h.

Referenced by AcquireState(), AddTupleStream(), num_tuple_streams(), Reset(), TransferResourceOwnership(), and ~RowBatch().


The documentation for this class was generated from the following files: