Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
fragment-exec-state.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <sstream>
18 
19 #include "codegen/llvm-codegen.h"
20 #include "gen-cpp/ImpalaInternalService.h"
21 #include "rpc/thrift-util.h"
22 
23 #include "common/names.h"
24 
25 using namespace apache::thrift;
26 using namespace impala;
27 
29  lock_guard<mutex> l(status_lock_);
30  if (!status.ok() && exec_status_.ok()) exec_status_ = status;
31  return exec_status_;
32 }
33 
35  lock_guard<mutex> l(status_lock_);
36  RETURN_IF_ERROR(exec_status_);
37  executor_.Cancel();
38  return Status::OK;
39 }
40 
41 Status FragmentMgr::FragmentExecState::Prepare(
42  const TExecPlanFragmentParams& exec_params) {
43  exec_params_ = exec_params;
44  RETURN_IF_ERROR(executor_.Prepare(exec_params));
45  return Status::OK;
46 }
47 
49  // Open() does the full execution, because all plan fragments have sinks
50  executor_.Open();
51  executor_.Close();
52 }
53 
54 // There can only be one of these callbacks in-flight at any moment, because
55 // it is only invoked from the executor's reporting thread.
56 // Also, the reported status will always reflect the most recent execution status,
57 // including the final status when execution finishes.
58 void FragmentMgr::FragmentExecState::ReportStatusCb(
59  const Status& status, RuntimeProfile* profile, bool done) {
60  DCHECK(status.ok() || done); // if !status.ok() => done
61  Status exec_status = UpdateStatus(status);
62 
63  Status coord_status;
64  ImpalaInternalServiceConnection coord(client_cache_, coord_address(), &coord_status);
65  if (!coord_status.ok()) {
66  stringstream s;
67  s << "couldn't get a client for " << coord_address();
68  UpdateStatus(Status(ErrorMsg(TErrorCode::INTERNAL_ERROR, s.str())));
69  return;
70  }
71 
72  TReportExecStatusParams params;
73  params.protocol_version = ImpalaInternalServiceVersion::V1;
74  params.__set_query_id(fragment_instance_ctx_.query_ctx.query_id);
75  params.__set_backend_num(fragment_instance_ctx_.backend_num);
76  params.__set_fragment_instance_id(fragment_instance_ctx_.fragment_instance_id);
77  exec_status.SetTStatus(&params);
78  params.__set_done(done);
79  profile->ToThrift(&params.profile);
80  params.__isset.profile = true;
81 
82  RuntimeState* runtime_state = executor_.runtime_state();
83  DCHECK(runtime_state != NULL);
84  // Only send updates to insert status if fragment is finished, the coordinator
85  // waits until query execution is done to use them anyhow.
86  if (done) {
87  TInsertExecStatus insert_status;
88 
89  if (runtime_state->hdfs_files_to_move()->size() > 0) {
90  insert_status.__set_files_to_move(*runtime_state->hdfs_files_to_move());
91  }
92  if (runtime_state->per_partition_status()->size() > 0) {
93  insert_status.__set_per_partition_status(*runtime_state->per_partition_status());
94  }
95 
96  params.__set_insert_exec_status(insert_status);
97  }
98 
99  // Send new errors to coordinator
100  runtime_state->GetUnreportedErrors(&(params.error_log));
101  params.__isset.error_log = (params.error_log.size() > 0);
102 
103  TReportExecStatusResult res;
104  Status rpc_status =
105  coord.DoRpc(&ImpalaInternalServiceClient::ReportExecStatus, params, &res);
106  if (rpc_status.ok()) rpc_status = Status(res.status);
107  if (!rpc_status.ok()) {
108  UpdateStatus(rpc_status);
109  // we need to cancel the execution of this fragment
110  executor_.Cancel();
111  }
112 }
RuntimeState * runtime_state()
only valid after calling Exec(), and may return NULL if there is no executor
FileMoveMap * hdfs_files_to_move()
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
void GetUnreportedErrors(ErrorLogMap *new_errors)
PartitionStatusMap * per_partition_status()
void Cancel(const Status *cause=NULL)
Status Exec(QuerySchedule &schedule, std::vector< ExprContext * > *output_expr_ctxs)
Status Close()
call beeswax.close() for current query, if one in progress
Status DoRpc(const F &f, const Request &request, Response *response)
Definition: client-cache.h:225
void SetTStatus(T *status_container) const
Definition: status.h:213
Status UpdateStatus(const Status &status, const TUniqueId *failed_fragment)
ImpaladQueryExecutor * executor_
execution state of coordinator fragment
Definition: expr-test.cc:71
bool ok() const
Definition: status.h:172
void ToThrift(TRuntimeProfileTree *tree) const