26 #include <openssl/rand.h>
27 #include <openssl/evp.h>
28 #include <openssl/sha.h>
29 #include <openssl/err.h>
31 #include <gutil/strings/substitute.h>
33 DEFINE_bool(disk_spill_encryption,
false,
"Set this to encrypt and perform an integrity "
34 "check on all data spilled to disk during a query");
38 using namespace strings;
42 BufferedBlockMgr::BlockMgrsMap BufferedBlockMgr::query_to_block_mgrs_;
43 SpinLock BufferedBlockMgr::static_block_mgrs_lock_;
51 query_tracker_(mgr_->mem_tracker_->parent()),
52 num_reserved_buffers_(num_reserved_buffers),
53 num_tmp_reserved_buffers_(0),
54 num_pinned_buffers_(0) {
86 DCHECK_NOTNULL(buffer);
87 if (buffer->
len == mgr_->max_block_size()) {
88 ++num_pinned_buffers_;
89 if (tracker_ != NULL) tracker_->ConsumeLocal(buffer->
len, query_tracker_);
94 DCHECK_NOTNULL(buffer);
95 if (buffer->
len == mgr_->max_block_size()) {
96 DCHECK_GT(num_pinned_buffers_, 0);
97 --num_pinned_buffers_;
98 if (tracker_ != NULL) tracker_->ReleaseLocal(buffer->
len, query_tracker_);
104 ss <<
"Client " <<
this << endl
105 <<
" num_reserved_buffers=" << num_reserved_buffers_ << endl
106 <<
" num_tmp_reserved_buffers=" << num_tmp_reserved_buffers_ << endl
107 <<
" num_pinned_buffers=" << num_pinned_buffers_;
114 : buffer_desc_(NULL),
115 block_mgr_(block_mgr),
123 return block_mgr_->PinBlock(
this, pinned, release_block, unpin);
127 return block_mgr_->UnpinBlock(
this);
131 return block_mgr_->DeleteBlock(
this);
145 if (is_deleted_ && (is_pinned_ || (!in_write_ && buffer_desc_ != NULL))) {
146 LOG(ERROR) <<
"Deleted block in use - " <<
DebugString();
150 if (buffer_desc_ == NULL && (is_pinned_ || in_write_)) {
151 LOG(ERROR) <<
"Block without buffer in use - " <<
DebugString();
155 if (buffer_desc_ == NULL && block_mgr_->unpinned_blocks_.Contains(
this)) {
156 LOG(ERROR) <<
"Unpersisted block without buffer - " <<
DebugString();
160 if (buffer_desc_ != NULL && (buffer_desc_->block !=
this)) {
161 LOG(ERROR) <<
"Block buffer inconsistency - " <<
DebugString();
170 ss <<
"Block: " <<
this << endl
171 <<
" Buffer Desc: " << buffer_desc_ << endl
172 <<
" Data Len: " << valid_data_len_ << endl
173 <<
" Num Rows: " << num_rows_ << endl;
174 if (is_pinned_) ss <<
" Buffer Len: " << buffer_len() << endl;
175 ss <<
" Deleted: " << is_deleted_ << endl
176 <<
" Pinned: " << is_pinned_ << endl
177 <<
" Write Issued: " << in_write_ << endl
178 <<
" Client Local: " << client_local_;
201 shared_ptr<BufferedBlockMgr>* block_mgr) {
202 DCHECK_NOTNULL(parent);
208 if (*block_mgr == NULL) {
217 (*block_mgr)->Init(state->
io_mgr(),
profile, parent, mem_limit);
237 DCHECK_GE(num_reserved_buffers, 0);
238 lock_guard<mutex> lock(
lock_);
246 lock_guard<mutex> lock(
lock_);
258 lock_guard<mutex> lock(
lock_);
264 if (num_buffers < 0)
return true;
273 lock_guard<mutex> lock(
lock_);
280 DCHECK_GT(buffers_needed, 0) <<
"Trying to consume 0 memory";
281 unique_lock<mutex> lock(
lock_);
302 int additional_tmp_reservations = 0;
312 int buffers_acquired = 0;
316 if (buffer_desc == NULL)
break;
319 delete[] buffer_desc->
buffer;
321 }
while (buffers_acquired != buffers_needed);
327 if (buffers_acquired != buffers_needed || !status.
ok()) {
332 if (buffers_acquired < additional_tmp_reservations) {
335 (additional_tmp_reservations - buffers_acquired);
337 (additional_tmp_reservations - buffers_acquired);
361 lock_guard<mutex> lock(
lock_);
372 status.
AddDetail(Substitute(
"The memory limit is set too low initialize the"
373 " spilling operator. The minimum required memory to spill this operator is $0.",
384 DCHECK_LE(len,
max_block_size_) <<
"Cannot request block bigger than max_len";
385 DCHECK_NE(len, 0) <<
"Cannot request block of zero size";
387 Block* new_block = NULL;
390 lock_guard<mutex> lock(
lock_);
394 DCHECK_EQ(new_block->
client_, client);
397 DCHECK(unpin_block == NULL);
400 uint8_t* buffer =
new uint8_t[len];
417 DCHECK(!in_mem) <<
"A new block cannot start in mem.";
420 if (unpin_block == NULL) {
430 }
else if (unpin_block != NULL) {
435 DCHECK(new_block == NULL || new_block->
is_pinned());
450 unique_lock<mutex> lock(
lock_);
530 lock_guard<mutex> lock(
lock_);
538 DCHECK_NOTNULL(block);
560 lock_guard<mutex> lock(
lock_);
581 release_block = NULL;
595 vector<DiskIoMgr::ScanRange*> ranges(1, scan_range);
604 offset += io_mgr_buffer->
len();
606 }
while (!io_mgr_buffer->
eosr());
619 DCHECK(!block->
is_deleted_) <<
"Unpin for deleted block.";
621 lock_guard<mutex> unpinned_lock(
lock_);
630 DCHECK(!
unpinned_blocks_.Contains(block)) <<
" Unpin for block in unpinned list";
674 int disk_id = tmp_file.
disk_id();
677 static unsigned int next_disk_id = 0;
678 disk_id = ++next_disk_id;
684 tmp_file.
path(), file_offset, disk_id, callback));
687 uint8_t* outbuf = NULL;
717 lock_guard<mutex> lock(
lock_);
739 <<
"Client should be waiting. No one should have pinned this block.";
745 <<
"Client should be waiting. No one should have deleted this block.";
749 <<
"Only io sized buffers should spill";
767 if (!write_status.
ok()) {
786 lock_guard<mutex> lock(
lock_);
804 <<
"Should never be writing a small buffer";
837 DCHECK_NOTNULL(block);
839 DCHECK_NOTNULL(client);
841 <<
"Pinned or deleted block " << endl << block->
DebugString();
844 unique_lock<mutex> l(
lock_);
848 bool is_reserved_request =
false;
850 is_reserved_request =
true;
852 is_reserved_request =
true;
891 if (buffer_desc == NULL) {
897 ss <<
"Query id=" <<
query_id_ <<
" was unable to get minimum required buffers."
903 status.
AddDetail(
"Query did not have enough memory to get the minimum required "
904 "buffers in the block manager.");
908 DCHECK_NOTNULL(buffer_desc);
909 if (buffer_desc->
block != NULL) {
915 buffer_desc->
block = block;
955 return Status(
"Spilling has been disabled for plans that do not have stats and "
956 "are not hinted to prevent potentially bad plans from using too many cluster "
957 "resources. Compute stats on these tables, hint the plan or disable this "
958 "behavior via query options to enable spilling.");
980 DCHECK_NOTNULL(client);
981 Block* new_block = NULL;
990 DCHECK_NOTNULL(new_block);
996 int num_free_io_buffers = 0;
1005 num_free_io_buffers += is_free;
1008 LOG(ERROR) <<
"All buffers list is corrupt. Buffer iterator is not valid.";
1012 if (buffer->
block == NULL && !is_free) {
1013 LOG(ERROR) <<
"Buffer with no block not in free list." << endl <<
DebugInternal();
1018 LOG(ERROR) <<
"Non-io sized buffers should not end up on free list.";
1022 if (buffer->
block != NULL) {
1024 LOG(ERROR) <<
"buffer->block inconsistent."
1031 LOG(ERROR) <<
"Block with buffer in free list and"
1034 <<
" Unpinned_blocks_.Contains = "
1043 LOG(ERROR) <<
"free_buffer_list_ inconsistency."
1044 <<
" num_free_io_buffers = " << num_free_io_buffers
1051 while (block != NULL) {
1053 LOG(ERROR) <<
"Block inconsistent in unpinned list."
1059 LOG(ERROR) <<
"Block in unpinned list with"
1061 <<
" free_io_buffers_.Contains = "
1066 block = block->
Next();
1077 LOG(ERROR) <<
"Missed writing unpinned blocks";
1084 unique_lock<mutex> l(
lock_);
1086 if (client != NULL) ss << endl << client->
DebugString();
1092 ss <<
"Buffered block mgr" << endl
1099 <<
" Remaining memory: " <<
mem_tracker_->SpareCapacity()
1106 MemTracker* parent_tracker, int64_t mem_limit) {
1107 unique_lock<mutex> l(
lock_);
1116 RAND_load_file(
"/dev/urandom", 4096);
1127 profile_.get(),
"BlocksCreated", TUnit::UNIT);
1129 profile_.get(),
"BlocksRecycled", TUnit::UNIT);
1131 profile_.get(),
"BytesWritten", TUnit::BYTES);
1142 profile(), mem_limit, -1,
"Block Manager", parent_tracker));
1152 if (num_tmp_devices == 0) {
1154 "No spilling directories configured. Cannot spill. Set --scratch_dirs.");
1158 for (
int i = 0; i < num_tmp_devices; ++i) {
1170 stringstream* errstream =
static_cast<stringstream*
>(ctx);
1177 stringstream errstream;
1178 errstream << msg <<
": ";
1180 return Status(Substitute(
"Openssl Error: $0", errstream.str()));
1194 RAND_bytes(block->
key_,
sizeof(block->
key_));
1195 RAND_bytes(block->
iv_,
sizeof(block->
iv_));
1202 EVP_CIPHER_CTX_init(&ctx);
1203 EVP_CIPHER_CTX_set_padding(&ctx, 0);
1208 if (EVP_EncryptInit_ex(&ctx, EVP_aes_256_cfb(), NULL, block->
key_, block->
iv_) != 1) {
1209 return OpenSSLErr(
"EVP_EncryptInit_ex failure");
1214 block->
buffer(), len) != 1) {
1215 return OpenSSLErr(
"EVP_EncryptUpdate failure");
1223 return OpenSSLErr(
"EVP_EncryptFinal failure");
1248 EVP_CIPHER_CTX_init(&ctx);
1249 EVP_CIPHER_CTX_set_padding(&ctx, 0);
1252 if (EVP_DecryptInit_ex(&ctx, EVP_aes_256_cfb(), NULL, block->
key_, block->
iv_) != 1) {
1253 return OpenSSLErr(
"EVP_DecryptInit_ex failure");
1257 if (EVP_DecryptUpdate(&ctx, block->
buffer(), &len, block->
buffer(), len) != 1) {
1258 return OpenSSLErr(
"EVP_DecryptUpdate failure");
1265 if (1 != EVP_DecryptFinal_ex(&ctx, block->
buffer() + len, &len)) {
1266 return OpenSSLErr(
"EVP_DecryptFinal failure");
1278 uint8_t* data = NULL;
1294 uint8_t test_hash[SHA256_DIGEST_LENGTH];
1296 if (memcmp(test_hash, block->
hash_, SHA256_DIGEST_LENGTH) != 0) {
1297 return Status(
"Block verification failure");
uint8_t * buffer
Start of the buffer.
virtual int64_t value() const
Status FindBufferForBlock(Block *block, bool *in_mem)
bool initialized_
If true, Init() has been called.
const int64_t max_block_size_
Size of the largest/default block in bytes.
Descriptor for a single memory buffer in the pool.
int total_pinned_buffers_
The total number of pinned buffers across all clients.
static Status OpenSSLErr(const string &msg)
RuntimeProfile::Counter * integrity_check_timer_
Time spent in disk spill integrity generation and checking.
void CancelContext(RequestContext *context, bool wait_for_disks_completion=false)
int num_reserved_buffers_remaining(Client *client) const
const TUniqueId & query_id() const
int64_t remaining_unreserved_buffers() const
int writes_issued_
Number of writes issued.
Client(BufferedBlockMgr *mgr, int num_reserved_buffers, MemTracker *tracker, RuntimeState *state)
boost::condition_variable buffer_available_cv_
Signal availability of free buffers.
Status UnpinBlock(Block *block)
boost::function< void(const Status &)> WriteDoneCallback
Status MemLimitTooLowError(Client *client)
std::list< BufferDescriptor * >::iterator all_buffers_it
Iterator into all_io_buffers_ for this buffer.
DiskIoMgr::WriteRange * write_range_
int num_reserved_buffers_
bool TryConsume(int64_t bytes)
Status TransferBuffer(Block *dst, Block *src, bool unpin)
static int num_tmp_devices()
static Status GetFile(int tmp_device_id, const TUniqueId &query_id, File **new_file)
Status Pin(bool *pinned, Block *release_block=NULL, bool unpin=true)
RuntimeProfile::Counter * buffered_pin_counter_
Number of Pin() calls that did not require a disk read.
MemTracker * query_tracker_
RuntimeProfile::Counter * mem_limit_counter_
These have a fixed value for the lifetime of the manager and show memory usage.
const TUniqueId & query_id() const
Status DeleteOrUnpinBlock(Block *block, bool unpin)
MemTracker * get_tracker(Client *client) const
Status AllocateSpace(int64_t write_size, int64_t *offset)
const bool disable_spill_
#define RETURN_IF_ERROR(stmt)
some generally useful macros
void ConsumeLocal(int64_t bytes, MemTracker *end_tracker)
void ReleaseMemory(Client *client, int64_t size)
int num_pinned_buffers(Client *client) const
Status AddWriteRange(RequestContext *writer, WriteRange *write_range)
Status GetNewBlock(Client *client, Block *unpin_block, Block **block, int64_t len=-1)
#define ADD_TIMER(profile, name)
Status RegisterClient(int num_reserved_buffers, MemTracker *tracker, RuntimeState *state, Client **client)
int num_tmp_reserved_buffers_
RuntimeProfile::Counter * created_block_counter_
Total number of blocks created.
bool ConsumeMemory(Client *client, int64_t size)
void Init()
Initialize the state of a block and set the number of bytes allocated to 0.
int64_t max_block_size() const
RuntimeProfile::Counter * disk_read_timer_
Time taken for disk reads.
void UnregisterContext(RequestContext *context)
const char * file() const
uint8_t iv_[AES_BLOCK_SIZE]
boost::condition_variable write_complete_cv_
Status AddScanRanges(RequestContext *reader, const std::vector< ScanRange * > &ranges, bool schedule_immediately=false)
const int block_write_threshold_
void AddDetail(const std::string &msg)
Add a detail string. Calling this method is only defined on a non-OK message.
RuntimeProfile::Counter * buffer_wait_timer_
Time spent waiting for a free buffer.
RuntimeProfile::Counter * encryption_timer_
Time spent in disk spill encryption and decryption.
BufferedBlockMgr(RuntimeState *state, int64_t block_size)
uint8_t key_[32]
If encryption_ is on, a AES 256-bit key. Regenerated on each write.
bool is_pinned_
is_pinned_ is true while the block is pinned by a client.
virtual void Set(int64_t value)
const std::string & path() const
void ClearTmpReservation(Client *client)
Sets tmp reservation to 0 on this client.
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
int unfullfilled_reserved_buffers_
The total number of reserved buffers across all clients that are not pinned.
RuntimeProfile * profile()
BufferDescriptor * buffer_desc_
void SetData(const uint8_t *buffer, int64_t len)
int non_local_outstanding_writes_
void ClearReservations(Client *client)
Clears all reservations for this client.
DiskIoMgr * io_mgr_
DiskIoMgr handles to read and write blocks.
boost::scoped_ptr< RuntimeProfile > profile_
Counters and timers to track behavior.
void UnpinBuffer(BufferDescriptor *buffer)
bool LogError(const ErrorMsg &msg)
int num_local_disks() const
Returns the number of local disks attached to the system.
void EncryptDone(Block *block)
Deallocates temporary buffer alloced in Encrypt().
InternalQueue< Block > unused_blocks_
std::string DebugInternal() const
static Status Create(RuntimeState *state, MemTracker *parent, RuntimeProfile *profile, int64_t mem_limit, int64_t buffer_size, boost::shared_ptr< BufferedBlockMgr > *block_mgr)
std::string DebugString() const
Debug helper method to print the state of a block.
int64_t valid_data_len_
Length of valid (i.e. allocated) data within the block.
static IntCounter * NUM_QUERIES_SPILLED
int64_t available_buffers(Client *client) const
#define ADD_COUNTER(profile, name, unit)
DEFINE_bool(disk_spill_encryption, false,"Set this to encrypt and perform an integrity ""check on all data spilled to disk during a query")
Status Encrypt(Block *block, uint8_t **outbuf)
uint8_t hash_[SHA256_DIGEST_LENGTH]
static int Ceil(int value, int divisor)
Returns the ceil of value/divisor.
InternalQueue< BufferDescriptor > free_io_buffers_
void Reset(hdfsFS fs, const char *file, int64_t len, int64_t offset, int disk_id, bool try_cache, bool expected_local, int64_t mtime, void *metadata=NULL)
This class is thread-safe.
void Release(int64_t bytes)
Decreases consumption of this tracker and its ancestors by 'bytes'.
const bool check_integrity_
Block * GetUnusedBlock(Client *client)
static const Status CANCELLED
Status Decrypt(Block *block)
Decrypts the contents of buffer() in place.
Status WriteUnpinnedBlock(Block *block)
Issues the write for this block to the DiskIoMgr.
Status VerifyHash(Block *block)
Verifies that the contents of buffer() match those that were set by SetHash()
bool TryAcquireTmpReservation(Client *client, int num_buffers)
static const int64_t NEVER_CACHE
If the mtime is set to NEVER_CACHE, the file handle should never be cached.
void Init(DiskIoMgr *io_mgr, RuntimeProfile *profile, MemTracker *parent_tracker, int64_t mem_limit)
Initializes the block mgr. Idempotent and thread-safe.
static const Status MEM_LIMIT_EXCEEDED
bool is_cancelled() const
bool is_deleted_
True if the block is deleted by the client.
Status FindBuffer(boost::unique_lock< boost::mutex > &lock, BufferDescriptor **buffer)
Status RegisterContext(RequestContext **request_context, MemTracker *reader_mem_tracker=NULL)
static SpinLock static_block_mgrs_lock_
Protects query_to_block_mgrs_.
boost::ptr_vector< TmpFileMgr::File > tmp_files_
Client * client_
The client that owns this block.
Status GetNext(BufferDescriptor **buffer)
DiskIoMgr::RequestContext * io_request_context_
T * Next() const
Returns the Next/Prev node or NULL if this is the end/front.
Block * block
Block that this buffer is assigned to. May be NULL.
Status PinBlock(Block *block, bool *pinned, Block *src, bool unpin)
void WriteComplete(Block *block, const Status &write_status)
uint8_t offset[7 *64-sizeof(uint64_t)]
static int OpenSSLErrCallback(const char *buf, size_t len, void *ctx)
RuntimeProfile::Counter * recycled_blocks_counter_
Number of deleted blocks reused.
std::string DebugString(Client *client=NULL)
Dumps block mgr state. Grabs lock. If client is not NULL, also dumps its state.
const ErrorMsg & msg() const
Returns the error message associated with a non-successful status.
virtual void Add(int64_t delta)
RuntimeProfile::Counter * block_size_counter_
int64_t bytes_allocated() const
void AddChild(RuntimeProfile *child, bool indent=true, RuntimeProfile *location=NULL)
void SetHash(Block *block)
Takes a cryptographic hash of the data and sets hash_ with it.
int64_t len
Length of the buffer.
boost::scoped_ptr< MemTracker > mem_tracker_
Track buffers allocated by the block manager.
void ReleaseLocal(int64_t bytes, MemTracker *end_tracker)
const TUniqueId query_id_
Status WriteUnpinnedBlocks()
void ReturnUnusedBlock(Block *block)
void PinBuffer(BufferDescriptor *buffer)
RuntimeProfile::Counter * outstanding_writes_counter_
Number of writes outstanding (issued but not completed).
std::list< BufferDescriptor * > all_io_buffers_
All allocated io-sized buffers.
boost::scoped_array< uint8_t > encrypted_write_buffer_
RuntimeProfile::Counter * bytes_written_counter_
Number of bytes written to disk (includes writes still queued in the IO manager). ...
InternalQueue< Block > unpinned_blocks_
Status DeleteBlock(Block *block)
bool Validate() const
Used to debug the state of the block manager. Lock must already be taken.
static BlockMgrsMap query_to_block_mgrs_
string DebugString() const