18 #include <boost/functional/hash.hpp>
19 #include <boost/thread/locks.hpp>
20 #include <boost/thread/thread.hpp>
29 #include "gen-cpp/ImpalaInternalService.h"
30 #include "gen-cpp/ImpalaInternalService_types.h"
34 using namespace apache::thrift;
38 inline uint32_t DataStreamMgr::GetHashValue(
39 const TUniqueId& fragment_instance_id,
PlanNodeId node_id) {
40 uint32_t value = RawValue::GetHashValue(&fragment_instance_id.lo,
TYPE_BIGINT, 0);
41 value = RawValue::GetHashValue(&fragment_instance_id.hi,
TYPE_BIGINT, value);
42 value = RawValue::GetHashValue(&node_id,
TYPE_INT, value);
46 shared_ptr<DataStreamRecvr> DataStreamMgr::CreateRecvr(
RuntimeState* state,
50 DCHECK(profile != NULL);
51 VLOG_FILE <<
"creating receiver for fragment="
52 << fragment_instance_id <<
", node=" << dest_node_id;
53 shared_ptr<DataStreamRecvr> recvr(
55 fragment_instance_id, dest_node_id, num_senders, is_merging, buffer_size,
57 size_t hash_value = GetHashValue(fragment_instance_id, dest_node_id);
58 lock_guard<mutex> l(
lock_);
59 fragment_stream_set_.insert(make_pair(fragment_instance_id, dest_node_id));
60 receiver_map_.insert(make_pair(hash_value, recvr));
64 shared_ptr<DataStreamRecvr> DataStreamMgr::FindRecvr(
65 const TUniqueId& fragment_instance_id,
PlanNodeId node_id,
bool acquire_lock) {
66 VLOG_ROW <<
"looking up fragment_instance_id=" << fragment_instance_id
67 <<
", node=" << node_id;
68 size_t hash_value = GetHashValue(fragment_instance_id, node_id);
69 if (acquire_lock)
lock_.lock();
70 pair<StreamMap::iterator, StreamMap::iterator> range =
71 receiver_map_.equal_range(hash_value);
72 while (range.first != range.second) {
73 shared_ptr<DataStreamRecvr> recvr = range.first->second;
74 if (recvr->fragment_instance_id() == fragment_instance_id
75 && recvr->dest_node_id() == node_id) {
76 if (acquire_lock)
lock_.unlock();
81 if (acquire_lock)
lock_.unlock();
82 return shared_ptr<DataStreamRecvr>();
86 const TUniqueId& fragment_instance_id,
PlanNodeId dest_node_id,
87 const TRowBatch& thrift_batch,
int sender_id) {
88 VLOG_ROW <<
"AddData(): fragment_instance_id=" << fragment_instance_id
89 <<
" node=" << dest_node_id
90 <<
" size=" << RowBatch::GetBatchSize(thrift_batch);
91 shared_ptr<DataStreamRecvr> recvr =
92 FindRecvr(fragment_instance_id, dest_node_id);
103 recvr->AddBatch(thrift_batch, sender_id);
107 Status DataStreamMgr::CloseSender(
const TUniqueId& fragment_instance_id,
109 VLOG_FILE <<
"CloseSender(): fragment_instance_id=" << fragment_instance_id
110 <<
", node=" << dest_node_id;
111 shared_ptr<DataStreamRecvr> recvr = FindRecvr(fragment_instance_id, dest_node_id);
122 recvr->RemoveSender(sender_id);
127 const TUniqueId& fragment_instance_id,
PlanNodeId node_id) {
128 VLOG_QUERY <<
"DeregisterRecvr(): fragment_instance_id=" << fragment_instance_id
129 <<
", node=" << node_id;
130 size_t hash_value = GetHashValue(fragment_instance_id, node_id);
131 lock_guard<mutex> l(
lock_);
132 pair<StreamMap::iterator, StreamMap::iterator> range =
133 receiver_map_.equal_range(hash_value);
134 while (range.first != range.second) {
135 const shared_ptr<DataStreamRecvr>& recvr = range.first->second;
136 if (recvr->fragment_instance_id() == fragment_instance_id
137 && recvr->dest_node_id() == node_id) {
139 recvr->CancelStream();
140 fragment_stream_set_.erase(make_pair(recvr->fragment_instance_id(),
141 recvr->dest_node_id()));
142 receiver_map_.erase(range.first);
149 err <<
"unknown row receiver id: fragment_instance_id=" << fragment_instance_id
150 <<
" node_id=" << node_id;
151 LOG(ERROR) << err.str();
156 VLOG_QUERY <<
"cancelling all streams for fragment=" << fragment_instance_id;
157 lock_guard<mutex> l(
lock_);
158 FragmentStreamSet::iterator i =
159 fragment_stream_set_.lower_bound(make_pair(fragment_instance_id, 0));
160 while (i != fragment_stream_set_.end() && i->first == fragment_instance_id) {
161 shared_ptr<DataStreamRecvr> recvr = FindRecvr(i->first, i->second,
false);
165 err <<
"Cancel(): missing in stream_map: fragment=" << i->first
166 <<
" node=" << i->second;
167 LOG(ERROR) << err.str();
169 recvr->CancelStream();
void Cancel(const Status *cause=NULL)
std::size_t hash_value(const Decimal4Value &v)
This function must be called 'hash_value' to be picked up by boost.
const RowDescriptor & row_desc() const
MemTracker * instance_mem_tracker()