Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
thrift-util.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "rpc/thrift-util.h"
16 
17 #include <boost/shared_ptr.hpp>
18 #include <boost/thread.hpp>
19 
20 #include "util/hash-util.h"
21 #include "util/time.h"
22 #include "rpc/thrift-server.h"
23 #include "gen-cpp/Types_types.h"
24 #include "gen-cpp/Data_types.h"
25 
26 // TCompactProtocol requires some #defines to work right. They also define UNLIKELY
27 // so we need to undef this.
28 // TODO: is there a better include to use?
29 #ifdef UNLIKELY
30 #undef UNLIKELY
31 #endif
32 #define SIGNED_RIGHT_SHIFT_IS 1
33 #define ARITHMETIC_RIGHT_SHIFT 1
34 
35 // Thrift does things like throw exception("some string " + int) which just returns
36 // garbage.
37 // TODO: get thrift to fix this.
38 #pragma clang diagnostic push
39 #pragma clang diagnostic ignored "-Wstring-plus-int"
40 #include <thrift/Thrift.h>
41 #include <thrift/transport/TSocket.h>
42 #include <thrift/transport/TServerSocket.h>
43 #include <thrift/concurrency/ThreadManager.h>
44 #include <thrift/concurrency/PosixThreadFactory.h>
45 #include <thrift/protocol/TCompactProtocol.h>
46 #pragma clang diagnostic pop
47 
48 #include "common/names.h"
49 
50 using namespace apache::thrift;
51 using namespace apache::thrift::transport;
52 using namespace apache::thrift::server;
53 using namespace apache::thrift::protocol;
54 using namespace apache::thrift::concurrency;
55 
56 // Thrift defines operator< but does not implement it. This is a stub
57 // implementation so we can link.
58 bool Apache::Hadoop::Hive::Partition::operator<(
59  const Apache::Hadoop::Hive::Partition& x) const {
60  DCHECK(false) << "This should not get called.";
61  return false;
62 }
63 
64 namespace impala {
65 
66 ThriftSerializer::ThriftSerializer(bool compact, int initial_buffer_size) :
67  mem_buffer_(new TMemoryBuffer(initial_buffer_size)) {
68  if (compact) {
69  TCompactProtocolFactoryT<TMemoryBuffer> factory;
70  protocol_ = factory.getProtocol(mem_buffer_);
71  } else {
72  TBinaryProtocolFactoryT<TMemoryBuffer> factory;
73  protocol_ = factory.getProtocol(mem_buffer_);
74  }
75 }
76 
77 shared_ptr<TProtocol> CreateDeserializeProtocol(
78  shared_ptr<TMemoryBuffer> mem, bool compact) {
79  if (compact) {
80  TCompactProtocolFactoryT<TMemoryBuffer> tproto_factory;
81  return tproto_factory.getProtocol(mem);
82  } else {
83  TBinaryProtocolFactoryT<TMemoryBuffer> tproto_factory;
84  return tproto_factory.getProtocol(mem);
85  }
86 }
87 
88 // Comparator for THostPorts. Thrift declares this (in gen-cpp/Types_types.h) but
89 // never defines it.
90 bool TNetworkAddress::operator<(const TNetworkAddress& that) const {
91  if (this->hostname < that.hostname) {
92  return true;
93  } else if ((this->hostname == that.hostname) && (this->port < that.port)) {
94  return true;
95  }
96  return false;
97 };
98 
99 // Comparator for TUniqueIds
100 bool TUniqueId::operator<(const TUniqueId& that) const {
101  return (hi < that.hi) || (hi == that.hi && lo < that.lo);
102 }
103 
104 bool TAccessEvent::operator<(const TAccessEvent& that) const {
105  return this->name < that.name;
106 }
107 
108 static void ThriftOutputFunction(const char* output) {
109  VLOG_QUERY << output;
110 }
111 
113  GlobalOutput.setOutputFunction(ThriftOutputFunction);
114 }
115 
116 Status WaitForLocalServer(const ThriftServer& server, int num_retries,
117  int retry_interval_ms) {
118  return WaitForServer("localhost", server.port(), num_retries, retry_interval_ms);
119 }
120 
121 Status WaitForServer(const string& host, int port, int num_retries,
122  int retry_interval_ms) {
123  int retry_count = 0;
124  while (retry_count < num_retries) {
125  try {
126  TSocket socket(host, port);
127  // Timeout is in ms
128  socket.setConnTimeout(500);
129  socket.open();
130  socket.close();
131  return Status::OK;
132  } catch (const TException& e) {
133  VLOG_QUERY << "Connection failed: " << e.what();
134  }
135  ++retry_count;
136  VLOG_QUERY << "Waiting " << retry_interval_ms << "ms for Thrift server at "
137  << host << ":" << port
138  << " to come up, failed attempt " << retry_count
139  << " of " << num_retries;
140  SleepForMs(retry_interval_ms);
141  }
142  return Status("Server did not come up");
143 }
144 
145 std::ostream& operator<<(std::ostream& out, const TColumnValue& colval) {
146  if (colval.__isset.bool_val) {
147  out << ((colval.bool_val) ? "true" : "false");
148  } else if (colval.__isset.double_val) {
149  out << colval.double_val;
150  } else if (colval.__isset.byte_val) {
151  out << colval.byte_val;
152  } else if (colval.__isset.short_val) {
153  out << colval.short_val;
154  } else if (colval.__isset.int_val) {
155  out << colval.int_val;
156  } else if (colval.__isset.long_val) {
157  out << colval.long_val;
158  } else if (colval.__isset.string_val) {
159  out << colval.string_val;
160  } else if (colval.__isset.binary_val) {
161  out << colval.binary_val; // Stored as a std::string
162  } else {
163  out << "NULL";
164  }
165  return out;
166 }
167 
168 bool TNetworkAddressComparator(const TNetworkAddress& a, const TNetworkAddress& b) {
169  int cmp = a.hostname.compare(b.hostname);
170  if (cmp < 0) return true;
171  if (cmp == 0) return a.port < b.port;
172  return false;
173 }
174 
175 bool IsTimeoutTException(const TException& e) {
176  // String taken from Thrift's TSocket.cpp
177  return strstr(e.what(), "EAGAIN (timed out)") != NULL;
178 }
179 
180 }
void InitThriftLogging()
Redirects all Thrift logging to VLOG(1)
Definition: thrift-util.cc:112
const StringSearch UrlParser::protocol_search & protocol
Definition: url-parser.cc:36
boost::shared_ptr< apache::thrift::protocol::TProtocol > protocol_
Definition: thrift-util.h:90
void SleepForMs(const int64_t duration_ms)
Sleeps the current thread for at least duration_ms milliseconds.
Definition: time.cc:21
bool TNetworkAddressComparator(const TNetworkAddress &a, const TNetworkAddress &b)
Definition: thrift-util.cc:168
#define VLOG_QUERY
Definition: logging.h:57
bool IsTimeoutTException(const TException &e)
Definition: thrift-util.cc:175
Status WaitForServer(const string &host, int port, int num_retries, int retry_interval_ms)
Definition: thrift-util.cc:121
static void ThriftOutputFunction(const char *output)
Definition: thrift-util.cc:108
static const Status OK
Definition: status.h:87
Status WaitForLocalServer(const ThriftServer &server, int num_retries, int retry_interval_ms)
Definition: thrift-util.cc:116
boost::shared_ptr< apache::thrift::transport::TMemoryBuffer > mem_buffer_
Definition: thrift-util.h:89
shared_ptr< TProtocol > CreateDeserializeProtocol(shared_ptr< TMemoryBuffer > mem, bool compact)
Definition: thrift-util.cc:77
ostream & operator<<(ostream &os, const map< TNetworkAddress, llama::TAllocatedResource > &resources)
string name
Definition: cpu-info.cc:50