Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
catalog-server.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "catalog/catalog-server.h"
16 
17 #include <gutil/strings/substitute.h>
18 #include <thrift/protocol/TDebugProtocol.h>
19 
20 #include "catalog/catalog-util.h"
22 #include "util/debug-util.h"
23 #include "gen-cpp/CatalogInternalService_types.h"
24 #include "gen-cpp/CatalogObjects_types.h"
25 #include "gen-cpp/CatalogService_types.h"
26 
27 #include "common/names.h"
28 
29 using boost::bind;
30 using boost::mem_fn;
31 using namespace apache::thrift;
32 using namespace impala;
33 using namespace rapidjson;
34 using namespace strings;
35 
36 DEFINE_int32(catalog_service_port, 26000, "port where the CatalogService is running");
37 DECLARE_string(state_store_host);
38 DECLARE_int32(state_store_subscriber_port);
39 DECLARE_int32(state_store_port);
40 DECLARE_string(hostname);
41 DECLARE_bool(compact_catalog_topic);
42 
43 string CatalogServer::IMPALA_CATALOG_TOPIC = "catalog-update";
44 
46  "catalog-server.topic-processing-time-s";
47 
48 const string CATALOG_WEB_PAGE = "/catalog";
49 const string CATALOG_TEMPLATE = "catalog.tmpl";
50 const string CATALOG_OBJECT_WEB_PAGE = "/catalog_object";
51 const string CATALOG_OBJECT_TEMPLATE = "catalog_object.tmpl";
52 
53 // Implementation for the CatalogService thrift interface.
55  public:
57  : catalog_server_(catalog_server) {
58  }
59 
60  // Executes a TDdlExecRequest and returns details on the result of the operation.
61  virtual void ExecDdl(TDdlExecResponse& resp, const TDdlExecRequest& req) {
62  VLOG_RPC << "ExecDdl(): request=" << ThriftDebugString(req);
63  Status status = catalog_server_->catalog()->ExecDdl(req, &resp);
64  if (!status.ok()) LOG(ERROR) << status.GetDetail();
65  TStatus thrift_status;
66  status.ToThrift(&thrift_status);
67  resp.result.__set_status(thrift_status);
68  VLOG_RPC << "ExecDdl(): response=" << ThriftDebugString(resp);
69  }
70 
71  // Executes a TResetMetadataRequest and returns details on the result of the operation.
72  virtual void ResetMetadata(TResetMetadataResponse& resp,
73  const TResetMetadataRequest& req) {
74  VLOG_RPC << "ResetMetadata(): request=" << ThriftDebugString(req);
75  Status status = catalog_server_->catalog()->ResetMetadata(req, &resp);
76  if (!status.ok()) LOG(ERROR) << status.GetDetail();
77  TStatus thrift_status;
78  status.ToThrift(&thrift_status);
79  resp.result.__set_status(thrift_status);
80  VLOG_RPC << "ResetMetadata(): response=" << ThriftDebugString(resp);
81  }
82 
83  // Executes a TUpdateCatalogRequest and returns details on the result of the
84  // operation.
85  virtual void UpdateCatalog(TUpdateCatalogResponse& resp,
86  const TUpdateCatalogRequest& req) {
87  VLOG_RPC << "UpdateCatalog(): request=" << ThriftDebugString(req);
88  Status status = catalog_server_->catalog()->UpdateCatalog(req, &resp);
89  if (!status.ok()) LOG(ERROR) << status.GetDetail();
90  TStatus thrift_status;
91  status.ToThrift(&thrift_status);
92  resp.result.__set_status(thrift_status);
93  VLOG_RPC << "UpdateCatalog(): response=" << ThriftDebugString(resp);
94  }
95 
96  // Gets functions in the Catalog based on the parameters of the
97  // TGetFunctionsRequest.
98  virtual void GetFunctions(TGetFunctionsResponse& resp,
99  const TGetFunctionsRequest& req) {
100  VLOG_RPC << "GetFunctions(): request=" << ThriftDebugString(req);
101  Status status = catalog_server_->catalog()->GetFunctions(req, &resp);
102  if (!status.ok()) LOG(ERROR) << status.GetDetail();
103  TStatus thrift_status;
104  status.ToThrift(&thrift_status);
105  resp.__set_status(thrift_status);
106  VLOG_RPC << "GetFunctions(): response=" << ThriftDebugString(resp);
107  }
108 
109  // Gets a TCatalogObject based on the parameters of the TGetCatalogObjectRequest.
110  virtual void GetCatalogObject(TGetCatalogObjectResponse& resp,
111  const TGetCatalogObjectRequest& req) {
112  VLOG_RPC << "GetCatalogObject(): request=" << ThriftDebugString(req);
113  Status status = catalog_server_->catalog()->GetCatalogObject(req.object_desc,
114  &resp.catalog_object);
115  if (!status.ok()) LOG(ERROR) << status.GetDetail();
116  VLOG_RPC << "GetCatalogObject(): response=" << ThriftDebugString(resp);
117  }
118 
119  // Prioritizes the loading of metadata for one or more catalog objects. Currently only
120  // used for loading tables/views because they are the only type of object that is loaded
121  // lazily.
122  virtual void PrioritizeLoad(TPrioritizeLoadResponse& resp,
123  const TPrioritizeLoadRequest& req) {
124  VLOG_RPC << "PrioritizeLoad(): request=" << ThriftDebugString(req);
125  Status status = catalog_server_->catalog()->PrioritizeLoad(req);
126  if (!status.ok()) LOG(ERROR) << status.GetDetail();
127  TStatus thrift_status;
128  status.ToThrift(&thrift_status);
129  resp.__set_status(thrift_status);
130  VLOG_RPC << "PrioritizeLoad(): response=" << ThriftDebugString(resp);
131  }
132 
133  virtual void SentryAdminCheck(TSentryAdminCheckResponse& resp,
134  const TSentryAdminCheckRequest& req) {
135  VLOG_RPC << "SentryAdminCheck(): request=" << ThriftDebugString(req);
136  Status status = catalog_server_->catalog()->SentryAdminCheck(req);
137  if (!status.ok()) LOG(ERROR) << status.GetDetail();
138  TStatus thrift_status;
139  status.ToThrift(&thrift_status);
140  resp.__set_status(thrift_status);
141  VLOG_RPC << "SentryAdminCheck(): response=" << ThriftDebugString(resp);
142  }
143 
144  private:
146 };
147 
148 CatalogServer::CatalogServer(MetricGroup* metrics)
149  : thrift_iface_(new CatalogServiceThriftIf(this)),
150  thrift_serializer_(FLAGS_compact_catalog_topic), metrics_(metrics),
151  topic_updates_ready_(false), last_sent_catalog_version_(0L),
152  catalog_objects_min_version_(0L), catalog_objects_max_version_(0L) {
155  TUnit::TIME_S));
156 }
157 
158 
160  TNetworkAddress subscriber_address =
161  MakeNetworkAddress(FLAGS_hostname, FLAGS_state_store_subscriber_port);
162  TNetworkAddress statestore_address =
163  MakeNetworkAddress(FLAGS_state_store_host, FLAGS_state_store_port);
164  TNetworkAddress server_address = MakeNetworkAddress(FLAGS_hostname,
165  FLAGS_catalog_service_port);
166 
167  // This will trigger a full Catalog metadata load.
168  catalog_.reset(new Catalog());
169  catalog_update_gathering_thread_.reset(new Thread("catalog-server",
170  "catalog-update-gathering-thread",
172 
174  Substitute("catalog-server@$0", TNetworkAddressToString(server_address)),
175  subscriber_address, statestore_address, metrics_));
176 
178  bind<void>(mem_fn(&CatalogServer::UpdateCatalogTopicCallback), this, _1, _2);
179  Status status = statestore_subscriber_->AddTopic(IMPALA_CATALOG_TOPIC, false, cb);
180  if (!status.ok()) {
181  status.AddDetail("CatalogService failed to start");
182  return status;
183  }
185 
186  // Notify the thread to start for the first time.
187  {
188  lock_guard<mutex> l(catalog_lock_);
189  catalog_update_cv_.notify_one();
190  }
191  return Status::OK;
192 }
193 
195  Webserver::UrlCallback catalog_callback =
196  bind<void>(mem_fn(&CatalogServer::CatalogUrlCallback), this, _1, _2);
198  catalog_callback);
199 
200  Webserver::UrlCallback catalog_objects_callback =
201  bind<void>(mem_fn(&CatalogServer::CatalogObjectsUrlCallback), this, _1, _2);
203  catalog_objects_callback, false);
204 }
205 
207  const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
208  vector<TTopicDelta>* subscriber_topic_updates) {
209  StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
210  incoming_topic_deltas.find(CatalogServer::IMPALA_CATALOG_TOPIC);
211  if (topic == incoming_topic_deltas.end()) return;
212 
213  try_mutex::scoped_try_lock l(catalog_lock_);
214  // Return if unable to acquire the catalog_lock_ or if the topic update data is
215  // not yet ready for processing. This indicates the catalog_update_gathering_thread_
216  // is still building a topic update.
217  if (!l || !topic_updates_ready_) return;
218 
219  const TTopicDelta& delta = topic->second;
220 
221  // If this is not a delta update, clear all catalog objects and request an update
222  // from version 0 from the local catalog. There is an optimization that checks if
223  // pending_topic_updates_ was just reloaded from version 0, if they have then skip this
224  // step and use that data.
225  if (delta.from_version == 0 && delta.to_version == 0 &&
229  } else {
230  // Process the pending topic update.
231  LOG_EVERY_N(INFO, 300) << "Catalog Version: " << catalog_objects_max_version_
232  << " Last Catalog Version: " << last_sent_catalog_version_;
233 
234  BOOST_FOREACH(const TTopicItem& catalog_object, pending_topic_updates_) {
235  if (subscriber_topic_updates->size() == 0) {
236  subscriber_topic_updates->push_back(TTopicDelta());
237  subscriber_topic_updates->back().topic_name = IMPALA_CATALOG_TOPIC;
238  }
239  TTopicDelta& update = subscriber_topic_updates->back();
240  update.topic_entries.push_back(catalog_object);
241  }
242 
243  // Update the new catalog version and the set of known catalog objects.
244  last_sent_catalog_version_ = catalog_objects_max_version_;
245  }
246 
247  // Signal the catalog update gathering thread to start.
248  topic_updates_ready_ = false;
249  catalog_update_cv_.notify_one();
250 }
251 
253  while (1) {
254  unique_lock<mutex> unique_lock(catalog_lock_);
255  // Protect against spurious wakups by checking the value of topic_updates_ready_.
256  // It is only safe to continue on and update the shared pending_topic_updates_
257  // when topic_updates_ready_ is false, otherwise we may be in the middle of
258  // processing a heartbeat.
259  while (topic_updates_ready_) {
260  catalog_update_cv_.wait(unique_lock);
261  }
262 
264  sw.Start();
265 
266  // Clear any pending topic updates. They will have been processed by the heartbeat
267  // thread by the time we make it here.
268  pending_topic_updates_.clear();
269 
270  long current_catalog_version;
271  Status status = catalog_->GetCatalogVersion(&current_catalog_version);
272  if (!status.ok()) {
273  LOG(ERROR) << status.GetDetail();
274  } else if (current_catalog_version != last_sent_catalog_version_) {
275  // If there has been a change since the last time the catalog was queried,
276  // call into the Catalog to find out what has changed.
277  TGetAllCatalogObjectsResponse catalog_objects;
278  status = catalog_->GetAllCatalogObjects(last_sent_catalog_version_,
279  &catalog_objects);
280  if (!status.ok()) {
281  LOG(ERROR) << status.GetDetail();
282  } else {
283  // Use the catalog objects to build a topic update list.
284  BuildTopicUpdates(catalog_objects.objects);
286  catalog_objects_max_version_ = catalog_objects.max_catalog_version;
287  }
288  }
289 
290  topic_processing_time_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
291  topic_updates_ready_ = true;
292  }
293 }
294 
295 void CatalogServer::BuildTopicUpdates(const vector<TCatalogObject>& catalog_objects) {
296  unordered_set<string> current_entry_keys;
297 
298  // Add any new/updated catalog objects to the topic.
299  BOOST_FOREACH(const TCatalogObject& catalog_object, catalog_objects) {
300  const string& entry_key = TCatalogObjectToEntryKey(catalog_object);
301  if (entry_key.empty()) {
302  LOG_EVERY_N(WARNING, 60) << "Unable to build topic entry key for TCatalogObject: "
303  << ThriftDebugString(catalog_object);
304  }
305 
306  current_entry_keys.insert(entry_key);
307  // Remove this entry from catalog_topic_entry_keys_. At the end of this loop, we will
308  // be left with the set of keys that were in the last update, but not in this
309  // update, indicating which objects have been removed/dropped.
310  catalog_topic_entry_keys_.erase(entry_key);
311 
312  // This isn't a new or an updated item, skip it.
313  if (catalog_object.catalog_version <= last_sent_catalog_version_) continue;
314 
315  VLOG(1) << "Publishing update: " << entry_key << "@"
316  << catalog_object.catalog_version;
317 
318  pending_topic_updates_.push_back(TTopicItem());
319  TTopicItem& item = pending_topic_updates_.back();
320  item.key = entry_key;
321  Status status = thrift_serializer_.Serialize(&catalog_object, &item.value);
322  if (!status.ok()) {
323  LOG(ERROR) << "Error serializing topic value: " << status.GetDetail();
324  pending_topic_updates_.pop_back();
325  }
326  }
327 
328  // Any remaining items in catalog_topic_entry_keys_ indicate the object was removed
329  // since the last update.
330  BOOST_FOREACH(const string& key, catalog_topic_entry_keys_) {
331  pending_topic_updates_.push_back(TTopicItem());
332  TTopicItem& item = pending_topic_updates_.back();
333  item.key = key;
334  VLOG(1) << "Publishing deletion: " << key;
335  // Don't set a value to mark this item as deleted.
336  }
337  catalog_topic_entry_keys_.swap(current_entry_keys);
338 }
339 
341  Document* document) {
342  TGetDbsResult get_dbs_result;
343  Status status = catalog_->GetDbNames(NULL, &get_dbs_result);
344  if (!status.ok()) {
345  Value error(status.GetDetail().c_str(), document->GetAllocator());
346  document->AddMember("error", error, document->GetAllocator());
347  return;
348  }
349  Value databases(kArrayType);
350  BOOST_FOREACH(const string& db, get_dbs_result.dbs) {
351  Value database(kObjectType);
352  Value str(db.c_str(), document->GetAllocator());
353  database.AddMember("name", str, document->GetAllocator());
354 
355  TGetTablesResult get_table_results;
356  Status status = catalog_->GetTableNames(db, NULL, &get_table_results);
357  if (!status.ok()) {
358  Value error(status.GetDetail().c_str(), document->GetAllocator());
359  database.AddMember("error", error, document->GetAllocator());
360  continue;
361  }
362 
363  Value table_array(kArrayType);
364  BOOST_FOREACH(const string& table, get_table_results.tables) {
365  Value table_obj(kObjectType);
366  Value fq_name(Substitute("$0.$1", db, table).c_str(), document->GetAllocator());
367  table_obj.AddMember("fqtn", fq_name, document->GetAllocator());
368  Value table_name(table.c_str(), document->GetAllocator());
369  table_obj.AddMember("name", table_name, document->GetAllocator());
370  table_array.PushBack(table_obj, document->GetAllocator());
371  }
372  database.AddMember("num_tables", table_array.Size(), document->GetAllocator());
373  database.AddMember("tables", table_array, document->GetAllocator());
374  databases.PushBack(database, document->GetAllocator());
375  }
376  document->AddMember("databases", databases, document->GetAllocator());
377 }
378 
379 
381  Document* document) {
382  Webserver::ArgumentMap::const_iterator object_type_arg = args.find("object_type");
383  Webserver::ArgumentMap::const_iterator object_name_arg = args.find("object_name");
384  if (object_type_arg != args.end() && object_name_arg != args.end()) {
385  TCatalogObjectType::type object_type =
386  TCatalogObjectTypeFromName(object_type_arg->second);
387 
388  // Get the object type and name from the topic entry key
389  TCatalogObject request;
390  TCatalogObjectFromObjectName(object_type, object_name_arg->second, &request);
391 
392  // Get the object and dump its contents.
393  TCatalogObject result;
394  Status status = catalog_->GetCatalogObject(request, &result);
395  if (status.ok()) {
396  Value debug_string(ThriftDebugString(result).c_str(), document->GetAllocator());
397  document->AddMember("thrift_string", debug_string, document->GetAllocator());
398  } else {
399  Value error(status.GetDetail().c_str(), document->GetAllocator());
400  document->AddMember("error", error, document->GetAllocator());
401  }
402  } else {
403  Value error("Please specify values for the object_type and object_name parameters.",
404  document->GetAllocator());
405  document->AddMember("error", error, document->GetAllocator());
406  }
407 }
Status TCatalogObjectFromObjectName(const TCatalogObjectType::type &object_type, const string &object_name, TCatalogObject *catalog_object)
Definition: catalog-util.cc:68
const string CATALOG_TEMPLATE
virtual void PrioritizeLoad(TPrioritizeLoadResponse &resp, const TPrioritizeLoadRequest &req)
boost::mutex catalog_lock_
string TCatalogObjectToEntryKey(const TCatalogObject &catalog_object)
virtual void UpdateCatalog(TUpdateCatalogResponse &resp, const TUpdateCatalogRequest &req)
const std::string GetDetail() const
Definition: status.cc:184
string TNetworkAddressToString(const TNetworkAddress &address)
Utility method to print address as address:port.
DEFINE_int32(catalog_service_port, 26000,"port where the CatalogService is running")
TODO: Consider allowing fragment IDs as category parameters.
Definition: thread.h:45
boost::function< void(const ArgumentMap &args, rapidjson::Document *json)> UrlCallback
Definition: webserver.h:38
boost::condition_variable catalog_update_cv_
void CatalogObjectsUrlCallback(const Webserver::ArgumentMap &args, rapidjson::Document *document)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
M * RegisterMetric(M *metric)
Definition: metrics.h:211
MetricGroups may be organised hierarchically as a tree.
Definition: metrics.h:200
void RegisterUrlCallback(const std::string &path, const std::string &template_filename, const UrlCallback &callback, bool is_on_nav_bar=true)
Only one callback may be registered per URL.
Definition: webserver.cc:412
boost::unordered_set< std::string > catalog_topic_entry_keys_
void BuildTopicUpdates(const std::vector< TCatalogObject > &catalog_objects)
static std::string IMPALA_CATALOG_TOPIC
TNetworkAddress MakeNetworkAddress(const string &hostname, int port)
Definition: network-util.cc:96
boost::scoped_ptr< StatestoreSubscriber > statestore_subscriber_
virtual void ExecDdl(TDdlExecResponse &resp, const TDdlExecRequest &req)
TCatalogObjectType::type TCatalogObjectTypeFromName(const string &name)
Definition: catalog-util.cc:29
Status Serialize(T *obj, std::vector< uint8_t > *result)
Serializes obj into result. Result will contain a copy of the memory.
Definition: thrift-util.h:48
const string CATALOG_WEB_PAGE
std::map< std::string, std::string > ArgumentMap
Definition: webserver.h:36
virtual void ResetMetadata(TResetMetadataResponse &resp, const TResetMetadataRequest &req)
boost::scoped_ptr< Thread > catalog_update_gathering_thread_
Thread that polls the catalog for any updates.
const string CATALOG_OBJECT_TEMPLATE
boost::function< void(const TopicDeltaMap &state, std::vector< TTopicDelta > *topic_updates)> UpdateCallback
boost::scoped_ptr< Catalog > catalog_
int64_t catalog_objects_min_version_
const string CATALOG_SERVER_TOPIC_PROCESSING_TIMES
virtual void SentryAdminCheck(TSentryAdminCheckResponse &resp, const TSentryAdminCheckRequest &req)
void ToThrift(TStatus *status) const
Convert into TStatus.
Definition: status.cc:188
void RegisterWebpages(Webserver *webserver)
void Update(const T &value)
std::map< Statestore::TopicId, TTopicDelta > TopicDeltaMap
A TopicDeltaMap is passed to each callback. See UpdateCallback for more details.
uint64_t ElapsedTime() const
Returns time in nanosecond.
Definition: stopwatch.h:105
ThriftSerializer thrift_serializer_
DECLARE_int32(state_store_subscriber_port)
void CatalogUrlCallback(const Webserver::ArgumentMap &args, rapidjson::Document *document)
DECLARE_string(state_store_host)
StatsMetric< double > * topic_processing_time_metric_
Metric that tracks the amount of time taken preparing a catalog update.
virtual void GetCatalogObject(TGetCatalogObjectResponse &resp, const TGetCatalogObjectRequest &req)
void UpdateCatalogTopicCallback(const StatestoreSubscriber::TopicDeltaMap &incoming_topic_deltas, std::vector< TTopicDelta > *subscriber_topic_updates)
int64_t catalog_objects_max_version_
static const Status OK
Definition: status.h:87
#define VLOG_RPC
Definition: logging.h:56
virtual void GetFunctions(TGetFunctionsResponse &resp, const TGetFunctionsRequest &req)
CatalogServiceThriftIf(CatalogServer *catalog_server)
MetricGroup * metrics_
bool ok() const
Definition: status.h:172
CatalogServer * catalog_server_
const string CATALOG_OBJECT_WEB_PAGE
std::vector< TTopicItem > pending_topic_updates_
DECLARE_bool(compact_catalog_topic)
int64_t last_sent_catalog_version_