Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
row-batch.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "runtime/row-batch.h"
16 
17 #include <stdint.h> // for intptr_t
18 #include <boost/scoped_ptr.hpp>
19 
21 #include "runtime/mem-tracker.h"
22 #include "runtime/string-value.h"
23 #include "runtime/tuple-row.h"
24 #include "util/compress.h"
25 #include "util/decompress.h"
26 #include "gen-cpp/Results_types.h"
27 
28 #include "common/names.h"
29 
30 namespace impala {
31 
32 const int RowBatch::AT_CAPACITY_MEM_USAGE = 8 * 1024 * 1024;
33 
35  MemTracker* mem_tracker)
36  : mem_tracker_(mem_tracker),
37  has_in_flight_row_(false),
38  num_rows_(0),
39  capacity_(capacity),
40  num_tuples_per_row_(row_desc.tuple_descriptors().size()),
41  row_desc_(row_desc),
42  auxiliary_mem_usage_(0),
43  need_to_return_(false),
44  tuple_data_pool_(new MemPool(mem_tracker_)) {
45  DCHECK(mem_tracker_ != NULL);
46  DCHECK_GT(capacity, 0);
48  tuple_ptrs_ = reinterpret_cast<Tuple**>(tuple_data_pool_->Allocate(tuple_ptrs_size_));
49 }
50 
51 // TODO: we want our input_batch's tuple_data to come from our (not yet implemented)
52 // global runtime memory segment; how do we get thrift to allocate it from there?
53 // maybe change line (in Data_types.cc generated from Data.thrift)
54 // xfer += iprot->readString(this->tuple_data[_i9]);
55 // to allocated string data in special mempool
56 // (change via python script that runs over Data_types.cc)
57 RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
58  MemTracker* mem_tracker)
59  : mem_tracker_(mem_tracker),
60  has_in_flight_row_(false),
61  num_rows_(input_batch.num_rows),
62  capacity_(num_rows_),
63  num_tuples_per_row_(input_batch.row_tuples.size()),
64  row_desc_(row_desc),
65  auxiliary_mem_usage_(0),
66  tuple_data_pool_(new MemPool(mem_tracker)) {
67  DCHECK(mem_tracker_ != NULL);
68  tuple_ptrs_size_ = num_rows_ * input_batch.row_tuples.size() * sizeof(Tuple*);
69  tuple_ptrs_ = reinterpret_cast<Tuple**>(tuple_data_pool_->Allocate(tuple_ptrs_size_));
70  if (input_batch.compression_type != THdfsCompression::NONE) {
71  // Decompress tuple data into data pool
72  uint8_t* compressed_data = (uint8_t*)input_batch.tuple_data.c_str();
73  size_t compressed_size = input_batch.tuple_data.size();
74 
75  scoped_ptr<Codec> decompressor;
76  Status status = Codec::CreateDecompressor(NULL, false, input_batch.compression_type,
77  &decompressor);
78  DCHECK(status.ok()) << status.GetDetail();
79 
80  int64_t uncompressed_size = input_batch.uncompressed_size;
81  DCHECK_NE(uncompressed_size, -1) << "RowBatch decompression failed";
82  uint8_t* data = tuple_data_pool_->Allocate(uncompressed_size);
83  status = decompressor->ProcessBlock(true, compressed_size, compressed_data,
84  &uncompressed_size, &data);
85  DCHECK(status.ok()) << "RowBatch decompression failed.";
86  decompressor->Close();
87  } else {
88  // Tuple data uncompressed, copy directly into data pool
89  uint8_t* data = tuple_data_pool_->Allocate(input_batch.tuple_data.size());
90  memcpy(data, input_batch.tuple_data.c_str(), input_batch.tuple_data.size());
91  }
92 
93  // convert input_batch.tuple_offsets into pointers
94  int tuple_idx = 0;
95  for (vector<int32_t>::const_iterator offset = input_batch.tuple_offsets.begin();
96  offset != input_batch.tuple_offsets.end(); ++offset) {
97  if (*offset == -1) {
98  tuple_ptrs_[tuple_idx++] = NULL;
99  } else {
100  tuple_ptrs_[tuple_idx++] = reinterpret_cast<Tuple*>(
101  tuple_data_pool_->GetDataPtr(*offset + tuple_ptrs_size_));
102  }
103  }
104 
105  // check whether we have string slots
106  // TODO: do that during setup (part of RowDescriptor c'tor?)
107  bool has_string_slots = false;
108  const vector<TupleDescriptor*>& tuple_descs = row_desc_.tuple_descriptors();
109  for (int i = 0; i < tuple_descs.size(); ++i) {
110  if (!tuple_descs[i]->string_slots().empty()) {
111  has_string_slots = true;
112  break;
113  }
114  }
115  if (!has_string_slots) return;
116 
117  // convert string offsets contained in tuple data into pointers
118  for (int i = 0; i < num_rows_; ++i) {
119  TupleRow* row = GetRow(i);
120  vector<TupleDescriptor*>::const_iterator desc = tuple_descs.begin();
121  for (int j = 0; desc != tuple_descs.end(); ++desc, ++j) {
122  if ((*desc)->string_slots().empty()) continue;
123  Tuple* t = row->GetTuple(j);
124  if (t == NULL) continue;
125 
126  vector<SlotDescriptor*>::const_iterator slot = (*desc)->string_slots().begin();
127  for (; slot != (*desc)->string_slots().end(); ++slot) {
128  DCHECK((*slot)->type().IsVarLen());
129  StringValue* string_val = t->GetStringSlot((*slot)->tuple_offset());
130  int offset = reinterpret_cast<intptr_t>(string_val->ptr) + tuple_ptrs_size_;
131  string_val->ptr = reinterpret_cast<char*>(tuple_data_pool_->GetDataPtr(offset));
132  }
133  }
134  }
135 }
136 
138  tuple_data_pool_->FreeAll();
139  for (int i = 0; i < io_buffers_.size(); ++i) {
140  io_buffers_[i]->Return();
141  }
142  for (int i = 0; i < tuple_streams_.size(); ++i) {
143  tuple_streams_[i]->Close();
144  }
145 }
146 
147 int RowBatch::Serialize(TRowBatch* output_batch) {
148  // why does Thrift not generate a Clear() function?
149  output_batch->row_tuples.clear();
150  output_batch->tuple_offsets.clear();
151  output_batch->compression_type = THdfsCompression::NONE;
152 
153  output_batch->num_rows = num_rows_;
154  row_desc_.ToThrift(&output_batch->row_tuples);
155  output_batch->tuple_offsets.reserve(num_rows_ * num_tuples_per_row_);
156 
157  int size = TotalByteSize();
158  output_batch->tuple_data.resize(size);
159  output_batch->uncompressed_size = size;
160 
161  // Copy tuple data, including strings, into output_batch (converting string
162  // pointers into offsets in the process)
163  int offset = 0; // current offset into output_batch->tuple_data
164  char* tuple_data = const_cast<char*>(output_batch->tuple_data.c_str());
165  for (int i = 0; i < num_rows_; ++i) {
166  TupleRow* row = GetRow(i);
167  const vector<TupleDescriptor*>& tuple_descs = row_desc_.tuple_descriptors();
168  vector<TupleDescriptor*>::const_iterator desc = tuple_descs.begin();
169  for (int j = 0; desc != tuple_descs.end(); ++desc, ++j) {
170  if (row->GetTuple(j) == NULL) {
171  // NULLs are encoded as -1
172  output_batch->tuple_offsets.push_back(-1);
173  continue;
174  }
175  // Record offset before creating copy (which increments offset and tuple_data)
176  output_batch->tuple_offsets.push_back(offset);
177  row->GetTuple(j)->DeepCopy(**desc, &tuple_data, &offset, /* convert_ptrs */ true);
178  DCHECK_LE(offset, size);
179  }
180  }
181  DCHECK_EQ(offset, size);
182 
183  if (size > 0) {
184  // Try compressing tuple_data to compression_scratch_, swap if compressed data is
185  // smaller
186  scoped_ptr<Codec> compressor;
187  Status status = Codec::CreateCompressor(NULL, false, THdfsCompression::LZ4,
188  &compressor);
189  DCHECK(status.ok()) << status.GetDetail();
190 
191  int64_t compressed_size = compressor->MaxOutputLen(size);
192  if (compression_scratch_.size() < compressed_size) {
193  compression_scratch_.resize(compressed_size);
194  }
195  uint8_t* input = (uint8_t*)output_batch->tuple_data.c_str();
196  uint8_t* compressed_output = (uint8_t*)compression_scratch_.c_str();
197  compressor->ProcessBlock(true, size, input, &compressed_size, &compressed_output);
198  if (LIKELY(compressed_size < size)) {
199  compression_scratch_.resize(compressed_size);
200  output_batch->tuple_data.swap(compression_scratch_);
201  output_batch->compression_type = THdfsCompression::LZ4;
202  }
203  VLOG_ROW << "uncompressed size: " << size << ", compressed size: " << compressed_size;
204  }
205 
206  // The size output_batch would be if we didn't compress tuple_data (will be equal to
207  // actual batch size if tuple_data isn't compressed)
208  return GetBatchSize(*output_batch) - output_batch->tuple_data.size() + size;
209 }
210 
212  DCHECK(buffer != NULL);
213  io_buffers_.push_back(buffer);
214  auxiliary_mem_usage_ += buffer->buffer_len();
215  buffer->SetMemTracker(mem_tracker_);
216 }
217 
219  DCHECK(stream != NULL);
220  tuple_streams_.push_back(stream);
221  auxiliary_mem_usage_ += stream->byte_size();
222 }
223 
225  DCHECK(tuple_data_pool_.get() != NULL);
226  num_rows_ = 0;
227  has_in_flight_row_ = false;
228  tuple_data_pool_->FreeAll();
230  for (int i = 0; i < io_buffers_.size(); ++i) {
231  io_buffers_[i]->Return();
232  }
233  io_buffers_.clear();
234  for (int i = 0; i < tuple_streams_.size(); ++i) {
235  tuple_streams_[i]->Close();
236  }
237  tuple_streams_.clear();
239  tuple_ptrs_ = reinterpret_cast<Tuple**>(tuple_data_pool_->Allocate(tuple_ptrs_size_));
240  need_to_return_ = false;
241 }
242 
244  dest->auxiliary_mem_usage_ += tuple_data_pool_->total_allocated_bytes();
245  dest->tuple_data_pool_->AcquireData(tuple_data_pool_.get(), false);
246  for (int i = 0; i < io_buffers_.size(); ++i) {
248  dest->io_buffers_.push_back(buffer);
249  dest->auxiliary_mem_usage_ += buffer->buffer_len();
250  buffer->SetMemTracker(dest->mem_tracker_);
251  }
252  io_buffers_.clear();
253  for (int i = 0; i < tuple_streams_.size(); ++i) {
254  dest->tuple_streams_.push_back(tuple_streams_[i]);
255  dest->auxiliary_mem_usage_ += tuple_streams_[i]->byte_size();
256  }
257  tuple_streams_.clear();
260  tuple_ptrs_ = NULL;
261  Reset();
262 }
263 
264 int RowBatch::GetBatchSize(const TRowBatch& batch) {
265  int result = batch.tuple_data.size();
266  result += batch.row_tuples.size() * sizeof(TTupleId);
267  result += batch.tuple_offsets.size() * sizeof(int32_t);
268  return result;
269 }
270 
272  DCHECK(row_desc_.Equals(src->row_desc_));
273  DCHECK_EQ(num_tuples_per_row_, src->num_tuples_per_row_);
274  DCHECK_EQ(tuple_ptrs_size_, src->tuple_ptrs_size_);
275  DCHECK_EQ(capacity_, src->capacity_);
276  DCHECK_EQ(auxiliary_mem_usage_, 0);
277 
278  // The destination row batch should be empty.
279  DCHECK(!has_in_flight_row_);
280  DCHECK_EQ(num_rows_, 0);
281 
282  for (int i = 0; i < src->io_buffers_.size(); ++i) {
283  DiskIoMgr::BufferDescriptor* buffer = src->io_buffers_[i];
284  io_buffers_.push_back(buffer);
285  auxiliary_mem_usage_ += buffer->buffer_len();
286  buffer->SetMemTracker(mem_tracker_);
287  }
288  src->io_buffers_.clear();
289  src->auxiliary_mem_usage_ = 0;
290 
291  DCHECK(src->tuple_streams_.empty());
292 
294  num_rows_ = src->num_rows_;
295  capacity_ = src->capacity_;
297  std::swap(tuple_ptrs_, src->tuple_ptrs_);
298  tuple_data_pool_->AcquireData(src->tuple_data_pool_.get(), false);
299  auxiliary_mem_usage_ += src->tuple_data_pool_->total_allocated_bytes();
300 }
301 
302 // TODO: consider computing size of batches as they are built up
304  int result = 0;
305  for (int i = 0; i < num_rows_; ++i) {
306  TupleRow* row = GetRow(i);
307  const vector<TupleDescriptor*>& tuple_descs = row_desc_.tuple_descriptors();
308  vector<TupleDescriptor*>::const_iterator desc = tuple_descs.begin();
309  for (int j = 0; desc != tuple_descs.end(); ++desc, ++j) {
310  Tuple* tuple = row->GetTuple(j);
311  if (tuple == NULL) continue;
312  result += (*desc)->byte_size();
313  vector<SlotDescriptor*>::const_iterator slot = (*desc)->string_slots().begin();
314  for (; slot != (*desc)->string_slots().end(); ++slot) {
315  DCHECK((*slot)->type().IsVarLen());
316  if (tuple->IsNull((*slot)->null_indicator_offset())) continue;
317  StringValue* string_val = tuple->GetStringSlot((*slot)->tuple_offset());
318  result += string_val->len;
319  }
320  }
321  }
322  return result;
323 }
324 
326  int row_size = row_desc_.GetRowSize();
327  if (row_size > AT_CAPACITY_MEM_USAGE) return row_size;
328  int num_rows = 0;
329  if (row_size != 0) {
330  num_rows = std::min(capacity_, AT_CAPACITY_MEM_USAGE / row_size);
331  }
332  int tuple_buffer_size = num_rows * row_size;
333  DCHECK_LE(tuple_buffer_size, AT_CAPACITY_MEM_USAGE);
334  return tuple_buffer_size;
335 }
336 }
Tuple ** tuple_ptrs_
Definition: row-batch.h:241
The underlying memory management is done by the BufferedBlockMgr.
int GetRowSize() const
Definition: descriptors.cc:320
int num_rows() const
Definition: row-batch.h:215
const std::string GetDetail() const
Definition: status.cc:184
RowBatch(const RowDescriptor &row_desc, int capacity, MemTracker *tracker)
Definition: row-batch.cc:34
static Status CreateCompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *compressor)
Tuple * GetTuple(int tuple_idx)
Definition: tuple-row.h:30
static Status CreateDecompressor(MemPool *mem_pool, bool reuse, THdfsCompression::type format, boost::scoped_ptr< Codec > *decompressor)
RowDescriptor row_desc_
Definition: row-batch.h:237
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
void AcquireState(RowBatch *src)
Definition: row-batch.cc:271
std::vector< DiskIoMgr::BufferDescriptor * > io_buffers_
Definition: row-batch.h:258
int64_t byte_size() const
Returns the byte size necessary to store the entire stream in memory.
Tuple * DeepCopy(const TupleDescriptor &desc, MemPool *pool, bool convert_ptrs=false)
Definition: tuple.cc:34
bool IsNull(const NullIndicatorOffset &offset) const
Definition: tuple.h:112
bool has_in_flight_row_
All members below need to be handled in RowBatch::AcquireState()
Definition: row-batch.h:232
void AddTupleStream(BufferedTupleStream *stream)
Definition: row-batch.cc:218
int64_t auxiliary_mem_usage_
Definition: row-batch.h:246
static const int AT_CAPACITY_MEM_USAGE
Definition: row-batch.h:222
void Reset()
Resets the row batch, returning all resources it has accumulated.
Definition: row-batch.cc:224
void AddIoBuffer(DiskIoMgr::BufferDescriptor *buffer)
Add io buffer to this row batch.
Definition: row-batch.cc:211
int num_tuples_per_row_
Definition: row-batch.h:236
void SetMemTracker(MemTracker *tracker)
Definition: disk-io-mgr.cc:187
bool Equals(const RowDescriptor &other_desc) const
Return true if the tuple ids of this descriptor match tuple ids of other desc.
Definition: descriptors.cc:361
void ToThrift(std::vector< TTupleId > *row_tuple_ids)
Populate row_tuple_ids with our ids.
Definition: descriptors.cc:345
static int GetBatchSize(const TRowBatch &batch)
Utility function: returns total size of batch.
Definition: row-batch.cc:264
bool need_to_return_
Definition: row-batch.h:250
void TransferResourceOwnership(RowBatch *dest)
Definition: row-batch.cc:243
MemTracker * mem_tracker_
Definition: row-batch.h:228
#define VLOG_ROW
Definition: logging.h:59
This class is thread-safe.
Definition: mem-tracker.h:61
std::string compression_scratch_
Definition: row-batch.h:270
const RowDescriptor & row_desc() const
const std::vector< TupleDescriptor * > & tuple_descriptors() const
Return descriptors for all tuples in this row, in order of appearance.
Definition: descriptors.h:412
int Serialize(TRowBatch *output_batch)
Definition: row-batch.cc:147
std::vector< BufferedTupleStream * > tuple_streams_
Tuple streams currently owned by this row batch.
Definition: row-batch.h:261
StringValue * GetStringSlot(int offset)
Definition: tuple.h:128
#define LIKELY(expr)
Definition: compiler-util.h:32
const RowDescriptor * row_desc_
owned by plan root, which resides in runtime_state_'s pool
Definition: coordinator.h:255
uint8_t offset[7 *64-sizeof(uint64_t)]
int MaxTupleBufferSize()
Computes the maximum size needed to store tuple data for this row batch.
Definition: row-batch.cc:325
bool ok() const
Definition: status.h:172
boost::scoped_ptr< MemPool > tuple_data_pool_
holding (some of the) data referenced by rows
Definition: row-batch.h:253