Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
runtime-state.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <iostream>
16 #include <jni.h>
17 #include <sstream>
18 #include <string>
19 
20 #include "common/logging.h"
21 #include <boost/algorithm/string/join.hpp>
22 #include <gutil/strings/substitute.h>
23 
24 #include "codegen/llvm-codegen.h"
25 #include "common/object-pool.h"
26 #include "common/status.h"
27 #include "exprs/expr.h"
29 #include "runtime/descriptors.h"
30 #include "runtime/runtime-state.h"
34 #include "util/bitmap.h"
35 #include "util/cpu-info.h"
36 #include "util/debug-util.h"
37 #include "util/disk-info.h"
38 #include "util/error-util.h"
39 #include "util/jni-util.h"
40 #include "util/mem-info.h"
41 #include "util/pretty-printer.h"
42 
43 #include "common/names.h"
44 
45 using namespace llvm;
46 
47 DECLARE_int32(max_errors);
48 
49 // The fraction of the query mem limit that is used for the block mgr. Operators
50 // that accumulate memory all use the block mgr so the majority of the memory should
51 // be allocated to the block mgr. The remaining memory is used by the non-spilling
52 // operators and should be independent of data size.
53 static const float BLOCK_MGR_MEM_FRACTION = 0.8f;
54 
55 // The minimum amount of memory that must be left after the block mgr reserves the
56 // BLOCK_MGR_MEM_FRACTION. The block limit is:
57 // min(query_limit * BLOCK_MGR_MEM_FRACTION, query_limit - BLOCK_MGR_MEM_MIN_REMAINING)
58 // TODO: this value was picked arbitrarily and the tests are written to rely on this
59 // for the minimum memory required to run the query. Revisit.
60 static const int64_t BLOCK_MGR_MEM_MIN_REMAINING = 100 * 1024 * 1024;
61 
62 namespace impala {
63 
64 RuntimeState::RuntimeState(const TPlanFragmentInstanceCtx& fragment_instance_ctx,
65  const string& cgroup, ExecEnv* exec_env)
66  : obj_pool_(new ObjectPool()),
67  fragment_instance_ctx_(fragment_instance_ctx),
68  now_(new TimestampValue(fragment_instance_ctx_.query_ctx.now_string.c_str(),
69  fragment_instance_ctx_.query_ctx.now_string.size())),
70  cgroup_(cgroup),
71  profile_(obj_pool_.get(),
72  "Fragment " + PrintId(fragment_instance_ctx_.fragment_instance_id)),
73  is_cancelled_(false),
74  query_resource_mgr_(NULL),
75  root_node_id_(-1) {
76  Status status = Init(exec_env);
77  DCHECK(status.ok()) << status.GetDetail();
78 }
79 
80 RuntimeState::RuntimeState(const TQueryCtx& query_ctx)
81  : obj_pool_(new ObjectPool()),
82  now_(new TimestampValue(query_ctx.now_string.c_str(),
83  query_ctx.now_string.size())),
84  exec_env_(ExecEnv::GetInstance()),
85  profile_(obj_pool_.get(), "<unnamed>"),
86  is_cancelled_(false),
87  query_resource_mgr_(NULL),
88  root_node_id_(-1) {
89  fragment_instance_ctx_.__set_query_ctx(query_ctx);
90  fragment_instance_ctx_.query_ctx.request.query_options.__set_batch_size(
92 }
93 
95  block_mgr_.reset();
96 
97  typedef boost::unordered_map<SlotId, Bitmap*>::iterator SlotBitmapIterator;
98  for (SlotBitmapIterator it = slot_bitmap_filters_.begin();
99  it != slot_bitmap_filters_.end(); ++it) {
100  if (it->second != NULL) {
101  delete it->second;
102  it->second = NULL;
103  }
104  }
105 
106  // query_mem_tracker_ must be valid as long as instance_mem_tracker_ is so
107  // delete instance_mem_tracker_ first.
108  // LogUsage() walks the MemTracker tree top-down when the memory limit is exceeded.
109  // Break the link between the instance_mem_tracker and its parent (query_mem_tracker_)
110  // before the instance_mem_tracker_ and its children are destroyed.
111  if (instance_mem_tracker_.get() != NULL) {
112  // May be NULL if InitMemTrackers() is not called, for example from tests.
113  instance_mem_tracker_->UnregisterFromParent();
114  }
115 
116  instance_mem_tracker_.reset();
117  query_mem_tracker_.reset();
118 }
119 
123  TQueryOptions& query_options =
124  fragment_instance_ctx_.query_ctx.request.query_options;
125 
126  // max_errors does not indicate how many errors in total have been recorded, but rather
127  // how many are distinct. It is defined as the sum of the number of generic errors and
128  // the number of distinct other errors.
129  if (query_options.max_errors <= 0) {
130  // TODO: fix linker error and uncomment this
131  //query_options_.max_errors = FLAGS_max_errors;
132  query_options.max_errors = 100;
133  }
134  if (query_options.batch_size <= 0) {
135  query_options.__set_batch_size(DEFAULT_BATCH_SIZE);
136  }
137 
138  // Register with the thread mgr
139  if (exec_env != NULL) {
140  resource_pool_ = exec_env->thread_mgr()->RegisterPool();
141  DCHECK(resource_pool_ != NULL);
142  }
143 
144  total_cpu_timer_ = ADD_TIMER(runtime_profile(), "TotalCpuTime");
145  total_storage_wait_timer_ = ADD_TIMER(runtime_profile(), "TotalStorageWaitTime");
146  total_network_send_timer_ = ADD_TIMER(runtime_profile(), "TotalNetworkSendTime");
147  total_network_receive_timer_ = ADD_TIMER(runtime_profile(), "TotalNetworkReceiveTime");
148 
149  return Status::OK;
150 }
151 
152 void RuntimeState::InitMemTrackers(const TUniqueId& query_id, const string* pool_name,
153  int64_t query_bytes_limit, int64_t query_rm_reservation_limit_bytes) {
154  MemTracker* query_parent_tracker = exec_env_->process_mem_tracker();
155  if (pool_name != NULL) {
156  query_parent_tracker = MemTracker::GetRequestPoolMemTracker(*pool_name,
157  query_parent_tracker);
158  }
160  MemTracker::GetQueryMemTracker(query_id, query_bytes_limit,
161  query_rm_reservation_limit_bytes, query_parent_tracker, query_resource_mgr());
164 }
165 
167  DCHECK(block_mgr_.get() == NULL);
168 
169  // Compute the max memory the block mgr will use.
170  int64_t block_mgr_limit = query_mem_tracker_->lowest_limit();
171  if (block_mgr_limit < 0) block_mgr_limit = numeric_limits<int64_t>::max();
172  block_mgr_limit = min(static_cast<int64_t>(block_mgr_limit * BLOCK_MGR_MEM_FRACTION),
173  block_mgr_limit - BLOCK_MGR_MEM_MIN_REMAINING);
174  if (block_mgr_limit < 0) block_mgr_limit = 0;
175  if (query_options().__isset.max_block_mgr_memory &&
176  query_options().max_block_mgr_memory > 0) {
177  block_mgr_limit = query_options().max_block_mgr_memory;
178  LOG(ERROR) << "Block mgr mem limit: "
179  << PrettyPrinter::Print(block_mgr_limit, TUnit::BYTES);
180  }
181 
183  runtime_profile(), block_mgr_limit, io_mgr()->max_read_buffer_size(),
184  &block_mgr_));
185  return Status::OK;
186 }
187 
189  if (codegen_.get() != NULL) return Status::OK;
190  // TODO: add the fragment ID to the codegen ID as well
193  codegen_->EnableOptimizations(true);
194  profile_.AddChild(codegen_->runtime_profile());
195  return Status::OK;
196 }
197 
199  lock_guard<SpinLock> l(error_log_lock_);
200  return (error_log_.size() == 0);
201 }
202 
204  lock_guard<SpinLock> l(error_log_lock_);
206 }
207 
209  stringstream out;
210  {
211  lock_guard<SpinLock> l(file_errors_lock_);
212  for (int i = 0; i < file_errors_.size(); ++i) {
213  out << file_errors_[i].second << " errors in " << file_errors_[i].first << endl;
214  }
215  }
216  return out.str();
217 }
218 
219 void RuntimeState::ReportFileErrors(const std::string& file_name, int num_errors) {
220  lock_guard<SpinLock> l(file_errors_lock_);
221  file_errors_.push_back(make_pair(file_name, num_errors));
222 }
223 
224 bool RuntimeState::LogError(const ErrorMsg& message) {
225  lock_guard<SpinLock> l(error_log_lock_);
226  // All errors go to the log, unreported_error_count_ is counted independently of the size of the
227  // error_log to account for errors that were already reported to the coordninator
228  VLOG_QUERY << "Error from query " << query_id() << ": " << message.msg();
230  AppendError(&error_log_, message);
231  return true;
232  }
233  return false;
234 }
235 
237  lock_guard<SpinLock> l(error_log_lock_);
238  *new_errors = error_log_;
239  // Reset the map, but keep all already reported keys so that we do not
240  // report the same errors multiple times.
241  BOOST_FOREACH(ErrorLogMap::value_type v, error_log_) {
242  v.second.messages.clear();
243  v.second.count = 0;
244  }
245 }
246 
248  int64_t failed_allocation_size) {
249  DCHECK_GE(failed_allocation_size, 0);
250  {
251  lock_guard<SpinLock> l(query_status_lock_);
252  if (query_status_.ok()) {
254  } else {
255  return query_status_;
256  }
257  }
258 
259  DCHECK(query_mem_tracker_.get() != NULL);
260  stringstream ss;
261  ss << "Memory Limit Exceeded\n";
262  if (failed_allocation_size != 0) {
263  DCHECK_NOTNULL(tracker);
264  ss << " " << tracker->label() << " could not allocate "
265  << PrettyPrinter::Print(failed_allocation_size, TUnit::BYTES)
266  << " without exceeding limit." << endl;
267  }
268 
271  } else {
272  ss << query_mem_tracker_->LogUsage();
273  }
274  LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
275  // Add warning about missing stats except for compute stats child queries.
276  if (!query_ctx().__isset.parent_query_id &&
277  query_ctx().__isset.tables_missing_stats &&
278  !query_ctx().tables_missing_stats.empty()) {
279  LogError(ErrorMsg(TErrorCode::GENERAL,
280  GetTablesMissingStatsWarning(query_ctx().tables_missing_stats)));
281  }
283  return query_status_;
284 }
285 
287  // TODO: it would be nice if this also checked for cancellation, but doing so breaks
288  // cases where we use Status::CANCELLED to indicate that the limit was reached.
289  if (instance_mem_tracker_->AnyLimitExceeded()) return SetMemLimitExceeded();
290  lock_guard<SpinLock> l(query_status_lock_);
291  return query_status_;
292 }
293 
295  bool* acquired_ownership) {
296  *acquired_ownership = false;
297  if (bitmap != NULL) {
298  lock_guard<SpinLock> l(bitmap_lock_);
299  if (slot_bitmap_filters_.find(slot) != slot_bitmap_filters_.end()) {
300  Bitmap* existing_bitmap = slot_bitmap_filters_[slot];
301  DCHECK_NOTNULL(existing_bitmap);
302  existing_bitmap->And(bitmap);
303  } else {
304  // This is the first time we set the slot_bitmap_filters_[slot]. We avoid
305  // allocating a new bitmap by using the passed bitmap.
306  slot_bitmap_filters_[slot] = bitmap;
307  *acquired_ownership = true;
308  }
309  }
310 }
311 
312 Status RuntimeState::GetCodegen(LlvmCodeGen** codegen, bool initialize) {
313  if (codegen_.get() == NULL && initialize) RETURN_IF_ERROR(CreateCodegen());
314  *codegen = codegen_.get();
315  return Status::OK;
316 }
317 
318 }
boost::shared_ptr< MemTracker > query_mem_tracker_
static boost::shared_ptr< MemTracker > GetQueryMemTracker(const TUniqueId &id, int64_t byte_limit, int64_t rm_reserved_limit, MemTracker *parent, QueryResourceMgr *res_mgr)
Definition: mem-tracker.cc:156
const std::string & msg() const
Returns the formatted error string.
Definition: error-util.h:118
string GetTablesMissingStatsWarning(const vector< TTableName > &tables_missing_stats)
Definition: error-util.cc:40
static Status LoadImpalaIR(ObjectPool *, const std::string &id, boost::scoped_ptr< LlvmCodeGen > *codegen)
void And(const Bitmap *src)
Bitwise ANDs the src bitmap into this one.
Definition: bitmap.h:63
string PrintErrorMapToString(const ErrorLogMap &errors)
Definition: error-util.cc:153
boost::shared_ptr< BufferedBlockMgr > block_mgr_
RuntimeProfile::Counter * total_storage_wait_timer_
Total time waiting in storage (across all threads)
const TUniqueId & query_id() const
const std::string & label() const
Definition: mem-tracker.h:283
MemTracker tracker
std::string ErrorLog()
Returns the error log lines as a string joined with ' '.
boost::scoped_ptr< ObjectPool > obj_pool_
Object pool owned by the coordinator. Any executor will have its own pool.
Definition: coordinator.h:296
void InitMemTrackers(const TUniqueId &query_id, const std::string *request_pool, int64_t query_bytes_limit, int64_t query_rm_reservation_limit_bytes=-1)
const TUniqueId & query_id() const
Definition: coordinator.h:152
boost::scoped_ptr< ObjectPool > obj_pool_
RuntimeProfile * runtime_profile()
Returns runtime state profile.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
void GetUnreportedErrors(ErrorLogMap *new_errors)
#define ADD_TIMER(profile, name)
MemTracker * query_mem_tracker()
Status Init(ExecEnv *exec_env)
Set per-fragment state.
string PrintId(const TUniqueId &id, const string &separator)
Definition: debug-util.cc:97
SpinLock error_log_lock_
Lock protecting error_log_ and unreported_error_idx_.
#define SCOPED_TIMER(c)
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
void AddBitmapFilter(SlotId slot, Bitmap *bitmap, bool *acquired_ownership)
boost::scoped_ptr< LlvmCodeGen > codegen_
std::vector< std::pair< std::string, int > > file_errors_
Stores the number of parse errors per file.
MemTracker * process_mem_tracker()
Definition: exec-env.h:86
SpinLock bitmap_lock_
Lock protecting slot_bitmap_filters_.
LLVM code generator. This is the top level object to generate jitted code.
Definition: llvm-codegen.h:107
void AppendError(ErrorLogMap *map, const ErrorMsg &e)
Definition: error-util.cc:177
#define VLOG_QUERY
Definition: logging.h:57
int max_errors() const
QueryResourceMgr * query_resource_mgr() const
boost::unordered_map< SlotId, Bitmap * > slot_bitmap_filters_
bool LogError(const ErrorMsg &msg)
const TQueryOptions & query_options() const
Definition: runtime-state.h:95
void ReportFileErrors(const std::string &file_name, int num_errors)
Report that num_errors occurred while parsing file_name.
static Status Create(RuntimeState *state, MemTracker *parent, RuntimeProfile *profile, int64_t mem_limit, int64_t buffer_size, boost::shared_ptr< BufferedBlockMgr > *block_mgr)
int SlotId
Definition: global-types.h:24
boost::scoped_ptr< MemTracker > instance_mem_tracker_
Memory usage of this fragment instance.
~RuntimeState()
Empty d'tor to avoid issues with scoped_ptr.
RuntimeProfile::Counter * total_cpu_timer_
Total CPU time (across all threads), including all wait times.
const TUniqueId & fragment_instance_id() const
std::string FileErrors()
Returns a string representation of the file_errors_.
This class is thread-safe.
Definition: mem-tracker.h:61
SpinLock file_errors_lock_
Lock protecting file_errors_.
RuntimeProfile profile_
static const Status MEM_LIMIT_EXCEEDED
Definition: status.h:89
size_t ErrorCount(const ErrorLogMap &errors)
Definition: error-util.cc:191
Status SetMemLimitExceeded(MemTracker *tracker=NULL, int64_t failed_allocation_size=0)
static const int DEFAULT_BATCH_SIZE
static MemTracker * GetRequestPoolMemTracker(const std::string &pool_name, MemTracker *parent)
Definition: mem-tracker.cc:134
const TQueryCtx & query_ctx() const
DECLARE_int32(max_errors)
bool ErrorLogIsEmpty()
Return true if error log is empty.
Status CreateBlockMgr()
Gets/Creates the query wide block mgr.
std::string LogUsage(const std::string &prefix="") const
Logs the usage of this tracker and all of its children (recursively).
static const Status OK
Definition: status.h:87
RuntimeProfile::Counter * total_network_send_timer_
Total time spent sending over the network (across all threads)
Status GetCodegen(LlvmCodeGen **codegen, bool initialize=true)
static const int64_t BLOCK_MGR_MEM_MIN_REMAINING
ExecEnv * exec_env_
Definition: coordinator.h:193
ErrorLogMap error_log_
Logs error messages.
ThreadResourceMgr::ResourcePool * resource_pool_
void AddChild(RuntimeProfile *child, bool indent=true, RuntimeProfile *location=NULL)
bool ok() const
Definition: status.h:172
DiskIoMgr * io_mgr()
ThreadResourceMgr * thread_mgr()
Definition: exec-env.h:87
static const float BLOCK_MGR_MEM_FRACTION
string name
Definition: cpu-info.cc:50
std::map< TErrorCode::type, TErrorLogEntry > ErrorLogMap
Tracks log messages per error code.
Definition: error-util.h:144
TPlanFragmentInstanceCtx fragment_instance_ctx_
RuntimeProfile::Counter * total_network_receive_timer_
Total time spent receiving over the network (across all threads)
bool IsMemLimitExceeded() const
Definition: status.h:178
Counter * total_time_counter()
Returns the counter for the total elapsed time.