Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
query-resource-mgr.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <boost/foreach.hpp>
18 #include <boost/uuid/uuid.hpp>
19 #include <boost/uuid/uuid_generators.hpp>
20 #include <gutil/strings/substitute.h>
21 #include <sstream>
22 
23 #include "runtime/exec-env.h"
25 #include "util/container-util.h"
26 #include "util/network-util.h"
27 #include "util/promise.h"
28 #include "util/time.h"
29 
30 #include "common/names.h"
31 
32 using boost::uuids::random_generator;
33 using boost::uuids::uuid;
34 using namespace impala;
35 using namespace strings;
36 
38 
39 DEFINE_double(max_vcore_oversubscription_ratio, 2.5, "(Advanced) The maximum ratio "
40  "allowed between running threads and acquired VCore resources for a query's fragments"
41  " on a single node");
42 
43 ResourceResolver::ResourceResolver(const unordered_set<TNetworkAddress>& unique_hosts) {
44  if (ExecEnv::GetInstance()->is_pseudo_distributed_llama()) {
45  CreateLocalLlamaNodeMapping(unique_hosts);
46  }
47 }
48 
49 void ResourceResolver::GetResourceHostport(const TNetworkAddress& src,
50  TNetworkAddress* dest) {
51  if (ExecEnv::GetInstance()->is_pseudo_distributed_llama()) {
52  *dest = impalad_to_dn_[src];
53  } else {
54  dest->hostname = src.hostname;
55  dest->port = 0;
56  }
57 }
58 
60  const unordered_set<TNetworkAddress>& unique_hosts) {
61  DCHECK(ExecEnv::GetInstance()->is_pseudo_distributed_llama());
62  const vector<string>& llama_nodes =
64  DCHECK(!llama_nodes.empty());
65  int llama_node_ix = 0;
66  BOOST_FOREACH(const TNetworkAddress& host, unique_hosts) {
67  TNetworkAddress dn_hostport = MakeNetworkAddress(llama_nodes[llama_node_ix]);
68  impalad_to_dn_[host] = dn_hostport;
69  dn_to_impalad_[dn_hostport] = host;
70  LOG(INFO) << "Mapping Datanode " << dn_hostport << " to Impalad: " << host;
71  // Round robin the registered Llama nodes.
72  llama_node_ix = (llama_node_ix + 1) % llama_nodes.size();
73  }
74 }
75 
76 QueryResourceMgr::QueryResourceMgr(const TUniqueId& reservation_id,
77  const TNetworkAddress& local_resource_location, const TUniqueId& query_id)
78  : reservation_id_(reservation_id), query_id_(query_id),
79  local_resource_location_(local_resource_location), exit_(false), callback_count_(0),
80  threads_running_(0), vcores_(0) {
81  max_vcore_oversubscription_ratio_ = FLAGS_max_vcore_oversubscription_ratio;
82 }
83 
84 void QueryResourceMgr::InitVcoreAcquisition(int32_t init_vcores) {
85  LOG(INFO) << "Initialising vcore acquisition thread for query " << PrintId(query_id_)
86  << " (" << init_vcores << " initial vcores)";
87  DCHECK(acquire_vcore_thread_.get() == NULL)
88  << "Double initialisation of QueryResourceMgr::InitCpuAcquisition()";
89  vcores_ = init_vcores;
90 
91  // These shared pointers to atomic values are used to communicate between the vcore
92  // acquisition thread and the class destructor. If the acquisition thread is in the
93  // middle of an Expand() call, the destructor might have to wait 5s (the default
94  // timeout) to return. This holds up query close operations. So instead check to see if
95  // the thread is in Expand(), and if so we set a synchronised flag early_exit_ which it
96  // inspects immediately after exiting Expand(), and if true, exits before touching any
97  // of the class-wide state (because the destructor may have finished before this point).
98 
100  early_exit_.reset(new AtomicInt<int16_t>());
101  acquire_vcore_thread_.reset(
102  new Thread("resource-mgmt", Substitute("acquire-cpu-$0", PrintId(query_id_)),
103  bind<void>(mem_fn(&QueryResourceMgr::AcquireVcoreResources), this,
105 }
106 
107 Status QueryResourceMgr::CreateExpansionRequest(int64_t memory_mb, int64_t vcores,
108  TResourceBrokerExpansionRequest* request) {
109  DCHECK(request != NULL);
110  DCHECK(memory_mb > 0 || vcores > 0);
111  DCHECK(reservation_id_ != TUniqueId()) << "Expansion requires existing reservation";
112 
113  unordered_set<TNetworkAddress> hosts;
114  hosts.insert(local_resource_location_);
115  ResourceResolver resolver(hosts);
116  llama::TResource res;
117  res.memory_mb = memory_mb;
118  res.v_cpu_cores = vcores;
119  TNetworkAddress res_address;
120  resolver.GetResourceHostport(local_resource_location_, &res_address);
121  res.__set_askedLocation(TNetworkAddressToString(res_address));
122 
123  random_generator uuid_generator;
124  uuid id = uuid_generator();
125  res.client_resource_id.hi = *reinterpret_cast<uint64_t*>(&id.data[0]);
126  res.client_resource_id.lo = *reinterpret_cast<uint64_t*>(&id.data[8]);
127  res.enforcement = llama::TLocationEnforcement::MUST;
128 
129  request->__set_resource(res);
130  request->__set_reservation_id(reservation_id_);
131  request->__set_request_timeout(DEFAULT_EXPANSION_REQUEST_TIMEOUT_MS);
132 
133  return Status::OK;
134 }
135 
138 }
139 
141  lock_guard<mutex> l(threads_running_lock_);
142  threads_running_ += delta;
143  DCHECK(threads_running_ >= 0L);
145 }
146 
148  lock_guard<mutex> l(callbacks_lock_);
149  callbacks_[callback_count_] = callback;
150  callbacks_it_ = callbacks_.begin();
151  return callback_count_++;
152 }
153 
154 void QueryResourceMgr::RemoveVcoreAvailableCb(int32_t callback_id) {
155  lock_guard<mutex> l(callbacks_lock_);
156  CallbackMap::iterator it = callbacks_.find(callback_id);
157  DCHECK(it != callbacks_.end()) << "Could not find callback with id: " << callback_id;
158  callbacks_.erase(it);
159  callbacks_it_ = callbacks_.begin();
160 }
161 
163  shared_ptr<AtomicInt<int16_t> > thread_in_expand,
164  shared_ptr<AtomicInt<int16_t> > early_exit) {
165  // Take a copy because we'd like to print it in some cases after the destructor.
166  TUniqueId reservation_id = reservation_id_;
167  VLOG_QUERY << "Starting Vcore acquisition for: " << reservation_id;
168  while (!ShouldExit()) {
169  {
170  unique_lock<mutex> l(threads_running_lock_);
172  threads_changed_cv_.wait(l);
173  }
174  }
175  if (ShouldExit()) break;
176 
177  TResourceBrokerExpansionRequest request;
178  CreateExpansionRequest(0L, 1, &request);
179  TResourceBrokerExpansionResponse response;
180  VLOG_QUERY << "Expanding VCore allocation: " << reservation_id_;
181 
182  // First signal that we are about to enter a blocking Expand() call.
183  thread_in_expand->FetchAndUpdate(1L);
184  // TODO: Could cause problems if called during or after a system-wide shutdown
185  Status status = ExecEnv::GetInstance()->resource_broker()->Expand(request, &response);
186  thread_in_expand->FetchAndUpdate(-1L);
187  // If signalled to exit quickly by the destructor, exit the loop now. It's important
188  // to do so without accessing any class variables since they may no longer be valid.
189  if (early_exit->FetchAndUpdate(0L) != 0) {
190  VLOG_QUERY << "Fragment finished during Expand(): " << reservation_id;
191  break;
192  }
193  if (!status.ok()) {
194  VLOG_QUERY << "Could not expand CPU resources for query " << PrintId(query_id_)
195  << ", reservation: " << PrintId(reservation_id_) << ". Error was: "
196  << status.GetDetail();
197  // Sleep to avoid flooding the resource broker, particularly if requests are being
198  // rejected quickly (and therefore we stay oversubscribed)
199  // TODO: configurable timeout
200  SleepForMs(250);
201  continue;
202  }
203 
204  const llama::TAllocatedResource& resource =
205  response.allocated_resources.begin()->second;
206  DCHECK(resource.v_cpu_cores == 1)
207  << "Asked for 1 core, got: " << resource.v_cpu_cores;
208  vcores_ += resource.v_cpu_cores;
209 
210  ExecEnv* exec_env = ExecEnv::GetInstance();
211  const string& cgroup =
212  exec_env->cgroups_mgr()->UniqueIdToCgroup(PrintId(query_id_, "_"));
213  int32_t num_shares = exec_env->cgroups_mgr()->VirtualCoresToCpuShares(vcores_);
214  exec_env->cgroups_mgr()->SetCpuShares(cgroup, num_shares);
215 
216  // TODO: Only call one callback no matter how many VCores we just added; maybe call
217  // all of them?
218  {
219  lock_guard<mutex> l(callbacks_lock_);
220  if (callbacks_.size() != 0) {
221  callbacks_it_->second();
222  if (++callbacks_it_ == callbacks_.end()) callbacks_it_ = callbacks_.begin();
223  }
224  }
225  }
226  VLOG_QUERY << "Leaving VCore acquisition thread: " << reservation_id;
227 }
228 
230  lock_guard<mutex> l(exit_lock_);
231  return exit_;
232 }
233 
235  {
236  lock_guard<mutex> l(exit_lock_);
237  if (exit_) return;
238  exit_ = true;
239  }
240  {
241  lock_guard<mutex> l(callbacks_lock_);
242  callbacks_.clear();
243  }
244  threads_changed_cv_.notify_all();
245 
246  // Delete all non-reservation requests associated with this reservation ID. If this the
247  // coordinator, the SimpleScheduler will actually release the resources by releasing the
248  // original reservation ID.
250 }
251 
253  if (acquire_vcore_thread_.get() == NULL) return;
254  if (!ShouldExit()) Shutdown();
255  // First, set the early exit flag. Then check to see if the thread is in Expand(). If
256  // so, the acquisition thread is guaranteed to see early_exit_ == 1L once it finishes
257  // Expand(), and will exit immediately. It's therefore safe not to wait for it.
258  early_exit_->FetchAndUpdate(1L);
259  if (thread_in_expand_->FetchAndUpdate(0L) == 0L) {
260  acquire_vcore_thread_->Join();
261  }
262 }
void GetResourceHostport(const TNetworkAddress &src, TNetworkAddress *dst)
TNetworkAddress local_resource_location_
int64_t vcores_
The number of VCores acquired for this node for this query.
boost::shared_ptr< AtomicInt< int16_t > > thread_in_expand_
int32_t AddVcoreAvailableCb(const VcoreAvailableCb &callback)
const std::string GetDetail() const
Definition: status.cc:184
string TNetworkAddressToString(const TNetworkAddress &address)
Utility method to print address as address:port.
int32_t VirtualCoresToCpuShares(int16_t v_cpu_cores)
Definition: cgroups-mgr.cc:59
DEFINE_double(max_vcore_oversubscription_ratio, 2.5,"(Advanced) The maximum ratio ""allowed between running threads and acquired VCore resources for a query's fragments"" on a single node")
ResourceResolver(const boost::unordered_set< TNetworkAddress > &unique_hosts)
boost::mutex exit_lock_
Used to control shutdown of AcquireCpuResources().
TODO: Consider allowing fragment IDs as category parameters.
Definition: thread.h:45
TUniqueId query_id_
Definition: coordinator.h:194
const TUniqueId & query_id() const
Definition: coordinator.h:152
std::string UniqueIdToCgroup(const std::string &unique_id) const
Definition: cgroups-mgr.cc:54
boost::condition_variable threads_changed_cv_
Waited on by AcquireCpuResources(), and notified by NotifyThreadUsageChange().
void CreateLocalLlamaNodeMapping(const boost::unordered_set< TNetworkAddress > &unique_hosts)
Status SetCpuShares(const std::string &cgroup, int32_t num_shares)
Definition: cgroups-mgr.cc:101
TNetworkAddress MakeNetworkAddress(const string &hostname, int port)
Definition: network-util.cc:96
string PrintId(const TUniqueId &id, const string &separator)
Definition: debug-util.cc:97
int64_t threads_running_
The number of threads we know to be running on behalf of this query.
void SleepForMs(const int64_t duration_ms)
Sleeps the current thread for at least duration_ms milliseconds.
Definition: time.cc:21
QueryResourceMgr(const TUniqueId &reservation_id, const TNetworkAddress &local_resource_location, const TUniqueId &query_id)
ResourceBroker * resource_broker()
Definition: exec-env.h:95
TUniqueId reservation_id_
ID of the single reservation corresponding to this query.
void ClearRequests(const TUniqueId &reservation_id, bool include_reservation)
CallbackMap::iterator callbacks_it_
Round-robin iterator to notify callbacks about new VCores one at a time.
boost::function< void()> VcoreAvailableCb
CgroupsMgr * cgroups_mgr()
Definition: exec-env.h:88
#define VLOG_QUERY
Definition: logging.h:57
boost::mutex threads_running_lock_
Protects threads_running_, threads_changed_cv_ and vcores_.
~QueryResourceMgr()
Waits for the VCore acquisition thread to stop.
TUniqueId query_id_
Query ID of the query this class manages resources for.
void RemoveVcoreAvailableCb(int32_t callback_id)
Removes the callback with the given ID.
boost::scoped_ptr< Thread > acquire_vcore_thread_
Runs AcquireVcoreResources() after InitVcoreAcquisition() is called.
static ExecEnv * GetInstance()
Definition: exec-env.h:63
void AcquireVcoreResources(boost::shared_ptr< AtomicInt< int16_t > > thread_in_expand, boost::shared_ptr< AtomicInt< int16_t > > early_exit)
const int64_t DEFAULT_EXPANSION_REQUEST_TIMEOUT_MS
static const Status OK
Definition: status.h:87
void NotifyThreadUsageChange(int delta)
Status Expand(const TResourceBrokerExpansionRequest &request, TResourceBrokerExpansionResponse *response)
void InitVcoreAcquisition(int32_t init_vcores)
boost::shared_ptr< AtomicInt< int16_t > > early_exit_
boost::mutex callbacks_lock_
Protects callbacks_ and callbacks_it_.
bool ok() const
Definition: status.h:172
const std::vector< std::string > & llama_nodes()
Status CreateExpansionRequest(int64_t memory_mb, int64_t vcores, TResourceBrokerExpansionRequest *request)
bool ShouldExit()
Notifies acquire_cpu_thread_ that it should terminate. Does not block.