18 #include <boost/scoped_ptr.hpp>
26 #include "gen-cpp/Results_types.h"
36 : mem_tracker_(mem_tracker),
37 has_in_flight_row_(false),
40 num_tuples_per_row_(row_desc.tuple_descriptors().size()),
42 auxiliary_mem_usage_(0),
43 need_to_return_(false),
44 tuple_data_pool_(new
MemPool(mem_tracker_)) {
46 DCHECK_GT(capacity, 0);
59 : mem_tracker_(mem_tracker),
60 has_in_flight_row_(false),
61 num_rows_(input_batch.num_rows),
63 num_tuples_per_row_(input_batch.row_tuples.size()),
65 auxiliary_mem_usage_(0),
66 tuple_data_pool_(new
MemPool(mem_tracker)) {
70 if (input_batch.compression_type != THdfsCompression::NONE) {
72 uint8_t* compressed_data = (uint8_t*)input_batch.tuple_data.c_str();
73 size_t compressed_size = input_batch.tuple_data.size();
75 scoped_ptr<Codec> decompressor;
80 int64_t uncompressed_size = input_batch.uncompressed_size;
81 DCHECK_NE(uncompressed_size, -1) <<
"RowBatch decompression failed";
83 status = decompressor->ProcessBlock(
true, compressed_size, compressed_data,
84 &uncompressed_size, &data);
85 DCHECK(status.
ok()) <<
"RowBatch decompression failed.";
86 decompressor->Close();
90 memcpy(data, input_batch.tuple_data.c_str(), input_batch.tuple_data.size());
95 for (vector<int32_t>::const_iterator
offset = input_batch.tuple_offsets.begin();
107 bool has_string_slots =
false;
109 for (
int i = 0; i < tuple_descs.size(); ++i) {
110 if (!tuple_descs[i]->string_slots().empty()) {
111 has_string_slots =
true;
115 if (!has_string_slots)
return;
120 vector<TupleDescriptor*>::const_iterator desc = tuple_descs.begin();
121 for (
int j = 0; desc != tuple_descs.end(); ++desc, ++j) {
122 if ((*desc)->string_slots().empty())
continue;
124 if (t == NULL)
continue;
126 vector<SlotDescriptor*>::const_iterator slot = (*desc)->string_slots().begin();
127 for (; slot != (*desc)->string_slots().end(); ++slot) {
128 DCHECK((*slot)->type().IsVarLen());
131 string_val->ptr =
reinterpret_cast<char*
>(
tuple_data_pool_->GetDataPtr(offset));
149 output_batch->row_tuples.clear();
150 output_batch->tuple_offsets.clear();
151 output_batch->compression_type = THdfsCompression::NONE;
158 output_batch->tuple_data.resize(size);
159 output_batch->uncompressed_size = size;
164 char* tuple_data =
const_cast<char*
>(output_batch->tuple_data.c_str());
168 vector<TupleDescriptor*>::const_iterator desc = tuple_descs.begin();
169 for (
int j = 0; desc != tuple_descs.end(); ++desc, ++j) {
172 output_batch->tuple_offsets.push_back(-1);
176 output_batch->tuple_offsets.push_back(offset);
178 DCHECK_LE(offset, size);
181 DCHECK_EQ(offset, size);
186 scoped_ptr<Codec> compressor;
191 int64_t compressed_size = compressor->MaxOutputLen(size);
195 uint8_t* input = (uint8_t*)output_batch->tuple_data.c_str();
197 compressor->ProcessBlock(
true, size, input, &compressed_size, &compressed_output);
198 if (
LIKELY(compressed_size < size)) {
201 output_batch->compression_type = THdfsCompression::LZ4;
203 VLOG_ROW <<
"uncompressed size: " << size <<
", compressed size: " << compressed_size;
208 return GetBatchSize(*output_batch) - output_batch->tuple_data.size() + size;
212 DCHECK(buffer != NULL);
219 DCHECK(stream != NULL);
265 int result = batch.tuple_data.size();
266 result += batch.row_tuples.size() *
sizeof(TTupleId);
267 result += batch.tuple_offsets.size() *
sizeof(int32_t);
282 for (
int i = 0; i < src->
io_buffers_.size(); ++i) {
308 vector<TupleDescriptor*>::const_iterator desc = tuple_descs.begin();
309 for (
int j = 0; desc != tuple_descs.end(); ++desc, ++j) {
311 if (tuple == NULL)
continue;
312 result += (*desc)->byte_size();
313 vector<SlotDescriptor*>::const_iterator slot = (*desc)->string_slots().begin();
314 for (; slot != (*desc)->string_slots().end(); ++slot) {
315 DCHECK((*slot)->type().IsVarLen());
316 if (tuple->
IsNull((*slot)->null_indicator_offset()))
continue;
318 result += string_val->
len;
332 int tuple_buffer_size = num_rows * row_size;
334 return tuple_buffer_size;
The underlying memory management is done by the BufferedBlockMgr.
const std::string GetDetail() const
RowBatch(const RowDescriptor &row_desc, int capacity, MemTracker *tracker)
static Status CreateCompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *compressor)
Tuple * GetTuple(int tuple_idx)
static Status CreateDecompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *decompressor)
A tuple with 0 materialised slots is represented as NULL.
TupleRow * GetRow(int row_idx)
void AcquireState(RowBatch *src)
std::vector< DiskIoMgr::BufferDescriptor * > io_buffers_
int64_t byte_size() const
Returns the byte size necessary to store the entire stream in memory.
Tuple * DeepCopy(const TupleDescriptor &desc, MemPool *pool, bool convert_ptrs=false)
bool IsNull(const NullIndicatorOffset &offset) const
bool has_in_flight_row_
All members below need to be handled in RowBatch::AcquireState()
void AddTupleStream(BufferedTupleStream *stream)
int64_t auxiliary_mem_usage_
static const int AT_CAPACITY_MEM_USAGE
void Reset()
Resets the row batch, returning all resources it has accumulated.
void AddIoBuffer(DiskIoMgr::BufferDescriptor *buffer)
Add io buffer to this row batch.
void SetMemTracker(MemTracker *tracker)
bool Equals(const RowDescriptor &other_desc) const
Return true if the tuple ids of this descriptor match tuple ids of other desc.
void ToThrift(std::vector< TTupleId > *row_tuple_ids)
Populate row_tuple_ids with our ids.
static int GetBatchSize(const TRowBatch &batch)
Utility function: returns total size of batch.
void TransferResourceOwnership(RowBatch *dest)
MemTracker * mem_tracker_
This class is thread-safe.
std::string compression_scratch_
const RowDescriptor & row_desc() const
const std::vector< TupleDescriptor * > & tuple_descriptors() const
Return descriptors for all tuples in this row, in order of appearance.
int Serialize(TRowBatch *output_batch)
std::vector< BufferedTupleStream * > tuple_streams_
Tuple streams currently owned by this row batch.
StringValue * GetStringSlot(int offset)
const RowDescriptor * row_desc_
owned by plan root, which resides in runtime_state_'s pool
uint8_t offset[7 *64-sizeof(uint64_t)]
int MaxTupleBufferSize()
Computes the maximum size needed to store tuple data for this row batch.
boost::scoped_ptr< MemPool > tuple_data_pool_
holding (some of the) data referenced by rows