Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
runtime-state.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 
16 #ifndef IMPALA_RUNTIME_RUNTIME_STATE_H
17 #define IMPALA_RUNTIME_RUNTIME_STATE_H
18 
20 #include "common/object-pool.h"
21 
22 #include <boost/scoped_ptr.hpp>
23 #include <boost/shared_ptr.hpp>
24 #include <vector>
25 #include <string>
27 #include <sstream>
28 
30 #include "runtime/exec-env.h"
31 #include "runtime/descriptors.h" // for PlanNodeId
32 #include "runtime/disk-io-mgr.h" // for DiskIoMgr::RequestContext
33 #include "runtime/mem-tracker.h"
35 #include "gen-cpp/PlanNodes_types.h"
36 #include "gen-cpp/Types_types.h" // for TUniqueId
37 #include "gen-cpp/ImpalaInternalService_types.h" // for TQueryOptions
38 #include "util/runtime-profile.h"
39 
40 namespace impala {
41 
42 class Bitmap;
43 class BufferedBlockMgr;
44 class DescriptorTbl;
45 class ObjectPool;
46 class Status;
47 class ExecEnv;
48 class Expr;
49 class LlvmCodeGen;
50 class TimestampValue;
52 
57 typedef std::map<std::string, TInsertPartitionStatus> PartitionStatusMap;
58 
60 typedef std::map<std::string, TInsertStats> PartitionInsertStats;
61 
65 typedef std::map<std::string, std::string> FileMoveMap;
66 
69 class RuntimeState {
70  public:
71  RuntimeState(const TPlanFragmentInstanceCtx& fragment_instance_ctx,
72  const std::string& cgroup, ExecEnv* exec_env);
73 
75  RuntimeState(const TQueryCtx& query_ctx);
76 
78  ~RuntimeState();
79 
86  void InitMemTrackers(const TUniqueId& query_id, const std::string* request_pool,
87  int64_t query_bytes_limit, int64_t query_rm_reservation_limit_bytes = -1);
88 
91 
92  ObjectPool* obj_pool() const { return obj_pool_.get(); }
93  const DescriptorTbl& desc_tbl() const { return *desc_tbl_; }
95  const TQueryOptions& query_options() const {
96  return query_ctx().request.query_options;
97  }
98  int batch_size() const { return query_ctx().request.query_options.batch_size; }
99  bool abort_on_error() const {
100  return query_ctx().request.query_options.abort_on_error;
101  }
103  return query_ctx().request.query_options.abort_on_default_limit_exceeded;
104  }
105  int max_errors() const { return query_options().max_errors; }
106  const TQueryCtx& query_ctx() const { return fragment_instance_ctx_.query_ctx; }
107  const TPlanFragmentInstanceCtx& fragment_ctx() const { return fragment_instance_ctx_; }
108  const std::string& effective_user() const {
109  if (query_ctx().session.__isset.delegated_user &&
110  !query_ctx().session.delegated_user.empty()) {
111  return do_as_user();
112  }
113  return connected_user();
114  }
115  const std::string& do_as_user() const { return query_ctx().session.delegated_user; }
116  const std::string& connected_user() const {
117  return query_ctx().session.connected_user;
118  }
119  const TimestampValue* now() const { return now_.get(); }
120  void set_now(const TimestampValue* now);
121  const ErrorLogMap& error_log() const { return error_log_; }
122  const std::vector<std::pair<std::string, int> >& file_errors() const {
123  return file_errors_;
124  }
125  const TUniqueId& query_id() const { return query_ctx().query_id; }
126  const TUniqueId& fragment_instance_id() const {
127  return fragment_instance_ctx_.fragment_instance_id;
128  }
129  const std::string& cgroup() const { return cgroup_; }
130  ExecEnv* exec_env() { return exec_env_; }
135  }
138  }
143 
145  std::vector<DiskIoMgr::RequestContext*>* reader_contexts() { return &reader_contexts_; }
146 
148  DCHECK_EQ(root_node_id_, -1) << "Should not set this twice.";
149  root_node_id_ = id;
150  }
151 
154  uint32_t fragment_hash_seed() const { return root_node_id_ + 1; }
155 
159  uint32_t slot_filter_bitmap_size() const { return 32213; }
160 
166  void AddBitmapFilter(SlotId slot, Bitmap* bitmap, bool* acquired_ownership);
167 
173  if (slot_bitmap_filters_.find(slot) == slot_bitmap_filters_.end()) return NULL;
174  return slot_bitmap_filters_[slot];
175  }
176 
178 
181 
183  bool codegen_enabled() const { return !query_options().disable_codegen; }
184 
187  bool codegen_created() const { return codegen_.get() != NULL; }
188 
192  Status GetCodegen(LlvmCodeGen** codegen, bool initialize = true);
193 
195  DCHECK(block_mgr_.get() != NULL);
196  return block_mgr_.get();
197  }
198 
200  boost::lock_guard<SpinLock> l(query_status_lock_);
201  return query_status_;
202  };
203 
208  bool LogError(const ErrorMsg& msg);
209 
211  bool LogHasSpace() {
212  boost::lock_guard<SpinLock> l(error_log_lock_);
213  return error_log_.size() < query_options().max_errors;
214  }
215 
217  void ReportFileErrors(const std::string& file_name, int num_errors);
218 
220  void ClearFileErrors() { file_errors_.clear(); }
221 
223  bool ErrorLogIsEmpty();
224 
226  std::string ErrorLog();
227 
230  void GetUnreportedErrors(ErrorLogMap* new_errors);
231 
233  std::string FileErrors();
234 
235  bool is_cancelled() const { return is_cancelled_; }
236  void set_is_cancelled(bool v) { is_cancelled_ = v; }
237 
241  }
244  }
247  }
248 
250  void set_query_status(const std::string& err_msg) {
251  boost::lock_guard<SpinLock> l(query_status_lock_);
252  if (!query_status_.ok()) return;
253  query_status_ = Status(err_msg);
254  }
255 
262  int64_t failed_allocation_size = 0);
263 
268 
271 
272  private:
274  Status Init(ExecEnv* exec_env);
275 
279 
280  static const int DEFAULT_BATCH_SIZE = 1024;
281 
283  boost::scoped_ptr<ObjectPool> obj_pool_;
284 
287 
290 
293 
295  std::vector<std::pair<std::string, int> > file_errors_;
296 
299  TPlanFragmentInstanceCtx fragment_instance_ctx_;
300 
303  boost::scoped_ptr<TimestampValue> now_;
304 
307  std::string cgroup_;
309  boost::scoped_ptr<LlvmCodeGen> codegen_;
310 
314 
318 
321 
323 
326 
329 
332 
335 
338  boost::shared_ptr<MemTracker> query_mem_tracker_;
339 
341  boost::scoped_ptr<MemTracker> instance_mem_tracker_;
342 
345 
351 
355 
357  std::vector<DiskIoMgr::RequestContext*> reader_contexts_;
358 
362  boost::shared_ptr<BufferedBlockMgr> block_mgr_;
363 
372 
375 
378  boost::unordered_map<SlotId, Bitmap*> slot_bitmap_filters_;
379 
381  RuntimeState(const RuntimeState&);
382 };
383 
384 #define RETURN_IF_CANCELLED(state) \
385  do { \
386  if (UNLIKELY((state)->is_cancelled())) return Status::CANCELLED; \
387  } while (false)
388 
389 }
390 
391 #endif
boost::shared_ptr< MemTracker > query_mem_tracker_
PartitionStatusMap per_partition_status_
Records summary statistics for the results of inserts into Hdfs partitions.
void set_fragment_root_id(PlanNodeId id)
uint32_t slot_filter_bitmap_size() const
QueryResourceMgr * query_resource_mgr_
boost::shared_ptr< BufferedBlockMgr > block_mgr_
RuntimeProfile::Counter * total_storage_wait_timer_
Total time waiting in storage (across all threads)
const TUniqueId & query_id() const
int PlanNodeId
Definition: global-types.h:26
bool codegen_created() const
CatalogServiceClientCache * catalogd_client_cache()
MemTracker tracker
BufferedBlockMgr * block_mgr()
std::string ErrorLog()
Returns the error log lines as a string joined with ' '.
FileMoveMap * hdfs_files_to_move()
void InitMemTrackers(const TUniqueId &query_id, const std::string *request_pool, int64_t query_bytes_limit, int64_t query_rm_reservation_limit_bytes=-1)
boost::scoped_ptr< ObjectPool > obj_pool_
RuntimeProfile * runtime_profile()
Returns runtime state profile.
void GetUnreportedErrors(ErrorLogMap *new_errors)
void set_desc_tbl(DescriptorTbl *desc_tbl)
Definition: runtime-state.h:94
Lightweight spinlock.
Definition: spinlock.h:24
std::map< std::string, TInsertStats > PartitionInsertStats
Stats per partition for insert queries. They key is the same as for PartitionRowCount.
Definition: runtime-state.h:60
bool LogHasSpace()
Returns true if the error log has not reached max_errors_.
PlanNodeId root_node_id_
MemTracker * query_mem_tracker()
PartitionStatusMap * per_partition_status()
Status Init(ExecEnv *exec_env)
Set per-fragment state.
const std::string & cgroup() const
HBaseTableFactory * htable_factory()
const std::string & do_as_user() const
HBaseTableFactory * htable_factory()
Definition: exec-env.h:82
SpinLock error_log_lock_
Lock protecting error_log_ and unreported_error_idx_.
void set_query_status(const std::string &err_msg)
Sets query_status_ with err_msg if no error has been set yet.
const ErrorLogMap & error_log() const
void AddBitmapFilter(SlotId slot, Bitmap *bitmap, bool *acquired_ownership)
boost::scoped_ptr< LlvmCodeGen > codegen_
std::vector< std::pair< std::string, int > > file_errors_
Stores the number of parse errors per file.
SpinLock bitmap_lock_
Lock protecting slot_bitmap_filters_.
LLVM code generator. This is the top level object to generate jitted code.
Definition: llvm-codegen.h:107
RuntimeProfile::Counter * total_network_receive_timer()
RuntimeProfile::Counter * total_network_send_timer()
int max_errors() const
QueryResourceMgr * query_resource_mgr() const
FileMoveMap hdfs_files_to_move_
boost::unordered_map< SlotId, Bitmap * > slot_bitmap_filters_
bool LogError(const ErrorMsg &msg)
const TQueryOptions & query_options() const
Definition: runtime-state.h:95
const Bitmap * GetBitmapFilter(SlotId slot)
std::map< std::string, std::string > FileMoveMap
Definition: runtime-state.h:65
void ReportFileErrors(const std::string &file_name, int num_errors)
Report that num_errors occurred while parsing file_name.
const std::string & connected_user() const
CatalogServiceClientCache * catalogd_client_cache()
Definition: exec-env.h:79
ObjectPool * obj_pool() const
Definition: runtime-state.h:92
std::map< std::string, TInsertPartitionStatus > PartitionStatusMap
Definition: runtime-state.h:51
void ClearFileErrors()
Clear the file errors.
int SlotId
Definition: global-types.h:24
const std::vector< std::pair< std::string, int > > & file_errors() const
boost::scoped_ptr< MemTracker > instance_mem_tracker_
Memory usage of this fragment instance.
~RuntimeState()
Empty d'tor to avoid issues with scoped_ptr.
RuntimeProfile::Counter * total_cpu_timer_
Total CPU time (across all threads), including all wait times.
uint32_t fragment_hash_seed() const
const TUniqueId & fragment_instance_id() const
std::string FileErrors()
Returns a string representation of the file_errors_.
void set_is_cancelled(bool v)
const DescriptorTbl & desc_tbl() const
Definition: runtime-state.h:93
This class is thread-safe.
Definition: mem-tracker.h:61
RuntimeState(const TPlanFragmentInstanceCtx &fragment_instance_ctx, const std::string &cgroup, ExecEnv *exec_env)
SpinLock file_errors_lock_
Lock protecting file_errors_.
RuntimeProfile profile_
int batch_size() const
Definition: runtime-state.h:98
const std::string & effective_user() const
bool is_cancelled() const
MemTracker * instance_mem_tracker()
Status SetMemLimitExceeded(MemTracker *tracker=NULL, int64_t failed_allocation_size=0)
static const int DEFAULT_BATCH_SIZE
DataStreamMgr * stream_mgr()
ImpalaInternalServiceClientCache * impalad_client_cache()
const TQueryCtx & query_ctx() const
bool codegen_enabled() const
Returns true if codegen is enabled for this query.
bool ErrorLogIsEmpty()
Return true if error log is empty.
Status CreateBlockMgr()
Gets/Creates the query wide block mgr.
std::vector< DiskIoMgr::RequestContext * > * reader_contexts()
DiskIoMgr * disk_io_mgr()
Definition: exec-env.h:83
RuntimeProfile::Counter * total_network_send_timer_
Total time spent sending over the network (across all threads)
void set_now(const TimestampValue *now)
bool is_cancelled_
if true, execution should stop with a CANCELLED status
Status GetCodegen(LlvmCodeGen **codegen, bool initialize=true)
DataStreamMgr * stream_mgr()
Definition: exec-env.h:75
ErrorLogMap error_log_
Logs error messages.
DescriptorTbl * desc_tbl_
const TimestampValue * now() const
ThreadResourceMgr::ResourcePool * resource_pool_
bool abort_on_error() const
Definition: runtime-state.h:99
Only CPU-heavy threads need be managed using this class.
bool ok() const
Definition: status.h:172
RuntimeProfile::Counter * total_storage_wait_timer()
ThreadResourceMgr::ResourcePool * resource_pool()
DiskIoMgr * io_mgr()
const TPlanFragmentInstanceCtx & fragment_ctx() const
RuntimeProfile::Counter * total_cpu_timer()
std::map< TErrorCode::type, TErrorLogEntry > ErrorLogMap
Tracks log messages per error code.
Definition: error-util.h:144
TPlanFragmentInstanceCtx fragment_instance_ctx_
RuntimeProfile::Counter * total_network_receive_timer_
Total time spent receiving over the network (across all threads)
bool abort_on_default_limit_exceeded() const
std::vector< DiskIoMgr::RequestContext * > reader_contexts_
Reader contexts that need to be closed when the fragment is closed.
ImpalaInternalServiceClientCache * impalad_client_cache()
Definition: exec-env.h:76
void SetQueryResourceMgr(QueryResourceMgr *res_mgr)
boost::scoped_ptr< TimestampValue > now_