Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hdfs-table-sink.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "exec/hdfs-table-sink.h"
20 #include "exec/exec-node.h"
21 #include "gen-cpp/ImpalaInternalService_constants.h"
22 #include "util/hdfs-util.h"
23 #include "exprs/expr.h"
24 #include "exprs/expr-context.h"
25 #include "runtime/hdfs-fs-cache.h"
26 #include "runtime/raw-value.h"
27 #include "runtime/row-batch.h"
28 #include "runtime/runtime-state.h"
30 #include "util/impalad-metrics.h"
31 #include "runtime/mem-tracker.h"
32 #include "util/url-coding.h"
33 
34 #include <vector>
35 #include <sstream>
36 #include <gutil/strings/substitute.h>
37 #include <hdfs.h>
38 #include <boost/scoped_ptr.hpp>
39 #include <boost/date_time/posix_time/posix_time.hpp>
40 #include <stdlib.h>
41 
42 #include "gen-cpp/ImpalaInternalService_constants.h"
43 
44 #include "common/names.h"
45 
46 using boost::posix_time::microsec_clock;
47 using boost::posix_time::ptime;
48 using namespace strings;
49 
50 namespace impala {
51 
52 const static string& ROOT_PARTITION_KEY =
53  g_ImpalaInternalService_constants.ROOT_PARTITION_KEY;
54 
55 HdfsTableSink::HdfsTableSink(const RowDescriptor& row_desc,
56  const vector<TExpr>& select_list_texprs,
57  const TDataSink& tsink)
58  : row_desc_(row_desc),
59  table_id_(tsink.table_sink.target_table_id),
60  select_list_texprs_(select_list_texprs),
61  partition_key_texprs_(tsink.table_sink.hdfs_table_sink.partition_key_exprs),
62  overwrite_(tsink.table_sink.hdfs_table_sink.overwrite),
63  has_empty_input_batch_(false) {
64  DCHECK(tsink.__isset.table_sink);
65 }
66 
67 OutputPartition::OutputPartition()
68  : hdfs_connection(NULL), tmp_hdfs_file(NULL), num_rows(0), num_files(0),
69  partition_descriptor(NULL) {
70 }
71 
73  // Prepare select list expressions.
74  // Disable codegen for these - they would be unused anyway.
75  // TODO: codegen table sink
80 
81  // Prepare partition key exprs and gather dynamic partition key exprs.
82  for (size_t i = 0; i < partition_key_expr_ctxs_.size(); ++i) {
83  // Remember non-constant partition key exprs for building hash table of Hdfs files.
84  if (!partition_key_expr_ctxs_[i]->root()->IsConstant()) {
86  }
87  }
88  // Sanity check.
89  DCHECK_LE(partition_key_expr_ctxs_.size(), table_desc_->col_names().size())
90  << DebugString();
92  << DebugString();
93  DCHECK_GE(output_expr_ctxs_.size(),
95 
96  // Prepare literal partition key exprs
97  BOOST_FOREACH(
98  const HdfsTableDescriptor::PartitionIdToDescriptorMap::value_type& id_to_desc,
100  HdfsPartitionDescriptor* partition = id_to_desc.second;
101  RETURN_IF_ERROR(partition->PrepareExprs(state));
102  }
103 
104  return Status::OK;
105 }
106 
110  runtime_profile_ = state->obj_pool()->Add(
111  new RuntimeProfile(state->obj_pool(), "HdfsTableSink"));
112  SCOPED_TIMER(runtime_profile_->total_time_counter());
113 
114  // TODO: Consider a system-wide random number generator, initialised in a single place.
115  ptime now = microsec_clock::local_time();
116  long seed = (now.time_of_day().seconds() * 1000)
117  + (now.time_of_day().total_microseconds() / 1000);
118  VLOG_QUERY << "Random seed: " << seed;
119  srand(seed);
120 
125 
126  // Resolve table id and set input tuple descriptor.
127  table_desc_ = static_cast<const HdfsTableDescriptor*>(
129 
130  if (table_desc_ == NULL) {
131  stringstream error_msg("Failed to get table descriptor for table id: ");
132  error_msg << table_id_;
133  return Status(error_msg.str());
134  }
135 
136  staging_dir_ = Substitute("$0/_impala_insert_staging/$1/", table_desc_->hdfs_base_dir(),
137  PrintId(state->query_id(), "_"));
138 
140  RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
142  mem_tracker_.reset(new MemTracker(profile(), -1, -1, profile()->name(),
143  state->instance_mem_tracker()));
144 
146  ADD_COUNTER(profile(), "PartitionsCreated", TUnit::UNIT);
148  ADD_COUNTER(profile(), "FilesCreated", TUnit::UNIT);
150  ADD_COUNTER(profile(), "RowsInserted", TUnit::UNIT);
152  ADD_COUNTER(profile(), "BytesWritten", TUnit::BYTES);
153  encode_timer_ = ADD_TIMER(profile(), "EncodeTimer");
154  hdfs_write_timer_ = ADD_TIMER(profile(), "HdfsWriteTimer");
155  compress_timer_ = ADD_TIMER(profile(), "CompressTimer");
156 
157  return Status::OK;
158 }
159 
163  // Open literal partition key exprs
164  BOOST_FOREACH(
165  const HdfsTableDescriptor::PartitionIdToDescriptorMap::value_type& id_to_desc,
167  HdfsPartitionDescriptor* partition = id_to_desc.second;
168  RETURN_IF_ERROR(partition->OpenExprs(state));
169  }
170 
171  // Get file format for default partition in table descriptor, and build a map from
172  // partition key values to partition descriptor for multiple output format support. The
173  // map is keyed on the concatenation of the non-constant keys of the PARTITION clause of
174  // the INSERT statement.
175  BOOST_FOREACH(
176  const HdfsTableDescriptor::PartitionIdToDescriptorMap::value_type& id_to_desc,
178  if (id_to_desc.first == g_ImpalaInternalService_constants.DEFAULT_PARTITION_ID) {
179  default_partition_ = id_to_desc.second;
180  } else {
181  // Build a map whose key is computed from the value of dynamic partition keys for a
182  // particular partition, and whose value is the descriptor for that partition.
183 
184  // True if this partition might be written to, false otherwise.
185  // A partition may be written to iff:
186  // For all partition key exprs e, either:
187  // 1. e is not constant
188  // 2. The value supplied by the query for this partition key is equal to e's
189  // constant value.
190  // Only relevant partitions are remembered in partition_descriptor_map_.
191  bool relevant_partition = true;
192  HdfsPartitionDescriptor* partition = id_to_desc.second;
193  DCHECK_EQ(partition->partition_key_value_ctxs().size(),
194  partition_key_expr_ctxs_.size());
195  vector<ExprContext*> dynamic_partition_key_value_ctxs;
196  for (size_t i = 0; i < partition_key_expr_ctxs_.size(); ++i) {
197  // Remember non-constant partition key exprs for building hash table of Hdfs files
198  if (!partition_key_expr_ctxs_[i]->root()->IsConstant()) {
199  dynamic_partition_key_value_ctxs.push_back(
200  partition->partition_key_value_ctxs()[i]);
201  } else {
202  // Deal with the following: one partition has (year=2009, month=3); another has
203  // (year=2010, month=3).
204  // A query like: INSERT INTO TABLE... PARTITION(year=2009) SELECT month FROM...
205  // would lead to both partitions having the same key modulo ignored constant
206  // partition keys. So only keep a reference to the partition which matches
207  // partition_key_values for constant values, since only that is written to.
208  void* table_partition_key_value =
209  partition->partition_key_value_ctxs()[i]->GetValue(NULL);
210  void* target_partition_key_value = partition_key_expr_ctxs_[i]->GetValue(NULL);
211  if (table_partition_key_value == NULL && target_partition_key_value == NULL) {
212  continue;
213  }
214  if (table_partition_key_value == NULL || target_partition_key_value == NULL
215  || !RawValue::Eq(table_partition_key_value, target_partition_key_value,
216  partition_key_expr_ctxs_[i]->root()->type())) {
217  relevant_partition = false;
218  break;
219  }
220  }
221  }
222  if (relevant_partition) {
223  string key;
224  // It's ok if current_row_ is NULL (which it should be here), since all of these
225  // expressions are constant, and can therefore be evaluated without a valid row
226  // context.
227  GetHashTblKey(dynamic_partition_key_value_ctxs, &key);
228  DCHECK(partition_descriptor_map_.find(key) == partition_descriptor_map_.end())
229  << "Partitions with duplicate 'static' keys found during INSERT";
230  partition_descriptor_map_[key] = partition;
231  }
232  }
233  }
234  if (default_partition_ == NULL) {
235  return Status("No default partition found for HdfsTextTableSink");
236  }
237  return Status::OK;
238 }
239 
241  const HdfsPartitionDescriptor& partition_descriptor,
242  OutputPartition* output_partition) {
243 
244  // Create final_hdfs_file_name_prefix and tmp_hdfs_file_name_prefix.
245  // Path: <hdfs_base_dir>/<partition_values>/<unique_id_str>
246 
247  // Temporary files are written under the following path which is unique to this sink:
248  // <table_dir>/_impala_insert_staging/<query_id>/<per_fragment_unique_id>_dir/
249  // Both the temporary directory and the file name, when moved to the real partition
250  // directory must be unique.
251  // Prefix the directory name with "." to make it hidden and append "_dir" at the end
252  // of the directory to avoid name clashes for unpartitioned tables.
253  // The files are located in <partition_values>/<random_value>_data under
254  // tmp_hdfs_file_name_prefix.
255 
256  // Use the query id as filename.
257  const string& query_suffix = Substitute("$0_$1_data", unique_id_str_, rand());
258 
259  output_partition->tmp_hdfs_dir_name =
260  Substitute("$0/.$1_$2_dir/", staging_dir_, unique_id_str_, rand());
261  output_partition->tmp_hdfs_file_name_prefix = Substitute("$0$1$2",
262  output_partition->tmp_hdfs_dir_name, output_partition->partition_name,
263  query_suffix);
264 
265  if (partition_descriptor.location().empty()) {
266  output_partition->final_hdfs_file_name_prefix = Substitute("$0/$1$2",
267  table_desc_->hdfs_base_dir(), output_partition->partition_name, query_suffix);
268  } else {
269  // If the partition descriptor has a location (as set by alter table add partition
270  // with a location clause), that provides the complete directory path for this
271  // partition. No partition key suffix ("p=1/j=foo/") should be added.
272  output_partition->final_hdfs_file_name_prefix =
273  Substitute("$0/$1", partition_descriptor.location(), query_suffix);
274  }
275 
276  output_partition->num_files = 0;
277 }
278 
280  OutputPartition* output_partition) {
281  SCOPED_TIMER(ADD_TIMER(profile(), "TmpFileCreateTimer"));
282  stringstream filename;
283  filename << output_partition->tmp_hdfs_file_name_prefix
284  << "." << output_partition->num_files
285  << "." << output_partition->writer->file_extension();
286  output_partition->current_file_name = filename.str();
287  // Check if tmp_hdfs_file_name exists.
288  const char* tmp_hdfs_file_name_cstr =
289  output_partition->current_file_name.c_str();
290  if (hdfsExists(hdfs_connection_, tmp_hdfs_file_name_cstr) == 0) {
291  return Status(GetHdfsErrorMsg("Temporary HDFS file already exists: ",
292  output_partition->current_file_name));
293  }
294  uint64_t block_size = output_partition->partition_descriptor->block_size();
295  if (block_size == 0) block_size = output_partition->writer->default_block_size();
296 
297  output_partition->tmp_hdfs_file = hdfsOpenFile(hdfs_connection_,
298  tmp_hdfs_file_name_cstr, O_WRONLY, 0, 0, block_size);
299  VLOG_FILE << "hdfsOpenFile() file=" << tmp_hdfs_file_name_cstr;
300  if (output_partition->tmp_hdfs_file == NULL) {
301  return Status(GetHdfsErrorMsg("Failed to open HDFS file for writing: ",
302  output_partition->current_file_name));
303  }
304 
307 
308  // Save the ultimate destination for this file (it will be moved by the coordinator)
309  stringstream dest;
310  dest << output_partition->final_hdfs_file_name_prefix
311  << "." << output_partition->num_files
312  << "." << output_partition->writer->file_extension();
313  (*state->hdfs_files_to_move())[output_partition->current_file_name] = dest.str();
314 
315  ++output_partition->num_files;
316  output_partition->num_rows = 0;
317  Status status = output_partition->writer->InitNewFile();
318  if (!status.ok()) {
319  ClosePartitionFile(state, output_partition);
320  hdfsDelete(hdfs_connection_, output_partition->current_file_name.c_str(), 0);
321  }
322  return status;
323 }
324 
326  const HdfsPartitionDescriptor& partition_descriptor,
327  OutputPartition* output_partition) {
328  // Build the unique name for this partition from the partition keys, e.g. "j=1/f=foo/"
329  // etc.
330  stringstream partition_name_ss;
331  for (int j = 0; j < partition_key_expr_ctxs_.size(); ++j) {
332  partition_name_ss << table_desc_->col_names()[j] << "=";
333  void* value = partition_key_expr_ctxs_[j]->GetValue(current_row_);
334  // NULL partition keys get a special value to be compatible with Hive.
335  if (value == NULL) {
336  partition_name_ss << table_desc_->null_partition_key_value();
337  } else {
338  string value_str;
339  partition_key_expr_ctxs_[j]->PrintValue(value, &value_str);
340  // Directory names containing partition-key values need to be UrlEncoded, in
341  // particular to avoid problems when '/' is part of the key value (which might
342  // occur, for example, with date strings). Hive will URL decode the value
343  // transparently when Impala's frontend asks the metastore for partition key values,
344  // which makes it particularly important that we use the same encoding as Hive. It's
345  // also not necessary to encode the values when writing partition metadata. You can
346  // check this with 'show partitions <tbl>' in Hive, followed by a select from a
347  // decoded partition key value.
348  string encoded_str;
349  UrlEncode(value_str, &encoded_str, true);
350  // If the string is empty, map it to NULL (mimicking Hive's behaviour)
351  partition_name_ss << (encoded_str.empty() ?
352  table_desc_->null_partition_key_value() : encoded_str);
353  }
354  partition_name_ss << "/";
355  }
356 
357  // partition_name_ss now holds the unique descriptor for this partition,
358  output_partition->partition_name = partition_name_ss.str();
359  BuildHdfsFileNames(partition_descriptor, output_partition);
360 
361  output_partition->hdfs_connection = hdfs_connection_;
362  output_partition->partition_descriptor = &partition_descriptor;
363 
364  bool allow_unsupported_formats =
365  state->query_options().__isset.allow_unsupported_formats &&
366  state->query_options().allow_unsupported_formats;
367  if (!allow_unsupported_formats) {
368  if (partition_descriptor.file_format() == THdfsFileFormat::SEQUENCE_FILE ||
369  partition_descriptor.file_format() == THdfsFileFormat::AVRO) {
370  stringstream error_msg;
371  map<int, const char*>::const_iterator i =
372  _THdfsFileFormat_VALUES_TO_NAMES.find(partition_descriptor.file_format());
373  error_msg << "Writing to table format " << i->second
374  << " is not supported. Use query option ALLOW_UNSUPPORTED_FORMATS"
375  " to override.";
376  return Status(error_msg.str());
377  }
378  if (partition_descriptor.file_format() == THdfsFileFormat::TEXT &&
379  state->query_options().__isset.compression_codec &&
380  state->query_options().compression_codec != THdfsCompression::NONE) {
381  stringstream error_msg;
382  error_msg << "Writing to compressed text table is not supported. "
383  "Use query option ALLOW_UNSUPPORTED_FORMATS to override.";
384  return Status(error_msg.str());
385  }
386  }
387 
388  // It is incorrect to initialize a writer if there are no rows to feed it. The writer
389  // could incorrectly create an empty file or empty partition.
391 
392  switch (partition_descriptor.file_format()) {
393  case THdfsFileFormat::TEXT:
394  output_partition->writer.reset(
396  this, state, output_partition, &partition_descriptor, table_desc_,
398  break;
399  case THdfsFileFormat::PARQUET:
400  output_partition->writer.reset(
402  this, state, output_partition, &partition_descriptor, table_desc_,
404  break;
405  case THdfsFileFormat::SEQUENCE_FILE:
406  output_partition->writer.reset(
408  this, state, output_partition, &partition_descriptor, table_desc_,
410  break;
411  case THdfsFileFormat::AVRO:
412  output_partition->writer.reset(
414  this, state, output_partition, &partition_descriptor, table_desc_,
416  break;
417  default:
418  stringstream error_msg;
419  map<int, const char*>::const_iterator i =
420  _THdfsFileFormat_VALUES_TO_NAMES.find(partition_descriptor.file_format());
421  if (i != _THdfsFileFormat_VALUES_TO_NAMES.end()) {
422  error_msg << "Cannot write to table with format " << i->second << ". "
423  << "Impala only supports writing to TEXT and PARQUET.";
424  } else {
425  error_msg << "Cannot write to table. Impala only supports writing to TEXT"
426  << " and PARQUET tables. (Unknown file format: "
427  << partition_descriptor.file_format() << ")";
428  }
429  return Status(error_msg.str());
430  }
431  RETURN_IF_ERROR(output_partition->writer->Init());
433  return CreateNewTmpFile(state, output_partition);
434 }
435 
436 void HdfsTableSink::GetHashTblKey(const vector<ExprContext*>& ctxs, string* key) {
437  stringstream hash_table_key;
438  for (int i = 0; i < ctxs.size(); ++i) {
440  ctxs[i]->GetValue(current_row_), ctxs[i]->root()->type(), &hash_table_key);
441  // Additionally append "/" to avoid accidental key collisions.
442  hash_table_key << "/";
443  }
444  *key = hash_table_key.str();
445 }
446 
448  RuntimeState* state, const string& key, PartitionPair** partition_pair) {
449  PartitionMap::iterator existing_partition;
450  existing_partition = partition_keys_to_output_partitions_.find(key);
451  if (existing_partition == partition_keys_to_output_partitions_.end()) {
452  // Create a new OutputPartition, and add it to
453  // partition_keys_to_output_partitions.
454  const HdfsPartitionDescriptor* partition_descriptor = default_partition_;
455  PartitionDescriptorMap::const_iterator it = partition_descriptor_map_.find(key);
456  if (it != partition_descriptor_map_.end()) {
457  partition_descriptor = it->second;
458  }
459 
460  OutputPartition* partition = state->obj_pool()->Add(new OutputPartition());
461  Status status = InitOutputPartition(state, *partition_descriptor, partition);
462  if (!status.ok()) {
463  // We failed to create the output partition successfully. Clean it up now
464  // as it is not added to partition_keys_to_output_partitions_ so won't be
465  // cleaned up in Close().
466  if (partition->writer.get() != NULL) partition->writer->Close();
467  return status;
468  }
469 
470  // Save the partition name so that the coordinator can create the partition directory
471  // structure if needed
472  DCHECK(state->per_partition_status()->find(partition->partition_name) ==
473  state->per_partition_status()->end());
474  TInsertPartitionStatus partition_status;
475  partition_status.__set_num_appended_rows(0L);
476  partition_status.__set_id(partition_descriptor->id());
477  partition_status.__set_stats(TInsertStats());
478  state->per_partition_status()->insert(
479  make_pair(partition->partition_name, partition_status));
480 
481  if (!has_empty_input_batch_) {
482  // Indicate that temporary directory is to be deleted after execution
483  (*state->hdfs_files_to_move())[partition->tmp_hdfs_dir_name] = "";
484  }
485 
486  partition_keys_to_output_partitions_[key].first = partition;
487  *partition_pair = &partition_keys_to_output_partitions_[key];
488  } else {
489  // Use existing output_partition partition.
490  *partition_pair = &existing_partition->second;
491  }
492  return Status::OK;
493 }
494 
495 Status HdfsTableSink::Send(RuntimeState* state, RowBatch* batch, bool eos) {
500  DCHECK(eos || batch->num_rows() > 0);
501  has_empty_input_batch_ = batch->num_rows() == 0 && eos;
502 
503  // If there are no partition keys then just pass the whole batch to one partition.
504  if (dynamic_partition_key_expr_ctxs_.empty()) {
505  // If there are no dynamic keys just use an empty key.
506  PartitionPair* partition_pair;
507  // Populate the partition_pair even if the input is empty because we need it to
508  // delete the existing data for 'insert overwrite'.
509  RETURN_IF_ERROR(GetOutputPartition(state, ROOT_PARTITION_KEY, &partition_pair));
510  if (!has_empty_input_batch_) {
511  // Pass the row batch to the writer. If new_file is returned true then the current
512  // file is finalized and a new file is opened.
513  // The writer tracks where it is in the batch when it returns with new_file set.
514  OutputPartition* output_partition = partition_pair->first;
515  bool new_file;
516  do {
517  RETURN_IF_ERROR(output_partition->writer->AppendRowBatch(
518  batch, partition_pair->second, &new_file));
519  if (new_file) {
520  RETURN_IF_ERROR(FinalizePartitionFile(state, output_partition));
521  RETURN_IF_ERROR(CreateNewTmpFile(state, output_partition));
522  }
523  } while (new_file);
524  }
525  } else {
526  for (int i = 0; i < batch->num_rows(); ++i) {
527  current_row_ = batch->GetRow(i);
528 
529  string key;
531  PartitionPair* partition_pair = NULL;
532  RETURN_IF_ERROR(GetOutputPartition(state, key, &partition_pair));
533  partition_pair->second.push_back(i);
534  }
535  for (PartitionMap::iterator partition = partition_keys_to_output_partitions_.begin();
536  partition != partition_keys_to_output_partitions_.end(); ++partition) {
537  OutputPartition* output_partition = partition->second.first;
538  if (partition->second.second.empty()) continue;
539 
540  bool new_file;
541  do {
542  RETURN_IF_ERROR(output_partition->writer->AppendRowBatch(
543  batch, partition->second.second, &new_file));
544  if (new_file) {
545  RETURN_IF_ERROR(FinalizePartitionFile(state, output_partition));
546  RETURN_IF_ERROR(CreateNewTmpFile(state, output_partition));
547  }
548  } while (new_file);
549  partition->second.second.clear();
550  }
551  }
552 
553  if (eos) {
554  // Close Hdfs files, and update stats in runtime state.
555  for (PartitionMap::iterator cur_partition =
557  cur_partition != partition_keys_to_output_partitions_.end();
558  ++cur_partition) {
559  RETURN_IF_ERROR(FinalizePartitionFile(state, cur_partition->second.first));
560  }
561  }
562  return Status::OK;
563 }
564 
566  OutputPartition* partition) {
567  if (partition->tmp_hdfs_file == NULL && !overwrite_) return Status::OK;
568  SCOPED_TIMER(ADD_TIMER(profile(), "FinalizePartitionFileTimer"));
569 
570  // OutputPartition writer could be NULL if there is no row to output.
571  if (partition->writer.get() != NULL) {
572  RETURN_IF_ERROR(partition->writer->Finalize());
573 
574  // Track total number of appended rows per partition in runtime
575  // state. partition->num_rows counts number of rows appended is per-file.
576  PartitionStatusMap::iterator it =
577  state->per_partition_status()->find(partition->partition_name);
578 
579  // Should have been created in GetOutputPartition() when the partition was initialised.
580  DCHECK(it != state->per_partition_status()->end());
581  it->second.num_appended_rows += partition->num_rows;
582  DataSink::MergeInsertStats(partition->writer->stats(), &it->second.stats);
583  }
584 
585  ClosePartitionFile(state, partition);
586  return Status::OK;
587 }
588 
590  if (partition->tmp_hdfs_file == NULL) return;
591  int hdfs_ret = hdfsCloseFile(hdfs_connection_, partition->tmp_hdfs_file);
592  VLOG_FILE << "hdfsCloseFile() file=" << partition->current_file_name;
593  if (hdfs_ret != 0) {
594  state->LogError(ErrorMsg(TErrorCode::GENERAL,
595  GetHdfsErrorMsg("Failed to close HDFS file: ",
596  partition->current_file_name)));
597  }
598  partition->tmp_hdfs_file = NULL;
600 }
601 
603  if (closed_) return;
605  for (PartitionMap::iterator cur_partition =
607  cur_partition != partition_keys_to_output_partitions_.end();
608  ++cur_partition) {
609  if (cur_partition->second.first->writer.get() != NULL) {
610  cur_partition->second.first->writer->Close();
611  }
612  ClosePartitionFile(state, cur_partition->second.first);
613  }
615 
616  // Close literal partition key exprs
617  BOOST_FOREACH(
618  const HdfsTableDescriptor::PartitionIdToDescriptorMap::value_type& id_to_desc,
620  HdfsPartitionDescriptor* partition = id_to_desc.second;
621  partition->CloseExprs(state);
622  }
625  closed_ = true;
626 }
627 
628 Status HdfsTableSink::GetFileBlockSize(OutputPartition* output_partition, int64_t* size) {
629  hdfsFileInfo* info = hdfsGetPathInfo(output_partition->hdfs_connection,
630  output_partition->current_file_name.c_str());
631 
632  if (info == NULL) {
633  return Status(GetHdfsErrorMsg("Failed to get info on temporary HDFS file: ",
634  output_partition->current_file_name));
635  }
636 
637  *size = info->mBlockSize;
638  hdfsFreeFileInfo(info, 1);
639 
640  return Status::OK;
641 }
642 
644  stringstream out;
645  out << "HdfsTableSink(overwrite=" << (overwrite_ ? "true" : "false")
646  << " table_desc=" << table_desc_->DebugString()
647  << " partition_key_exprs=" << Expr::DebugString(partition_key_expr_ctxs_)
648  << " output_exprs=" << Expr::DebugString(output_expr_ctxs_)
649  << ")";
650  return out.str();
651 }
652 
653 }
virtual std::string DebugString() const
Definition: descriptors.cc:190
static Status GetFileBlockSize(OutputPartition *output_partition, int64_t *size)
TableId table_id_
Table id resolved in Prepare() to set tuple_desc_;.
boost::scoped_ptr< MemTracker > expr_mem_tracker_
Definition: data-sink.h:85
int num_rows() const
Definition: row-batch.h:215
THdfsFileFormat::type file_format() const
Definition: descriptors.h:184
std::vector< ExprContext * > output_expr_ctxs_
Exprs that materialize output values.
Status OpenExprs(RuntimeState *state)
Definition: descriptors.cc:145
const TUniqueId & query_id() const
std::string final_hdfs_file_name_prefix
virtual RuntimeProfile * profile()
Returns the runtime profile for the sink.
void BuildHdfsFileNames(const HdfsPartitionDescriptor &partition_descriptor, OutputPartition *output)
static void PrintValueAsBytes(const void *value, const ColumnType &type, std::stringstream *stream)
Writes the byte representation of a value to a stringstream character-by-character.
Definition: raw-value.cc:28
static bool Eq(const void *v1, const void *v2, const ColumnType &type)
Definition: raw-value.h:106
Status PrepareExprs(RuntimeState *state)
Definition: descriptors.cc:134
static Status Open(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for opening multiple expr trees.
FileMoveMap * hdfs_files_to_move()
Status PrepareExprs(RuntimeState *state)
Initialise and prepare select and partition key expressions.
TableDescriptor * GetTableDescriptor(TableId id) const
Definition: descriptors.cc:427
PartitionDescriptorMap partition_descriptor_map_
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
virtual void Close(RuntimeState *state)
TupleRow * GetRow(int row_idx)
Definition: row-batch.h:140
RuntimeProfile::Counter * partitions_created_counter_
Status GetOutputPartition(RuntimeState *state, const std::string &key, PartitionPair **partition_pair)
#define ADD_TIMER(profile, name)
const std::string & hdfs_base_dir() const
Definition: descriptors.h:229
PartitionStatusMap * per_partition_status()
int32_t num_files
Number of files created in this partition.
string PrintId(const TUniqueId &id, const string &separator)
Definition: debug-util.cc:97
#define COUNTER_ADD(c, v)
std::string partition_name
key1=val1/key2=val2/ etc. Used to identify partitions to the metastore.
#define SCOPED_TIMER(c)
std::string DebugString() const
const HdfsPartitionDescriptor * partition_descriptor
The descriptor for this partition.
static void Close(const std::vector< ExprContext * > &ctxs, RuntimeState *state)
Convenience function for closing multiple expr trees.
static void UrlEncode(const char *in, int in_len, string *out, bool hive_compat)
Definition: url-coding.cc:48
static HdfsFsCache * instance()
Definition: hdfs-fs-cache.h:43
static IntGauge * NUM_FILES_OPEN_FOR_INSERT
#define VLOG_QUERY
Definition: logging.h:57
RuntimeProfile::Counter * rows_inserted_counter_
const HdfsPartitionDescriptor * default_partition_
Currently this is the default partition since we don't support multi-format sinks.
hdfsFS hdfs_connection
Connection to hdfs.
hdfsFS hdfs_connection_
Connection to hdfs, established in Open() and closed in Close().
const HdfsTableDescriptor * table_desc_
Descriptor of target table. Set in Prepare().
bool LogError(const ErrorMsg &msg)
const TQueryOptions & query_options() const
Definition: runtime-state.h:95
void CloseExprs(RuntimeState *state)
Definition: descriptors.cc:151
void ClosePartitionFile(RuntimeState *state, OutputPartition *partition)
Closes the hdfs file for this partition as well as the writer.
const std::string & location() const
Definition: descriptors.h:189
Status CreateNewTmpFile(RuntimeState *state, OutputPartition *output_partition)
std::vector< ExprContext * > partition_key_expr_ctxs_
Exprs of partition keys.
ObjectPool * obj_pool() const
Definition: runtime-state.h:92
RuntimeProfile::Counter * encode_timer_
Time spent converting tuple to on disk format.
int num_clustering_cols() const
Definition: descriptors.h:153
const PartitionIdToDescriptorMap & partition_descriptors() const
Definition: descriptors.h:245
virtual Status Prepare(RuntimeState *state)
Prepares output_exprs and partition_key_exprs, and connects to HDFS.
hdfsFile tmp_hdfs_file
Hdfs file at tmp_hdfs_file_name.
static const string & ROOT_PARTITION_KEY
bool overwrite_
Indicates whether the existing partitions should be overwritten.
#define ADD_COUNTER(profile, name, unit)
RuntimeProfile::Counter * files_created_counter_
RuntimeProfile * runtime_profile_
Allocated from runtime state's pool.
const TUniqueId & fragment_instance_id() const
std::pair< OutputPartition *, std::vector< int32_t > > PartitionPair
static void MergeInsertStats(const TInsertStats &src_stats, TInsertStats *dst_stats)
Definition: data-sink.cc:90
const DescriptorTbl & desc_tbl() const
Definition: runtime-state.h:93
This class is thread-safe.
Definition: mem-tracker.h:61
void GetHashTblKey(const std::vector< ExprContext * > &ctxs, std::string *key)
const std::vector< std::string > & col_names() const
Definition: descriptors.h:165
int64_t num_rows
Records number of rows appended to the current file in this partition.
boost::scoped_ptr< HdfsTableWriter > writer
Table format specific writer functions.
TupleRow * current_row_
Current row from the current RowBatch to output.
const RowDescriptor & row_desc() const
const std::vector< TExpr > & select_list_texprs_
MemTracker * instance_mem_tracker()
RuntimeProfile::Counter * compress_timer_
Time spent compressing data.
RuntimeProfile::Counter * hdfs_write_timer_
Time spent writing to hdfs.
virtual Status Send(RuntimeState *state, RowBatch *batch, bool eos)
Append all rows in batch to the temporary Hdfs files corresponding to partitions. ...
static const Status OK
Definition: status.h:87
Status FinalizePartitionFile(RuntimeState *state, OutputPartition *partition)
const RowDescriptor * row_desc_
owned by plan root, which resides in runtime_state_'s pool
Definition: coordinator.h:255
std::vector< ExprContext * > dynamic_partition_key_expr_ctxs_
Metadata for a single partition inside an Hdfs table.
Definition: descriptors.h:177
static Status CreateExprTrees(ObjectPool *pool, const std::vector< TExpr > &texprs, std::vector< ExprContext * > *ctxs)
Definition: expr.cc:149
boost::scoped_ptr< MemTracker > mem_tracker_
virtual Status Prepare(RuntimeState *state)
Definition: data-sink.cc:136
static Status Prepare(const std::vector< ExprContext * > &ctxs, RuntimeState *state, const RowDescriptor &row_desc, MemTracker *tracker)
#define VLOG_FILE
Definition: logging.h:58
bool ok() const
Definition: status.h:172
Status InitOutputPartition(RuntimeState *state, const HdfsPartitionDescriptor &partition_descriptor, OutputPartition *output_partition)
Initialises the filenames of a given output partition, and opens the temporary file.
string GetHdfsErrorMsg(const string &prefix, const string &file)
Definition: hdfs-util.cc:26
string name
Definition: cpu-info.cc:50
virtual std::string DebugString() const
Definition: expr.cc:385
RuntimeProfile::Counter * bytes_written_counter_
const std::string & null_partition_key_value() const
Definition: descriptors.h:230
std::string tmp_hdfs_file_name_prefix
virtual Status Open(RuntimeState *state)
PartitionMap partition_keys_to_output_partitions_
const std::vector< TExpr > & partition_key_texprs_
const std::vector< ExprContext * > & partition_key_value_ctxs() const
Definition: descriptors.h:185
const RowDescriptor & row_desc_
Row descriptor of row batches passed in Send(). Set in c'tor.
Counter * total_time_counter()
Returns the counter for the total elapsed time.