17 #include <boost/lexical_cast.hpp>
18 #include <google/malloc_extension.h>
19 #include <gutil/strings/substitute.h>
28 using namespace impala;
29 using namespace strings;
32 DEFINE_int32(log_mem_usage_interval, 0,
"If non-zero, impalad will output memory usage "
33 "every log_mem_usage_interval'th fragment completion.");
36 VLOG_QUERY <<
"ExecPlanFragment() instance_id="
37 << exec_params.fragment_instance_ctx.fragment_instance_id
38 <<
" coord=" << exec_params.fragment_instance_ctx.query_ctx.coord_address
39 <<
" backend#=" << exec_params.fragment_instance_ctx.backend_num;
41 if (!exec_params.fragment.__isset.output_sink) {
42 return Status(
"missing sink in plan fragment");
45 shared_ptr<FragmentExecState> exec_state(
54 lock_guard<mutex> l(fragment_exec_state_map_lock_);
56 fragment_exec_state_map_.insert(
57 make_pair(exec_params.fragment_instance_ctx.fragment_instance_id, exec_state));
62 exec_state->set_exec_thread(
new Thread(
"impala-server",
"exec-plan-fragment",
77 shared_ptr<FragmentExecState> exec_state_reference;
79 lock_guard<mutex> l(fragment_exec_state_map_lock_);
80 FragmentExecStateMap::iterator i =
82 if (i != fragment_exec_state_map_.end()) {
83 exec_state_reference = i->second;
84 fragment_exec_state_map_.erase(i);
86 LOG(ERROR) <<
"missing entry in fragment exec state map: instance_id="
90 #ifndef ADDRESS_SANITIZER
92 if (FLAGS_log_mem_usage_interval > 0) {
94 if (num_complete % FLAGS_log_mem_usage_interval == 0) {
97 MallocExtension::instance()->GetStats(buf, 2048);
105 const TUniqueId& fragment_instance_id) {
106 lock_guard<mutex> l(fragment_exec_state_map_lock_);
107 FragmentExecStateMap::iterator i = fragment_exec_state_map_.find(fragment_instance_id);
108 if (i == fragment_exec_state_map_.end()) {
109 return shared_ptr<FragmentExecState>();
116 const TCancelPlanFragmentParams& params) {
117 VLOG_QUERY <<
"CancelPlanFragment(): instance_id=" << params.fragment_instance_id;
118 shared_ptr<FragmentExecState> exec_state =
119 GetFragmentExecState(params.fragment_instance_id);
120 if (exec_state.get() == NULL) {
121 Status status(
ErrorMsg(TErrorCode::INTERNAL_ERROR, Substitute(
"Unknown fragment id: $0",
122 lexical_cast<string>(params.fragment_instance_id))));
129 exec_state->Cancel().SetTStatus(&return_val);
const TUniqueId & fragment_instance_id() const
void FragmentExecThread(FragmentExecState *exec_state)
TODO: Consider allowing fragment IDs as category parameters.
void Exec()
Main loop of plan fragment execution. Blocks until execution finishes.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
static IntCounter * IMPALA_SERVER_NUM_FRAGMENTS
void SetTStatus(T *status_container) const
static ExecEnv * GetInstance()
Status ExecPlanFragment(const TExecPlanFragmentParams ¶ms)
boost::shared_ptr< FragmentExecState > GetFragmentExecState(const TUniqueId &fragment_instance_id)
DEFINE_int32(periodic_counter_update_period_ms, 500,"Period to update rate counters and"" sampling counters in ms")
void CancelPlanFragment(TCancelPlanFragmentResult &return_val, const TCancelPlanFragmentParams ¶ms)
Cancels a plan fragment that is running asynchronously.
Execution state of a single plan fragment.