Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
buffered-tuple-stream.h
Go to the documentation of this file.
1 // Copyright 2013 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_H
16 #define IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_H
17 
18 #include "common/status.h"
20 
21 namespace impala {
22 
23 class BufferedBlockMgr;
24 class RuntimeProfile;
25 class RuntimeState;
26 class RowBatch;
27 class RowDescriptor;
28 class SlotDescriptor;
29 class TupleRow;
30 
33 //
35 //
47 //
51 //
56 //
62 //
70 //
86 //
110  public:
121  struct RowIdx {
122  static const uint64_t BLOCK_MASK = 0xFFFF;
123  static const uint64_t BLOCK_SHIFT = 0;
124  static const uint64_t OFFSET_MASK = 0xFFFFFF0000;
125  static const uint64_t OFFSET_SHIFT = 16;
126  static const uint64_t IDX_MASK = 0xFFFFFF0000000000;
127  static const uint64_t IDX_SHIFT = 40;
128 
129  uint64_t block() const {
130  return (data & BLOCK_MASK);
131  };
132 
133  uint64_t offset() const {
134  return (data & OFFSET_MASK) >> OFFSET_SHIFT;
135  };
136 
137  uint64_t idx() const {
138  return (data & IDX_MASK) >> IDX_SHIFT;
139  }
140 
142  DCHECK_LE(block, BLOCK_MASK)
143  << "Cannot have more than 2^16 = 64K blocks in a tuple stream.";
144  DCHECK_LE(offset, OFFSET_MASK >> OFFSET_SHIFT)
145  << "Cannot have blocks larger than 2^24 = 16MB";
146  DCHECK_LE(idx, IDX_MASK >> IDX_SHIFT)
147  << "Cannot have more than 2^24 = 16M rows in a block.";
148  data = block | (offset << OFFSET_SHIFT) | (idx << IDX_SHIFT);
149  return data;
150  }
151 
152  std::string DebugString() const;
153 
155  };
156 
166  BufferedBlockMgr* block_mgr, BufferedBlockMgr::Client* client,
167  bool use_initial_small_buffers = true,
168  bool delete_on_read = false, bool read_write = false);
169 
174  Status Init(RuntimeProfile* profile = NULL, bool pinned = true);
175 
178  Status SwitchToIoBuffers(bool* got_buffer);
179 
184  bool AddRow(TupleRow* row, uint8_t** dst = NULL);
185 
188  uint8_t* AllocateRow(int size);
189 
192  void GetTupleRow(const RowIdx& idx, TupleRow* row) const;
193 
200  Status PrepareForRead(bool* got_buffer = NULL);
201 
206  Status PinStream(bool already_reserved, bool* pinned);
207 
210  Status UnpinStream(bool all = false);
211 
216  Status GetNext(RowBatch* batch, bool* eos, std::vector<RowIdx>* indices = NULL);
217 
221  Status GetRows(boost::scoped_ptr<RowBatch>* batch, bool* got_rows);
222 
224  void Close();
225 
228  Status status() const { return status_; }
229 
231  int64_t num_rows() const { return num_rows_; }
232 
234  int64_t rows_returned() const { return rows_returned_; }
235 
237  int64_t byte_size() const { return total_byte_size_; }
238 
241  int64_t bytes_in_mem(bool ignore_current) const;
242 
243  bool is_pinned() const { return pinned_; }
244  int blocks_pinned() const { return num_pinned_; }
245  int blocks_unpinned() const { return blocks_.size() - num_pinned_ - num_small_blocks_; }
246  bool has_read_block() const { return read_block_ != blocks_.end(); }
247  bool has_write_block() const { return write_block_ != NULL; }
248  bool using_small_buffers() const { return use_small_buffers_; }
249 
250  std::string DebugString() const;
251 
252  private:
255 
257  const bool delete_on_read_;
258 
262  const bool read_write_;
263 
266 
269 
271  const bool nullable_tuple_;
272 
275 
283 
285  std::vector<std::pair<int, std::vector<SlotDescriptor*> > > string_slots_;
286 
290 
292  std::list<BufferedBlockMgr::Block*> blocks_;
293 
296 
299  std::list<BufferedBlockMgr::Block*>::iterator read_block_;
300 
304  std::vector<uint8_t*> block_start_idx_;
305 
307  uint8_t* read_ptr_;
308 
310  uint32_t read_tuple_idx_;
311 
314 
316  int64_t read_bytes_;
317 
319  int64_t rows_returned_;
320 
323 
326 
331 
334 
335  bool closed_; // Used for debugging.
337 
339  int64_t num_rows_;
340 
345  bool pinned_;
346 
351 
356  template <bool HasNullableTuple>
357  bool DeepCopyInternal(TupleRow* row, uint8_t** dst);
358 
360  bool DeepCopy(TupleRow* row, uint8_t** dst);
361 
366  Status NewBlockForWrite(int min_size, bool* got_block);
367 
371 
373  int ComputeRowSize(TupleRow* row) const;
374 
377 
379  template <bool HasNullableTuple>
380  Status GetNextInternal(RowBatch* batch, bool* eos, std::vector<RowIdx>* indices);
381 
383  int ComputeNumNullIndicatorBytes(int block_size) const;
384 };
385 
386 }
387 
388 #endif
The underlying memory management is done by the BufferedBlockMgr.
uint8_t * read_ptr_
Current ptr offset in read_block_'s buffer.
int64_t total_byte_size_
Total size of blocks_, including small blocks.
void Close()
Must be called once at the end to cleanup all resources. Idempotent.
std::vector< uint8_t * > block_start_idx_
std::list< BufferedBlockMgr::Block * >::iterator read_block_
int64_t rows_returned() const
Number of rows returned via GetNext().
Status PinStream(bool already_reserved, bool *pinned)
Status GetNextInternal(RowBatch *batch, bool *eos, std::vector< RowIdx > *indices)
Templated GetNext implementation.
RuntimeProfile::Counter * unpin_timer_
BufferedTupleStream(RuntimeState *state, const RowDescriptor &row_desc, BufferedBlockMgr *block_mgr, BufferedBlockMgr::Client *client, bool use_initial_small_buffers=true, bool delete_on_read=false, bool read_write=false)
bool DeepCopy(TupleRow *row, uint8_t **dst)
Wrapper of the templated DeepCopyInternal() function.
int fixed_tuple_row_size_
Sum of the fixed length portion of all the tuples in desc_.
uint64_t set(uint64_t block, uint64_t offset, uint64_t idx)
std::vector< std::pair< int, std::vector< SlotDescriptor * > > > string_slots_
Vector of all the strings slots grouped by tuple_idx.
const RowDescriptor & desc_
Description of rows stored in the stream.
bool use_small_buffers_
If true, this stream is still using small buffers.
int64_t byte_size() const
Returns the byte size necessary to store the entire stream in memory.
int read_block_idx_
The block index of the current read block.
bool DeepCopyInternal(TupleRow *row, uint8_t **dst)
uint32_t write_tuple_idx_
Current idx of the tuple written at the write_block_ buffer.
int ComputeNumNullIndicatorBytes(int block_size) const
Computes the number of bytes needed for null indicators for a block of 'block_size'.
BufferedBlockMgr::Client * block_mgr_client_
uint32_t read_tuple_idx_
Current idx of the tuple read from the read_block_ buffer.
const bool delete_on_read_
If true, blocks are deleted after they are read.
const bool nullable_tuple_
Whether any tuple in the rows is nullable.
BufferedBlockMgr::Block * write_block_
The current block for writing. NULL if there is no available block to write to.
Status GetRows(boost::scoped_ptr< RowBatch > *batch, bool *got_rows)
const RowDescriptor & row_desc() const
Status UnpinBlock(BufferedBlockMgr::Block *block)
Unpins block if it is an io sized block and updates tracking stats.
BufferedBlockMgr * block_mgr_
Block manager and client used to allocate, pin and release blocks. Not owned.
RuntimeProfile::Counter * pin_timer_
Counters added by this object to the parent runtime profile.
RuntimeState *const state_
Runtime state instance used to check for cancellation. Not owned.
Status Init(RuntimeProfile *profile=NULL, bool pinned=true)
bool AddRow(TupleRow *row, uint8_t **dst=NULL)
Status PrepareForRead(bool *got_buffer=NULL)
int64_t read_bytes_
Bytes read in read_block_.
std::list< BufferedBlockMgr::Block * > blocks_
List of blocks in the stream.
int64_t num_rows() const
Number of rows in the stream.
Status NewBlockForWrite(int min_size, bool *got_block)
RuntimeProfile::Counter * get_new_block_timer_
Status UnpinStream(bool all=false)
Status SwitchToIoBuffers(bool *got_buffer)
int num_small_blocks_
The total number of small blocks in blocks_;.
int64_t num_rows_
Number of rows stored in the stream.
Status GetNext(RowBatch *batch, bool *eos, std::vector< RowIdx > *indices=NULL)
int ComputeRowSize(TupleRow *row) const
Returns the byte size of this row when encoded in a block.
int64_t rows_returned_
Number of rows returned to the caller from GetNext().
int64_t bytes_in_mem(bool ignore_current) const
void GetTupleRow(const RowIdx &idx, TupleRow *row) const