17 #include <gutil/strings/substitute.h>
18 #include <thrift/protocol/TDebugProtocol.h>
23 #include "gen-cpp/CatalogInternalService_types.h"
24 #include "gen-cpp/CatalogObjects_types.h"
25 #include "gen-cpp/CatalogService_types.h"
31 using namespace apache::thrift;
32 using namespace impala;
33 using namespace rapidjson;
34 using namespace strings;
36 DEFINE_int32(catalog_service_port, 26000,
"port where the CatalogService is running");
43 string CatalogServer::IMPALA_CATALOG_TOPIC =
"catalog-update";
46 "catalog-server.topic-processing-time-s";
57 : catalog_server_(catalog_server) {
61 virtual void ExecDdl(TDdlExecResponse& resp,
const TDdlExecRequest& req) {
62 VLOG_RPC <<
"ExecDdl(): request=" << ThriftDebugString(req);
63 Status status = catalog_server_->catalog()->ExecDdl(req, &resp);
65 TStatus thrift_status;
67 resp.result.__set_status(thrift_status);
68 VLOG_RPC <<
"ExecDdl(): response=" << ThriftDebugString(resp);
73 const TResetMetadataRequest& req) {
74 VLOG_RPC <<
"ResetMetadata(): request=" << ThriftDebugString(req);
75 Status status = catalog_server_->catalog()->ResetMetadata(req, &resp);
77 TStatus thrift_status;
79 resp.result.__set_status(thrift_status);
80 VLOG_RPC <<
"ResetMetadata(): response=" << ThriftDebugString(resp);
86 const TUpdateCatalogRequest& req) {
87 VLOG_RPC <<
"UpdateCatalog(): request=" << ThriftDebugString(req);
88 Status status = catalog_server_->catalog()->UpdateCatalog(req, &resp);
90 TStatus thrift_status;
92 resp.result.__set_status(thrift_status);
93 VLOG_RPC <<
"UpdateCatalog(): response=" << ThriftDebugString(resp);
99 const TGetFunctionsRequest& req) {
100 VLOG_RPC <<
"GetFunctions(): request=" << ThriftDebugString(req);
101 Status status = catalog_server_->catalog()->GetFunctions(req, &resp);
102 if (!status.
ok()) LOG(ERROR) << status.
GetDetail();
103 TStatus thrift_status;
105 resp.__set_status(thrift_status);
106 VLOG_RPC <<
"GetFunctions(): response=" << ThriftDebugString(resp);
111 const TGetCatalogObjectRequest& req) {
112 VLOG_RPC <<
"GetCatalogObject(): request=" << ThriftDebugString(req);
113 Status status = catalog_server_->catalog()->GetCatalogObject(req.object_desc,
114 &resp.catalog_object);
115 if (!status.
ok()) LOG(ERROR) << status.
GetDetail();
116 VLOG_RPC <<
"GetCatalogObject(): response=" << ThriftDebugString(resp);
123 const TPrioritizeLoadRequest& req) {
124 VLOG_RPC <<
"PrioritizeLoad(): request=" << ThriftDebugString(req);
125 Status status = catalog_server_->catalog()->PrioritizeLoad(req);
126 if (!status.
ok()) LOG(ERROR) << status.
GetDetail();
127 TStatus thrift_status;
129 resp.__set_status(thrift_status);
130 VLOG_RPC <<
"PrioritizeLoad(): response=" << ThriftDebugString(resp);
134 const TSentryAdminCheckRequest& req) {
135 VLOG_RPC <<
"SentryAdminCheck(): request=" << ThriftDebugString(req);
136 Status status = catalog_server_->catalog()->SentryAdminCheck(req);
137 if (!status.
ok()) LOG(ERROR) << status.
GetDetail();
138 TStatus thrift_status;
140 resp.__set_status(thrift_status);
141 VLOG_RPC <<
"SentryAdminCheck(): response=" << ThriftDebugString(resp);
150 thrift_serializer_(FLAGS_compact_catalog_topic), metrics_(metrics),
151 topic_updates_ready_(false), last_sent_catalog_version_(0L),
152 catalog_objects_min_version_(0L), catalog_objects_max_version_(0L) {
160 TNetworkAddress subscriber_address =
162 TNetworkAddress statestore_address =
165 FLAGS_catalog_service_port);
170 "catalog-update-gathering-thread",
175 subscriber_address, statestore_address,
metrics_));
181 status.AddDetail(
"CatalogService failed to start");
203 catalog_objects_callback,
false);
208 vector<TTopicDelta>* subscriber_topic_updates) {
209 StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
211 if (topic == incoming_topic_deltas.end())
return;
219 const TTopicDelta& delta = topic->second;
225 if (delta.from_version == 0 && delta.to_version == 0 &&
235 if (subscriber_topic_updates->size() == 0) {
236 subscriber_topic_updates->push_back(TTopicDelta());
239 TTopicDelta& update = subscriber_topic_updates->back();
240 update.topic_entries.push_back(catalog_object);
270 long current_catalog_version;
271 Status status =
catalog_->GetCatalogVersion(¤t_catalog_version);
277 TGetAllCatalogObjectsResponse catalog_objects;
296 unordered_set<string> current_entry_keys;
299 BOOST_FOREACH(
const TCatalogObject& catalog_object, catalog_objects) {
301 if (entry_key.empty()) {
302 LOG_EVERY_N(WARNING, 60) <<
"Unable to build topic entry key for TCatalogObject: "
303 << ThriftDebugString(catalog_object);
306 current_entry_keys.insert(entry_key);
315 VLOG(1) <<
"Publishing update: " << entry_key <<
"@"
316 << catalog_object.catalog_version;
320 item.key = entry_key;
323 LOG(ERROR) <<
"Error serializing topic value: " << status.
GetDetail();
334 VLOG(1) <<
"Publishing deletion: " << key;
341 Document* document) {
342 TGetDbsResult get_dbs_result;
345 Value error(status.
GetDetail().c_str(), document->GetAllocator());
346 document->AddMember(
"error", error, document->GetAllocator());
349 Value databases(kArrayType);
350 BOOST_FOREACH(
const string& db, get_dbs_result.dbs) {
351 Value database(kObjectType);
352 Value str(db.c_str(), document->GetAllocator());
353 database.AddMember(
"name", str, document->GetAllocator());
355 TGetTablesResult get_table_results;
356 Status status =
catalog_->GetTableNames(db, NULL, &get_table_results);
358 Value error(status.
GetDetail().c_str(), document->GetAllocator());
359 database.AddMember(
"error", error, document->GetAllocator());
363 Value table_array(kArrayType);
364 BOOST_FOREACH(
const string& table, get_table_results.tables) {
365 Value table_obj(kObjectType);
366 Value fq_name(Substitute(
"$0.$1", db, table).c_str(), document->GetAllocator());
367 table_obj.AddMember(
"fqtn", fq_name, document->GetAllocator());
368 Value table_name(table.c_str(), document->GetAllocator());
369 table_obj.AddMember(
"name", table_name, document->GetAllocator());
370 table_array.PushBack(table_obj, document->GetAllocator());
372 database.AddMember(
"num_tables", table_array.Size(), document->GetAllocator());
373 database.AddMember(
"tables", table_array, document->GetAllocator());
374 databases.PushBack(database, document->GetAllocator());
376 document->AddMember(
"databases", databases, document->GetAllocator());
381 Document* document) {
382 Webserver::ArgumentMap::const_iterator object_type_arg = args.find(
"object_type");
383 Webserver::ArgumentMap::const_iterator object_name_arg = args.find(
"object_name");
384 if (object_type_arg != args.end() && object_name_arg != args.end()) {
385 TCatalogObjectType::type object_type =
389 TCatalogObject request;
393 TCatalogObject result;
396 Value debug_string(ThriftDebugString(result).c_str(), document->GetAllocator());
397 document->AddMember(
"thrift_string", debug_string, document->GetAllocator());
399 Value error(status.
GetDetail().c_str(), document->GetAllocator());
400 document->AddMember(
"error", error, document->GetAllocator());
403 Value error(
"Please specify values for the object_type and object_name parameters.",
404 document->GetAllocator());
405 document->AddMember(
"error", error, document->GetAllocator());
Status TCatalogObjectFromObjectName(const TCatalogObjectType::type &object_type, const string &object_name, TCatalogObject *catalog_object)
const string CATALOG_TEMPLATE
virtual void PrioritizeLoad(TPrioritizeLoadResponse &resp, const TPrioritizeLoadRequest &req)
boost::mutex catalog_lock_
void GatherCatalogUpdatesThread()
string TCatalogObjectToEntryKey(const TCatalogObject &catalog_object)
virtual void UpdateCatalog(TUpdateCatalogResponse &resp, const TUpdateCatalogRequest &req)
const std::string GetDetail() const
string TNetworkAddressToString(const TNetworkAddress &address)
Utility method to print address as address:port.
DEFINE_int32(catalog_service_port, 26000,"port where the CatalogService is running")
TODO: Consider allowing fragment IDs as category parameters.
boost::function< void(const ArgumentMap &args, rapidjson::Document *json)> UrlCallback
boost::condition_variable catalog_update_cv_
void CatalogObjectsUrlCallback(const Webserver::ArgumentMap &args, rapidjson::Document *document)
#define RETURN_IF_ERROR(stmt)
some generally useful macros
M * RegisterMetric(M *metric)
MetricGroups may be organised hierarchically as a tree.
void RegisterUrlCallback(const std::string &path, const std::string &template_filename, const UrlCallback &callback, bool is_on_nav_bar=true)
Only one callback may be registered per URL.
boost::unordered_set< std::string > catalog_topic_entry_keys_
void BuildTopicUpdates(const std::vector< TCatalogObject > &catalog_objects)
static std::string IMPALA_CATALOG_TOPIC
TNetworkAddress MakeNetworkAddress(const string &hostname, int port)
boost::scoped_ptr< StatestoreSubscriber > statestore_subscriber_
virtual void ExecDdl(TDdlExecResponse &resp, const TDdlExecRequest &req)
TCatalogObjectType::type TCatalogObjectTypeFromName(const string &name)
Status Serialize(T *obj, std::vector< uint8_t > *result)
Serializes obj into result. Result will contain a copy of the memory.
const string CATALOG_WEB_PAGE
std::map< std::string, std::string > ArgumentMap
virtual void ResetMetadata(TResetMetadataResponse &resp, const TResetMetadataRequest &req)
boost::scoped_ptr< Thread > catalog_update_gathering_thread_
Thread that polls the catalog for any updates.
const string CATALOG_OBJECT_TEMPLATE
boost::function< void(const TopicDeltaMap &state, std::vector< TTopicDelta > *topic_updates)> UpdateCallback
boost::scoped_ptr< Catalog > catalog_
bool topic_updates_ready_
int64_t catalog_objects_min_version_
const string CATALOG_SERVER_TOPIC_PROCESSING_TIMES
virtual void SentryAdminCheck(TSentryAdminCheckResponse &resp, const TSentryAdminCheckRequest &req)
void ToThrift(TStatus *status) const
Convert into TStatus.
void RegisterWebpages(Webserver *webserver)
void Update(const T &value)
std::map< Statestore::TopicId, TTopicDelta > TopicDeltaMap
A TopicDeltaMap is passed to each callback. See UpdateCallback for more details.
uint64_t ElapsedTime() const
Returns time in nanosecond.
ThriftSerializer thrift_serializer_
DECLARE_int32(state_store_subscriber_port)
void CatalogUrlCallback(const Webserver::ArgumentMap &args, rapidjson::Document *document)
DECLARE_string(state_store_host)
StatsMetric< double > * topic_processing_time_metric_
Metric that tracks the amount of time taken preparing a catalog update.
virtual void GetCatalogObject(TGetCatalogObjectResponse &resp, const TGetCatalogObjectRequest &req)
void UpdateCatalogTopicCallback(const StatestoreSubscriber::TopicDeltaMap &incoming_topic_deltas, std::vector< TTopicDelta > *subscriber_topic_updates)
int64_t catalog_objects_max_version_
virtual void GetFunctions(TGetFunctionsResponse &resp, const TGetFunctionsRequest &req)
CatalogServiceThriftIf(CatalogServer *catalog_server)
CatalogServer * catalog_server_
const string CATALOG_OBJECT_WEB_PAGE
std::vector< TTopicItem > pending_topic_updates_
DECLARE_bool(compact_catalog_topic)
int64_t last_sent_catalog_version_