Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
buffered-block-mgr.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef IMPALA_RUNTIME_BUFFERED_BLOCK_MGR
16 #define IMPALA_RUNTIME_BUFFERED_BLOCK_MGR
17 
18 #include <boost/shared_ptr.hpp>
19 
20 #include "runtime/disk-io-mgr.h"
21 #include "runtime/tmp-file-mgr.h"
22 
23 #include <openssl/aes.h>
24 #include <openssl/sha.h>
25 
26 namespace impala {
27 
28 class RuntimeState;
29 
35 //
46 //
52 //
56 //
61 //
66 //
77 //
82 //
93  private:
94  struct BufferDescriptor;
95 
96  public:
105  struct Client;
106 
117  //
127  //
132  //
136  class Block : public InternalQueue<Block>::Node {
137  public:
145  Status Pin(bool* pinned, Block* release_block = NULL, bool unpin = true);
146 
150  Status Unpin();
151 
154  Status Delete();
155 
156  void AddRow() { ++num_rows_; }
157  int num_rows() const { return num_rows_; }
158 
160  template <typename T> T* Allocate(int size) {
161  DCHECK_GE(BytesRemaining(), size);
162  uint8_t* current_location = buffer_desc_->buffer + valid_data_len_;
163  valid_data_len_ += size;
164  return reinterpret_cast<T*>(current_location);
165  }
166 
168  int BytesRemaining() const {
169  DCHECK_NOTNULL(buffer_desc_);
170  return buffer_desc_->len - valid_data_len_;
171  }
172 
174  void ReturnAllocation(int size) {
175  DCHECK_GE(valid_data_len_, size);
176  valid_data_len_ -= size;
177  }
178 
181  uint8_t* buffer() const {
182  DCHECK_NOTNULL(buffer_desc_);
183  return buffer_desc_->buffer;
184  }
185 
187  int64_t valid_data_len() const { return valid_data_len_; }
188 
191  int64_t buffer_len() const {
192  DCHECK(is_pinned());
193  return buffer_desc_->len;
194  }
195 
198  bool is_max_size() const {
199  DCHECK(is_pinned());
201  }
202 
203  bool is_pinned() const { return is_pinned_; }
204 
206  std::string DebugString() const;
207 
208  private:
209  friend class BufferedBlockMgr;
210 
211  Block(BufferedBlockMgr* block_mgr);
212 
214  void Init();
215 
218  bool Validate() const;
219 
223 
226 
229 
237 
240 
243 
247  boost::scoped_array<uint8_t> encrypted_write_buffer_;
248 
250  uint8_t key_[32];
251 
259  uint8_t iv_[AES_BLOCK_SIZE];
260 
263  uint8_t hash_[SHA256_DIGEST_LENGTH];
264 
268 
271 
274  bool in_write_;
275 
278 
283  boost::condition_variable write_complete_cv_;
284 
289  }; // class Block
290 
295  static Status Create(RuntimeState* state, MemTracker* parent,
296  RuntimeProfile* profile, int64_t mem_limit, int64_t buffer_size,
297  boost::shared_ptr<BufferedBlockMgr>* block_mgr);
298 
300 
311  Status RegisterClient(int num_reserved_buffers, MemTracker* tracker,
312  RuntimeState* state, Client** client);
313 
315  void ClearReservations(Client* client);
316 
325  bool TryAcquireTmpReservation(Client* client, int num_buffers);
326 
328  void ClearTmpReservation(Client* client);
329 
331  static int GetNumReservedBlocks() {
333  }
334 
348  Status GetNewBlock(Client* client, Block* unpin_block, Block** block, int64_t len = -1);
349 
352  void Cancel();
353 
355  std::string DebugString(Client* client = NULL);
356 
363  bool ConsumeMemory(Client* client, int64_t size);
364 
367  void ReleaseMemory(Client* client, int64_t size);
368 
371  int64_t available_buffers(Client* client) const;
372 
376 
380  int available_allocated_buffers() const { return all_io_buffers_.size(); }
381  int num_free_buffers() const { return free_io_buffers_.size(); }
382 
383  int num_pinned_buffers(Client* client) const;
384  int num_reserved_buffers_remaining(Client* client) const;
385  MemTracker* get_tracker(Client* client) const;
386  int64_t max_block_size() const { return max_block_size_; }
387  int64_t bytes_allocated() const;
388  RuntimeProfile* profile() { return profile_.get(); }
389  int writes_issued() const { return writes_issued_; }
390 
391  private:
392  friend struct Client;
393 
395  struct BufferDescriptor : public InternalQueue<BufferDescriptor>::Node {
397  uint8_t* buffer;
398 
400  int64_t len;
401 
404 
406  std::list<BufferDescriptor*>::iterator all_buffers_it;
407 
408  BufferDescriptor(uint8_t* buf, int64_t len)
409  : buffer(buf), len(len), block(NULL) {
410  }
411  };
412 
413  BufferedBlockMgr(RuntimeState* state, int64_t block_size);
414 
416  void Init(DiskIoMgr* io_mgr, RuntimeProfile* profile,
417  MemTracker* parent_tracker, int64_t mem_limit);
418 
422 
425  Status PinBlock(Block* block, bool* pinned, Block* src, bool unpin);
426  Status UnpinBlock(Block* block);
427  Status DeleteBlock(Block* block);
428 
432  Status DeleteOrUnpinBlock(Block* block, bool unpin);
433 
439  Status TransferBuffer(Block* dst, Block* src, bool unpin);
440 
446  int64_t remaining_unreserved_buffers() const;
447 
454  Status FindBufferForBlock(Block* block, bool* in_mem);
455 
463  Status FindBuffer(boost::unique_lock<boost::mutex>& lock,
464  BufferDescriptor** buffer);
465 
471 
473  Status WriteUnpinnedBlock(Block* block);
474 
480  void WriteComplete(Block* block, const Status& write_status);
481 
485  void ReturnUnusedBlock(Block* block);
486 
489  Block* GetUnusedBlock(Client* client);
490 
492  bool Validate() const;
493  std::string DebugInternal() const;
494 
496  const int64_t max_block_size_;
497 
501 
504  const bool disable_spill_;
505 
506  const TUniqueId query_id_;
507 
509 
511  boost::scoped_ptr<MemTracker> mem_tracker_;
512 
518  boost::mutex lock_;
519 
522 
525 
528 
532 
534  boost::condition_variable buffer_available_cv_;
535 
541 
547 
554 
556  std::list<BufferDescriptor*> all_io_buffers_;
557 
560  boost::ptr_vector<TmpFileMgr::File> tmp_files_;
561 
565 
569 
574 
576  boost::scoped_ptr<RuntimeProfile> profile_;
577 
581 
584 
587 
590 
593 
596 
599 
602 
605 
608 
611 
614 
619  typedef boost::unordered_map<TUniqueId, boost::weak_ptr<BufferedBlockMgr> >
622 
625  Status Encrypt(Block* block, uint8_t** outbuf);
626 
628  void EncryptDone(Block* block);
629 
631  Status Decrypt(Block* block);
632 
634  void SetHash(Block* block);
635 
637  Status VerifyHash(Block* block);
638 
641  const bool encryption_;
642 
647  const bool check_integrity_;
648 }; // class BufferedBlockMgr
649 
650 } // namespace impala.
651 
652 #endif
Status FindBufferForBlock(Block *block, bool *in_mem)
bool initialized_
If true, Init() has been called.
const int64_t max_block_size_
Size of the largest/default block in bytes.
Descriptor for a single memory buffer in the pool.
int total_pinned_buffers_
The total number of pinned buffers across all clients.
RuntimeProfile::Counter * integrity_check_timer_
Time spent in disk spill integrity generation and checking.
BufferedBlockMgr * block_mgr_
Parent block manager object. Responsible for maintaining the state of the block.
int num_reserved_buffers_remaining(Client *client) const
int64_t remaining_unreserved_buffers() const
int writes_issued_
Number of writes issued.
boost::condition_variable buffer_available_cv_
Signal availability of free buffers.
Status UnpinBlock(Block *block)
MemTracker tracker
Status MemLimitTooLowError(Client *client)
std::list< BufferDescriptor * >::iterator all_buffers_it
Iterator into all_io_buffers_ for this buffer.
DiskIoMgr::WriteRange * write_range_
Status TransferBuffer(Block *dst, Block *src, bool unpin)
static int num_tmp_devices()
Definition: tmp-file-mgr.h:87
Status Pin(bool *pinned, Block *release_block=NULL, bool unpin=true)
RuntimeProfile::Counter * buffered_pin_counter_
Number of Pin() calls that did not require a disk read.
int available_allocated_buffers() const
RuntimeProfile::Counter * mem_limit_counter_
These have a fixed value for the lifetime of the manager and show memory usage.
T * Allocate(int size)
Allocates the specified number of bytes from this block.
Status DeleteOrUnpinBlock(Block *block, bool unpin)
MemTracker * get_tracker(Client *client) const
Lightweight spinlock.
Definition: spinlock.h:24
void ReleaseMemory(Client *client, int64_t size)
int num_pinned_buffers(Client *client) const
Status GetNewBlock(Client *client, Block *unpin_block, Block **block, int64_t len=-1)
Status RegisterClient(int num_reserved_buffers, MemTracker *tracker, RuntimeState *state, Client **client)
RuntimeProfile::Counter * created_block_counter_
Total number of blocks created.
bool ConsumeMemory(Client *client, int64_t size)
void Init()
Initialize the state of a block and set the number of bytes allocated to 0.
int64_t max_block_size() const
RuntimeProfile::Counter * disk_read_timer_
Time taken for disk reads.
boost::condition_variable write_complete_cv_
RuntimeProfile::Counter * buffer_wait_timer_
Time spent waiting for a free buffer.
RuntimeProfile::Counter * encryption_timer_
Time spent in disk spill encryption and decryption.
BufferedBlockMgr(RuntimeState *state, int64_t block_size)
uint8_t key_[32]
If encryption_ is on, a AES 256-bit key. Regenerated on each write.
static int GetNumReservedBlocks()
Return the number of blocks a block manager will reserve for its I/O buffers.
int num_rows_
Number of rows in this block.
bool is_pinned_
is_pinned_ is true while the block is pinned by a client.
void ClearTmpReservation(Client *client)
Sets tmp reservation to 0 on this client.
int unfullfilled_reserved_buffers_
The total number of reserved buffers across all clients that are not pinned.
RuntimeProfile * profile()
int BytesRemaining() const
Return the number of remaining bytes that can be allocated in this block.
void ClearReservations(Client *client)
Clears all reservations for this client.
int64_t valid_data_len() const
Return the number of bytes allocated in this block.
DiskIoMgr * io_mgr_
DiskIoMgr handles to read and write blocks.
boost::scoped_ptr< RuntimeProfile > profile_
Counters and timers to track behavior.
Block(BufferedBlockMgr *block_mgr)
void EncryptDone(Block *block)
Deallocates temporary buffer alloced in Encrypt().
InternalQueue< Block > unused_blocks_
std::string DebugInternal() const
static Status Create(RuntimeState *state, MemTracker *parent, RuntimeProfile *profile, int64_t mem_limit, int64_t buffer_size, boost::shared_ptr< BufferedBlockMgr > *block_mgr)
std::string DebugString() const
Debug helper method to print the state of a block.
int64_t valid_data_len_
Length of valid (i.e. allocated) data within the block.
boost::unordered_map< TUniqueId, boost::weak_ptr< BufferedBlockMgr > > BlockMgrsMap
int64_t available_buffers(Client *client) const
Status Encrypt(Block *block, uint8_t **outbuf)
uint8_t hash_[SHA256_DIGEST_LENGTH]
InternalQueue< BufferDescriptor > free_io_buffers_
This class is thread-safe.
Definition: mem-tracker.h:61
Block * GetUnusedBlock(Client *client)
Status Decrypt(Block *block)
Decrypts the contents of buffer() in place.
Status WriteUnpinnedBlock(Block *block)
Issues the write for this block to the DiskIoMgr.
Status VerifyHash(Block *block)
Verifies that the contents of buffer() match those that were set by SetHash()
bool TryAcquireTmpReservation(Client *client, int num_buffers)
void Init(DiskIoMgr *io_mgr, RuntimeProfile *profile, MemTracker *parent_tracker, int64_t mem_limit)
Initializes the block mgr. Idempotent and thread-safe.
bool is_deleted_
True if the block is deleted by the client.
Status FindBuffer(boost::unique_lock< boost::mutex > &lock, BufferDescriptor **buffer)
static SpinLock static_block_mgrs_lock_
Protects query_to_block_mgrs_.
T must be a subclass of InternalQueue::Node.
boost::ptr_vector< TmpFileMgr::File > tmp_files_
Client * client_
The client that owns this block.
DiskIoMgr::RequestContext * io_request_context_
Block * block
Block that this buffer is assigned to. May be NULL.
Status PinBlock(Block *block, bool *pinned, Block *src, bool unpin)
void WriteComplete(Block *block, const Status &write_status)
RuntimeProfile::Counter * recycled_blocks_counter_
Number of deleted blocks reused.
std::string DebugString(Client *client=NULL)
Dumps block mgr state. Grabs lock. If client is not NULL, also dumps its state.
RuntimeProfile::Counter * block_size_counter_
int64_t bytes_allocated() const
void SetHash(Block *block)
Takes a cryptographic hash of the data and sets hash_ with it.
boost::scoped_ptr< MemTracker > mem_tracker_
Track buffers allocated by the block manager.
void ReturnUnusedBlock(Block *block)
RuntimeProfile::Counter * outstanding_writes_counter_
Number of writes outstanding (issued but not completed).
std::list< BufferDescriptor * > all_io_buffers_
All allocated io-sized buffers.
boost::scoped_array< uint8_t > encrypted_write_buffer_
RuntimeProfile::Counter * bytes_written_counter_
Number of bytes written to disk (includes writes still queued in the IO manager). ...
InternalQueue< Block > unpinned_blocks_
Status DeleteBlock(Block *block)
void ReturnAllocation(int size)
Return size bytes from the most recent allocation.
bool Validate() const
Used to debug the state of the block manager. Lock must already be taken.
static BlockMgrsMap query_to_block_mgrs_