17 #include <thrift/protocol/TDebugProtocol.h>
18 #include <boost/date_time/posix_time/posix_time_types.hpp>
19 #include <boost/unordered_map.hpp>
20 #include <boost/foreach.hpp>
21 #include <gutil/strings/substitute.h>
47 DEFINE_bool(serialize_batch,
false,
"serialize and deserialize each returned row batch");
48 DEFINE_int32(status_report_interval, 5,
"interval between profile reports; in seconds");
53 namespace posix_time = boost::posix_time;
54 using boost::get_system_time;
55 using boost::system_time;
56 using namespace apache::thrift;
57 using namespace strings;
61 const string PlanFragmentExecutor::PER_HOST_PEAK_MEM_COUNTER =
"PerHostPeakMemUsage";
63 PlanFragmentExecutor::PlanFragmentExecutor(
ExecEnv* exec_env,
65 exec_env_(exec_env), plan_(NULL), report_status_cb_(report_status_cb),
66 report_thread_active_(false), done_(false), prepared_(false), closed_(false),
67 has_thread_token_(false), average_thread_tokens_(NULL),
68 mem_usage_sampled_counter_(NULL), thread_usage_sampled_counter_(NULL) {
82 const TPlanFragmentExecParams& params = request.params;
83 query_id_ = request.fragment_instance_ctx.query_ctx.query_id;
86 <<
PrintId(request.fragment_instance_ctx.fragment_instance_id);
87 VLOG(2) <<
"params:\n" << ThriftDebugString(params);
89 if (request.__isset.reserved_resource) {
90 VLOG_QUERY <<
"Executing fragment in reserved resource:\n"
91 << request.reserved_resource;
95 if (FLAGS_enable_rm && request.__isset.reserved_resource) {
106 if (FLAGS_enable_rm && !cgroup.empty() && request.__isset.reserved_resource) {
109 request.fragment_instance_ctx.fragment_instance_id, cgroup, &is_first));
113 DCHECK(request.__isset.reserved_resource);
115 request.reserved_resource.v_cpu_cores);
121 if (FLAGS_enable_rm && request.__isset.reserved_resource) {
122 TUniqueId reservation_id;
123 reservation_id << request.reserved_resource.reservation_id;
128 reservation_id, request.local_resource_address, &res_mgr);
129 DCHECK(res_mgr != NULL);
133 request.reserved_resource.v_cpu_cores);
138 int64_t bytes_limit = -1;
142 VLOG_QUERY <<
"Using query memory limit from query options: "
146 int64_t rm_reservation_size_bytes = -1;
147 if (request.__isset.reserved_resource && request.reserved_resource.memory_mb > 0) {
148 rm_reservation_size_bytes =
149 static_cast<int64_t
>(request.reserved_resource.memory_mb) * 1024L * 1024L;
153 if (rm_reservation_size_bytes > bytes_limit && bytes_limit != -1) {
155 PrettyPrinter::PrintBytes(rm_reservation_size_bytes),
156 PrettyPrinter::PrintBytes(bytes_limit)));
157 rm_reservation_size_bytes = bytes_limit;
159 VLOG_QUERY <<
"Using RM reservation memory limit from resource reservation: "
163 DCHECK(!params.request_pool.empty());
165 bytes_limit, rm_reservation_size_bytes);
189 DCHECK(request.__isset.desc_tbl);
193 VLOG_QUERY <<
"descriptor table for fragment="
194 << request.fragment_instance_ctx.fragment_instance_id
198 DCHECK(request.__isset.fragment);
203 if (request.params.__isset.debug_node_id) {
204 DCHECK(request.params.__isset.debug_action);
205 DCHECK(request.params.__isset.debug_phase);
207 request.params.debug_phase, request.params.debug_action,
plan_);
211 vector<ExecNode*> exch_nodes;
213 BOOST_FOREACH(
ExecNode* exch_node, exch_nodes)
215 DCHECK_EQ(exch_node->
type(), TPlanNodeType::EXCHANGE_NODE);
218 DCHECK_GT(num_senders, 0);
219 static_cast<ExchangeNode*
>(exch_node)->set_num_senders(num_senders);
223 vector<ExecNode*> scan_nodes;
224 vector<TScanRangeParams> no_scan_ranges;
226 for (
int i = 0; i < scan_nodes.size(); ++i) {
229 params.per_node_scan_ranges, scan_node->
id(), no_scan_ranges);
242 if (request.fragment.__isset.output_sink) {
244 obj_pool(), request.fragment.output_sink, request.fragment.output_exprs,
249 if (sink_profile != NULL) {
275 DCHECK_NOTNULL(codegen);
279 ss <<
"Error with codegen for this query: " << status.
GetDetail();
286 if (per_node_scan_ranges.empty())
return;
289 BOOST_FOREACH(
const PerNodeScanRanges::value_type& entry, per_node_scan_ranges) {
298 <<
"Hdfs split stats (<volume id>:<# splits>/<split lengths>) for query="
310 new Thread(
"plan-fragment-executor",
"report-profile",
346 if (batch == NULL)
break;
349 for (
int i = 0; i < batch->
num_rows(); ++i) {
375 VLOG_FILE <<
"ReportProfile(): instance_id="
386 int report_fragment_offset = rand() % FLAGS_status_report_interval;
387 system_time timeout = get_system_time()
388 + posix_time::seconds(report_fragment_offset);
393 system_time timeout = get_system_time()
394 + posix_time::seconds(FLAGS_status_report_interval);
405 <<
"profile for instance " <<
runtime_state_->fragment_instance_id();
419 VLOG_FILE <<
"exiting reporting thread: instance_id="
465 if (*batch != NULL && (*batch)->
num_rows() == 0) *batch = NULL;
500 int64_t cpu_time = cpu_and_wait_time
515 if (status.
ok())
return;
556 runtime_state_->query_resource_mgr()->NotifyThreadUsageChange(-1);
void UpdateStatus(const Status &status)
boost::scoped_ptr< RowBatch > row_batch_
void CollectNodes(TPlanNodeType::type node_type, std::vector< ExecNode * > *nodes)
int64_t consumption() const
Returns the memory consumed in bytes.
void CollectScanNodes(std::vector< ExecNode * > *nodes)
Collect all scan node types.
const std::string GetDetail() const
int32_t VirtualCoresToCpuShares(int16_t v_cpu_cores)
Counter * AddSamplingCounter(const std::string &name, Counter *src_counter)
void AddInfoString(const std::string &key, const std::string &value)
Status GetNextInternal(RowBatch **batch)
RuntimeProfile::TimeSeriesCounter * thread_usage_sampled_counter_
Sampled thread usage (tokens) at even time intervals.
Status RegisterFragment(const TUniqueId &fragment_instance_id, const std::string &cgroup, bool *is_first)
TODO: Consider allowing fragment IDs as category parameters.
void FragmentComplete()
Called when the fragment execution is complete to finalize counters.
RuntimeProfile::Counter * per_host_mem_usage_
int64_t num_threads() const
const RowDescriptor & row_desc()
static const std::string HDFS_SPLIT_STATS_DESC
Description string for the per volume stats output.
MonotonicStopWatch fragment_sw_
Stopwatch for this entire fragment. Started in Prepare(), stopped in Close().
std::string UniqueIdToCgroup(const std::string &unique_id) const
#define RETURN_IF_ERROR(stmt)
some generally useful macros
TupleRow * GetRow(int row_idx)
RuntimeProfile::Counter * average_thread_tokens_
#define ADD_TIMER(profile, name)
bool report_thread_active_
static Status CreateTree(ObjectPool *pool, const TPlan &plan, const DescriptorTbl &descs, ExecNode **root)
const DescriptorTbl & desc_tbl()
DEFINE_int32(status_report_interval, 5,"interval between profile reports; in seconds")
Status SetCpuShares(const std::string &cgroup, int32_t num_shares)
bool GetQueryResourceMgr(const TUniqueId &query_id, const TUniqueId &reservation_id, const TNetworkAddress &local_resource_address, QueryResourceMgr **res_mgr)
boost::condition_variable stop_report_thread_cv_
const RowDescriptor & row_desc() const
string PrintId(const TUniqueId &id, const string &separator)
#define COUNTER_ADD(c, v)
void OptimizeLlvmModule()
void Cancel()
Initiate cancellation. Must not be called until after Prepare() returned.
virtual void Set(int64_t value)
boost::condition_variable report_thread_started_cv_
ResourceBroker * resource_broker()
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
T Read()
Safe read of the value.
void UnregisterPool(ResourcePool *pool)
ReportStatusCallback report_status_cb_
profile reporting-related
Status Prepare(const TExecPlanFragmentParams &request)
RuntimeProfile * profile()
Profile information for plan and output sink.
void ReleaseThreadToken()
Releases the thread token for this fragment executor.
TPlanNodeType::type type() const
LLVM code generator. This is the top level object to generate jitted code.
bool closed_
true if Close() has been called
CgroupsMgr * cgroups_mgr()
std::string DebugString() const
Returns a string representation in DFS order of the plan rooted at this.
bool CompareAndSwap(T old_val, T new_val)
Returns true if the atomic compare-and-swap was successful.
boost::scoped_ptr< DataSink > sink_
void PrintVolumeIds(const TPlanExecParams ¶ms)
Print stats about scan ranges for each volumeId in params to info log.
void UnregisterQueryResourceMgr(const TUniqueId &query_id)
virtual Status Prepare(RuntimeState *state)
#define ADD_COUNTER(profile, name, unit)
std::string DebugString() const
boost::mutex report_thread_lock_
void SetScanRanges(const std::vector< TScanRangeParams > &scan_range_params)
Abstract base class of all scan nodes; introduces SetScanRange().
uint64_t ElapsedTime() const
Returns time in nanosecond.
boost::scoped_ptr< RuntimeState > runtime_state_
static void PrintHdfsSplitStats(const PerVolumnStats &per_volume_stats, std::stringstream *ss)
TimeSeriesCounter * AddTimeSeriesCounter(const std::string &name, TUnit::type unit, DerivedCounterFunction sample_fn)
AtomicInt< int > completed_report_sent_
RuntimeProfile::Counter * rows_produced_counter_
Number of rows returned by this fragment.
static void StopSamplingCounter(RuntimeProfile::Counter *counter)
Stops updating the value of 'counter'.
void SendReport(bool done)
static void UpdateHdfsSplitStats(const std::vector< TScanRangeParams > &scan_range_params_list, PerVolumnStats *per_volume_stats)
Update the per volume stats with the given scan range params list.
RuntimeState * runtime_state()
call these only after Prepare()
void PrettyPrint(std::ostream *s, const std::string &prefix="") const
std::map< TPlanNodeId, std::vector< TScanRangeParams > > PerNodeScanRanges
typedef for TPlanFragmentExecParams.per_node_scan_ranges
boost::function< void(const Status &status, RuntimeProfile *profile, bool done)> ReportStatusCallback
boost::mutex status_lock_
static Status Create(ObjectPool *pool, const TDescriptorTable &thrift_tbl, DescriptorTbl **tbl)
bool ReachedLimit()
Returns true if this query has a limit and it has been reached.
const V & FindWithDefault(const std::map< K, V > &m, const K &key, const V &default_val)
DEFINE_bool(serialize_batch, false,"serialize and deserialize each returned row batch")
const ErrorMsg & msg() const
Returns the error message associated with a non-successful status.
RuntimeProfile::TimeSeriesCounter * mem_usage_sampled_counter_
Sampled memory usage at even time intervals.
bool done_
true if plan_->GetNext() indicated that it's done
void AddChild(RuntimeProfile *child, bool indent=true, RuntimeProfile *location=NULL)
virtual Status Open(RuntimeState *state)
Only CPU-heavy threads need be managed using this class.
string PrintRow(TupleRow *row, const RowDescriptor &d)
static const std::string PER_HOST_PEAK_MEM_COUNTER
Name of the counter that is tracking per query, per host peak mem usage.
bool prepared_
true if Prepare() returned OK
MemTracker * query_mem_tracker()
ThreadResourceMgr * thread_mgr()
virtual void Close(RuntimeState *state)
static void SetDebugOptions(int node_id, TExecNodePhase::type phase, TDebugAction::type action, ExecNode *tree)
Set debug action for node with given id in 'tree'.
boost::unordered_map< int32_t, std::pair< int, int64_t > > PerVolumnStats
map from volume id to <number of split, per volume split lengths>
Status UnregisterFragment(const TUniqueId &fragment_instance_id, const std::string &cgroup)
virtual Status GetNext(RuntimeState *state, RowBatch *row_batch, bool *eos)=0
Status GetNext(RowBatch **batch)
boost::scoped_ptr< Thread > report_thread_
bool IsMemLimitExceeded() const
static void StopTimeSeriesCounter(RuntimeProfile::TimeSeriesCounter *counter)
Stops 'counter' from receiving any more samples.
RuntimeProfile * runtime_profile()
bool has_thread_token_
true if this fragment has not returned the thread token to the thread resource mgr ...
static Status CreateDataSink(ObjectPool *pool, const TDataSink &thrift_sink, const std::vector< TExpr > &output_exprs, const TPlanFragmentExecParams ¶ms, const RowDescriptor &row_desc, boost::scoped_ptr< DataSink > *sink)