Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
data-stream-mgr.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 
16 #ifndef IMPALA_RUNTIME_DATA_STREAM_MGR_H
17 #define IMPALA_RUNTIME_DATA_STREAM_MGR_H
18 
19 #include <list>
20 #include <set>
21 #include <boost/thread/mutex.hpp>
22 #include <boost/shared_ptr.hpp>
23 #include <boost/thread/condition_variable.hpp>
24 #include <boost/unordered_map.hpp>
25 
26 #include "common/status.h"
27 #include "common/object-pool.h"
28 #include "runtime/descriptors.h" // for PlanNodeId
29 #include "runtime/mem-tracker.h"
30 #include "util/promise.h"
31 #include "util/runtime-profile.h"
32 #include "gen-cpp/Types_types.h" // for TUniqueId
33 
34 namespace impala {
35 
36 class DescriptorTbl;
37 class DataStreamRecvr;
38 class RowBatch;
39 class RuntimeState;
40 class TRowBatch;
41 
49 //
53 //
57  public:
59 
66  boost::shared_ptr<DataStreamRecvr> CreateRecvr(
67  RuntimeState* state, const RowDescriptor& row_desc,
68  const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
69  int num_senders, int buffer_size, RuntimeProfile* profile,
70  bool is_merging);
71 
81  Status AddData(const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
82  const TRowBatch& thrift_batch, int sender_id);
83 
87  Status CloseSender(const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
88  int sender_id);
89 
91  void Cancel(const TUniqueId& fragment_instance_id);
92 
93  private:
94  friend class DataStreamRecvr;
95 
97  boost::mutex lock_;
98 
104  typedef boost::unordered_multimap<uint32_t,
105  boost::shared_ptr<DataStreamRecvr> > StreamMap;
107 
109  struct ComparisonOp {
110  bool operator()(const std::pair<impala::TUniqueId, PlanNodeId>& a,
111  const std::pair<impala::TUniqueId, PlanNodeId>& b) {
112  if (a.first.hi < b.first.hi) {
113  return true;
114  } else if (a.first.hi > b.first.hi) {
115  return false;
116  } else if (a.first.lo < b.first.lo) {
117  return true;
118  } else if (a.first.lo > b.first.lo) {
119  return false;
120  }
121  return a.second < b.second;
122  }
123  };
124 
126  typedef std::set<std::pair<TUniqueId, PlanNodeId>, ComparisonOp > FragmentStreamSet;
128 
132  boost::shared_ptr<DataStreamRecvr> FindRecvr(
133  const TUniqueId& fragment_instance_id, PlanNodeId node_id,
134  bool acquire_lock = true);
135 
137  Status DeregisterRecvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id);
138 
139  inline uint32_t GetHashValue(const TUniqueId& fragment_instance_id, PlanNodeId node_id);
140 };
141 
142 }
143 
144 #endif
Status DeregisterRecvr(const TUniqueId &fragment_instance_id, PlanNodeId node_id)
Remove receiver block for fragment_instance_id/node_id from the map.
int PlanNodeId
Definition: global-types.h:26
boost::shared_ptr< DataStreamRecvr > CreateRecvr(RuntimeState *state, const RowDescriptor &row_desc, const TUniqueId &fragment_instance_id, PlanNodeId dest_node_id, int num_senders, int buffer_size, RuntimeProfile *profile, bool is_merging)
FragmentStreamSet fragment_stream_set_
Status CloseSender(const TUniqueId &fragment_instance_id, PlanNodeId dest_node_id, int sender_id)
bool operator()(const std::pair< impala::TUniqueId, PlanNodeId > &a, const std::pair< impala::TUniqueId, PlanNodeId > &b)
boost::mutex lock_
protects all fields below
boost::shared_ptr< DataStreamRecvr > FindRecvr(const TUniqueId &fragment_instance_id, PlanNodeId node_id, bool acquire_lock=true)
uint32_t GetHashValue(const TUniqueId &fragment_instance_id, PlanNodeId node_id)
less-than ordering for pair<TUniqueId, PlanNodeId>
boost::unordered_multimap< uint32_t, boost::shared_ptr< DataStreamRecvr > > StreamMap
std::set< std::pair< TUniqueId, PlanNodeId >, ComparisonOp > FragmentStreamSet
ordered set of registered streams' fragment instance id/node id
const RowDescriptor & row_desc() const
Status AddData(const TUniqueId &fragment_instance_id, PlanNodeId dest_node_id, const TRowBatch &thrift_batch, int sender_id)
void Cancel(const TUniqueId &fragment_instance_id)
Closes all receivers registered for fragment_instance_id immediately.