Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
row-batch.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 
16 #ifndef IMPALA_RUNTIME_ROW_BATCH_H
17 #define IMPALA_RUNTIME_ROW_BATCH_H
18 
19 #include <vector>
20 #include <cstring>
21 #include <boost/scoped_ptr.hpp>
22 
23 #include "common/logging.h"
24 #include "runtime/descriptors.h"
25 #include "runtime/disk-io-mgr.h"
26 #include "runtime/mem-pool.h"
27 #include "runtime/mem-tracker.h"
28 
29 namespace impala {
30 
31 class BufferedTupleStream;
32 class MemTracker;
33 class TRowBatch;
34 class Tuple;
35 class TupleRow;
36 class TupleDescriptor;
37 
51 //
63 //
66 class RowBatch {
67  public:
72 
78  RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
79  MemTracker* tracker);
80 
85  ~RowBatch();
86 
87  static const int INVALID_ROW_INDEX = -1;
88 
94  int AddRows(int n) {
95  if (num_rows_ + n > capacity_) return INVALID_ROW_INDEX;
96  has_in_flight_row_ = true;
97  return num_rows_;
98  }
99 
100  int AddRow() { return AddRows(1); }
101 
102  void CommitRows(int n) {
103  DCHECK_GE(n, 0);
104  DCHECK_LE(num_rows_ + n, capacity_);
105  num_rows_ += n;
106  has_in_flight_row_ = false;
107  }
108 
109  void CommitLastRow() { CommitRows(1); }
110 
113  void set_num_rows(int num_rows) {
114  DCHECK_LE(num_rows, num_rows_);
116  }
117 
120  bool AtCapacity() {
123  }
124 
129  bool AtCapacity(MemPool* tuple_pool) {
130  DCHECK(tuple_pool != NULL);
131  return AtCapacity() ||
132  (tuple_pool->total_allocated_bytes() > AT_CAPACITY_MEM_USAGE && num_rows_ > 0);
133  }
134 
138  int TotalByteSize();
139 
140  TupleRow* GetRow(int row_idx) {
141  DCHECK(tuple_ptrs_ != NULL);
142  DCHECK_GE(row_idx, 0);
143  DCHECK_LT(row_idx, num_rows_ + (has_in_flight_row_ ? 1 : 0));
144  return reinterpret_cast<TupleRow*>(tuple_ptrs_ + row_idx * num_tuples_per_row_);
145  }
146 
147  int row_byte_size() { return num_tuples_per_row_ * sizeof(Tuple*); }
149  int num_io_buffers() const { return io_buffers_.size(); }
150  int num_tuple_streams() const { return tuple_streams_.size(); }
151 
153  void Reset();
154 
157 
160  void AddTupleStream(BufferedTupleStream* stream);
161 
168 
172 
173  void CopyRow(TupleRow* src, TupleRow* dest) {
174  memcpy(dest, src, num_tuples_per_row_ * sizeof(Tuple*));
175  }
176 
179  void CopyRows(int dest, int src, int num_rows) {
180  DCHECK_LE(dest, src);
181  DCHECK_LE(src + num_rows, capacity_);
182  memmove(tuple_ptrs_ + num_tuples_per_row_ * dest,
184  num_rows * num_tuples_per_row_ * sizeof(Tuple*));
185  }
186 
187  void ClearRow(TupleRow* row) {
188  memset(row, 0, num_tuples_per_row_ * sizeof(Tuple*));
189  }
190 
200  void AcquireState(RowBatch* src);
201 
210  int Serialize(TRowBatch* output_batch);
211 
213  static int GetBatchSize(const TRowBatch& batch);
214 
215  int num_rows() const { return num_rows_; }
216  int capacity() const { return capacity_; }
217 
218  const RowDescriptor& row_desc() const { return row_desc_; }
219 
222  static const int AT_CAPACITY_MEM_USAGE;
223 
225  int MaxTupleBufferSize();
226 
227  private:
228  MemTracker* mem_tracker_; // not owned
229 
231 
232  bool has_in_flight_row_; // if true, last row hasn't been committed yet
233  int num_rows_; // # of committed rows
234  int capacity_; // maximum # of rows
235 
238 
243 
247 
251 
253  boost::scoped_ptr<MemPool> tuple_data_pool_;
254 
258  std::vector<DiskIoMgr::BufferDescriptor*> io_buffers_;
259 
261  std::vector<BufferedTupleStream*> tuple_streams_;
262 
270  std::string compression_scratch_;
271 };
272 
273 }
274 
275 #endif
Tuple ** tuple_ptrs_
Definition: row-batch.h:241
The underlying memory management is done by the BufferedBlockMgr.
int num_rows() const
Definition: row-batch.h:215
int AddRows(int n)
Definition: row-batch.h:94
RowBatch(const RowDescriptor &row_desc, int capacity, MemTracker *tracker)
Definition: row-batch.cc:34
MemTracker tracker
void ClearRow(TupleRow *row)
Definition: row-batch.h:187
int64_t total_allocated_bytes() const
Definition: mem-pool.h:148
RowDescriptor row_desc_
Definition: row-batch.h:237
int num_io_buffers() const
Definition: row-batch.h:149
A tuple with 0 materialised slots is represented as NULL.
Definition: tuple.h:48
const RowDescriptor & row_desc() const
Definition: row-batch.h:218
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
void AcquireState(RowBatch *src)
Definition: row-batch.cc:271
bool AtCapacity()
Definition: row-batch.h:120
void CopyRows(int dest, int src, int num_rows)
Definition: row-batch.h:179
bool AtCapacity(MemPool *tuple_pool)
Definition: row-batch.h:129
std::vector< DiskIoMgr::BufferDescriptor * > io_buffers_
Definition: row-batch.h:258
int row_byte_size()
Definition: row-batch.h:147
bool has_in_flight_row_
All members below need to be handled in RowBatch::AcquireState()
Definition: row-batch.h:232
void AddTupleStream(BufferedTupleStream *stream)
Definition: row-batch.cc:218
int64_t auxiliary_mem_usage_
Definition: row-batch.h:246
static const int AT_CAPACITY_MEM_USAGE
Definition: row-batch.h:222
void Reset()
Resets the row batch, returning all resources it has accumulated.
Definition: row-batch.cc:224
void set_num_rows(int num_rows)
Definition: row-batch.h:113
void AddIoBuffer(DiskIoMgr::BufferDescriptor *buffer)
Add io buffer to this row batch.
Definition: row-batch.cc:211
int num_tuples_per_row_
Definition: row-batch.h:236
static int GetBatchSize(const TRowBatch &batch)
Utility function: returns total size of batch.
Definition: row-batch.cc:264
void MarkNeedToReturn()
Definition: row-batch.h:167
bool need_to_return_
Definition: row-batch.h:250
void TransferResourceOwnership(RowBatch *dest)
Definition: row-batch.cc:243
MemTracker * mem_tracker_
Definition: row-batch.h:228
This class is thread-safe.
Definition: mem-tracker.h:61
void CommitLastRow()
Definition: row-batch.h:109
std::string compression_scratch_
Definition: row-batch.h:270
MemPool * tuple_data_pool()
Definition: row-batch.h:148
int Serialize(TRowBatch *output_batch)
Definition: row-batch.cc:147
std::vector< BufferedTupleStream * > tuple_streams_
Tuple streams currently owned by this row batch.
Definition: row-batch.h:261
int capacity() const
Definition: row-batch.h:216
void CopyRow(TupleRow *src, TupleRow *dest)
Definition: row-batch.h:173
int MaxTupleBufferSize()
Computes the maximum size needed to store tuple data for this row batch.
Definition: row-batch.cc:325
int num_tuple_streams() const
Definition: row-batch.h:150
void CommitRows(int n)
Definition: row-batch.h:102
boost::scoped_ptr< MemPool > tuple_data_pool_
holding (some of the) data referenced by rows
Definition: row-batch.h:253
static const int INVALID_ROW_INDEX
Definition: row-batch.h:87