Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
data-stream-mgr.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include <iostream>
18 #include <boost/functional/hash.hpp>
19 #include <boost/thread/locks.hpp>
20 #include <boost/thread/thread.hpp>
21 
22 #include "runtime/row-batch.h"
24 #include "runtime/raw-value.h"
25 #include "runtime/runtime-state.h"
26 #include "util/debug-util.h"
28 
29 #include "gen-cpp/ImpalaInternalService.h"
30 #include "gen-cpp/ImpalaInternalService_types.h"
31 
32 #include "common/names.h"
33 
34 using namespace apache::thrift;
35 
36 namespace impala {
37 
38 inline uint32_t DataStreamMgr::GetHashValue(
39  const TUniqueId& fragment_instance_id, PlanNodeId node_id) {
40  uint32_t value = RawValue::GetHashValue(&fragment_instance_id.lo, TYPE_BIGINT, 0);
41  value = RawValue::GetHashValue(&fragment_instance_id.hi, TYPE_BIGINT, value);
42  value = RawValue::GetHashValue(&node_id, TYPE_INT, value);
43  return value;
44 }
45 
46 shared_ptr<DataStreamRecvr> DataStreamMgr::CreateRecvr(RuntimeState* state,
47  const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id,
48  PlanNodeId dest_node_id, int num_senders, int buffer_size, RuntimeProfile* profile,
49  bool is_merging) {
50  DCHECK(profile != NULL);
51  VLOG_FILE << "creating receiver for fragment="
52  << fragment_instance_id << ", node=" << dest_node_id;
53  shared_ptr<DataStreamRecvr> recvr(
54  new DataStreamRecvr(this, state->instance_mem_tracker(), row_desc,
55  fragment_instance_id, dest_node_id, num_senders, is_merging, buffer_size,
56  profile));
57  size_t hash_value = GetHashValue(fragment_instance_id, dest_node_id);
58  lock_guard<mutex> l(lock_);
59  fragment_stream_set_.insert(make_pair(fragment_instance_id, dest_node_id));
60  receiver_map_.insert(make_pair(hash_value, recvr));
61  return recvr;
62 }
63 
64 shared_ptr<DataStreamRecvr> DataStreamMgr::FindRecvr(
65  const TUniqueId& fragment_instance_id, PlanNodeId node_id, bool acquire_lock) {
66  VLOG_ROW << "looking up fragment_instance_id=" << fragment_instance_id
67  << ", node=" << node_id;
68  size_t hash_value = GetHashValue(fragment_instance_id, node_id);
69  if (acquire_lock) lock_.lock();
70  pair<StreamMap::iterator, StreamMap::iterator> range =
71  receiver_map_.equal_range(hash_value);
72  while (range.first != range.second) {
73  shared_ptr<DataStreamRecvr> recvr = range.first->second;
74  if (recvr->fragment_instance_id() == fragment_instance_id
75  && recvr->dest_node_id() == node_id) {
76  if (acquire_lock) lock_.unlock();
77  return recvr;
78  }
79  ++range.first;
80  }
81  if (acquire_lock) lock_.unlock();
82  return shared_ptr<DataStreamRecvr>();
83 }
84 
85 Status DataStreamMgr::AddData(
86  const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
87  const TRowBatch& thrift_batch, int sender_id) {
88  VLOG_ROW << "AddData(): fragment_instance_id=" << fragment_instance_id
89  << " node=" << dest_node_id
90  << " size=" << RowBatch::GetBatchSize(thrift_batch);
91  shared_ptr<DataStreamRecvr> recvr =
92  FindRecvr(fragment_instance_id, dest_node_id);
93  if (recvr == NULL) {
94  // The receiver may remove itself from the receiver map via DeregisterRecvr()
95  // at any time without considering the remaining number of senders.
96  // As a consequence, FindRecvr() may return an innocuous NULL if a thread
97  // calling DeregisterRecvr() beat the thread calling FindRecvr()
98  // in acquiring lock_.
99  // TODO: Rethink the lifecycle of DataStreamRecvr to distinguish
100  // errors from receiver-initiated teardowns.
101  return Status::OK;
102  }
103  recvr->AddBatch(thrift_batch, sender_id);
104  return Status::OK;
105 }
106 
107 Status DataStreamMgr::CloseSender(const TUniqueId& fragment_instance_id,
108  PlanNodeId dest_node_id, int sender_id) {
109  VLOG_FILE << "CloseSender(): fragment_instance_id=" << fragment_instance_id
110  << ", node=" << dest_node_id;
111  shared_ptr<DataStreamRecvr> recvr = FindRecvr(fragment_instance_id, dest_node_id);
112  if (recvr == NULL) {
113  // The receiver may remove itself from the receiver map via DeregisterRecvr()
114  // at any time without considering the remaining number of senders.
115  // As a consequence, FindRecvr() may return an innocuous NULL if a thread
116  // calling DeregisterRecvr() beat the thread calling FindRecvr()
117  // in acquiring lock_.
118  // TODO: Rethink the lifecycle of DataStreamRecvr to distinguish
119  // errors from receiver-initiated teardowns.
120  return Status::OK;
121  }
122  recvr->RemoveSender(sender_id);
123  return Status::OK;
124 }
125 
126 Status DataStreamMgr::DeregisterRecvr(
127  const TUniqueId& fragment_instance_id, PlanNodeId node_id) {
128  VLOG_QUERY << "DeregisterRecvr(): fragment_instance_id=" << fragment_instance_id
129  << ", node=" << node_id;
130  size_t hash_value = GetHashValue(fragment_instance_id, node_id);
131  lock_guard<mutex> l(lock_);
132  pair<StreamMap::iterator, StreamMap::iterator> range =
133  receiver_map_.equal_range(hash_value);
134  while (range.first != range.second) {
135  const shared_ptr<DataStreamRecvr>& recvr = range.first->second;
136  if (recvr->fragment_instance_id() == fragment_instance_id
137  && recvr->dest_node_id() == node_id) {
138  // Notify concurrent AddData() requests that the stream has been terminated.
139  recvr->CancelStream();
140  fragment_stream_set_.erase(make_pair(recvr->fragment_instance_id(),
141  recvr->dest_node_id()));
142  receiver_map_.erase(range.first);
143  return Status::OK;
144  }
145  ++range.first;
146  }
147 
148  stringstream err;
149  err << "unknown row receiver id: fragment_instance_id=" << fragment_instance_id
150  << " node_id=" << node_id;
151  LOG(ERROR) << err.str();
152  return Status(err.str());
153 }
154 
155 void DataStreamMgr::Cancel(const TUniqueId& fragment_instance_id) {
156  VLOG_QUERY << "cancelling all streams for fragment=" << fragment_instance_id;
157  lock_guard<mutex> l(lock_);
158  FragmentStreamSet::iterator i =
159  fragment_stream_set_.lower_bound(make_pair(fragment_instance_id, 0));
160  while (i != fragment_stream_set_.end() && i->first == fragment_instance_id) {
161  shared_ptr<DataStreamRecvr> recvr = FindRecvr(i->first, i->second, false);
162  if (recvr == NULL) {
163  // keep going but at least log it
164  stringstream err;
165  err << "Cancel(): missing in stream_map: fragment=" << i->first
166  << " node=" << i->second;
167  LOG(ERROR) << err.str();
168  } else {
169  recvr->CancelStream();
170  }
171  ++i;
172  }
173 }
174 
175 }
int PlanNodeId
Definition: global-types.h:26
void Cancel(const Status *cause=NULL)
std::size_t hash_value(const Decimal4Value &v)
This function must be called 'hash_value' to be picked up by boost.
#define VLOG_QUERY
Definition: logging.h:57
mutex lock_
#define VLOG_ROW
Definition: logging.h:59
const RowDescriptor & row_desc() const
MemTracker * instance_mem_tracker()
#define VLOG_FILE
Definition: logging.h:58