Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
query-resource-mgr.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef STATESTORE_QUERY_RESOURCE_MGR_H
16 #define STATESTORE_QUERY_RESOURCE_MGR_H
17 
18 #include "common/atomic.h"
19 #include "common/status.h"
20 #include "gen-cpp/Types_types.h"
21 #include "gen-cpp/ResourceBrokerService.h"
22 #include "gen-cpp/ImpalaInternalService.h"
23 #include "gen-cpp/Frontend_types.h"
24 #include "util/promise.h"
25 #include "util/thread.h"
26 
27 #include <boost/function.hpp>
28 #include <boost/thread/mutex.hpp>
29 #include <boost/unordered_map.hpp>
30 #include <boost/unordered_set.hpp>
31 #include <boost/shared_ptr.hpp>
32 #include <string>
33 
34 namespace impala {
35 
36 class ResourceBroker;
37 
41  public:
42  ResourceResolver(const boost::unordered_set<TNetworkAddress>& unique_hosts);
43 
51  void GetResourceHostport(const TNetworkAddress& src, TNetworkAddress* dst);
52 
53  private:
58  boost::unordered_map<TNetworkAddress, TNetworkAddress> impalad_to_dn_;
59  boost::unordered_map<TNetworkAddress, TNetworkAddress> dn_to_impalad_;
60 
64  const boost::unordered_set<TNetworkAddress>& unique_hosts);
65 };
66 
71 //
78 //
88 //
90 //
95  public:
96  QueryResourceMgr(const TUniqueId& reservation_id,
97  const TNetworkAddress& local_resource_location, const TUniqueId& query_id);
98 
101  void InitVcoreAcquisition(int32_t init_vcores);
102 
106  //
110  inline bool IsVcoreOverSubscribed() {
111  boost::lock_guard<boost::mutex> l(threads_running_lock_);
113  }
114 
117  void NotifyThreadUsageChange(int delta);
118 
122  typedef boost::function<void ()> VcoreAvailableCb;
123  int32_t AddVcoreAvailableCb(const VcoreAvailableCb& callback);
124 
126  void RemoveVcoreAvailableCb(int32_t callback_id);
127 
130  Status CreateExpansionRequest(int64_t memory_mb, int64_t vcores,
131  TResourceBrokerExpansionRequest* request);
132 
136  void Shutdown();
137 
140 
141  private:
143  TUniqueId reservation_id_;
144 
146  TUniqueId query_id_;
147 
151  TNetworkAddress local_resource_location_;
152 
154  boost::mutex exit_lock_;
155  bool exit_;
156 
158  boost::mutex callbacks_lock_;
159 
161  typedef boost::unordered_map<int32_t, VcoreAvailableCb> CallbackMap;
163 
165  CallbackMap::iterator callbacks_it_;
166 
170 
172  boost::mutex threads_running_lock_;
173 
175  boost::condition_variable threads_changed_cv_;
176 
179 
181  int64_t vcores_;
182 
187 
189  boost::scoped_ptr<Thread> acquire_vcore_thread_;
190 
196  boost::shared_ptr<AtomicInt<int16_t> > early_exit_;
197 
201  boost::shared_ptr<AtomicInt<int16_t> > thread_in_expand_;
202 
206  void AcquireVcoreResources(boost::shared_ptr<AtomicInt<int16_t> > thread_in_expand,
207  boost::shared_ptr<AtomicInt<int16_t> > early_exit);
208 
212 
214  bool ShouldExit();
215 };
216 
217 }
218 
219 #endif
void GetResourceHostport(const TNetworkAddress &src, TNetworkAddress *dst)
TNetworkAddress local_resource_location_
int64_t vcores_
The number of VCores acquired for this node for this query.
boost::shared_ptr< AtomicInt< int16_t > > thread_in_expand_
int32_t AddVcoreAvailableCb(const VcoreAvailableCb &callback)
ResourceResolver(const boost::unordered_set< TNetworkAddress > &unique_hosts)
boost::mutex exit_lock_
Used to control shutdown of AcquireCpuResources().
const TUniqueId & query_id() const
Definition: coordinator.h:152
boost::condition_variable threads_changed_cv_
Waited on by AcquireCpuResources(), and notified by NotifyThreadUsageChange().
void CreateLocalLlamaNodeMapping(const boost::unordered_set< TNetworkAddress > &unique_hosts)
int64_t threads_running_
The number of threads we know to be running on behalf of this query.
QueryResourceMgr(const TUniqueId &reservation_id, const TNetworkAddress &local_resource_location, const TUniqueId &query_id)
boost::unordered_map< TNetworkAddress, TNetworkAddress > dn_to_impalad_
TUniqueId reservation_id_
ID of the single reservation corresponding to this query.
CallbackMap::iterator callbacks_it_
Round-robin iterator to notify callbacks about new VCores one at a time.
boost::function< void()> VcoreAvailableCb
boost::mutex threads_running_lock_
Protects threads_running_, threads_changed_cv_ and vcores_.
boost::unordered_map< int32_t, VcoreAvailableCb > CallbackMap
List of callbacks to notify when a new VCore resource is available.
~QueryResourceMgr()
Waits for the VCore acquisition thread to stop.
TUniqueId query_id_
Query ID of the query this class manages resources for.
void RemoveVcoreAvailableCb(int32_t callback_id)
Removes the callback with the given ID.
boost::scoped_ptr< Thread > acquire_vcore_thread_
Runs AcquireVcoreResources() after InitVcoreAcquisition() is called.
void AcquireVcoreResources(boost::shared_ptr< AtomicInt< int16_t > > thread_in_expand, boost::shared_ptr< AtomicInt< int16_t > > early_exit)
void NotifyThreadUsageChange(int delta)
void InitVcoreAcquisition(int32_t init_vcores)
boost::shared_ptr< AtomicInt< int16_t > > early_exit_
boost::mutex callbacks_lock_
Protects callbacks_ and callbacks_it_.
Only CPU-heavy threads need be managed using this class.
boost::unordered_map< TNetworkAddress, TNetworkAddress > impalad_to_dn_
Status CreateExpansionRequest(int64_t memory_mb, int64_t vcores, TResourceBrokerExpansionRequest *request)
bool ShouldExit()
Notifies acquire_cpu_thread_ that it should terminate. Does not block.