Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
buffered-block-mgr.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "runtime/runtime-state.h"
16 #include "runtime/mem-tracker.h"
17 #include "runtime/mem-pool.h"
19 #include "runtime/tmp-file-mgr.h"
20 #include "util/runtime-profile.h"
21 #include "util/disk-info.h"
22 #include "util/filesystem-util.h"
23 #include "util/impalad-metrics.h"
24 #include "util/uid-util.h"
25 
26 #include <openssl/rand.h>
27 #include <openssl/evp.h>
28 #include <openssl/sha.h>
29 #include <openssl/err.h>
30 
31 #include <gutil/strings/substitute.h>
32 
33 DEFINE_bool(disk_spill_encryption, false, "Set this to encrypt and perform an integrity "
34  "check on all data spilled to disk during a query");
35 
36 #include "common/names.h"
37 
38 using namespace strings; // for Substitute
39 
40 namespace impala {
41 
42 BufferedBlockMgr::BlockMgrsMap BufferedBlockMgr::query_to_block_mgrs_;
43 SpinLock BufferedBlockMgr::static_block_mgrs_lock_;
44 
46 Client(BufferedBlockMgr* mgr, int num_reserved_buffers, MemTracker* tracker,
47  RuntimeState* state)
48  : mgr_(mgr),
49  state_(state),
50  tracker_(tracker),
51  query_tracker_(mgr_->mem_tracker_->parent()),
52  num_reserved_buffers_(num_reserved_buffers),
53  num_tmp_reserved_buffers_(0),
54  num_pinned_buffers_(0) {
55 }
56 
57 // Unowned.
59 
60 // Unowned.
62 
63 // Tracker for this client. Can be NULL. Unowned.
64 // If this is set, when the client gets a buffer, we update the consumption on this
65 // tracker. However, we don't want to transfer the buffer from the block mgr to the
66 // client (i.e. release from the block mgr), since the block mgr is where the
67 // block mem usage limit is enforced. Even when we give a buffer to a client, the
68 // buffer is still owned and counts against the block mgr tracker (i.e. there is a
69 // fixed pool of buffers regardless of if they are in the block mgr or the clients).
71 
72  // This is the common ancestor between the block mgr tracker and the client tracker.
73  // When memory is transferred to the client, we want it to stop at this tracker.
75 
76  // Number of buffers reserved by this client.
78 
79  // Number of buffers temporarily reserved.
81 
82  // Number of buffers pinned by this client.
84 
85  void PinBuffer(BufferDescriptor* buffer) {
86  DCHECK_NOTNULL(buffer);
87  if (buffer->len == mgr_->max_block_size()) {
88  ++num_pinned_buffers_;
89  if (tracker_ != NULL) tracker_->ConsumeLocal(buffer->len, query_tracker_);
90  }
91  }
92 
93  void UnpinBuffer(BufferDescriptor* buffer) {
94  DCHECK_NOTNULL(buffer);
95  if (buffer->len == mgr_->max_block_size()) {
96  DCHECK_GT(num_pinned_buffers_, 0);
97  --num_pinned_buffers_;
98  if (tracker_ != NULL) tracker_->ReleaseLocal(buffer->len, query_tracker_);
99  }
100  }
101 
102  string DebugString() const {
103  stringstream ss;
104  ss << "Client " << this << endl
105  << " num_reserved_buffers=" << num_reserved_buffers_ << endl
106  << " num_tmp_reserved_buffers=" << num_tmp_reserved_buffers_ << endl
107  << " num_pinned_buffers=" << num_pinned_buffers_;
108  return ss.str();
109  }
110 };
111 
112 // BufferedBlockMgr::Block methods.
113 BufferedBlockMgr::Block::Block(BufferedBlockMgr* block_mgr)
114  : buffer_desc_(NULL),
115  block_mgr_(block_mgr),
116  client_(NULL),
117  write_range_(NULL),
118  valid_data_len_(0),
119  num_rows_(0) {
120 }
121 
122 Status BufferedBlockMgr::Block::Pin(bool* pinned, Block* release_block, bool unpin) {
123  return block_mgr_->PinBlock(this, pinned, release_block, unpin);
124 }
125 
127  return block_mgr_->UnpinBlock(this);
128 }
129 
131  return block_mgr_->DeleteBlock(this);
132 }
133 
135  // No locks are taken because the block is new or has previously been deleted.
136  is_pinned_ = false;
137  in_write_ = false;
138  is_deleted_ = false;
139  valid_data_len_ = 0;
140  client_ = NULL;
141  num_rows_ = 0;
142 }
143 
145  if (is_deleted_ && (is_pinned_ || (!in_write_ && buffer_desc_ != NULL))) {
146  LOG(ERROR) << "Deleted block in use - " << DebugString();
147  return false;
148  }
149 
150  if (buffer_desc_ == NULL && (is_pinned_ || in_write_)) {
151  LOG(ERROR) << "Block without buffer in use - " << DebugString();
152  return false;
153  }
154 
155  if (buffer_desc_ == NULL && block_mgr_->unpinned_blocks_.Contains(this)) {
156  LOG(ERROR) << "Unpersisted block without buffer - " << DebugString();
157  return false;
158  }
159 
160  if (buffer_desc_ != NULL && (buffer_desc_->block != this)) {
161  LOG(ERROR) << "Block buffer inconsistency - " << DebugString();
162  return false;
163  }
164 
165  return true;
166 }
167 
169  stringstream ss;
170  ss << "Block: " << this << endl
171  << " Buffer Desc: " << buffer_desc_ << endl
172  << " Data Len: " << valid_data_len_ << endl
173  << " Num Rows: " << num_rows_ << endl;
174  if (is_pinned_) ss << " Buffer Len: " << buffer_len() << endl;
175  ss << " Deleted: " << is_deleted_ << endl
176  << " Pinned: " << is_pinned_ << endl
177  << " Write Issued: " << in_write_ << endl
178  << " Client Local: " << client_local_;
179  return ss.str();
180 }
181 
183  : max_block_size_(block_size),
184  // Keep two writes in flight per scratch disk so the disks can stay busy.
185  block_write_threshold_(TmpFileMgr::num_tmp_devices() * 2),
186  disable_spill_(state->query_ctx().disable_spilling),
187  query_id_(state->query_id()),
188  initialized_(false),
192  io_mgr_(state->io_mgr()),
193  is_cancelled_(false),
194  writes_issued_(0),
195  encryption_(FLAGS_disk_spill_encryption),
196  check_integrity_(FLAGS_disk_spill_encryption) {
197 }
198 
200  RuntimeProfile* profile, int64_t mem_limit, int64_t block_size,
201  shared_ptr<BufferedBlockMgr>* block_mgr) {
202  DCHECK_NOTNULL(parent);
203  block_mgr->reset();
204  {
205  lock_guard<SpinLock> lock(static_block_mgrs_lock_);
206  BlockMgrsMap::iterator it = query_to_block_mgrs_.find(state->query_id());
207  if (it != query_to_block_mgrs_.end()) *block_mgr = it->second.lock();
208  if (*block_mgr == NULL) {
209  // weak_ptr::lock returns NULL if the weak_ptr is expired. This means
210  // all shared_ptr references have gone to 0 and it is in the process of
211  // being deleted. This can happen if the last shared reference is released
212  // but before the weak ptr is removed from the map.
213  block_mgr->reset(new BufferedBlockMgr(state, block_size));
214  query_to_block_mgrs_[state->query_id()] = *block_mgr;
215  }
216  }
217  (*block_mgr)->Init(state->io_mgr(), profile, parent, mem_limit);
218  return Status::OK;
219 }
220 
222  int64_t unused_reserved = client->num_reserved_buffers_ +
224  return max(0L, remaining_unreserved_buffers()) + max(0L, unused_reserved);
225 }
226 
228  int64_t num_buffers = free_io_buffers_.size() +
230  num_buffers += mem_tracker_->SpareCapacity() / max_block_size();
231  num_buffers -= unfullfilled_reserved_buffers_;
232  return num_buffers;
233 }
234 
236  RuntimeState* state, Client** client) {
237  DCHECK_GE(num_reserved_buffers, 0);
238  lock_guard<mutex> lock(lock_);
239  *client = obj_pool_.Add(new Client(this, num_reserved_buffers, tracker, state));
240  unfullfilled_reserved_buffers_ += num_reserved_buffers;
241  return Status::OK;
242 }
243 
245  // TODO: The modifications to the client's mem variables can be made w/o the lock.
246  lock_guard<mutex> lock(lock_);
247  if (client->num_pinned_buffers_ < client->num_reserved_buffers_) {
249  client->num_reserved_buffers_ - client->num_pinned_buffers_;
250  }
251  client->num_reserved_buffers_ = 0;
252 
254  client->num_tmp_reserved_buffers_ = 0;
255 }
256 
257 bool BufferedBlockMgr::TryAcquireTmpReservation(Client* client, int num_buffers) {
258  lock_guard<mutex> lock(lock_);
259  DCHECK_EQ(client->num_tmp_reserved_buffers_, 0);
260  if (client->num_pinned_buffers_ < client->num_reserved_buffers_) {
261  // If client has unused reserved buffers, we use those first.
262  num_buffers -= client->num_reserved_buffers_ - client->num_pinned_buffers_;
263  }
264  if (num_buffers < 0) return true;
265  if (available_buffers(client) < num_buffers) return false;
266 
267  client->num_tmp_reserved_buffers_ = num_buffers;
268  unfullfilled_reserved_buffers_ += num_buffers;
269  return true;
270 }
271 
273  lock_guard<mutex> lock(lock_);
275  client->num_tmp_reserved_buffers_ = 0;
276 }
277 
278 bool BufferedBlockMgr::ConsumeMemory(Client* client, int64_t size) {
279  int buffers_needed = BitUtil::Ceil(size, max_block_size());
280  DCHECK_GT(buffers_needed, 0) << "Trying to consume 0 memory";
281  unique_lock<mutex> lock(lock_);
282 
283  if (size < max_block_size() && mem_tracker_->TryConsume(size)) {
284  // For small allocations (less than a block size), just let the allocation through.
285  client->tracker_->ConsumeLocal(size, client->query_tracker_);
286  return true;
287  }
288 
290  buffers_needed) {
291  return false;
292  }
293 
294  if (mem_tracker_->TryConsume(size)) {
295  // There was still unallocated memory, don't need to recycle allocated blocks.
296  client->tracker_->ConsumeLocal(size, client->query_tracker_);
297  return true;
298  }
299 
300  // Bump up client->num_tmp_reserved_buffers_ to satisfy this request. We don't want
301  // another client to grab the buffer.
302  int additional_tmp_reservations = 0;
303  if (client->num_tmp_reserved_buffers_ < buffers_needed) {
304  additional_tmp_reservations = buffers_needed - client->num_tmp_reserved_buffers_;
305  client->num_tmp_reserved_buffers_ += additional_tmp_reservations;
306  unfullfilled_reserved_buffers_ += additional_tmp_reservations;
307  }
308 
309  // Loop until we have freed enough memory.
310  // We free all the memory at the end. We don't want another component to steal the
311  // memory.
312  int buffers_acquired = 0;
313  do {
314  BufferDescriptor* buffer_desc = NULL;
315  FindBuffer(lock, &buffer_desc); // This waits on the lock.
316  if (buffer_desc == NULL) break;
317  all_io_buffers_.erase(buffer_desc->all_buffers_it);
318  if (buffer_desc->block != NULL) buffer_desc->block->buffer_desc_ = NULL;
319  delete[] buffer_desc->buffer;
320  ++buffers_acquired;
321  } while (buffers_acquired != buffers_needed);
322 
323  Status status = Status::OK;
324  if (buffers_acquired == buffers_needed) status = WriteUnpinnedBlocks();
325  // If we either couldn't acquire enough buffers or WriteUnpinnedBlocks() failed, undo
326  // the reservation.
327  if (buffers_acquired != buffers_needed || !status.ok()) {
328  if (!status.ok()) {
329  VLOG_QUERY << "Query: " << query_id_ << " write unpinned buffers failed.";
330  client->state_->LogError(status.msg());
331  }
332  if (buffers_acquired < additional_tmp_reservations) {
333  // TODO: what is the reasoning behind this calculation?
334  client->num_tmp_reserved_buffers_ -=
335  (additional_tmp_reservations - buffers_acquired);
337  (additional_tmp_reservations - buffers_acquired);
338  }
339  mem_tracker_->Release(buffers_acquired * max_block_size());
340  return false;
341  }
342 
343  client->num_tmp_reserved_buffers_ -= buffers_acquired;
344  unfullfilled_reserved_buffers_ -= buffers_acquired;
345 
346  DCHECK_GE(buffers_acquired * max_block_size(), size);
347  mem_tracker_->Release(buffers_acquired * max_block_size());
348  if (!mem_tracker_->TryConsume(size)) return false;
349  client->tracker_->ConsumeLocal(size, client->query_tracker_);
350  DCHECK(Validate()) << endl << DebugInternal();
351  return true;
352 }
353 
354 void BufferedBlockMgr::ReleaseMemory(Client* client, int64_t size) {
355  mem_tracker_->Release(size);
356  client->tracker_->ReleaseLocal(size, client->query_tracker_);
357 }
358 
360  {
361  lock_guard<mutex> lock(lock_);
362  if (is_cancelled_) return;
363  is_cancelled_ = true;
364  }
365  // Cancel the underlying io mgr to unblock any waiting threads.
367 }
368 
370  // TODO: what to print here. We can't know the value of the entire query here.
372  status.AddDetail(Substitute("The memory limit is set too low initialize the"
373  " spilling operator. The minimum required memory to spill this operator is $0.",
375  TUnit::BYTES)));
376  VLOG_QUERY << "Query: " << query_id_ << " ran out of memory: " << endl
377  << DebugInternal() << endl << client->DebugString() << endl
378  << GetStackTrace();
379  return status;
380 }
381 
382 Status BufferedBlockMgr::GetNewBlock(Client* client, Block* unpin_block, Block** block,
383  int64_t len) {
384  DCHECK_LE(len, max_block_size_) << "Cannot request block bigger than max_len";
385  DCHECK_NE(len, 0) << "Cannot request block of zero size";
386  *block = NULL;
387  Block* new_block = NULL;
388 
389  {
390  lock_guard<mutex> lock(lock_);
391  if (is_cancelled_) return Status::CANCELLED;
392  new_block = GetUnusedBlock(client);
393  DCHECK(new_block->Validate()) << endl << new_block->DebugString();
394  DCHECK_EQ(new_block->client_, client);
395 
396  if (len > 0 && len < max_block_size_) {
397  DCHECK(unpin_block == NULL);
398  if (client->tracker_->TryConsume(len)) {
399  // TODO: Have a cache of unused blocks of size 'len' (0, max_block_size_)
400  uint8_t* buffer = new uint8_t[len];
401  new_block->buffer_desc_ = obj_pool_.Add(new BufferDescriptor(buffer, len));
402  new_block->buffer_desc_->block = new_block;
403  new_block->is_pinned_ = true;
404  client->PinBuffer(new_block->buffer_desc_);
406  *block = new_block;
407  } else {
408  new_block->is_deleted_ = true;
409  ReturnUnusedBlock(new_block);
410  }
411  return Status::OK;
412  }
413  }
414 
415  bool in_mem;
416  RETURN_IF_ERROR(FindBufferForBlock(new_block, &in_mem));
417  DCHECK(!in_mem) << "A new block cannot start in mem.";
418 
419  if (!new_block->is_pinned()) {
420  if (unpin_block == NULL) {
421  // We couldn't get a new block and no unpin block was provided. Can't return
422  // a block.
423  new_block->is_deleted_ = true;
424  ReturnUnusedBlock(new_block);
425  new_block = NULL;
426  } else {
427  // We need to transfer the buffer from unpin_block to new_block.
428  RETURN_IF_ERROR(TransferBuffer(new_block, unpin_block, true));
429  }
430  } else if (unpin_block != NULL) {
431  // Got a new block without needing to transfer. Just unpin this block.
432  RETURN_IF_ERROR(unpin_block->Unpin());
433  }
434 
435  DCHECK(new_block == NULL || new_block->is_pinned());
436  *block = new_block;
437  return Status::OK;
438 }
439 
441  Status status = Status::OK;
442  // First write out the src block.
443  DCHECK(src->is_pinned_);
444  DCHECK(!dst->is_pinned_);
445  DCHECK(dst->buffer_desc_ == NULL);
446  DCHECK_EQ(src->buffer_desc_->len, max_block_size_);
447  src->is_pinned_ = false;
448 
449  if (unpin) {
450  unique_lock<mutex> lock(lock_);
451  src->client_local_ = true;
452  status = WriteUnpinnedBlock(src);
453  if (!status.ok()) {
454  // The transfer failed, return the buffer to src.
455  src->is_pinned_ = true;
456  return status;
457  }
458  // Wait for the write to complete.
459  while (src->in_write_ && !is_cancelled_) {
460  src->write_complete_cv_.wait(lock);
461  }
462  if (is_cancelled_) {
463  // We can't be sure the write succeeded, so return the buffer to src.
464  src->is_pinned_ = true;
465  return Status::CANCELLED;
466  }
467  DCHECK(!src->in_write_);
468  }
469  // Assign the buffer to the new block.
470  dst->buffer_desc_ = src->buffer_desc_;
471  dst->buffer_desc_->block = dst;
472  src->buffer_desc_ = NULL;
473  dst->is_pinned_ = true;
474  if (!unpin) {
475  src->is_deleted_ = true;
476  ReturnUnusedBlock(src);
477  }
478  return Status::OK;
479 }
480 
482  {
483  lock_guard<SpinLock> lock(static_block_mgrs_lock_);
484  DCHECK(query_to_block_mgrs_.find(query_id_) != query_to_block_mgrs_.end());
486  }
487 
489 
490  // If there are any outstanding writes and we are here it means that when the
491  // WriteComplete() callback gets executed it is going to access invalid memory.
492  // See IMPALA-1890.
493  DCHECK_EQ(non_local_outstanding_writes_, 0) << endl << DebugInternal();
494  // Delete tmp files.
495  BOOST_FOREACH(TmpFileMgr::File& file, tmp_files_) {
496  file.Remove();
497  }
498  tmp_files_.clear();
499 
500  // Free memory resources.
501  BOOST_FOREACH(BufferDescriptor* buffer, all_io_buffers_) {
502  mem_tracker_->Release(buffer->len);
503  delete[] buffer->buffer;
504  }
505  DCHECK_EQ(mem_tracker_->consumption(), 0);
506  mem_tracker_->UnregisterFromParent();
507  mem_tracker_.reset();
508 }
509 
511  return mem_tracker_->consumption();
512 }
513 
515  return client->num_pinned_buffers_;
516 }
517 
519  return max(client->num_reserved_buffers_ - client->num_pinned_buffers_, 0);
520 }
521 
523  return client->tracker_;
524 }
525 
526 // TODO: It would be good if we had a sync primitive that supports is_mine() calls, see
527 // IMPALA-1884.
529  if (block == NULL) {
530  lock_guard<mutex> lock(lock_);
532  }
533  return unpin ? block->Unpin() : block->Delete();
534 }
535 
536 Status BufferedBlockMgr::PinBlock(Block* block, bool* pinned, Block* release_block,
537  bool unpin) {
538  DCHECK_NOTNULL(block);
539  DCHECK(!block->is_deleted_);
540  *pinned = false;
541  if (block->is_pinned_) {
542  *pinned = true;
543  return DeleteOrUnpinBlock(release_block, unpin);
544  }
545 
546  bool in_mem = false;
547  RETURN_IF_ERROR(FindBufferForBlock(block, &in_mem));
548  *pinned = block->is_pinned_;
549 
550  // Block was not evicted or had no data, nothing left to do.
551  if (in_mem || block->valid_data_len_ == 0) {
552  return DeleteOrUnpinBlock(release_block, unpin);
553  }
554 
555  if (!block->is_pinned_) {
556  if (release_block == NULL) return Status::OK;
557 
558  if (block->buffer_desc_ != NULL) {
559  {
560  lock_guard<mutex> lock(lock_);
561  if (free_io_buffers_.Contains(block->buffer_desc_)) {
562  DCHECK(!block->is_pinned_ && !block->in_write_ &&
563  !unpinned_blocks_.Contains(block)) << endl << block->DebugString();
564  free_io_buffers_.Remove(block->buffer_desc_);
565  } else if (unpinned_blocks_.Contains(block)) {
566  unpinned_blocks_.Remove(block);
567  } else {
568  DCHECK(block->in_write_);
569  }
570  block->is_pinned_ = true;
571  *pinned = true;
572  block->client_->PinBuffer(block->buffer_desc_);
575  }
576  return DeleteOrUnpinBlock(release_block, unpin);
577  }
578 
579  RETURN_IF_ERROR(TransferBuffer(block, release_block, unpin));
580  DCHECK(!release_block->is_pinned_);
581  release_block = NULL; // Handled by transfer.
582  DCHECK(block->is_pinned_);
583  *pinned = true;
584  }
585 
586  // Read the block from disk if it was not in memory.
587  DCHECK(block->write_range_ != NULL) << block->DebugString() << endl << release_block;
589  // Create a ScanRange to perform the read.
590  DiskIoMgr::ScanRange* scan_range =
592  scan_range->Reset(NULL, block->write_range_->file(), block->write_range_->len(),
593  block->write_range_->offset(), block->write_range_->disk_id(), false, block,
595  vector<DiskIoMgr::ScanRange*> ranges(1, scan_range);
597 
598  // Read from the io mgr buffer into the block's assigned buffer.
599  int64_t offset = 0;
600  DiskIoMgr::BufferDescriptor* io_mgr_buffer;
601  do {
602  RETURN_IF_ERROR(scan_range->GetNext(&io_mgr_buffer));
603  memcpy(block->buffer() + offset, io_mgr_buffer->buffer(), io_mgr_buffer->len());
604  offset += io_mgr_buffer->len();
605  io_mgr_buffer->Return();
606  } while (!io_mgr_buffer->eosr());
607  DCHECK_EQ(offset, block->write_range_->len());
608 
609  // Verify integrity first, because the hash was generated from encrypted data.
611 
612  // Decryption is done in-place, since the buffer can't be accessed by anyone else.
613  if (encryption_) RETURN_IF_ERROR(Decrypt(block));
614 
615  return DeleteOrUnpinBlock(release_block, unpin);
616 }
617 
619  DCHECK(!block->is_deleted_) << "Unpin for deleted block.";
620 
621  lock_guard<mutex> unpinned_lock(lock_);
622  if (is_cancelled_) return Status::CANCELLED;
623  DCHECK(block->Validate()) << endl << block->DebugString();
624  if (!block->is_pinned_) return Status::OK;
625  DCHECK_EQ(block->buffer_desc_->len, max_block_size_) << "Can only unpin io blocks.";
626  DCHECK(Validate()) << endl << DebugInternal();
627  // Add 'block' to the list of unpinned blocks and set is_pinned_ to false.
628  // Cache its position in the list for later removal.
629  block->is_pinned_ = false;
630  DCHECK(!unpinned_blocks_.Contains(block)) << " Unpin for block in unpinned list";
631  if (!block->in_write_) unpinned_blocks_.Enqueue(block);
632  block->client_->UnpinBuffer(block->buffer_desc_);
635  }
638  DCHECK(Validate()) << endl << DebugInternal();
639  DCHECK(block->Validate()) << endl << block->DebugString();
640  return Status::OK;
641 }
642 
644  if (disable_spill_) return Status::OK;
645 
646  // Assumes block manager lock is already taken.
648  && !unpinned_blocks_.empty()) {
649  // Pop a block from the back of the list (LIFO).
650  Block* write_block = unpinned_blocks_.PopBack();
651  write_block->client_local_ = false;
652  RETURN_IF_ERROR(WriteUnpinnedBlock(write_block));
654  }
655  DCHECK(Validate()) << endl << DebugInternal();
656  return Status::OK;
657 }
658 
660  // Assumes block manager lock is already taken.
661  DCHECK(!block->is_pinned_) << block->DebugString();
662  DCHECK(!block->in_write_) << block->DebugString();
663  DCHECK_EQ(block->buffer_desc_->len, max_block_size_);
664 
665  if (block->write_range_ == NULL) {
666  if (tmp_files_.empty()) RETURN_IF_ERROR(InitTmpFiles());
667 
668  // First time the block is being persisted. Find the next physical file in
669  // round-robin order and create a write range for it.
672  int64_t file_offset;
673  RETURN_IF_ERROR(tmp_file.AllocateSpace(max_block_size_, &file_offset));
674  int disk_id = tmp_file.disk_id();
675  if (disk_id < 0) {
676  // Assign a valid disk id to the write range if the tmp file was not assigned one.
677  static unsigned int next_disk_id = 0;
678  disk_id = ++next_disk_id;
679  }
680  disk_id %= io_mgr_->num_local_disks();
682  bind(mem_fn(&BufferedBlockMgr::WriteComplete), this, block, _1);
684  tmp_file.path(), file_offset, disk_id, callback));
685  }
686 
687  uint8_t* outbuf = NULL;
688  if (encryption_) {
689  // The block->buffer() could be accessed during the write path, so we have to
690  // make a copy of it while writing.
691  RETURN_IF_ERROR(Encrypt(block, &outbuf));
692  } else {
693  outbuf = block->buffer();
694  }
695 
696  if (check_integrity_) SetHash(block);
697 
698  block->write_range_->SetData(outbuf, block->valid_data_len_);
699 
700  // Issue write through DiskIoMgr.
702  block->in_write_ = true;
703  DCHECK(block->Validate()) << endl << block->DebugString();
706  ++writes_issued_;
707  if (writes_issued_ == 1) {
710  }
711  }
712  return Status::OK;
713 }
714 
715 void BufferedBlockMgr::WriteComplete(Block* block, const Status& write_status) {
716  Status status = Status::OK;
717  lock_guard<mutex> lock(lock_);
719  DCHECK(Validate()) << endl << DebugInternal();
720  DCHECK(is_cancelled_ || block->in_write_) << "WriteComplete() for block not in write."
721  << endl << block->DebugString();
722  if (!block->client_local_) {
723  DCHECK_GT(non_local_outstanding_writes_, 0) << block->DebugString();
725  }
726  block->in_write_ = false;
727 
728  // Explicitly release our temporarily allocated buffer here so that it doesn't
729  // hang around needlessly.
730  if (encryption_) EncryptDone(block);
731 
732  // ReturnUnusedBlock() will clear the block, so save the client pointer.
733  RuntimeState* state = block->client_->state_;
734  // If the block was re-pinned when it was in the IOMgr queue, don't free it.
735  if (block->is_pinned_) {
736  // The number of outstanding writes has decreased but the number of free buffers
737  // hasn't.
738  DCHECK(!block->client_local_)
739  << "Client should be waiting. No one should have pinned this block.";
740  if (write_status.ok() && !is_cancelled_ && !state->is_cancelled()) {
741  status = WriteUnpinnedBlocks();
742  }
743  } else if (block->client_local_) {
744  DCHECK(!block->is_deleted_)
745  << "Client should be waiting. No one should have deleted this block.";
746  block->write_complete_cv_.notify_one();
747  } else {
748  DCHECK_EQ(block->buffer_desc_->len, max_block_size_)
749  << "Only io sized buffers should spill";
750  free_io_buffers_.Enqueue(block->buffer_desc_);
751  // Finish the DeleteBlock() work.
752  if (block->is_deleted_) {
753  block->buffer_desc_->block = NULL;
754  block->buffer_desc_ = NULL;
755  ReturnUnusedBlock(block);
756  }
757  // Multiple threads may be waiting for the same block in FindBuffer(). Wake them
758  // all up. One thread will get this block, and the others will re-evaluate whether
759  // they should continue waiting and if another write needs to be initiated.
760  buffer_available_cv_.notify_all();
761  }
762  DCHECK(Validate()) << endl << DebugInternal();
763 
764  if (!write_status.ok() || !status.ok() || is_cancelled_) {
765  // If the instance is already cancelled, don't confuse things with these errors.
766  if (!state->is_cancelled()) {
767  if (!write_status.ok()) {
768  VLOG_QUERY << "Query: " << query_id_ << " write complete callback with error.";
769  state->LogError(write_status.msg());
770  }
771  if (!status.ok()) {
772  VLOG_QUERY << "Query: " << query_id_ << " error while writing unpinned blocks.";
773  state->LogError(status.msg());
774  }
775  }
776  // Set cancelled and wake up waiting threads if an error occurred. Note that in
777  // the case of client_local_, that thread was woken up above.
778  is_cancelled_ = true;
779  buffer_available_cv_.notify_all();
780  }
781 }
782 
784  DCHECK(!block->is_deleted_);
785 
786  lock_guard<mutex> lock(lock_);
787  DCHECK(block->Validate()) << endl << DebugInternal();
788  block->is_deleted_ = true;
789 
790  if (block->is_pinned_) {
791  if (block->is_max_size()) --total_pinned_buffers_;
792  block->is_pinned_ = false;
793  block->client_->UnpinBuffer(block->buffer_desc_);
796  }
797  } else if (unpinned_blocks_.Contains(block)) {
798  // Remove block from unpinned list.
799  unpinned_blocks_.Remove(block);
800  }
801 
802  if (block->in_write_) {
803  DCHECK(block->buffer_desc_ != NULL && block->buffer_desc_->len == max_block_size_)
804  << "Should never be writing a small buffer";
805  // If a write is still pending, return. Cleanup will be done in WriteComplete().
806  DCHECK(block->Validate()) << endl << block->DebugString();
808  }
809 
810  if (block->buffer_desc_ != NULL) {
811  if (block->buffer_desc_->len != max_block_size_) {
812  // Just delete the block for now.
813  delete[] block->buffer_desc_->buffer;
814  block->client_->tracker_->Release(block->buffer_desc_->len);
815  } else if (!free_io_buffers_.Contains(block->buffer_desc_)) {
816  free_io_buffers_.Enqueue(block->buffer_desc_);
817  buffer_available_cv_.notify_one();
818  }
819  block->buffer_desc_->block = NULL;
820  block->buffer_desc_ = NULL;
821  }
822  ReturnUnusedBlock(block);
823  DCHECK(block->Validate()) << endl << block->DebugString();
824  DCHECK(Validate()) << endl << DebugInternal();
826 }
827 
829  DCHECK(block->is_deleted_) << block->DebugString();
830  DCHECK(!block->is_pinned_) << block->DebugString();;
831  DCHECK(block->buffer_desc_ == NULL);
832  block->Init();
833  unused_blocks_.Enqueue(block);
834 }
835 
837  DCHECK_NOTNULL(block);
838  Client* client = block->client_;
839  DCHECK_NOTNULL(client);
840  DCHECK(!block->is_pinned_ && !block->is_deleted_)
841  << "Pinned or deleted block " << endl << block->DebugString();
842  *in_mem = false;
843 
844  unique_lock<mutex> l(lock_);
845  if (is_cancelled_) return Status::CANCELLED;
846 
847  // First check if there is enough reserved memory to satisfy this request.
848  bool is_reserved_request = false;
849  if (client->num_pinned_buffers_ < client->num_reserved_buffers_) {
850  is_reserved_request = true;
851  } else if (client->num_tmp_reserved_buffers_ > 0) {
852  is_reserved_request = true;
853  --client->num_tmp_reserved_buffers_;
854  }
855 
856  DCHECK(Validate()) << endl << DebugInternal();
857  if (is_reserved_request) --unfullfilled_reserved_buffers_;
858 
859  if (!is_reserved_request && remaining_unreserved_buffers() < 1) {
860  // The client already has its quota and there are no unreserved blocks left.
861  // Note that even if this passes, it is still possible for the path below to
862  // see OOM because another query consumed memory from the process tracker. This
863  // only happens if the buffer has not already been allocated by the block mgr.
864  // This check should ensure that the memory cannot be consumed by another client
865  // of the block mgr.
866  return Status::OK;
867  }
868 
869  if (block->buffer_desc_ != NULL) {
870  // The block is in memory. It may be in 3 states:
871  // 1. In the unpinned list. The buffer will not be in the free list.
872  // 2. in_write_ == true. The buffer will not be in the free list.
873  // 3. The buffer is free, but hasn't yet been reassigned to a different block.
874  DCHECK(unpinned_blocks_.Contains(block) ||
875  block->in_write_ ||
876  free_io_buffers_.Contains(block->buffer_desc_));
877  if (unpinned_blocks_.Contains(block)) {
878  unpinned_blocks_.Remove(block);
879  DCHECK(!free_io_buffers_.Contains(block->buffer_desc_));
880  } else if (block->in_write_) {
881  DCHECK(block->in_write_ && !free_io_buffers_.Contains(block->buffer_desc_));
882  } else {
883  free_io_buffers_.Remove(block->buffer_desc_);
884  }
886  *in_mem = true;
887  } else {
888  BufferDescriptor* buffer_desc = NULL;
889  RETURN_IF_ERROR(FindBuffer(l, &buffer_desc));
890 
891  if (buffer_desc == NULL) {
892  // There are no free buffers or blocks we can evict. We need to fail this request.
893  // If this is an optional request, return OK. If it is required, return OOM.
894  if (!is_reserved_request) return Status::OK;
895  if (VLOG_QUERY_IS_ON) {
896  stringstream ss;
897  ss << "Query id=" << query_id_ << " was unable to get minimum required buffers."
898  << endl << DebugInternal() << endl << client->DebugString()
899  << endl << GetStackTrace();
900  VLOG_QUERY << ss.str();
901  }
903  status.AddDetail("Query did not have enough memory to get the minimum required "
904  "buffers in the block manager.");
905  return status;
906  }
907 
908  DCHECK_NOTNULL(buffer_desc);
909  if (buffer_desc->block != NULL) {
910  // This buffer was assigned to a block but now we are reusing it. Reset the
911  // previous block->buffer link.
912  DCHECK(buffer_desc->block->Validate()) << endl << buffer_desc->block->DebugString();
913  buffer_desc->block->buffer_desc_ = NULL;
914  }
915  buffer_desc->block = block;
916  block->buffer_desc_ = buffer_desc;
917  }
918  DCHECK_NOTNULL(block->buffer_desc_);
919  block->is_pinned_ = true;
920  client->PinBuffer(block->buffer_desc_);
922 
923  DCHECK(block->Validate()) << endl << block->DebugString();
924  // The number of free buffers has decreased. Write unpinned blocks if the number
925  // of free buffers below the threshold is reached.
927  DCHECK(Validate()) << endl << DebugInternal();
928  return Status::OK;
929 }
930 
931 // We need to find a new buffer. We prefer getting this buffer in this order:
932 // 1. Allocate a new block if the number of free blocks is less than the write
933 // threshold, until we run out of memory.
934 // 2. Pick a buffer from the free list.
935 // 3. Wait and evict an unpinned buffer.
936 Status BufferedBlockMgr::FindBuffer(unique_lock<mutex>& lock,
937  BufferDescriptor** buffer_desc) {
938  *buffer_desc = NULL;
939 
940  // First, try to allocate a new buffer.
942  mem_tracker_->TryConsume(max_block_size_)) {
943  uint8_t* new_buffer = new uint8_t[max_block_size_];
944  *buffer_desc = obj_pool_.Add(new BufferDescriptor(new_buffer, max_block_size_));
945  (*buffer_desc)->all_buffers_it = all_io_buffers_.insert(
946  all_io_buffers_.end(), *buffer_desc);
947  return Status::OK;
948  }
949 
950  // Second, try to pick a buffer from the free list.
951  if (free_io_buffers_.empty()) {
952  // There are no free buffers. If spills are disabled or there no unpinned blocks we
953  // can write, return. We can't get a buffer.
954  if (disable_spill_) {
955  return Status("Spilling has been disabled for plans that do not have stats and "
956  "are not hinted to prevent potentially bad plans from using too many cluster "
957  "resources. Compute stats on these tables, hint the plan or disable this "
958  "behavior via query options to enable spilling.");
959  }
960 
961  // Third, this block needs to use a buffer that was unpinned from another block.
962  // Get a free buffer from the front of the queue and assign it to the block.
963  do {
964  if (unpinned_blocks_.empty() && non_local_outstanding_writes_ == 0) {
965  return Status::OK;
966  }
968  // Try to evict unpinned blocks before waiting.
970  DCHECK_GT(non_local_outstanding_writes_, 0) << endl << DebugInternal();
971  buffer_available_cv_.wait(lock);
972  if (is_cancelled_) return Status::CANCELLED;
973  } while (free_io_buffers_.empty());
974  }
975  *buffer_desc = free_io_buffers_.Dequeue();
976  return Status::OK;
977 }
978 
980  DCHECK_NOTNULL(client);
981  Block* new_block = NULL;
982  if (unused_blocks_.empty()) {
983  new_block = obj_pool_.Add(new Block(this));
984  new_block->Init();
986  } else {
987  new_block = unused_blocks_.Dequeue();
989  }
990  DCHECK_NOTNULL(new_block);
991  new_block->client_ = client;
992  return new_block;
993 }
994 
996  int num_free_io_buffers = 0;
997 
998  if (total_pinned_buffers_ < 0) {
999  LOG(ERROR) << "total_pinned_buffers_ < 0: " << total_pinned_buffers_;
1000  return false;
1001  }
1002 
1003  BOOST_FOREACH(BufferDescriptor* buffer, all_io_buffers_) {
1004  bool is_free = free_io_buffers_.Contains(buffer);
1005  num_free_io_buffers += is_free;
1006 
1007  if (*buffer->all_buffers_it != buffer) {
1008  LOG(ERROR) << "All buffers list is corrupt. Buffer iterator is not valid.";
1009  return false;
1010  }
1011 
1012  if (buffer->block == NULL && !is_free) {
1013  LOG(ERROR) << "Buffer with no block not in free list." << endl << DebugInternal();
1014  return false;
1015  }
1016 
1017  if (buffer->len != max_block_size_) {
1018  LOG(ERROR) << "Non-io sized buffers should not end up on free list.";
1019  return false;
1020  }
1021 
1022  if (buffer->block != NULL) {
1023  if (!buffer->block->Validate()) {
1024  LOG(ERROR) << "buffer->block inconsistent."
1025  << endl << buffer->block->DebugString();
1026  return false;
1027  }
1028 
1029  if (is_free && (buffer->block->is_pinned_ || buffer->block->in_write_ ||
1030  unpinned_blocks_.Contains(buffer->block))) {
1031  LOG(ERROR) << "Block with buffer in free list and"
1032  << " is_pinned_ = " << buffer->block->is_pinned_
1033  << " in_write_ = " << buffer->block->in_write_
1034  << " Unpinned_blocks_.Contains = "
1035  << unpinned_blocks_.Contains(buffer->block)
1036  << endl << buffer->block->DebugString();
1037  return false;
1038  }
1039  }
1040  }
1041 
1042  if (free_io_buffers_.size() != num_free_io_buffers) {
1043  LOG(ERROR) << "free_buffer_list_ inconsistency."
1044  << " num_free_io_buffers = " << num_free_io_buffers
1045  << " free_io_buffers_.size() = " << free_io_buffers_.size()
1046  << endl << DebugInternal();
1047  return false;
1048  }
1049 
1050  Block* block = unpinned_blocks_.head();
1051  while (block != NULL) {
1052  if (!block->Validate()) {
1053  LOG(ERROR) << "Block inconsistent in unpinned list."
1054  << endl << block->DebugString();
1055  return false;
1056  }
1057 
1058  if (block->in_write_ || free_io_buffers_.Contains(block->buffer_desc_)) {
1059  LOG(ERROR) << "Block in unpinned list with"
1060  << " in_write_ = " << block->in_write_
1061  << " free_io_buffers_.Contains = "
1062  << free_io_buffers_.Contains(block->buffer_desc_)
1063  << endl << block->DebugString();
1064  return false;
1065  }
1066  block = block->Next();
1067  }
1068 
1069  // Check if we're writing blocks when the number of free buffers falls below
1070  // threshold. We don't write blocks after cancellation.
1071  if (!is_cancelled_ && !unpinned_blocks_.empty() && !disable_spill_ &&
1074  // TODO: this isn't correct when WriteUnpinnedBlocks() fails during the call to
1075  // WriteUnpinnedBlock() so just log the condition but don't return false. Figure
1076  // out a way to re-enable this change?
1077  LOG(ERROR) << "Missed writing unpinned blocks";
1078  }
1079  return true;
1080 }
1081 
1083  stringstream ss;
1084  unique_lock<mutex> l(lock_);
1085  ss << DebugInternal();
1086  if (client != NULL) ss << endl << client->DebugString();
1087  return ss.str();
1088 }
1089 
1091  stringstream ss;
1092  ss << "Buffered block mgr" << endl
1093  << " Num writes outstanding: " << outstanding_writes_counter_->value() << endl
1094  << " Num free io buffers: " << free_io_buffers_.size() << endl
1095  << " Num unpinned blocks: " << unpinned_blocks_.size() << endl
1096  << " Num available buffers: " << remaining_unreserved_buffers() << endl
1097  << " Total pinned buffers: " << total_pinned_buffers_ << endl
1098  << " Unfullfilled reserved buffers: " << unfullfilled_reserved_buffers_ << endl
1099  << " Remaining memory: " << mem_tracker_->SpareCapacity()
1100  << " (#blocks=" << (mem_tracker_->SpareCapacity() / max_block_size_) << ")" << endl
1101  << " Block write threshold: " << block_write_threshold_;
1102  return ss.str();
1103 }
1104 
1105 void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, RuntimeProfile* parent_profile,
1106  MemTracker* parent_tracker, int64_t mem_limit) {
1107  unique_lock<mutex> l(lock_);
1108  if (initialized_) return;
1109 
1111  if (encryption_) {
1112  // Seed the random number generator
1113  // TODO: Try non-blocking read from /dev/random and add that, too.
1114  // TODO: We don't need to re-seed every BufferedBlockMgr::Init. Consider doing this
1115  // every X iterations.
1116  RAND_load_file("/dev/urandom", 4096);
1117  }
1118 
1119  profile_.reset(new RuntimeProfile(&obj_pool_, "BlockMgr"));
1120  parent_profile->AddChild(profile_.get());
1121 
1122  mem_limit_counter_ = ADD_COUNTER(profile_.get(), "MemoryLimit", TUnit::BYTES);
1123  mem_limit_counter_->Set(mem_limit);
1124  block_size_counter_ = ADD_COUNTER(profile_.get(), "MaxBlockSize", TUnit::BYTES);
1127  profile_.get(), "BlocksCreated", TUnit::UNIT);
1129  profile_.get(), "BlocksRecycled", TUnit::UNIT);
1131  profile_.get(), "BytesWritten", TUnit::BYTES);
1133  ADD_COUNTER(profile_.get(), "BlockWritesOutstanding", TUnit::UNIT);
1134  buffered_pin_counter_ = ADD_COUNTER(profile_.get(), "BufferedPins", TUnit::UNIT);
1135  disk_read_timer_ = ADD_TIMER(profile_.get(), "TotalReadBlockTime");
1136  buffer_wait_timer_ = ADD_TIMER(profile_.get(), "TotalBufferWaitTime");
1137  encryption_timer_ = ADD_TIMER(profile_.get(), "TotalEncryptionTime");
1138  integrity_check_timer_ = ADD_TIMER(profile_.get(), "TotalIntegrityCheckTime");
1139 
1140  // Create a new mem_tracker and allocate buffers.
1141  mem_tracker_.reset(new MemTracker(
1142  profile(), mem_limit, -1, "Block Manager", parent_tracker));
1143 
1144  initialized_ = true;
1145 }
1146 
1148  DCHECK(tmp_files_.empty());
1149 
1150  // Initialize the tmp files and the initial file to use.
1151  int num_tmp_devices = TmpFileMgr::num_tmp_devices();
1152  if (num_tmp_devices == 0) {
1153  return Status(
1154  "No spilling directories configured. Cannot spill. Set --scratch_dirs.");
1155  }
1156 
1157  tmp_files_.reserve(num_tmp_devices);
1158  for (int i = 0; i < num_tmp_devices; ++i) {
1159  TmpFileMgr::File* tmp_file;
1161  tmp_files_.push_back(tmp_file);
1162  }
1163  next_block_index_ = rand() % num_tmp_devices;
1164  return Status::OK;
1165 }
1166 
1167 // Callback used by OpenSSLErr() - write the error given to us through buf to the
1168 // stringstream that's passed in through ctx.
1169 static int OpenSSLErrCallback(const char *buf, size_t len, void* ctx) {
1170  stringstream* errstream = static_cast<stringstream*>(ctx);
1171  *errstream << buf;
1172  return 1;
1173 }
1174 
1175 // Called upon OpenSSL errors; returns a non-OK status with an error message.
1176 static Status OpenSSLErr(const string& msg) {
1177  stringstream errstream;
1178  errstream << msg << ": ";
1179  ERR_print_errors_cb (OpenSSLErrCallback, &errstream);
1180  return Status(Substitute("Openssl Error: $0", errstream.str()));
1181 }
1182 
1183 Status BufferedBlockMgr::Encrypt(Block* block, uint8_t** outbuf) {
1184  DCHECK(encryption_);
1185  DCHECK(block->buffer());
1186  DCHECK(!block->is_pinned_);
1187  DCHECK(!block->in_write_);
1188  DCHECK(outbuf);
1190 
1191  // Since we're using AES-CFB mode, we must take care not to reuse a key/iv pair.
1192  // Regenerate a new key and iv for every block of data we write, including between
1193  // writes of the same Block.
1194  RAND_bytes(block->key_, sizeof(block->key_));
1195  RAND_bytes(block->iv_, sizeof(block->iv_));
1196  block->encrypted_write_buffer_.reset(new uint8_t[block->valid_data_len_]);
1197 
1198  EVP_CIPHER_CTX ctx;
1199  int len = static_cast<int>(block->valid_data_len_);
1200 
1201  // Create and initialize the context for encryption
1202  EVP_CIPHER_CTX_init(&ctx);
1203  EVP_CIPHER_CTX_set_padding(&ctx, 0);
1204 
1205  // Start encryption. We use a 256-bit AES key, and the cipher block mode
1206  // is CFB because this gives us a stream cipher, which supports arbitrary
1207  // length ciphertexts - it doesn't have to be a multiple of 16 bytes.
1208  if (EVP_EncryptInit_ex(&ctx, EVP_aes_256_cfb(), NULL, block->key_, block->iv_) != 1) {
1209  return OpenSSLErr("EVP_EncryptInit_ex failure");
1210  }
1211 
1212  // Encrypt block->buffer() into the new encrypted_write_buffer_
1213  if (EVP_EncryptUpdate(&ctx, block->encrypted_write_buffer_.get(), &len,
1214  block->buffer(), len) != 1) {
1215  return OpenSSLErr("EVP_EncryptUpdate failure");
1216  }
1217 
1218  // This is safe because we're using CFB mode without padding.
1219  DCHECK_EQ(len, block->valid_data_len_);
1220 
1221  // Finalize encryption.
1222  if (1 != EVP_EncryptFinal_ex(&ctx, block->encrypted_write_buffer_.get() + len, &len)) {
1223  return OpenSSLErr("EVP_EncryptFinal failure");
1224  }
1225 
1226  // Again safe due to CFB with no padding
1227  DCHECK_EQ(len, 0);
1228 
1229  *outbuf = block->encrypted_write_buffer_.get();
1230  return Status::OK;
1231 }
1232 
1234  DCHECK(encryption_);
1235  DCHECK(block->encrypted_write_buffer_.get());
1236  block->encrypted_write_buffer_.reset();
1237 }
1238 
1240  DCHECK(encryption_);
1241  DCHECK(block->buffer());
1243 
1244  EVP_CIPHER_CTX ctx;
1245  int len = static_cast<int>(block->valid_data_len_);
1246 
1247  // Create and initialize the context for encryption
1248  EVP_CIPHER_CTX_init(&ctx);
1249  EVP_CIPHER_CTX_set_padding(&ctx, 0);
1250 
1251  // Start decryption; same parameters as encryption for obvious reasons
1252  if (EVP_DecryptInit_ex(&ctx, EVP_aes_256_cfb(), NULL, block->key_, block->iv_) != 1) {
1253  return OpenSSLErr("EVP_DecryptInit_ex failure");
1254  }
1255 
1256  // Decrypt block->buffer() in-place. Safe because no one is accessing it.
1257  if (EVP_DecryptUpdate(&ctx, block->buffer(), &len, block->buffer(), len) != 1) {
1258  return OpenSSLErr("EVP_DecryptUpdate failure");
1259  }
1260 
1261  // This is safe because we're using CFB mode without padding.
1262  DCHECK_EQ(len, block->valid_data_len_);
1263 
1264  // Finalize decryption.
1265  if (1 != EVP_DecryptFinal_ex(&ctx, block->buffer() + len, &len)) {
1266  return OpenSSLErr("EVP_DecryptFinal failure");
1267  }
1268 
1269  // Again safe due to CFB with no padding
1270  DCHECK_EQ(len, 0);
1271 
1272  return Status::OK;
1273 }
1274 
1276  DCHECK(check_integrity_);
1278  uint8_t* data = NULL;
1279  if (encryption_) {
1280  DCHECK(block->encrypted_write_buffer_.get());
1281  data = block->encrypted_write_buffer_.get();
1282  } else {
1283  DCHECK(block->buffer());
1284  data = block->buffer();
1285  }
1286  // Explicitly ignore the return value from SHA256(); it can't fail.
1287  (void) SHA256(data, block->valid_data_len_, block->hash_);
1288 }
1289 
1291  DCHECK(check_integrity_);
1292  DCHECK(block->buffer());
1294  uint8_t test_hash[SHA256_DIGEST_LENGTH];
1295  (void) SHA256(block->buffer(), block->valid_data_len_, test_hash);
1296  if (memcmp(test_hash, block->hash_, SHA256_DIGEST_LENGTH) != 0) {
1297  return Status("Block verification failure");
1298  }
1299  return Status::OK;
1300 }
1301 
1302 } // namespace impala
virtual int64_t value() const
Status FindBufferForBlock(Block *block, bool *in_mem)
bool initialized_
If true, Init() has been called.
const int64_t max_block_size_
Size of the largest/default block in bytes.
Descriptor for a single memory buffer in the pool.
int total_pinned_buffers_
The total number of pinned buffers across all clients.
static Status OpenSSLErr(const string &msg)
RuntimeProfile::Counter * integrity_check_timer_
Time spent in disk spill integrity generation and checking.
void CancelContext(RequestContext *context, bool wait_for_disks_completion=false)
Definition: disk-io-mgr.cc:377
int num_reserved_buffers_remaining(Client *client) const
const TUniqueId & query_id() const
int64_t remaining_unreserved_buffers() const
int writes_issued_
Number of writes issued.
Client(BufferedBlockMgr *mgr, int num_reserved_buffers, MemTracker *tracker, RuntimeState *state)
boost::condition_variable buffer_available_cv_
Signal availability of free buffers.
Status UnpinBlock(Block *block)
MemTracker tracker
boost::function< void(const Status &)> WriteDoneCallback
Definition: disk-io-mgr.h:464
Status MemLimitTooLowError(Client *client)
std::list< BufferDescriptor * >::iterator all_buffers_it
Iterator into all_io_buffers_ for this buffer.
DiskIoMgr::WriteRange * write_range_
bool TryConsume(int64_t bytes)
Definition: mem-tracker.h:163
Status TransferBuffer(Block *dst, Block *src, bool unpin)
static int num_tmp_devices()
Definition: tmp-file-mgr.h:87
static Status GetFile(int tmp_device_id, const TUniqueId &query_id, File **new_file)
Definition: tmp-file-mgr.cc:88
Status Pin(bool *pinned, Block *release_block=NULL, bool unpin=true)
RuntimeProfile::Counter * buffered_pin_counter_
Number of Pin() calls that did not require a disk read.
RuntimeProfile::Counter * mem_limit_counter_
These have a fixed value for the lifetime of the manager and show memory usage.
const TUniqueId & query_id() const
Definition: coordinator.h:152
Status DeleteOrUnpinBlock(Block *block, bool unpin)
MemTracker * get_tracker(Client *client) const
Status AllocateSpace(int64_t write_size, int64_t *offset)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
void ConsumeLocal(int64_t bytes, MemTracker *end_tracker)
Definition: mem-tracker.h:145
void ReleaseMemory(Client *client, int64_t size)
int num_pinned_buffers(Client *client) const
Status AddWriteRange(RequestContext *writer, WriteRange *write_range)
Status GetNewBlock(Client *client, Block *unpin_block, Block **block, int64_t len=-1)
#define ADD_TIMER(profile, name)
Status RegisterClient(int num_reserved_buffers, MemTracker *tracker, RuntimeState *state, Client **client)
RuntimeProfile::Counter * created_block_counter_
Total number of blocks created.
bool ConsumeMemory(Client *client, int64_t size)
void Init()
Initialize the state of a block and set the number of bytes allocated to 0.
int64_t max_block_size() const
RuntimeProfile::Counter * disk_read_timer_
Time taken for disk reads.
void UnregisterContext(RequestContext *context)
Definition: disk-io-mgr.cc:344
const char * file() const
Definition: disk-io-mgr.h:266
boost::condition_variable write_complete_cv_
Status AddScanRanges(RequestContext *reader, const std::vector< ScanRange * > &ranges, bool schedule_immediately=false)
Definition: disk-io-mgr.cc:455
void AddDetail(const std::string &msg)
Add a detail string. Calling this method is only defined on a non-OK message.
Definition: status.cc:166
RuntimeProfile::Counter * buffer_wait_timer_
Time spent waiting for a free buffer.
RuntimeProfile::Counter * encryption_timer_
Time spent in disk spill encryption and decryption.
BufferedBlockMgr(RuntimeState *state, int64_t block_size)
uint8_t key_[32]
If encryption_ is on, a AES 256-bit key. Regenerated on each write.
bool is_pinned_
is_pinned_ is true while the block is pinned by a client.
virtual void Set(int64_t value)
const std::string & path() const
Definition: tmp-file-mgr.h:44
#define SCOPED_TIMER(c)
void ClearTmpReservation(Client *client)
Sets tmp reservation to 0 on this client.
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
int unfullfilled_reserved_buffers_
The total number of reserved buffers across all clients that are not pinned.
RuntimeProfile * profile()
void SetData(const uint8_t *buffer, int64_t len)
Definition: disk-io-mgr.cc:205
void ClearReservations(Client *client)
Clears all reservations for this client.
DiskIoMgr * io_mgr_
DiskIoMgr handles to read and write blocks.
boost::scoped_ptr< RuntimeProfile > profile_
Counters and timers to track behavior.
#define VLOG_QUERY
Definition: logging.h:57
void UnpinBuffer(BufferDescriptor *buffer)
bool LogError(const ErrorMsg &msg)
int num_local_disks() const
Returns the number of local disks attached to the system.
Definition: disk-io-mgr.h:599
void EncryptDone(Block *block)
Deallocates temporary buffer alloced in Encrypt().
InternalQueue< Block > unused_blocks_
std::string DebugInternal() const
static Status Create(RuntimeState *state, MemTracker *parent, RuntimeProfile *profile, int64_t mem_limit, int64_t buffer_size, boost::shared_ptr< BufferedBlockMgr > *block_mgr)
std::string DebugString() const
Debug helper method to print the state of a block.
int64_t valid_data_len_
Length of valid (i.e. allocated) data within the block.
static IntCounter * NUM_QUERIES_SPILLED
int64_t available_buffers(Client *client) const
#define ADD_COUNTER(profile, name, unit)
DEFINE_bool(disk_spill_encryption, false,"Set this to encrypt and perform an integrity ""check on all data spilled to disk during a query")
Status Encrypt(Block *block, uint8_t **outbuf)
uint8_t hash_[SHA256_DIGEST_LENGTH]
static int Ceil(int value, int divisor)
Returns the ceil of value/divisor.
Definition: bit-util.h:32
InternalQueue< BufferDescriptor > free_io_buffers_
void Reset(hdfsFS fs, const char *file, int64_t len, int64_t offset, int disk_id, bool try_cache, bool expected_local, int64_t mtime, void *metadata=NULL)
This class is thread-safe.
Definition: mem-tracker.h:61
void Release(int64_t bytes)
Decreases consumption of this tracker and its ancestors by 'bytes'.
Definition: mem-tracker.h:209
Block * GetUnusedBlock(Client *client)
static const Status CANCELLED
Definition: status.h:88
Status Decrypt(Block *block)
Decrypts the contents of buffer() in place.
Status WriteUnpinnedBlock(Block *block)
Issues the write for this block to the DiskIoMgr.
Status VerifyHash(Block *block)
Verifies that the contents of buffer() match those that were set by SetHash()
bool TryAcquireTmpReservation(Client *client, int num_buffers)
static const int64_t NEVER_CACHE
If the mtime is set to NEVER_CACHE, the file handle should never be cached.
Definition: disk-io-mgr.h:299
void Init(DiskIoMgr *io_mgr, RuntimeProfile *profile, MemTracker *parent_tracker, int64_t mem_limit)
Initializes the block mgr. Idempotent and thread-safe.
static const Status MEM_LIMIT_EXCEEDED
Definition: status.h:89
bool is_cancelled() const
bool is_deleted_
True if the block is deleted by the client.
Status FindBuffer(boost::unique_lock< boost::mutex > &lock, BufferDescriptor **buffer)
Status RegisterContext(RequestContext **request_context, MemTracker *reader_mem_tracker=NULL)
Definition: disk-io-mgr.cc:336
static SpinLock static_block_mgrs_lock_
Protects query_to_block_mgrs_.
boost::ptr_vector< TmpFileMgr::File > tmp_files_
Client * client_
The client that owns this block.
Status GetNext(BufferDescriptor **buffer)
DiskIoMgr::RequestContext * io_request_context_
T * Next() const
Returns the Next/Prev node or NULL if this is the end/front.
Block * block
Block that this buffer is assigned to. May be NULL.
static const Status OK
Definition: status.h:87
Status PinBlock(Block *block, bool *pinned, Block *src, bool unpin)
void WriteComplete(Block *block, const Status &write_status)
uint8_t offset[7 *64-sizeof(uint64_t)]
static int OpenSSLErrCallback(const char *buf, size_t len, void *ctx)
RuntimeProfile::Counter * recycled_blocks_counter_
Number of deleted blocks reused.
std::string DebugString(Client *client=NULL)
Dumps block mgr state. Grabs lock. If client is not NULL, also dumps its state.
const ErrorMsg & msg() const
Returns the error message associated with a non-successful status.
Definition: status.h:189
string GetStackTrace()
Definition: debug-util.cc:246
virtual void Add(int64_t delta)
RuntimeProfile::Counter * block_size_counter_
int64_t bytes_allocated() const
void AddChild(RuntimeProfile *child, bool indent=true, RuntimeProfile *location=NULL)
void SetHash(Block *block)
Takes a cryptographic hash of the data and sets hash_ with it.
bool ok() const
Definition: status.h:172
boost::scoped_ptr< MemTracker > mem_tracker_
Track buffers allocated by the block manager.
void ReleaseLocal(int64_t bytes, MemTracker *end_tracker)
Definition: mem-tracker.h:155
DiskIoMgr * io_mgr()
void ReturnUnusedBlock(Block *block)
void PinBuffer(BufferDescriptor *buffer)
RuntimeProfile::Counter * outstanding_writes_counter_
Number of writes outstanding (issued but not completed).
std::list< BufferDescriptor * > all_io_buffers_
All allocated io-sized buffers.
boost::scoped_array< uint8_t > encrypted_write_buffer_
RuntimeProfile::Counter * bytes_written_counter_
Number of bytes written to disk (includes writes still queued in the IO manager). ...
InternalQueue< Block > unpinned_blocks_
Status DeleteBlock(Block *block)
#define VLOG_QUERY_IS_ON
Definition: logging.h:64
bool Validate() const
Used to debug the state of the block manager. Lock must already be taken.
static BlockMgrsMap query_to_block_mgrs_