Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
fragment-mgr.cc
Go to the documentation of this file.
1 // Copyright 2014 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "service/fragment-mgr.h"
16 
17 #include <boost/lexical_cast.hpp>
18 #include <google/malloc_extension.h>
19 #include <gutil/strings/substitute.h>
20 
22 #include "runtime/exec-env.h"
23 #include "util/impalad-metrics.h"
24 #include "util/uid-util.h"
25 
26 #include "common/names.h"
27 
28 using namespace impala;
29 using namespace strings;
30 
31 // TODO: this logging should go into a per query log.
32 DEFINE_int32(log_mem_usage_interval, 0, "If non-zero, impalad will output memory usage "
33  "every log_mem_usage_interval'th fragment completion.");
34 
35 Status FragmentMgr::ExecPlanFragment(const TExecPlanFragmentParams& exec_params) {
36  VLOG_QUERY << "ExecPlanFragment() instance_id="
37  << exec_params.fragment_instance_ctx.fragment_instance_id
38  << " coord=" << exec_params.fragment_instance_ctx.query_ctx.coord_address
39  << " backend#=" << exec_params.fragment_instance_ctx.backend_num;
40 
41  if (!exec_params.fragment.__isset.output_sink) {
42  return Status("missing sink in plan fragment");
43  }
44 
45  shared_ptr<FragmentExecState> exec_state(
46  new FragmentExecState(exec_params.fragment_instance_ctx, ExecEnv::GetInstance()));
47  // Call Prepare() now, before registering the exec state, to avoid calling
48  // exec_state->Cancel().
49  // We might get an async cancellation, and the executor requires that Cancel() not
50  // be called before Prepare() returns.
51  RETURN_IF_ERROR(exec_state->Prepare(exec_params));
52 
53  {
54  lock_guard<mutex> l(fragment_exec_state_map_lock_);
55  // register exec_state before starting exec thread
56  fragment_exec_state_map_.insert(
57  make_pair(exec_params.fragment_instance_ctx.fragment_instance_id, exec_state));
58  }
59 
60  // execute plan fragment in new thread
61  // TODO: manage threads via global thread pool
62  exec_state->set_exec_thread(new Thread("impala-server", "exec-plan-fragment",
63  &FragmentMgr::FragmentExecThread, this, exec_state.get()));
64 
65  return Status::OK;
66 }
67 
70  exec_state->Exec();
71  // we're done with this plan fragment
72 
73  // The last reference to the FragmentExecState is in the map. We don't
74  // want the destructor to be called while the fragment_exec_state_map_lock_
75  // is taken so we'll first grab a reference here before removing the entry
76  // from the map.
77  shared_ptr<FragmentExecState> exec_state_reference;
78  {
79  lock_guard<mutex> l(fragment_exec_state_map_lock_);
80  FragmentExecStateMap::iterator i =
81  fragment_exec_state_map_.find(exec_state->fragment_instance_id());
82  if (i != fragment_exec_state_map_.end()) {
83  exec_state_reference = i->second;
84  fragment_exec_state_map_.erase(i);
85  } else {
86  LOG(ERROR) << "missing entry in fragment exec state map: instance_id="
87  << exec_state->fragment_instance_id();
88  }
89  }
90 #ifndef ADDRESS_SANITIZER
91  // tcmalloc and address sanitizer can not be used together
92  if (FLAGS_log_mem_usage_interval > 0) {
94  if (num_complete % FLAGS_log_mem_usage_interval == 0) {
95  char buf[2048];
96  // This outputs how much memory is currently being used by this impalad
97  MallocExtension::instance()->GetStats(buf, 2048);
98  LOG(INFO) << buf;
99  }
100  }
101 #endif
102 }
103 
104 shared_ptr<FragmentMgr::FragmentExecState> FragmentMgr::GetFragmentExecState(
105  const TUniqueId& fragment_instance_id) {
106  lock_guard<mutex> l(fragment_exec_state_map_lock_);
107  FragmentExecStateMap::iterator i = fragment_exec_state_map_.find(fragment_instance_id);
108  if (i == fragment_exec_state_map_.end()) {
109  return shared_ptr<FragmentExecState>();
110  } else {
111  return i->second;
112  }
113 }
114 
115 void FragmentMgr::CancelPlanFragment(TCancelPlanFragmentResult& return_val,
116  const TCancelPlanFragmentParams& params) {
117  VLOG_QUERY << "CancelPlanFragment(): instance_id=" << params.fragment_instance_id;
118  shared_ptr<FragmentExecState> exec_state =
119  GetFragmentExecState(params.fragment_instance_id);
120  if (exec_state.get() == NULL) {
121  Status status(ErrorMsg(TErrorCode::INTERNAL_ERROR, Substitute("Unknown fragment id: $0",
122  lexical_cast<string>(params.fragment_instance_id))));
123  status.SetTStatus(&return_val);
124  return;
125  }
126  // we only initiate cancellation here, the map entry as well as the exec state
127  // are removed when fragment execution terminates (which is at present still
128  // running in exec_state->exec_thread_)
129  exec_state->Cancel().SetTStatus(&return_val);
130 }
const TUniqueId & fragment_instance_id() const
void FragmentExecThread(FragmentExecState *exec_state)
Definition: fragment-mgr.cc:68
TODO: Consider allowing fragment IDs as category parameters.
Definition: thread.h:45
void Exec()
Main loop of plan fragment execution. Blocks until execution finishes.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
#define VLOG_QUERY
Definition: logging.h:57
static IntCounter * IMPALA_SERVER_NUM_FRAGMENTS
void SetTStatus(T *status_container) const
Definition: status.h:213
static ExecEnv * GetInstance()
Definition: exec-env.h:63
static const Status OK
Definition: status.h:87
Status ExecPlanFragment(const TExecPlanFragmentParams &params)
Definition: fragment-mgr.cc:35
boost::shared_ptr< FragmentExecState > GetFragmentExecState(const TUniqueId &fragment_instance_id)
DEFINE_int32(periodic_counter_update_period_ms, 500,"Period to update rate counters and"" sampling counters in ms")
void CancelPlanFragment(TCancelPlanFragmentResult &return_val, const TCancelPlanFragmentParams &params)
Cancels a plan fragment that is running asynchronously.
Execution state of a single plan fragment.