Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
coordinator.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 
16 #ifndef IMPALA_RUNTIME_COORDINATOR_H
17 #define IMPALA_RUNTIME_COORDINATOR_H
18 
19 #include <vector>
20 #include <string>
21 #include <boost/scoped_ptr.hpp>
22 #include <boost/accumulators/accumulators.hpp>
23 #include <boost/accumulators/statistics/stats.hpp>
24 #include <boost/accumulators/statistics/min.hpp>
25 #include <boost/accumulators/statistics/mean.hpp>
26 #include <boost/accumulators/statistics/median.hpp>
27 #include <boost/accumulators/statistics/max.hpp>
28 #include <boost/accumulators/statistics/variance.hpp>
29 #include <boost/unordered_map.hpp>
30 #include <boost/unordered_set.hpp>
31 #include <boost/thread/thread.hpp>
32 #include <boost/thread/mutex.hpp>
33 #include <boost/thread/condition_variable.hpp>
34 
35 #include "common/status.h"
36 #include "common/global-types.h"
37 #include "util/progress-updater.h"
38 #include "util/runtime-profile.h"
39 #include "runtime/runtime-state.h"
41 #include "gen-cpp/Types_types.h"
42 #include "gen-cpp/Frontend_types.h"
43 
44 namespace impala {
45 
46 class DataStreamMgr;
47 class DataSink;
48 class RowBatch;
49 class RowDescriptor;
50 class PlanFragmentExecutor;
51 class ObjectPool;
52 class RuntimeState;
53 class ImpalaInternalServiceClient;
54 class Expr;
55 class ExprContext;
56 class ExecEnv;
57 class TUpdateCatalogRequest;
58 class TQueryExecRequest;
59 class TReportExecStatusParams;
60 class TRowBatch;
61 class TPlanExecRequest;
62 class TRuntimeProfileTree;
63 class RuntimeProfile;
65 
78 //
84 //
87 class Coordinator {
88  public:
89  Coordinator(ExecEnv* exec_env, RuntimeProfile::EventSequence* events);
90  ~Coordinator();
91 
99  Status Exec(QuerySchedule& schedule, std::vector<ExprContext*>* output_expr_ctxs);
100 
106  Status Wait();
107 
122  Status GetNext(RowBatch** batch, RuntimeState* state);
123 
128  void Cancel(const Status* cause = NULL);
129 
137  Status UpdateFragmentExecStatus(const TReportExecStatusParams& params);
138 
141  const RowDescriptor& row_desc() const;
142 
146 
150  RuntimeProfile* query_profile() const { return query_profile_.get(); }
151 
152  const TUniqueId& query_id() const { return query_id_; }
153 
156 
160  bool PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update);
161 
164  std::string GetErrorLog();
165 
166  const ProgressUpdater& progress() { return progress_; }
167 
169  Status GetStatus();
170 
173  const TExecSummary& exec_summary() const {
175  return exec_summary_;
176  }
177 
179 
180  private:
181  class BackendExecState;
182 
185  typedef boost::accumulators::accumulator_set<int64_t,
186  boost::accumulators::features<
187  boost::accumulators::tag::min,
188  boost::accumulators::tag::max,
189  boost::accumulators::tag::mean,
190  boost::accumulators::tag::variance>
191  > SummaryStats;
192 
194  TUniqueId query_id_;
195 
197  TDescriptorTable desc_tbl_;
198  TQueryCtx query_ctx_;
199 
201  TStmtType::type stmt_type_;
202 
204  typedef std::map<PlanNodeId, RuntimeProfile::Counter*> CounterMap;
205 
210 
213  };
214 
216  std::vector<BackendExecState*> backend_exec_states_;
217 
220 
222  TFinalizeParams finalize_params_;
223 
225  boost::mutex wait_lock_;
226 
227  bool has_called_wait_; // if true, Wait() was called; protected by wait_lock_
228 
231 
233  boost::mutex lock_;
234 
238 
244 
246  boost::scoped_ptr<PlanFragmentExecutor> executor_;
247 
252  boost::shared_ptr<MemTracker> query_mem_tracker_;
253 
256 
259  typedef boost::unordered_map<TUniqueId, BackendExecState*> BackendExecStateMap;
261 
263  ObjectPool* obj_pool() { return obj_pool_.get(); }
264 
267 
270 
274  boost::condition_variable backend_completion_cv_;
275 
279 
285 
290 
294 
296  boost::scoped_ptr<ObjectPool> obj_pool_;
297 
300  TExecSummary exec_summary_;
301 
303  boost::unordered_map<TPlanNodeId, int> plan_node_id_to_summary_map_;
304 
306  boost::scoped_ptr<RuntimeProfile> query_profile_;
307 
310 
319 
322 
325 
328 
331 
334  };
335 
339  std::vector<PerFragmentProfileData> fragment_profiles_;
340 
343 
345  boost::unordered_set<TNetworkAddress> unique_hosts_;
346 
349 
352  int backend_num, const TPlanFragment& fragment,
353  int fragment_idx, const FragmentExecParams& params, int instance_idx,
354  const TNetworkAddress& coord, TExecPlanFragmentParams* rpc_params);
355 
363  Status ExecRemoteFragment(void* exec_state);
364 
366  int GetFragmentNum(const TUniqueId& fragment_id);
367 
370  void PrintBackendInfo();
371 
373  void CreateAggregateCounters(const std::vector<TPlanFragment>& fragments);
374 
378 
381  int64_t ComputeTotalThroughput(int node_id);
382 
385  int64_t ComputeTotalScanRangesComplete(int node_id);
386 
388  void CancelInternal();
389 
393  void CancelRemoteFragments();
394 
400  Status UpdateStatus(const Status& status, const TUniqueId* failed_fragment);
401 
408 
413 
416 
419  void InitExecProfile(const TQueryExecRequest& request);
420 
426  void UpdateAverageProfile(BackendExecState* backend_exec_state);
427 
431  void ComputeFragmentSummaryStats(BackendExecState* backend_exec_state);
432 
435  void ReportQuerySummary();
436 
439  void UpdateExecSummary(int fragment_idx, int instance_idx, RuntimeProfile* profile);
440 
450  typedef boost::unordered_map<std::string, std::pair<bool, short> > PermissionCache;
451  void PopulatePathPermissionCache(hdfsFS fs, const std::string& path_str,
452  PermissionCache* permissions_cache);
453 };
454 
455 }
456 
457 #endif
Status UpdateFragmentExecStatus(const TReportExecStatusParams &params)
~Coordinator()
RuntimeState * runtime_state()
only valid after calling Exec(), and may return NULL if there is no executor
Status ExecRemoteFragment(void *exec_state)
client RuntimeProfile::EventSequence * events
Definition: coordinator.h:64
Status GetNext(RowBatch **batch, RuntimeState *state)
SummaryStats completion_times
Completion times for instances of this fragment.
Definition: coordinator.h:330
void DCheckLocked()
Definition: spinlock.h:43
TFinalizeParams finalize_params_
Only valid if needs_finalization is true.
Definition: coordinator.h:222
Struct for per fragment instance counters that will be aggregated by the coordinator.
Definition: coordinator.h:207
ProgressUpdater progress_
Keeps track of number of completed ranges and total scan ranges.
Definition: coordinator.h:230
TUniqueId query_id_
Definition: coordinator.h:194
boost::scoped_ptr< ObjectPool > obj_pool_
Object pool owned by the coordinator. Any executor will have its own pool.
Definition: coordinator.h:296
const TUniqueId & query_id() const
Definition: coordinator.h:152
TExecSummary exec_summary_
Definition: coordinator.h:300
void CreateAggregateCounters(const std::vector< TPlanFragment > &fragments)
Create aggregate counters for all scan nodes in any of the fragments.
void PopulatePathPermissionCache(hdfsFS fs, const std::string &path_str, PermissionCache *permissions_cache)
void SetExecPlanFragmentParams(QuerySchedule &schedule, int backend_num, const TPlanFragment &fragment, int fragment_idx, const FragmentExecParams &params, int instance_idx, const TNetworkAddress &coord, TExecPlanFragmentParams *rpc_params)
Fill in rpc_params based on parameters.
boost::mutex lock_
protects all fields below
Definition: coordinator.h:233
Lightweight spinlock.
Definition: spinlock.h:24
CounterMap throughput_counters
Throughput counters per node.
Definition: coordinator.h:209
TDescriptorTable desc_tbl_
copied from TQueryExecRequest; constant across all fragments
Definition: coordinator.h:197
FileMoveMap files_to_move_
Definition: coordinator.h:293
boost::unordered_map< TUniqueId, BackendExecState * > BackendExecStateMap
Definition: coordinator.h:259
void UpdateExecSummary(int fragment_idx, int instance_idx, RuntimeProfile *profile)
RuntimeProfile * query_profile() const
Definition: coordinator.h:150
void Cancel(const Status *cause=NULL)
std::vector< BackendExecState * > backend_exec_states_
BackendExecStates owned by obj_pool()
Definition: coordinator.h:216
int64_t ComputeTotalScanRangesComplete(int node_id)
Status Exec(QuerySchedule &schedule, std::vector< ExprContext * > *output_expr_ctxs)
Status FinalizeQuery()
Per fragment profile information.
Definition: coordinator.h:312
bool has_called_wait_
Definition: coordinator.h:227
void CancelRemoteFragments()
bool PrepareCatalogUpdate(TUpdateCatalogRequest *catalog_update)
Status Wait()
ObjectPool * obj_pool()
Returns a local object pool.
Definition: coordinator.h:263
SpinLock exec_summary_lock_
Execution summary for this query.
Definition: coordinator.h:299
bool returned_all_results_
Definition: coordinator.h:243
boost::accumulators::accumulator_set< int64_t, boost::accumulators::features< boost::accumulators::tag::min, boost::accumulators::tag::max, boost::accumulators::tag::mean, boost::accumulators::tag::variance > > SummaryStats
Definition: coordinator.h:181
Status query_status_
Definition: coordinator.h:237
std::map< PlanNodeId, RuntimeProfile::Counter * > CounterMap
map from id of a scan node to a specific counter in the node's profile
Definition: coordinator.h:204
Status GetStatus()
Returns query_status_.
std::map< std::string, std::string > FileMoveMap
Definition: runtime-state.h:65
bool execution_completed_
True if execution has completed, false otherwise.
Definition: coordinator.h:266
int64_t ComputeTotalThroughput(int node_id)
std::map< std::string, TInsertPartitionStatus > PartitionStatusMap
Definition: runtime-state.h:51
int num_remaining_backends_
Definition: coordinator.h:278
bool needs_finalization_
True if the query needs a post-execution step to tidy up.
Definition: coordinator.h:219
boost::unordered_map< TPlanNodeId, int > plan_node_id_to_summary_map_
A mapping of plan node ids to index into exec_summary_.nodes.
Definition: coordinator.h:303
FragmentInstanceCounters coordinator_counters_
Throughput counters for the coordinator fragment.
Definition: coordinator.h:342
This class is thread-safe.
Definition: mem-tracker.h:61
RuntimeProfile::Counter * finalization_timer_
Total time spent in finalization (typically 0 except for INSERT into hdfs tables) ...
Definition: coordinator.h:348
boost::mutex wait_lock_
ensures single-threaded execution of Wait(); must not hold lock_ when acquiring this ...
Definition: coordinator.h:225
const RowDescriptor & row_desc() const
CounterMap scan_ranges_complete_counters
Total finished scan ranges per node.
Definition: coordinator.h:212
boost::unordered_map< std::string, std::pair< bool, short > > PermissionCache
Definition: coordinator.h:450
PartitionStatusMap per_partition_status_
Definition: coordinator.h:289
void InitExecProfile(const TQueryExecRequest &request)
int num_instances
Number of instances running this fragment.
Definition: coordinator.h:321
RuntimeProfile * root_profile
Root profile for all fragment instances for this fragment.
Definition: coordinator.h:324
boost::shared_ptr< MemTracker > query_mem_tracker_
Definition: coordinator.h:252
Status WaitForAllBackends()
boost::condition_variable backend_completion_cv_
Definition: coordinator.h:274
std::vector< PerFragmentProfileData > fragment_profiles_
Definition: coordinator.h:339
const ProgressUpdater & progress()
Definition: coordinator.h:166
void UpdateAverageProfile(BackendExecState *backend_exec_state)
const TExecSummary & exec_summary() const
Definition: coordinator.h:173
const RowDescriptor * row_desc_
owned by plan root, which resides in runtime_state_'s pool
Definition: coordinator.h:255
BackendExecStateMap backend_exec_state_map_
Definition: coordinator.h:260
void ComputeFragmentSummaryStats(BackendExecState *backend_exec_state)
RuntimeProfile * averaged_profile
Definition: coordinator.h:318
int GetFragmentNum(const TUniqueId &fragment_id)
Determine fragment number, given fragment id.
ExecEnv * exec_env_
Definition: coordinator.h:193
SummaryStats bytes_assigned
Bytes assigned for instances of this fragment.
Definition: coordinator.h:327
RuntimeProfile::EventSequence * query_events_
Event timeline for this query. Unowned.
Definition: coordinator.h:309
std::string GetErrorLog()
Status UpdateStatus(const Status &status, const TUniqueId *failed_fragment)
SummaryStats rates
Execution rates for instances of this fragment.
Definition: coordinator.h:333
ImpaladQueryExecutor * executor_
execution state of coordinator fragment
Definition: expr-test.cc:71
SpinLock & GetExecSummaryLock() const
Definition: coordinator.h:178
const PartitionStatusMap & per_partition_status()
This is safe to call only after Wait()
Definition: coordinator.h:155
void CancelInternal()
Runs cancel logic. Assumes that lock_ is held.
void ReportQuerySummary()
MemTracker * query_mem_tracker()
Status FinalizeSuccessfulInsert()
Moves all temporary staging files to their final destinations.
TQueryCtx query_ctx_
Definition: coordinator.h:198
void CollectScanNodeCounters(RuntimeProfile *, FragmentInstanceCounters *result)
boost::scoped_ptr< RuntimeProfile > query_profile_
Aggregate counters for the entire query.
Definition: coordinator.h:306
TStmtType::type stmt_type_
copied from TQueryExecRequest, governs when to call ReportQuerySummary
Definition: coordinator.h:201
boost::unordered_set< TNetworkAddress > unique_hosts_
The set of hosts that the query will run on. Populated in Exec.
Definition: coordinator.h:345
int num_remote_fragements_complete_
Number of remote fragments that have completed.
Definition: coordinator.h:269
void PrintBackendInfo()