Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
parallel-executor.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <boost/thread/thread.hpp>
18 
19 #include "util/stopwatch.h"
20 #include "util/thread.h"
21 
22 #include "common/names.h"
23 
24 using namespace impala;
25 
26 Status ParallelExecutor::Exec(Function function, void** args, int num_args,
27  StatsMetric<double>* latencies) {
28  Status status;
29  ThreadGroup worker_threads;
30  mutex lock;
31 
32  for (int i = 0; i < num_args; ++i) {
33  stringstream ss;
34  ss << "worker-thread(" << i << ")";
35  worker_threads.AddThread(new Thread("parallel-executor", ss.str(),
36  &ParallelExecutor::Worker, function, args[i], &lock, &status, latencies));
37  }
38  worker_threads.JoinAll();
39 
40  return status;
41 }
42 
43 void ParallelExecutor::Worker(Function function, void* arg, mutex* lock, Status* status,
44  StatsMetric<double>* latencies) {
46  if (latencies != NULL) sw.Start();
47  Status local_status = function(arg);
48  if (!local_status.ok()) {
49  unique_lock<mutex> l(*lock);
50  if (status->ok()) *status = local_status;
51  }
52 
53  if (latencies != NULL) {
54  latencies->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
55  }
56 }
static void Worker(Function function, void *arg, boost::mutex *lock, Status *status, StatsMetric< double > *latencies)
TODO: Consider allowing fragment IDs as category parameters.
Definition: thread.h:45
boost::function< Status(void *arg)> Function
void Update(const T &value)
uint64_t ElapsedTime() const
Returns time in nanosecond.
Definition: stopwatch.h:105
Status AddThread(Thread *thread)
Definition: thread.cc:318
bool ok() const
Definition: status.h:172
static Status Exec(Function function, void **args, int num_args, StatsMetric< double > *latencies=NULL)
Callers may pass a StatsMetric to gather the latency distribution of task execution.