Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hdfs-table-sink.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 
16 #ifndef IMPALA_EXEC_HDFS_TABLE_SINK_H
17 #define IMPALA_EXEC_HDFS_TABLE_SINK_H
18 
19 #include <hdfs.h>
20 #include <boost/unordered_map.hpp>
21 #include <boost/scoped_ptr.hpp>
22 
24 #include "common/object-pool.h"
25 #include "exec/data-sink.h"
26 #include "runtime/descriptors.h"
27 #include "util/runtime-profile.h"
28 
29 namespace impala {
30 
31 class Expr;
32 class TupleDescriptor;
33 class TupleRow;
34 class RuntimeState;
35 class HdfsTableWriter;
36 class MemTracker;
37 
43 
50 
55  std::string current_file_name;
56 
60  std::string tmp_hdfs_dir_name;
61 
66 
68  std::string partition_name;
69 
72 
74  hdfsFile tmp_hdfs_file;
75 
77  int64_t num_rows;
78 
80  int32_t num_files;
81 
83  boost::scoped_ptr<HdfsTableWriter> writer;
84 
87 
89 };
90 
94 //
98 //
109 //
122 class HdfsTableSink : public DataSink {
123  public:
125  const std::vector<TExpr>& select_list_texprs, const TDataSink& tsink);
126 
128  virtual Status Prepare(RuntimeState* state);
129 
132  virtual Status Open(RuntimeState* state);
133 
135  virtual Status Send(RuntimeState* state, RowBatch* batch, bool eos);
136 
140  virtual void Close(RuntimeState* state);
141 
145  static Status GetFileBlockSize(OutputPartition* output_partition, int64_t* size);
146 
147  virtual RuntimeProfile* profile() { return runtime_profile_; }
149  MemTracker* mem_tracker() { return mem_tracker_.get(); }
150 
156 
157  std::string DebugString() const;
158 
159  private:
162  const HdfsPartitionDescriptor& partition_descriptor,
163  OutputPartition* output_partition);
164 
170  Status CreateNewTmpFile(RuntimeState* state, OutputPartition* output_partition);
171 
176  typedef std::pair<OutputPartition*, std::vector<int32_t> > PartitionPair;
177  typedef boost::unordered_map<std::string, PartitionPair> PartitionMap;
178 
179 
183  void GetHashTblKey(const std::vector<ExprContext*>& ctxs, std::string* key);
184 
187  Status GetOutputPartition(RuntimeState* state, const std::string& key,
188  PartitionPair** partition_pair);
189 
192 
197  void BuildHdfsFileNames(const HdfsPartitionDescriptor& partition_descriptor,
198  OutputPartition* output);
199 
203 
205  void ClosePartitionFile(RuntimeState* state, OutputPartition* partition);
206 
209 
212 
214  std::vector<ExprContext*> output_expr_ctxs_;
215 
218 
221 
224 
227 
230  const std::vector<TExpr>& select_list_texprs_;
231 
234  const std::vector<TExpr>& partition_key_texprs_;
235 
237  std::vector<ExprContext*> partition_key_expr_ctxs_;
238 
241 
244  std::string staging_dir_;
245 
248  std::string unique_id_str_;
249 
256 
259  std::vector<ExprContext*> dynamic_partition_key_expr_ctxs_;
260 
265  typedef boost::unordered_map<std::string, HdfsPartitionDescriptor*>
268 
269  boost::scoped_ptr<MemTracker> mem_tracker_;
270 
277 
284 
288 };
289 }
290 #endif
static Status GetFileBlockSize(OutputPartition *output_partition, int64_t *size)
TableId table_id_
Table id resolved in Prepare() to set tuple_desc_;.
std::vector< ExprContext * > output_expr_ctxs_
Exprs that materialize output values.
std::string final_hdfs_file_name_prefix
virtual RuntimeProfile * profile()
Returns the runtime profile for the sink.
void BuildHdfsFileNames(const HdfsPartitionDescriptor &partition_descriptor, OutputPartition *output)
int TableId
Definition: global-types.h:25
Status PrepareExprs(RuntimeState *state)
Initialise and prepare select and partition key expressions.
PartitionDescriptorMap partition_descriptor_map_
virtual void Close(RuntimeState *state)
HdfsTableSink(const RowDescriptor &row_desc, const std::vector< TExpr > &select_list_texprs, const TDataSink &tsink)
RuntimeProfile::Counter * partitions_created_counter_
Status GetOutputPartition(RuntimeState *state, const std::string &key, PartitionPair **partition_pair)
int32_t num_files
Number of files created in this partition.
MemTracker * mem_tracker()
std::string partition_name
key1=val1/key2=val2/ etc. Used to identify partitions to the metastore.
std::string DebugString() const
const HdfsPartitionDescriptor * partition_descriptor
The descriptor for this partition.
boost::unordered_map< std::string, PartitionPair > PartitionMap
Superclass of all data sinks.
Definition: data-sink.h:39
RuntimeProfile::Counter * rows_inserted_counter_
const HdfsPartitionDescriptor * default_partition_
Currently this is the default partition since we don't support multi-format sinks.
hdfsFS hdfs_connection
Connection to hdfs.
hdfsFS hdfs_connection_
Connection to hdfs, established in Open() and closed in Close().
const HdfsTableDescriptor * table_desc_
Descriptor of target table. Set in Prepare().
void ClosePartitionFile(RuntimeState *state, OutputPartition *partition)
Closes the hdfs file for this partition as well as the writer.
Status CreateNewTmpFile(RuntimeState *state, OutputPartition *output_partition)
std::vector< ExprContext * > partition_key_expr_ctxs_
Exprs of partition keys.
RuntimeProfile::Counter * encode_timer_
Time spent converting tuple to on disk format.
virtual Status Prepare(RuntimeState *state)
Prepares output_exprs and partition_key_exprs, and connects to HDFS.
RuntimeProfile::Counter * bytes_written_counter()
hdfsFile tmp_hdfs_file
Hdfs file at tmp_hdfs_file_name.
bool overwrite_
Indicates whether the existing partitions should be overwritten.
RuntimeProfile::Counter * files_created_counter_
RuntimeProfile * runtime_profile_
Allocated from runtime state's pool.
std::pair< OutputPartition *, std::vector< int32_t > > PartitionPair
This class is thread-safe.
Definition: mem-tracker.h:61
void GetHashTblKey(const std::vector< ExprContext * > &ctxs, std::string *key)
int64_t num_rows
Records number of rows appended to the current file in this partition.
boost::scoped_ptr< HdfsTableWriter > writer
Table format specific writer functions.
TupleRow * current_row_
Current row from the current RowBatch to output.
const RowDescriptor & row_desc() const
const std::vector< TExpr > & select_list_texprs_
RuntimeProfile::Counter * compress_timer_
Time spent compressing data.
RuntimeProfile::Counter * hdfs_write_timer_
Time spent writing to hdfs.
RuntimeProfile::Counter * hdfs_write_timer()
virtual Status Send(RuntimeState *state, RowBatch *batch, bool eos)
Append all rows in batch to the temporary Hdfs files corresponding to partitions. ...
RuntimeProfile::Counter * rows_inserted_counter()
const HdfsTableDescriptor & TableDesc()
RuntimeProfile::Counter * encode_timer()
Status FinalizePartitionFile(RuntimeState *state, OutputPartition *partition)
std::vector< ExprContext * > dynamic_partition_key_expr_ctxs_
Metadata for a single partition inside an Hdfs table.
Definition: descriptors.h:177
boost::scoped_ptr< MemTracker > mem_tracker_
boost::unordered_map< std::string, HdfsPartitionDescriptor * > PartitionDescriptorMap
Status InitOutputPartition(RuntimeState *state, const HdfsPartitionDescriptor &partition_descriptor, OutputPartition *output_partition)
Initialises the filenames of a given output partition, and opens the temporary file.
RuntimeProfile::Counter * compress_timer()
RuntimeProfile::Counter * bytes_written_counter_
std::string tmp_hdfs_file_name_prefix
virtual Status Open(RuntimeState *state)
PartitionMap partition_keys_to_output_partitions_
const std::vector< TExpr > & partition_key_texprs_
const RowDescriptor & row_desc_
Row descriptor of row batches passed in Send(). Set in c'tor.