Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
mem-pool.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "runtime/mem-pool.h"
16 #include "runtime/mem-tracker.h"
17 #include "util/impalad-metrics.h"
18 
19 #include <algorithm>
20 #include <stdio.h>
21 #include <sstream>
22 
23 #include "common/names.h"
24 
25 using namespace impala;
26 
27 DECLARE_bool(disable_mem_pools);
28 
30 
31 const char* MemPool::LLVM_CLASS_NAME = "class.impala::MemPool";
32 
33 MemPool::MemPool(MemTracker* mem_tracker, int chunk_size)
34  : current_chunk_idx_(-1),
35  last_offset_conversion_chunk_idx_(-1),
36  // round up chunk size to nearest 8 bytes
37  chunk_size_(chunk_size == 0
38  ? 0
39  : ((chunk_size + 7) / 8) * 8),
40  total_allocated_bytes_(0),
41  peak_allocated_bytes_(0),
42  total_reserved_bytes_(0),
43  mem_tracker_(mem_tracker) {
44  DCHECK_GE(chunk_size_, 0);
45  DCHECK(mem_tracker != NULL);
46 }
47 
49  : owns_data(true),
50  data(reinterpret_cast<uint8_t*>(malloc(size))),
51  size(size),
52  cumulative_allocated_bytes(0),
53  allocated_bytes(0) {
55  ImpaladMetrics::MEM_POOL_TOTAL_BYTES->Increment(size);
56  }
57 }
58 
60  int64_t total_bytes_released = 0;
61  for (size_t i = 0; i < chunks_.size(); ++i) {
62  if (!chunks_[i].owns_data) continue;
63  total_bytes_released += chunks_[i].size;
64  free(chunks_[i].data);
65  }
66 
67  DCHECK(chunks_.empty()) << "Must call FreeAll() or AcquireData() for this pool";
69  ImpaladMetrics::MEM_POOL_TOTAL_BYTES->Increment(-total_bytes_released);
70  }
71 }
72 
74  int64_t total_bytes_released = 0;
75  for (size_t i = 0; i < chunks_.size(); ++i) {
76  if (!chunks_[i].owns_data) continue;
77  total_bytes_released += chunks_[i].size;
78  free(chunks_[i].data);
79  }
80  chunks_.clear();
81  current_chunk_idx_ = -1;
85 
86  mem_tracker_->Release(total_bytes_released);
88  ImpaladMetrics::MEM_POOL_TOTAL_BYTES->Increment(-total_bytes_released);
89  }
90 }
91 
92 bool MemPool::FindChunk(int min_size, bool check_limits) {
93  // Try to allocate from a free chunk. The first free chunk, if any, will be immediately
94  // after the current chunk.
95  int first_free_idx = current_chunk_idx_ + 1;
96  // (cast size() to signed int in order to avoid everything else being cast to
97  // unsigned long, in particular -1)
98  while (++current_chunk_idx_ < static_cast<int>(chunks_.size())) {
99  // we found a free chunk
100  DCHECK_EQ(chunks_[current_chunk_idx_].allocated_bytes, 0);
101 
102  if (chunks_[current_chunk_idx_].size >= min_size) {
103  // This chunk is big enough. Move it before the other free chunks.
104  if (current_chunk_idx_ != first_free_idx) {
105  std::swap(chunks_[current_chunk_idx_], chunks_[first_free_idx]);
106  current_chunk_idx_ = first_free_idx;
107  }
108  break;
109  }
110  }
111 
112  if (current_chunk_idx_ == static_cast<int>(chunks_.size())) {
113  // need to allocate new chunk.
114  int chunk_size = chunk_size_;
115  if (chunk_size == 0) {
116  if (current_chunk_idx_ == 0) {
117  chunk_size = DEFAULT_INITIAL_CHUNK_SIZE;
118  } else {
119  // double the size of the last chunk in the list
120  chunk_size = chunks_[current_chunk_idx_ - 1].size * 2;
121  }
122  }
123  chunk_size = ::max(min_size, chunk_size);
124 
125  if (FLAGS_disable_mem_pools) chunk_size = min_size;
126 
127  if (check_limits) {
128  if (!mem_tracker_->TryConsume(chunk_size)) {
129  // We couldn't allocate a new chunk so current_chunk_idx_ is now be past the
130  // end of chunks_.
131  DCHECK_EQ(current_chunk_idx_, static_cast<int>(chunks_.size()));
132  current_chunk_idx_ = static_cast<int>(chunks_.size()) - 1;
133  return false;
134  }
135  } else {
136  mem_tracker_->Consume(chunk_size);
137  }
138 
139  // If there are no free chunks put it at the end, otherwise before the first free.
140  if (first_free_idx == static_cast<int>(chunks_.size())) {
141  chunks_.push_back(ChunkInfo(chunk_size));
142  } else {
143  current_chunk_idx_ = first_free_idx;
144  vector<ChunkInfo>::iterator insert_chunk = chunks_.begin() + current_chunk_idx_;
145  chunks_.insert(insert_chunk, ChunkInfo(chunk_size));
146  }
147  total_reserved_bytes_ += chunk_size;
148  }
149 
150  if (current_chunk_idx_ > 0) {
151  ChunkInfo& prev_chunk = chunks_[current_chunk_idx_ - 1];
152  chunks_[current_chunk_idx_].cumulative_allocated_bytes =
153  prev_chunk.cumulative_allocated_bytes + prev_chunk.allocated_bytes;
154  }
155 
156  DCHECK_LT(current_chunk_idx_, static_cast<int>(chunks_.size()));
157  DCHECK(CheckIntegrity(true));
158  return true;
159 }
160 
161 void MemPool::AcquireData(MemPool* src, bool keep_current) {
162  DCHECK(src->CheckIntegrity(false));
163  int num_acquired_chunks;
164  if (keep_current) {
165  num_acquired_chunks = src->current_chunk_idx_;
166  } else if (src->GetFreeOffset() == 0) {
167  // nothing in the last chunk
168  num_acquired_chunks = src->current_chunk_idx_;
169  } else {
170  num_acquired_chunks = src->current_chunk_idx_ + 1;
171  }
172 
173  if (num_acquired_chunks <= 0) {
174  if (!keep_current) src->FreeAll();
175  return;
176  }
177 
178  vector<ChunkInfo>::iterator end_chunk = src->chunks_.begin() + num_acquired_chunks;
179  int64_t total_transfered_bytes = 0;
180  for (vector<ChunkInfo>::iterator i = src->chunks_.begin(); i != end_chunk; ++i) {
181  total_transfered_bytes += i->size;
182  }
183  src->total_reserved_bytes_ -= total_transfered_bytes;
184  total_reserved_bytes_ += total_transfered_bytes;
185 
186  src->mem_tracker_->Release(total_transfered_bytes);
187  mem_tracker_->Consume(total_transfered_bytes);
188 
189  // insert new chunks after current_chunk_idx_
190  vector<ChunkInfo>::iterator insert_chunk = chunks_.begin() + current_chunk_idx_ + 1;
191  chunks_.insert(insert_chunk, src->chunks_.begin(), end_chunk);
192  src->chunks_.erase(src->chunks_.begin(), end_chunk);
193  current_chunk_idx_ += num_acquired_chunks;
194 
195  if (keep_current) {
196  src->current_chunk_idx_ = 0;
197  DCHECK(src->chunks_.size() == 1 || src->chunks_[1].allocated_bytes == 0);
199  src->chunks_[0].cumulative_allocated_bytes = 0;
201  } else {
202  src->current_chunk_idx_ = -1;
204  src->total_allocated_bytes_ = 0;
205  }
207 
208  // recompute cumulative_allocated_bytes
209  int start_idx = chunks_.size() - num_acquired_chunks;
210  int cumulative_bytes = (start_idx == 0
211  ? 0
212  : chunks_[start_idx - 1].cumulative_allocated_bytes
213  + chunks_[start_idx - 1].allocated_bytes);
214  for (int i = start_idx; i <= current_chunk_idx_; ++i) {
215  chunks_[i].cumulative_allocated_bytes = cumulative_bytes;
216  cumulative_bytes += chunks_[i].allocated_bytes;
217  }
218 
219  if (!keep_current) src->FreeAll();
220  DCHECK(CheckIntegrity(false));
221 }
222 
223 bool MemPool::Contains(uint8_t* ptr, int size) {
224  for (int i = 0; i < chunks_.size(); ++i) {
225  const ChunkInfo& info = chunks_[i];
226  if (ptr >= info.data && ptr < info.data + info.allocated_bytes) {
227  if (ptr + size > info.data + info.allocated_bytes) {
228  DCHECK_LE(reinterpret_cast<size_t>(ptr + size),
229  reinterpret_cast<size_t>(info.data + info.allocated_bytes));
230  return false;
231  }
232  return true;
233  }
234  }
235  return false;
236 }
237 
239  stringstream out;
240  char str[16];
241  out << "MemPool(#chunks=" << chunks_.size() << " [";
242  for (int i = 0; i < chunks_.size(); ++i) {
243  sprintf(str, "0x%lx=", reinterpret_cast<size_t>(chunks_[i].data));
244  out << (i > 0 ? " " : "")
245  << str
246  << chunks_[i].size
247  << "/" << chunks_[i].cumulative_allocated_bytes
248  << "/" << chunks_[i].allocated_bytes;
249  }
250  out << "] current_chunk=" << current_chunk_idx_
251  << " total_sizes=" << GetTotalChunkSizes()
252  << " total_alloc=" << total_allocated_bytes_
253  << ")";
254  return out.str();
255 }
256 
258  int64_t result = 0;
259  for (int i = 0; i < chunks_.size(); ++i) {
260  result += chunks_[i].size;
261  }
262  return result;
263 }
264 
265 bool MemPool::CheckIntegrity(bool current_chunk_empty) {
266  // Without pooling, there are way too many chunks and this takes too long.
267  if (FLAGS_disable_mem_pools) return true;
268 
269  // check that current_chunk_idx_ points to the last chunk with allocated data
270  DCHECK_LT(current_chunk_idx_, static_cast<int>(chunks_.size()));
271  int64_t total_allocated = 0;
272  for (int i = 0; i < chunks_.size(); ++i) {
273  DCHECK_GT(chunks_[i].size, 0);
274  if (i < current_chunk_idx_) {
275  DCHECK_GT(chunks_[i].allocated_bytes, 0);
276  } else if (i == current_chunk_idx_) {
277  if (current_chunk_empty) {
278  DCHECK_EQ(chunks_[i].allocated_bytes, 0);
279  } else {
280  DCHECK_GT(chunks_[i].allocated_bytes, 0);
281  }
282  } else {
283  DCHECK_EQ(chunks_[i].allocated_bytes, 0);
284  }
285  if (i > 0 && i <= current_chunk_idx_) {
286  DCHECK_EQ(chunks_[i-1].cumulative_allocated_bytes + chunks_[i-1].allocated_bytes,
287  chunks_[i].cumulative_allocated_bytes);
288  }
289  if (chunk_size_ != 0) DCHECK_GE(chunks_[i].size, chunk_size_);
290  total_allocated += chunks_[i].allocated_bytes;
291  }
292  DCHECK_EQ(total_allocated, total_allocated_bytes_);
293  return true;
294 }
295 
296 int MemPool::GetOffsetHelper(uint8_t* data) {
297  if (chunks_.empty()) return -1;
298  // try to locate chunk containing 'data', starting with chunk following
299  // the last one we looked at
300  for (int i = 0; i < chunks_.size(); ++i) {
301  int idx = (last_offset_conversion_chunk_idx_ + i + 1) % chunks_.size();
302  const ChunkInfo& info = chunks_[idx];
303  if (info.data <= data && info.data + info.allocated_bytes > data) {
305  return info.cumulative_allocated_bytes + data - info.data;
306  }
307  }
308  return -1;
309 }
310 
312  if (offset > total_allocated_bytes_) return NULL;
313  for (int i = 0; i < chunks_.size(); ++i) {
314  int idx = (last_offset_conversion_chunk_idx_ + i + 1) % chunks_.size();
315  const ChunkInfo& info = chunks_[idx];
316  if (info.cumulative_allocated_bytes <= offset
317  && info.cumulative_allocated_bytes + info.allocated_bytes > offset) {
319  return info.data + offset - info.cumulative_allocated_bytes;
320  }
321  }
322  return NULL;
323 }
324 
325 void MemPool::GetChunkInfo(vector<pair<uint8_t*, int> >* chunk_info) {
326  chunk_info->clear();
327  for (vector<ChunkInfo>::iterator info = chunks_.begin(); info != chunks_.end(); ++info) {
328  chunk_info->push_back(make_pair(info->data, info->allocated_bytes));
329  }
330 }
331 
333  char str[3];
334  stringstream out;
335  for (int i = 0; i < chunks_.size(); ++i) {
336  ChunkInfo& info = chunks_[i];
337  if (info.allocated_bytes == 0) return out.str();
338 
339  for (int j = 0; j < info.allocated_bytes; ++j) {
340  sprintf(str, "%x ", info.data[j]);
341  out << str;
342  }
343  }
344  return out.str();
345 }
int last_offset_conversion_chunk_idx_
Definition: mem-pool.h:215
bool CheckIntegrity(bool current_chunk_empty)
Definition: mem-pool.cc:265
int GetFreeOffset() const
Return offset to unoccpied space in current chunk.
Definition: mem-pool.h:250
MemTracker * mem_tracker_
Definition: mem-pool.h:232
bool Contains(uint8_t *ptr, int size)
Definition: mem-pool.cc:223
int allocated_bytes
bytes allocated via Allocate() in this chunk
Definition: mem-pool.h:194
bool FindChunk(int min_size, bool check_limits)
Definition: mem-pool.cc:92
bool TryConsume(int64_t bytes)
Definition: mem-tracker.h:163
std::vector< ChunkInfo > chunks_
Definition: mem-pool.h:228
std::string DebugPrint()
Print allocated bytes from all chunks.
Definition: mem-pool.cc:332
static IntGauge * MEM_POOL_TOTAL_BYTES
int64_t GetTotalChunkSizes() const
Return sum of chunk_sizes_.
Definition: mem-pool.cc:257
void AcquireData(MemPool *src, bool keep_current)
Definition: mem-pool.cc:161
static const int DEFAULT_INITIAL_CHUNK_SIZE
Definition: mem-pool.h:181
uint8_t * GetDataPtrHelper(int offset)
Definition: mem-pool.cc:311
int64_t total_allocated_bytes_
sum of allocated_bytes_
Definition: mem-pool.h:220
void FreeAll()
Definition: mem-pool.cc:73
void GetChunkInfo(std::vector< std::pair< uint8_t *, int > > *chunk_info)
Return (data ptr, allocated bytes) pairs for all chunks owned by this mempool.
Definition: mem-pool.cc:325
int64_t peak_allocated_bytes_
Maximum number of bytes allocated from this pool at one time.
Definition: mem-pool.h:223
DECLARE_bool(disable_mem_pools)
This class is thread-safe.
Definition: mem-tracker.h:61
static const char * LLVM_CLASS_NAME
Definition: mem-pool.h:177
void Release(int64_t bytes)
Decreases consumption of this tracker and its ancestors by 'bytes'.
Definition: mem-tracker.h:209
MemPool(MemTracker *mem_tracker, int chunk_size=0)
Definition: mem-pool.cc:33
int current_chunk_idx_
Definition: mem-pool.h:211
int64_t total_reserved_bytes_
sum of all bytes allocated in chunks_
Definition: mem-pool.h:226
void Consume(int64_t bytes)
Increases consumption of this tracker and its ancestors by 'bytes'.
Definition: mem-tracker.h:118
uint8_t offset[7 *64-sizeof(uint64_t)]
std::string DebugString()
Definition: mem-pool.cc:238
int GetOffsetHelper(uint8_t *data)
Definition: mem-pool.cc:296