Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
resource-broker.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <boost/algorithm/string/join.hpp>
18 #include <boost/foreach.hpp>
19 #include <boost/uuid/uuid.hpp>
20 #include <boost/uuid/uuid_generators.hpp>
21 #include <boost/uuid/uuid_io.hpp>
22 #include <boost/lexical_cast.hpp>
23 #include <gutil/strings/substitute.h>
24 #include <thrift/Thrift.h>
25 
26 #include "common/status.h"
27 #include "rpc/thrift-util.h"
28 #include "rpc/thrift-server.h"
30 #include "statestore/scheduler.h"
31 #include "util/debug-util.h"
32 #include "util/stopwatch.h"
33 #include "util/uid-util.h"
34 #include "util/network-util.h"
35 #include "util/llama-util.h"
36 #include "util/time.h"
37 #include "gen-cpp/ResourceBrokerService.h"
38 #include "gen-cpp/Llama_types.h"
39 
40 #include "common/names.h"
41 
42 using boost::algorithm::join;
43 using boost::algorithm::to_lower;
44 using boost::uuids::random_generator;
45 using namespace ::apache::thrift::server;
46 using namespace ::apache::thrift;
47 using namespace impala;
48 using namespace strings;
49 
50 DECLARE_int64(llama_registration_timeout_secs);
51 DECLARE_int64(llama_registration_wait_secs);
52 DECLARE_int64(llama_max_request_attempts);
53 
54 DECLARE_int32(resource_broker_cnxn_attempts);
55 DECLARE_int32(resource_broker_cnxn_retry_interval_ms);
56 DECLARE_int32(resource_broker_send_timeout);
57 DECLARE_int32(resource_broker_recv_timeout);
58 
59 static const string LLAMA_KERBEROS_SERVICE_NAME = "llama";
60 
61 namespace impala {
62 
63 // String to search for in Llama error messages to detect that Llama has restarted,
64 // and hence the resource broker must re-register.
65 const string LLAMA_RESTART_SEARCH_STRING = "unknown handle";
66 
67 class LlamaNotificationThriftIf : public llama::LlamaNotificationServiceIf {
68  public:
70  : resource_broker_(resource_broker) {}
71 
72  virtual void AMNotification(llama::TLlamaAMNotificationResponse& response,
73  const llama::TLlamaAMNotificationRequest& request) {
74  resource_broker_->AMNotification(request, response);
75  }
76 
77  virtual void NMNotification(llama::TLlamaNMNotificationResponse& response,
78  const llama::TLlamaNMNotificationRequest& request) {
79  LOG(WARNING) << "Ignoring node-manager notification. Handling not yet implemented.";
80  response.status.__set_status_code(llama::TStatusCode::OK);
81  }
82 
84 
85  private:
87 };
88 
89 ResourceBroker::ResourceBroker(const vector<TNetworkAddress>& llama_addresses,
90  const TNetworkAddress& llama_callback_address, MetricGroup* metrics) :
91  llama_addresses_(llama_addresses),
92  active_llama_addr_idx_(-1),
93  llama_callback_address_(llama_callback_address),
94  metrics_(metrics),
95  scheduler_(NULL),
96  llama_callback_thrift_iface_(new LlamaNotificationThriftIf(this)),
97  llama_client_cache_(new ClientCache<llama::LlamaAMServiceClient>(
98  FLAGS_resource_broker_cnxn_attempts,
99  FLAGS_resource_broker_cnxn_retry_interval_ms,
100  FLAGS_resource_broker_send_timeout,
101  FLAGS_resource_broker_recv_timeout,
103  DCHECK(metrics != NULL);
104  active_llama_metric_ = metrics->AddProperty<string>(
105  "resource-broker.active-llama", "none");
106  active_llama_handle_metric_ = metrics->AddProperty<string>(
107  "resource-broker.active-llama-handle", "none");
108 
110  new StatsMetric<double>("resource-broker.reservation-request-rpc-time",
111  TUnit::TIME_S, "The time, in seconds, that a Reserve() RPC takes to "
112  "Llama"));
114  new StatsMetric<double>("resource-broker.reservation-request-response-time",
115  TUnit::TIME_S, "The time, in seconds, that a reservation request takes "
116  "to be fulfilled by Llama"));
118  "resource-broker.reservation-requests-total", 0, TUnit::UNIT,
119  "The total number of reservation requests made by this Impala daemon to Llama");
121  "resource-broker.reservation-requests-fulfilled", 0, TUnit::UNIT,
122  "The number of reservation requests made by this Impala daemon to Llama "
123  "which succeeded");
125  "resource-broker.reservation-requests-failed", 0, TUnit::UNIT,
126  "The number of reservation requests made by this Impala daemon to Llama which "
127  "failed");
129  "resource-broker.reservation-requests-rejected", 0, TUnit::UNIT,
130  "The number of reservation requests made by this Impala daemon to Llama "
131  "which were rejected");
133  "resource-broker.reservation-requests-timedout", 0, TUnit::UNIT,
134  "The number of reservation requests made by this Impala daemon to Llama "
135  "which timed out");
136 
138  new StatsMetric<double>("resource-broker.expansion-request-rpc-time",
139  TUnit::TIME_S,
140  "The time, in seconds, that a Reserve() RPC takes to Llama"));
142  new StatsMetric<double>("resource-broker.expansion-request-response-time",
143  TUnit::TIME_S, "The time, in seconds, that a expansion request takes "
144  "to be fulfilled by Llama"));
145  expansion_requests_total_metric_ = metrics->AddCounter<int64_t>(
146  "resource-broker.expansion-requests-total", 0, TUnit::UNIT,
147  "The total number of expansion requests made by this Impala daemon to Llama");
149  "resource-broker.expansion-requests-fulfilled", 0, TUnit::UNIT,
150  "The number of expansion requests made by this Impala daemon to Llama "
151  "which succeeded");
152  expansion_requests_failed_metric_ = metrics->AddCounter<int64_t>(
153  "resource-broker.expansion-requests-failed", 0, TUnit::UNIT,
154  "The number of expansion requests made by this Impala daemon to Llama which "
155  "failed");
157  "resource-broker.expansion-requests-rejected", 0, TUnit::UNIT,
158  "The number of expansion requests made by this Impala daemon to Llama "
159  "which were rejected");
161  "resource-broker.expansion-requests-timedout", 0, TUnit::UNIT,
162  "The number of expansion requests made by this Impala daemon to Llama "
163  "which timed out");
164 
165  requests_released_metric_ = metrics->AddCounter<int64_t>(
166  "resource-broker.requests-released", 0, TUnit::UNIT,
167  "The number of resource-release requests received from Llama");
168 
170  "resource-broker.memory-resources-in-use", 0L, TUnit::BYTES, "The total"
171  " number of bytes currently allocated to this Impala daemon by Llama");
172 
174  "resource-broker.vcpu-resources-in-use", 0, TUnit::UNIT, "The total number "
175  "of vcpus currently allocated to this Impala daemon by Llama");
176 
177  requests_released_metric_ = metrics->AddCounter<int64_t>(
178  "resource-broker.requests-released", 0, TUnit::UNIT, "The total number of "
179  "resource allocations released by this Impala daemon");
180 }
181 
183  // The scheduler must have been set before calling Init().
184  DCHECK(scheduler_ != NULL);
185  DCHECK(llama_callback_thrift_iface_ != NULL);
186  if (llama_addresses_.size() == 0) {
187  return Status("No Llama addresses configured (see --llama_addresses)");
188  }
189 
190  shared_ptr<TProcessor> llama_callback_proc(
191  new llama::LlamaNotificationServiceProcessor(llama_callback_thrift_iface_));
192  llama_callback_server_.reset(new ThriftServer("llama-callback", llama_callback_proc,
193  llama_callback_address_.port, NULL, metrics_, 5));
195 
196  // Generate client id for registration with Llama, and register with LLama.
197  random_generator uuid_generator;
198  llama_client_id_ = uuid_generator();
201  return Status::OK;
202 }
203 
205  // Remember the current llama_handle_ to detect if another thread has already
206  // completed the registration successfully.
207  llama::TUniqueId current_llama_handle = llama_handle_;
208 
209  // Start time that this thread attempted registration. Used to limit the time that a
210  // query will wait for re-registration with the Llama to succeed.
211  int64_t start = MonotonicSeconds();
212  lock_guard<mutex> l(llama_registration_lock_);
213  if (llama_handle_ != current_llama_handle) return Status::OK;
214 
215  active_llama_metric_->set_value("none");
216  active_llama_handle_metric_->set_value("none");
217 
218  int llama_addr_idx = (active_llama_addr_idx_ + 1) % llama_addresses_.size();
219  int64_t now = MonotonicSeconds();
220  while (FLAGS_llama_registration_timeout_secs == -1 ||
221  (now - start) < FLAGS_llama_registration_timeout_secs) {
222  // Connect to the Llama at llama_address.
223  const TNetworkAddress& llama_address = llama_addresses_[llama_addr_idx];
224  // Client status will be ok if a Thrift connection could be successfully established
225  // for the returned client at some point in the past. Hence, there is no guarantee
226  // that the connection is still valid now and we must check for broken pipes, etc.
227  Status client_status;
229  llama_address, &client_status);
230  if (client_status.ok()) {
231  // Register this resource broker with Llama.
232  llama::TLlamaAMRegisterRequest request;
233  request.__set_version(llama::TLlamaServiceVersion::V1);
234  llama::TUniqueId llama_uuid;
235  UUIDToTUniqueId(llama_client_id_, &llama_uuid);
236  request.__set_client_id(llama_uuid);
237 
238  llama::TNetworkAddress callback_address;
239  callback_address << llama_callback_address_;
240  request.__set_notification_callback_service(callback_address);
241  llama::TLlamaAMRegisterResponse response;
242  LOG(INFO) << "Registering Resource Broker with Llama at " << llama_address;
243  Status rpc_status =
244  llama_client.DoRpc(&llama::LlamaAMServiceClient::Register, request, &response);
245  if (rpc_status.ok()) {
246  // TODO: Is there a period where an inactive Llama may respond to RPCs?
247  // If so, then we need to keep cycling through Llamas here and not
248  // return an error.
250  response.status, "Failed to register Resource Broker with Llama."));
251  LOG(INFO) << "Received Llama client handle " << response.am_handle
252  << ((response.am_handle == llama_handle_) ? " (same as old)" : "");
253  llama_handle_ = response.am_handle;
254  break;
255  }
256  }
257  // Cycle through the list of Llama addresses for Llama failover.
258  llama_addr_idx = (llama_addr_idx + 1) % llama_addresses_.size();
259  LOG(INFO) << "Failed to connect to Llama at " << llama_address << "." << endl
260  << "Error: " << client_status.GetDetail() << endl
261  << "Retrying to connect to Llama at "
262  << llama_addresses_[llama_addr_idx] << " in "
263  << FLAGS_llama_registration_wait_secs << "s.";
264  // Sleep to give Llama time to recover/failover before the next attempt.
265  SleepForMs(FLAGS_llama_registration_wait_secs * 1000);
266  now = MonotonicSeconds();
267  }
268  DCHECK(FLAGS_llama_registration_timeout_secs != -1);
269  if ((now - start) >= FLAGS_llama_registration_timeout_secs) {
270  return Status("Failed to (re-)register Resource Broker with Llama.");
271  }
272 
273  if (llama_addr_idx != active_llama_addr_idx_) {
274  // TODO: We've switched to a different Llama (failover). Cancel all queries
275  // coordinated by this Impalad to free up physical resources that are not
276  // accounted for anymore by Yarn.
277  }
278 
279  // If we reached this point, (re-)registration was successful.
280  active_llama_addr_idx_ = llama_addr_idx;
281  active_llama_metric_->set_value(lexical_cast<string>(llama_addresses_[llama_addr_idx]));
282  active_llama_handle_metric_->set_value(lexical_cast<string>(llama_handle_));
283  return Status::OK;
284 }
285 
286 bool ResourceBroker::LlamaHasRestarted(const llama::TStatus& status) const {
287  if (status.status_code == llama::TStatusCode::OK || !status.__isset.error_msgs) {
288  return false;
289  }
290  // Check whether one of the error messages contains LLAMA_RESTART_SEARCH_STRING.
291  for (int i = 0; i < status.error_msgs.size(); ++i) {
292  string error_msg = status.error_msgs[i];
293  to_lower(error_msg);
294  if (error_msg.find(LLAMA_RESTART_SEARCH_STRING) != string::npos) {
295  LOG(INFO) << "Assuming Llama restart from error message: " << status.error_msgs[i];
296  return true;
297  }
298  }
299  return false;
300 }
301 
303  // Close connections to all Llama addresses, not just the active one.
304  BOOST_FOREACH(const TNetworkAddress& llama_address, llama_addresses_) {
305  llama_client_cache_->CloseConnections(llama_address);
306  }
307  llama_callback_server_->Join();
308 }
309 
311  const TResourceBrokerReservationRequest& src,
312  llama::TLlamaAMReservationRequest& dest) {
313  dest.version = llama::TLlamaServiceVersion::V1;
314  dest.am_handle = llama_handle_;
315  dest.gang = src.gang;
316  // Queue is optional, so must be explicitly set for all versions of Thrift to work
317  // together.
318  dest.__set_queue(src.queue);
319  dest.user = src.user;
320  dest.resources = src.resources;
321  random_generator uuid_generator;
322  llama::TUniqueId request_id;
323  UUIDToTUniqueId(uuid_generator(), &request_id);
324  dest.__set_reservation_id(request_id);
325 }
326 
327 // Creates a Llama release request from a resource broker release request.
328 void ResourceBroker::CreateLlamaReleaseRequest(const TResourceBrokerReleaseRequest& src,
329  llama::TLlamaAMReleaseRequest& dest) {
330  dest.version = llama::TLlamaServiceVersion::V1;
331  dest.am_handle = llama_handle_;
332  dest.reservation_id << src.reservation_id;
333 }
334 
335 template <typename LlamaReqType, typename LlamaRespType>
336 Status ResourceBroker::LlamaRpc(LlamaReqType* request, LlamaRespType* response,
337  StatsMetric<double>* rpc_time_metric) {
338  int attempts = 0;
340  // Indicates whether to re-register with Llama before the next RPC attempt,
341  // e.g. because Llama has restarted or become unavailable.
342  bool register_with_llama = false;
343  while (attempts < FLAGS_llama_max_request_attempts) {
344  if (register_with_llama) {
345  RETURN_IF_ERROR(ReRegisterWithLlama(*request, response));
346  // Set the new Llama handle received from re-registering.
347  request->__set_am_handle(llama_handle_);
348  VLOG_RPC << "Retrying Llama RPC after re-registration: " << *request;
349  register_with_llama = false;
350  }
351  ++attempts;
352  Status rpc_status;
355  if (!rpc_status.ok()) {
356  register_with_llama = true;
357  continue;
358  }
359 
360  sw.Start();
361  try {
362  SendLlamaRpc(&llama_client, *request, response);
363  } catch (const TException& e) {
364  VLOG_RPC << "Reopening Llama client due to: " << e.what();
365  rpc_status = llama_client.Reopen();
366  if (!rpc_status.ok()) {
367  register_with_llama = true;
368  continue;
369  }
370  VLOG_RPC << "Retrying Llama RPC: " << *request;
371  SendLlamaRpc(&llama_client, *request, response);
372  }
373  if (rpc_time_metric != NULL) {
374  rpc_time_metric->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
375  }
376 
377  // Check whether Llama has been restarted. If so, re-register with it.
378  // Break out of the loop here upon success of the RPC.
379  if (!LlamaHasRestarted(response->status)) break;
380  register_with_llama = true;
381  }
382  if (attempts >= FLAGS_llama_max_request_attempts) {
383  return Status(Substitute(
384  "Request aborted after $0 attempts due to connectivity issues with Llama.",
385  FLAGS_llama_max_request_attempts));
386  }
387  return Status::OK;
388 }
389 
390 template <typename LlamaReqType, typename LlamaRespType>
393  const LlamaReqType& request, LlamaRespType* response) {
394  DCHECK(false) << "SendLlamaRpc template function must be specialized.";
395 }
396 
397 // Template specialization for the Llama GetNodes() RPC.
398 template <>
401  const llama::TLlamaAMGetNodesRequest& request,
402  llama::TLlamaAMGetNodesResponse* response) {
403  DCHECK(response != NULL);
404  (*llama_client)->GetNodes(*response, request);
405 }
406 
407 // Template specialization for the Llama Reserve() RPC.
408 template <>
411  const llama::TLlamaAMReservationRequest& request,
412  llama::TLlamaAMReservationResponse* response) {
413  DCHECK(response != NULL);
414  (*llama_client)->Reserve(*response, request);
415 }
416 
417 // Template specialization for the Llama Expand() RPC.
418 template <>
421  const llama::TLlamaAMReservationExpansionRequest& request,
422  llama::TLlamaAMReservationExpansionResponse* response) {
423  DCHECK(response != NULL);
424  (*llama_client)->Expand(*response, request);
425 }
426 
427 // Template specialization for the Llama Release() RPC.
428 template <>
431  const llama::TLlamaAMReleaseRequest& request,
432  llama::TLlamaAMReleaseResponse* response) {
433  DCHECK(response != NULL);
434  (*llama_client)->Release(*response, request);
435 }
436 
437 template <typename LlamaReqType, typename LlamaRespType>
438 Status ResourceBroker::ReRegisterWithLlama(const LlamaReqType& request,
439  LlamaRespType* response) {
441  return RefreshLlamaNodes();
442 }
443 
444 template <>
445 Status ResourceBroker::ReRegisterWithLlama(const llama::TLlamaAMGetNodesRequest& request,
446  llama::TLlamaAMGetNodesResponse* response) {
447  return RegisterWithLlama();
448 }
449 
451  resources->clear();
452  BOOST_FOREACH(const llama::TAllocatedResource& resource, allocated_resources_) {
453  TNetworkAddress host = MakeNetworkAddress(resource.location);
454  (*resources)[host] = resource;
455  VLOG_QUERY << "Getting allocated resource for reservation id "
456  << reservation_id_ << " and location " << host;
457  }
458 }
459 
461  const vector<llama::TAllocatedResource>& resources) {
462  // TODO: Llama returns a dump of all resources that we need to manually group by
463  // reservation id. Can Llama do the grouping for us?
464  BOOST_FOREACH(const llama::TAllocatedResource& resource, resources) {
465  // Ignore resources that don't belong to the given reservation id.
466  if (resource.reservation_id == request_id()) {
467  allocated_resources_.push_back(resource);
468  }
469  }
470 }
471 
472 bool ResourceBroker::WaitForNotification(int64_t timeout, ResourceMap* resources,
473  bool* timed_out, PendingRequest* pending_request) {
474  bool request_granted = pending_request->promise()->Get(timeout, timed_out);
475 
476  // Remove the promise from the pending-requests map.
477  {
478  lock_guard<mutex> l(pending_requests_lock_);
479  pending_requests_.erase(pending_request->request_id());
480  }
481 
482  if (request_granted && !*timed_out) {
483  pending_request->GetResources(resources);
484  int64_t total_memory_mb = 0L;
485  int32_t total_vcpus = 0;
486  BOOST_FOREACH(const ResourceMap::value_type& resource, *resources) {
487  total_memory_mb += resource.second.memory_mb;
488  total_vcpus += resource.second.v_cpu_cores;
489  }
490  allocated_memory_metric_->Increment(total_memory_mb * 1024L * 1024L);
491  allocated_vcpus_metric_->Increment(total_vcpus);
492  {
493  lock_guard<mutex> l(allocated_requests_lock_);
494  allocated_requests_[pending_request->reservation_id()].push_back(AllocatedRequest(
495  pending_request->reservation_id(), total_memory_mb, total_vcpus,
496  pending_request->is_expansion()));
497  }
498  }
499 
500  return request_granted;
501 }
502 
503 Status ResourceBroker::Expand(const TResourceBrokerExpansionRequest& request,
504  TResourceBrokerExpansionResponse* response) {
505  VLOG_RPC << "Sending expansion request: " << request;
506  llama::TLlamaAMReservationExpansionRequest ll_request;
507  llama::TLlamaAMReservationExpansionResponse ll_response;
508 
509  ll_request.version = llama::TLlamaServiceVersion::V1;
510  ll_request.am_handle = llama_handle_;
511  ll_request.expansion_of << request.reservation_id;
512  random_generator uuid_generator;
513  llama::TUniqueId request_id;
514  UUIDToTUniqueId(uuid_generator(), &request_id);
515  ll_request.__set_expansion_id(request_id);
516  ll_request.resource = request.resource;
517 
518  PendingRequest* pending_request;
519  {
520  lock_guard<mutex> l(pending_requests_lock_);
521  pending_request =
522  new PendingRequest(ll_request.expansion_of, ll_request.expansion_id, true);
523  pending_requests_.insert(make_pair(pending_request->request_id(), pending_request));
524  }
525 
527  sw.Start();
528  Status status = LlamaRpc(&ll_request, &ll_response, expansion_rpc_time_metric_);
529  // Check the status of the response.
530  if (!status.ok()) {
531  expansion_requests_failed_metric_->Increment(1);
532  return status;
533  }
534 
535  Status request_status = LlamaStatusToImpalaStatus(ll_response.status);
536  if (!request_status.ok()) {
537  expansion_requests_failed_metric_->Increment(1);
538  return request_status;
539  }
540 
541  bool timed_out = false;
542  bool request_granted = WaitForNotification(request.request_timeout,
543  &response->allocated_resources, &timed_out, pending_request);
544 
545  if (request_granted) {
546  // Only set the reservation ID for successful requests
547  response->__set_reservation_id(request.reservation_id);
548  }
549 
550  if (timed_out) {
552  return Status(Substitute("Resource expansion request exceeded timeout of $0",
553  PrettyPrinter::Print(request.request_timeout * 1000L * 1000L,
554  TUnit::TIME_NS)));
555  }
557  sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
558 
559  if (!request_granted) {
561  return Status("Resource expansion request was rejected.");
562  }
563 
564  VLOG_QUERY << "Fulfilled expansion for id: " << ll_response.expansion_id;
566  return Status::OK;
567 }
568 
569 Status ResourceBroker::Reserve(const TResourceBrokerReservationRequest& request,
570  TResourceBrokerReservationResponse* response) {
571  VLOG_QUERY << "Sending reservation request: " << request;
573 
574  llama::TLlamaAMReservationRequest ll_request;
575  llama::TLlamaAMReservationResponse ll_response;
576  CreateLlamaReservationRequest(request, ll_request);
577 
578  PendingRequest* pending_request;
579  {
580  lock_guard<mutex> l(pending_requests_lock_);
581  pending_request = new PendingRequest(ll_request.reservation_id,
582  ll_request.reservation_id, false);
583  pending_requests_.insert(make_pair(pending_request->request_id(), pending_request));
584  }
585 
587  sw.Start();
588  Status status = LlamaRpc(&ll_request, &ll_response, reservation_rpc_time_metric_);
589  // Check the status of the response.
590  if (!status.ok()) {
592  return status;
593  }
594  Status request_status = LlamaStatusToImpalaStatus(ll_response.status);
595  if (!request_status.ok()) {
597  return request_status;
598  }
599 
600  VLOG_RPC << "Received reservation response from Llama, waiting for notification on: "
601  << pending_request->request_id();
602 
603  bool timed_out = false;
604  bool request_granted = WaitForNotification(request.request_timeout,
605  &response->allocated_resources, &timed_out, pending_request);
606 
607  if (request_granted || timed_out) {
608  // Set the reservation_id to make sure it eventually gets released - even if when
609  // timed out, since the response may arrive later.
610  response->__set_reservation_id(
611  CastTUniqueId<llama::TUniqueId, TUniqueId>(pending_request->reservation_id()));
612  }
613 
614  if (timed_out) {
616  return Status(Substitute("Resource expansion request exceeded timeout of $0",
617  PrettyPrinter::Print(request.request_timeout * 1000L * 1000L,
618  TUnit::TIME_NS)));
619  }
621  sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
622 
623  if (!request_granted) {
625  return Status("Resource reservation request was rejected.");
626  }
627 
628  TUniqueId reservation_id;
629  reservation_id << pending_request->reservation_id();
630  response->__set_reservation_id(reservation_id);
631  VLOG_QUERY << "Fulfilled reservation with id: " << pending_request->reservation_id();
633  return Status::OK;
634 }
635 
636 void ResourceBroker::ClearRequests(const TUniqueId& reservation_id,
637  bool include_reservation) {
638  int64_t total_memory_bytes = 0L;
639  int32_t total_vcpus = 0L;
640  llama::TUniqueId llama_id = CastTUniqueId<TUniqueId, llama::TUniqueId>(reservation_id);
641  {
642  lock_guard<mutex> l(allocated_requests_lock_);
643  AllocatedRequestMap::iterator it = allocated_requests_.find(llama_id);
644  if (it == allocated_requests_.end()) return;
645  vector<AllocatedRequest>::iterator request_it = it->second.begin();
646  while (request_it != it->second.end()) {
647  DCHECK(request_it->reservation_id() == llama_id);
648  if (!request_it->is_expansion() && !include_reservation) {
649  // Leave the original reservation
650  ++request_it;
651  continue;
652  }
653  total_memory_bytes += (request_it->memory_mb() * 1024L * 1024L);
654  total_vcpus += request_it->vcpus();
655  request_it = it->second.erase(request_it);
656  }
657  }
658 
659  VLOG_QUERY << "Releasing "
660  << PrettyPrinter::Print(total_memory_bytes, TUnit::BYTES)
661  << " and " << total_vcpus << " cores for " << llama_id;
662  allocated_memory_metric_->Increment(-total_memory_bytes);
663  allocated_vcpus_metric_->Increment(-total_vcpus);
664 }
665 
666 Status ResourceBroker::Release(const TResourceBrokerReleaseRequest& request,
667  TResourceBrokerReleaseResponse* response) {
668  VLOG_QUERY << "Releasing all resources for reservation: " << request.reservation_id;
669 
670  ClearRequests(request.reservation_id, true);
671 
672  llama::TLlamaAMReleaseRequest llama_request;
673  llama::TLlamaAMReleaseResponse llama_response;
674  CreateLlamaReleaseRequest(request, llama_request);
675 
677  &llama_request, &llama_response,reservation_rpc_time_metric_));
678  RETURN_IF_ERROR(LlamaStatusToImpalaStatus(llama_response.status));
679  requests_released_metric_->Increment(1);
680 
681  {
682  lock_guard<mutex> l(allocated_requests_lock_);
683  llama::TUniqueId reservation_id =
684  CastTUniqueId<TUniqueId, llama::TUniqueId>(request.reservation_id);;
685  allocated_requests_.erase(reservation_id);
686  }
687 
688  return Status::OK;
689 }
690 
691 void ResourceBroker::AMNotification(const llama::TLlamaAMNotificationRequest& request,
692  llama::TLlamaAMNotificationResponse& response) {
693  {
694  // This Impalad may have restarted, so it is possible Llama is sending notifications
695  // while this Impalad is registering with Llama.
696  lock_guard<mutex> l(llama_registration_lock_);
697  if (request.am_handle != llama_handle_) {
698  VLOG_QUERY << "Ignoring Llama AM notification with mismatched AM handle. "
699  << "Known handle: " << llama_handle_ << ". Received handle: "
700  << request.am_handle;
701  // Ignore all notifications with mismatched handles.
702  return;
703  }
704  }
705  // Nothing to be done for heartbeats.
706  if (request.heartbeat) return;
707  VLOG_QUERY << "Received non-heartbeat AM notification";
708 
709  lock_guard<mutex> l(pending_requests_lock_);
710 
711  // Process granted allocations.
712  BOOST_FOREACH(const llama::TUniqueId& res_id, request.allocated_reservation_ids) {
713  // TODO: Garbage collect fulfillments that live for a long time, since they probably
714  // don't correspond to any query.
715  PendingRequestMap::iterator it = pending_requests_.find(res_id);
716  if (it == pending_requests_.end()) {
717  VLOG_RPC << "Allocation for " << res_id << " arrived after timeout";
718  // TODO: Release these allocations
719  continue;
720  }
721  LOG(INFO) << "Received allocated resource for reservation id: " << res_id;
722  it->second->SetResources(request.allocated_resources);
723  it->second->promise()->Set(true);
724  }
725 
726  // Process rejected allocations.
727  BOOST_FOREACH(const llama::TUniqueId& res_id, request.rejected_reservation_ids) {
728  PendingRequestMap::iterator it = pending_requests_.find(res_id);
729  if (it == pending_requests_.end()) {
730  VLOG_RPC << "Rejection for " << res_id << " arrived after timeout";
731  continue;
732  }
733  it->second->promise()->Set(false);
734  }
735 
736  // TODO: We maybe want a thread pool for handling preemptions to avoid
737  // blocking this function on query cancellations.
738  // Process preempted reservations.
739  BOOST_FOREACH(const llama::TUniqueId& res_id, request.preempted_reservation_ids) {
740  TUniqueId impala_res_id;
741  impala_res_id << res_id;
742  scheduler_->HandlePreemptedReservation(impala_res_id);
743  }
744 
745  // Process preempted client resources.
746  BOOST_FOREACH(const llama::TUniqueId& res_id, request.preempted_client_resource_ids) {
747  TUniqueId impala_res_id;
748  impala_res_id << res_id;
749  scheduler_->HandlePreemptedResource(impala_res_id);
750  }
751 
752  // Process lost client resources.
753  BOOST_FOREACH(const llama::TUniqueId& res_id, request.lost_client_resource_ids) {
754  TUniqueId impala_res_id;
755  impala_res_id << res_id;
756  scheduler_->HandlePreemptedResource(impala_res_id);
757  }
758 
759  response.status.__set_status_code(llama::TStatusCode::OK);
760 }
761 
762 void ResourceBroker::NMNotification(const llama::TLlamaNMNotificationRequest& request,
763  llama::TLlamaNMNotificationResponse& response) {
764 }
765 
767  llama::TLlamaAMGetNodesRequest llama_request;
768  llama_request.__set_am_handle(llama_handle_);
769  llama_request.__set_version(llama::TLlamaServiceVersion::V1);
770  llama::TLlamaAMGetNodesResponse llama_response;
771 
772  RETURN_IF_ERROR(LlamaRpc(&llama_request, &llama_response, NULL));
773  RETURN_IF_ERROR(LlamaStatusToImpalaStatus(llama_response.status));
774  llama_nodes_ = llama_response.nodes;
775  LOG(INFO) << "Llama Nodes [" << join(llama_nodes_, ", ") << "]";
776  return Status::OK;
777 }
778 
780  const TUniqueId& reservation_id, const TNetworkAddress& local_resource_address,
781  QueryResourceMgr** mgr) {
782  lock_guard<mutex> l(query_resource_mgrs_lock_);
783  pair<int32_t, QueryResourceMgr*>* entry = &query_resource_mgrs_[query_id];
784  if (entry->second == NULL) {
785  entry->second =
786  new QueryResourceMgr(reservation_id, local_resource_address, query_id);
787  }
788  *mgr = entry->second;
789  // Return true if this is the first reference to this resource mgr.
790  return ++entry->first == 1L;
791 }
792 
794  lock_guard<mutex> l(query_resource_mgrs_lock_);
795  QueryResourceMgrsMap::iterator it = query_resource_mgrs_.find(query_id);
796  DCHECK(it != query_resource_mgrs_.end())
797  << "UnregisterQueryResourceMgr() without corresponding GetQueryResourceMgr()";
798  if (--it->second.first == 0) {
799  it->second.second->Shutdown();
800  delete it->second.second;
801  query_resource_mgrs_.erase(it);
802  }
803 }
804 
805 ostream& operator<<(ostream& os,
806  const map<TNetworkAddress, llama::TAllocatedResource>& resources) {
807  typedef map<TNetworkAddress, llama::TAllocatedResource> ResourceMap;
808  int count = 0;
809  BOOST_FOREACH(const ResourceMap::value_type& resource, resources) {
810  os << "(" << resource.first << "," << resource.second << ")";
811  if (++count != resources.size()) os << ",";
812  }
813  return os;
814 }
815 
816 ostream& operator<<(ostream& os, const TResourceBrokerReservationRequest& request) {
817  os << "Reservation Request("
818  << "queue=" << request.queue << " "
819  << "user=" << request.user << " "
820  << "gang=" << request.gang << " "
821  << "request_timeout=" << request.request_timeout << " "
822  << "resources=[";
823  for (int i = 0; i < request.resources.size(); ++i) {
824  os << request.resources[i];
825  if (i + 1 != request.resources.size()) os << ",";
826  }
827  os << "])";
828  return os;
829 }
830 
831 ostream& operator<<(ostream& os, const TResourceBrokerReservationResponse& reservation) {
832  os << "Granted Reservation("
833  << "reservation id=" << reservation.reservation_id << " "
834  << "resources=[" << reservation.allocated_resources << "])";
835  return os;
836 }
837 
838 ostream& operator<<(ostream& os, const TResourceBrokerExpansionRequest& request) {
839  os << "Expansion Request("
840  << "reservation id=" << request.reservation_id << " "
841  << "resource=" << request.resource << " "
842  << "request_timeout=" << request.request_timeout << ")";
843  return os;
844 }
845 
846 ostream& operator<<(ostream& os, const TResourceBrokerExpansionResponse& expansion) {
847  os << "Expansion Response("
848  << "reservation id=" << expansion.reservation_id << " "
849  << "resources=[" << expansion.allocated_resources << "])";
850  return os;
851 }
852 
853 }
SimpleMetric< T, TMetricKind::COUNTER > * AddCounter(const std::string &key, const T &value, const TUnit::type unit=TUnit::UNIT, const std::string &description="")
Definition: metrics.h:239
const llama::TUniqueId & request_id() const
const llama::TUniqueId & reservation_id() const
static const string LLAMA_KERBEROS_SERVICE_NAME
const std::string GetDetail() const
Definition: status.cc:184
TNetworkAddress llama_callback_address_
AllocatedRequestMap allocated_requests_
void AMNotification(const llama::TLlamaAMNotificationRequest &request, llama::TLlamaAMNotificationResponse &response)
std::vector< llama::TAllocatedResource > allocated_resources_
IntCounter * reservation_requests_total_metric_
Total number of reservation requests.
IntCounter * reservation_requests_timedout_metric_
boost::mutex llama_registration_lock_
UIntGauge * allocated_memory_metric_
Total amount of memory currently allocated by Llama to this node.
const TUniqueId & query_id() const
Definition: coordinator.h:152
Status Reserve(const TResourceBrokerReservationRequest &request, TResourceBrokerReservationResponse *response)
Requests resources from Llama. Blocks until the request has been granted or denied.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
IntCounter * requests_released_metric_
Total number of fulfilled reservation requests that have been released.
M * RegisterMetric(M *metric)
Definition: metrics.h:211
StatsMetric< double > * reservation_rpc_time_metric_
void Close()
Closes the llama_client_cache_ and joins the llama_callback_server_.
IntCounter * expansion_requests_total_metric_
Total number of expansion requests.
MetricGroups may be organised hierarchically as a tree.
Definition: metrics.h:200
StatsMetric< double > * expansion_rpc_time_metric_
boost::shared_ptr< llama::LlamaNotificationServiceIf > llama_callback_thrift_iface_
Thrift API implementation which proxies Llama notifications onto this ResourceBroker.
bool WaitForNotification(int64_t timeout, ResourceMap *resources, bool *timed_out, PendingRequest *reservation)
TNetworkAddress MakeNetworkAddress(const string &hostname, int port)
Definition: network-util.cc:96
bool GetQueryResourceMgr(const TUniqueId &query_id, const TUniqueId &reservation_id, const TNetworkAddress &local_resource_address, QueryResourceMgr **res_mgr)
UIntGauge * allocated_vcpus_metric_
Total number of vcpu cores currently allocated by Llama to this node.
IntCounter * reservation_requests_failed_metric_
StringProperty * active_llama_metric_
void SleepForMs(const int64_t duration_ms)
Sleeps the current thread for at least duration_ms milliseconds.
Definition: time.cc:21
void SendLlamaRpc(ClientConnection< llama::LlamaAMServiceClient > *llama_client, const LlamaReqType &request, LlamaRespType *response)
static std::string Print(bool value, TUnit::type ignored, bool verbose=false)
void ClearRequests(const TUniqueId &reservation_id, bool include_reservation)
boost::mutex pending_requests_lock_
Protects pending_requests_.
IntCounter * expansion_requests_rejected_metric_
Number of well-formed expansion requests rejected by the central scheduler.
bool LlamaHasRestarted(const llama::TStatus &status) const
Detects Llama restarts from the given return status of a Llama RPC.
IntCounter * expansion_requests_failed_metric_
impala::Status LlamaStatusToImpalaStatus(const TStatus &status, const string &err_prefix)
Definition: llama-util.cc:127
DECLARE_int32(resource_broker_cnxn_attempts)
#define VLOG_QUERY
Definition: logging.h:57
Status Release(const TResourceBrokerReleaseRequest &request, TResourceBrokerReleaseResponse *response)
IntCounter * reservation_requests_fulfilled_metric_
Number of fulfilled reservation requests.
PendingRequestMap pending_requests_
SimpleMetric< T, TMetricKind::PROPERTY > * AddProperty(const std::string &key, const T &value, const std::string &description="")
Definition: metrics.h:231
void CreateLlamaReleaseRequest(const TResourceBrokerReleaseRequest &src, llama::TLlamaAMReleaseRequest &dest)
Creates a Llama release request from a resource broker release request.
void UnregisterQueryResourceMgr(const TUniqueId &query_id)
virtual void NMNotification(llama::TLlamaNMNotificationResponse &response, const llama::TLlamaNMNotificationRequest &request)
StatsMetric< double > * expansion_response_time_metric_
void Update(const T &value)
boost::mutex allocated_requests_lock_
Protectes allocated_requests_.
boost::uuids::uuid llama_client_id_
ResourceBroker(const std::vector< TNetworkAddress > &llama_addresses, const TNetworkAddress &llama_callback_address, MetricGroup *metrics)
virtual void HandlePreemptedResource(const TUniqueId &client_resource_id)=0
uint64_t ElapsedTime() const
Returns time in nanosecond.
Definition: stopwatch.h:105
uint64_t count
const T & Get()
Definition: promise.h:59
virtual void HandlePreemptedReservation(const TUniqueId &reservation_id)=0
IntCounter * expansion_requests_fulfilled_metric_
Number of fulfilled expansion requests.
Status RefreshLlamaNodes()
Retrieves the nodes known to Llama and stores them in llama_nodes_.
StringProperty * active_llama_handle_metric_
IntCounter * expansion_requests_timedout_metric_
LlamaNotificationThriftIf(ResourceBroker *resource_broker)
const string LLAMA_RESTART_SEARCH_STRING
DECLARE_int64(llama_registration_timeout_secs)
SimpleMetric< T > * AddGauge(const std::string &key, const T &value, const TUnit::type unit=TUnit::NONE, const std::string &description="")
Create a gauge metric object with given key and initial value (owned by this object) ...
Definition: metrics.h:223
void UUIDToTUniqueId(const boost::uuids::uuid &uuid, T *unique_id)
Definition: uid-util.h:38
llama::TUniqueId llama_handle_
Handle received from Llama during registration. Set in RegisterWithLlama().
std::vector< TNetworkAddress > llama_addresses_
Llama availability group.
static const Status OK
Definition: status.h:87
#define VLOG_RPC
Definition: logging.h:56
Status ReRegisterWithLlama(const LlamaReqType &request, LlamaRespType *response)
int64_t MonotonicSeconds()
Definition: time.h:41
boost::scoped_ptr< ThriftServer > llama_callback_server_
Status Expand(const TResourceBrokerExpansionRequest &request, TResourceBrokerExpansionResponse *response)
QueryResourceMgrsMap query_resource_mgrs_
boost::mutex query_resource_mgrs_lock_
Protects query_resource_mgrs_.
std::vector< std::string > llama_nodes_
List of nodes registered with Llama. Set in RefreshLlamaNodes().
virtual void AMNotification(llama::TLlamaAMNotificationResponse &response, const llama::TLlamaAMNotificationRequest &request)
IntCounter * reservation_requests_rejected_metric_
Number of well-formed reservation requests rejected by the central scheduler.
Only CPU-heavy threads need be managed using this class.
bool ok() const
Definition: status.h:172
ostream & operator<<(ostream &os, const map< TNetworkAddress, llama::TAllocatedResource > &resources)
std::map< TNetworkAddress, llama::TAllocatedResource > ResourceMap
void GetResources(ResourceMap *resources)
void NMNotification(const llama::TLlamaNMNotificationRequest &request, llama::TLlamaNMNotificationResponse &response)
StatsMetric< double > * reservation_response_time_metric_
Status LlamaRpc(LlamaReqType *request, LlamaRespType *response, StatsMetric< double > *rpc_time_metric)
boost::scoped_ptr< ClientCache< llama::LlamaAMServiceClient > > llama_client_cache_
Cache of Llama client connections.
void SetResources(const std::vector< llama::TAllocatedResource > &resources)
void CreateLlamaReservationRequest(const TResourceBrokerReservationRequest &src, llama::TLlamaAMReservationRequest &dest)
Creates a Llama reservation request from a resource broker reservation request.