Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hdfs-bulk-ops.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "util/hdfs-bulk-ops.h"
16 
17 #include <vector>
18 #include <boost/foreach.hpp>
19 
20 #include "util/debug-util.h"
21 #include "util/error-util.h"
22 #include "util/hdfs-util.h"
23 
24 #include "common/names.h"
25 
26 using namespace impala;
27 
28 HdfsOp::HdfsOp(HdfsOpType op, const std::string& src, HdfsOperationSet* op_set)
29  : op_(op), src_(src), op_set_(op_set) {
30  DCHECK(op != RENAME);
31  DCHECK(!src_.empty());
32 }
33 
34 HdfsOp::HdfsOp(HdfsOpType op, const std::string& src, const std::string& dst,
35  HdfsOperationSet* op_set) : op_(op), src_(src), dst_(dst), op_set_(op_set) {
36  DCHECK(op == RENAME);
37  DCHECK(!src_.empty());
38  DCHECK(!dst_.empty());
39 }
40 
41 HdfsOp::HdfsOp(HdfsOpType op, const string& src, short permissions,
42  HdfsOperationSet* op_set)
43  : op_(op), src_(src), permissions_(permissions), op_set_(op_set) {
44  DCHECK(op == CHMOD);
45  DCHECK(!src_.empty());
46 }
47 
48 // Required for ThreadPool
50 
51 void HdfsOp::Execute() const {
52  if (op_set_->ShouldAbort()) return;
53  int err = 0;
54  hdfsFS* hdfs_connection = op_set_->hdfs_connection();
55  switch (op_) {
56  case DELETE:
57  err = hdfsDelete(*hdfs_connection, src_.c_str(), 1);
58  VLOG_FILE << "hdfsDelete() file=" << src_.c_str();
59  break;
60  case CREATE_DIR:
61  err = hdfsCreateDirectory(*hdfs_connection, src_.c_str());
62  VLOG_FILE << "hdfsCreateDirectory() file=" << src_.c_str();
63  break;
64  case RENAME:
65  err = hdfsRename(*hdfs_connection, src_.c_str(), dst_.c_str());
66  VLOG_FILE << "hdfsRename() src_file=" << src_.c_str()
67  << " dst_file=" << dst_.c_str();
68  break;
69  case DELETE_THEN_CREATE:
70  err = hdfsDelete(*hdfs_connection, src_.c_str(), 1);
71  VLOG_FILE << "hdfsDelete() file=" << src_.c_str();
72  if (err != -1) {
73  err = hdfsCreateDirectory(*hdfs_connection, src_.c_str());
74  VLOG_FILE << "hdfsCreateDirectory() file=" << src_.c_str();
75  }
76  break;
77  case CHMOD:
78  err = hdfsChmod(*hdfs_connection, src_.c_str(), permissions_);
79  VLOG_FILE << "hdfsChmod() file=" << src_.c_str();
80  break;
81  }
82 
83  if (err == -1) {
84  string error_msg = GetStrErrMsg();
85  stringstream ss;
86  ss << "Hdfs op (";
87  switch (op_) {
88  case DELETE:
89  ss << "DELETE " << src_;
90  break;
91  case CREATE_DIR:
92  ss << "CREATE_DIR " << src_;
93  break;
94  case RENAME:
95  ss << "RENAME " << src_ << " TO " << dst_;
96  break;
97  case DELETE_THEN_CREATE:
98  ss << "DELETE_THEN_CREATE " << src_;
99  break;
100  case CHMOD:
101  ss << "CHMOD " << src_ << " " << oct << permissions_;
102  break;
103  }
104  ss << ") failed, error was: " << error_msg;
105 
106  op_set_->AddError(ss.str(), this);
107  }
108 
110 }
111 
112 // Utility method to convert from a thread-pool signature to HdfsOp::Execute
113 void HdfsThreadPoolHelper(int thread_id, const HdfsOp& op) {
114  op.Execute();
115 }
116 
117 // Factory method to create a new thread pool (required because only this file has access
118 // to the definition of HdfsThreadPoolHelper).
119 HdfsOpThreadPool* impala::CreateHdfsOpThreadPool(const string& name, uint32_t num_threads,
120  uint32_t max_queue_length) {
121  return new HdfsOpThreadPool(name, "hdfs-worker", num_threads,
122  max_queue_length, &HdfsThreadPoolHelper);
123 }
124 
125 HdfsOperationSet::HdfsOperationSet(hdfsFS* hdfs_connection)
126  : num_ops_(0L), hdfs_connection_(hdfs_connection) {
127 }
128 
130  bool abort_on_error) {
131  {
132  lock_guard<mutex> l(errors_lock_);
133  abort_on_error_ = abort_on_error;
134  }
135  int64_t num_ops = ops_.size();
136  if (num_ops == 0) return true;
137  num_ops_ += num_ops;
138 
139  BOOST_FOREACH(const HdfsOp& op, ops_) {
140  pool->Offer(op);
141  }
142  return promise_.Get();
143 }
144 
145 void HdfsOperationSet::Add(HdfsOpType op, const string& src) {
146  ops_.push_back(HdfsOp(op, src, this));
147 }
148 
149 void HdfsOperationSet::Add(HdfsOpType op, const string& src, const string& dst) {
150  ops_.push_back(HdfsOp(op, src, dst, this));
151 }
152 
153 void HdfsOperationSet::Add(HdfsOpType op, const string& src, short permissions) {
154  ops_.push_back(HdfsOp(op, src, permissions, this));
155 }
156 
157 void HdfsOperationSet::AddError(const string& err, const HdfsOp* op) {
158  lock_guard<mutex> l(errors_lock_);
159  errors_.push_back(make_pair(op, err));
160 }
161 
163  if (num_ops_.UpdateAndFetch(-1) == 0) {
164  promise_.Set(errors().size() == 0);
165  }
166 }
167 
169  lock_guard<mutex> l(errors_lock_);
170  return abort_on_error_ && !errors_.empty();
171 }
T UpdateAndFetch(T delta)
Increments by delta (i.e. += delta) and returns the new val.
Definition: atomic.h:105
void Set(const T &val)
Definition: promise.h:38
ThreadPool< HdfsOp > HdfsOpThreadPool
Definition: hdfs-bulk-ops.h:78
std::string src_
First operand.
Definition: hdfs-bulk-ops.h:66
void Execute() const
bool abort_on_error_
True if a single error should cause any subsequent operations to become no-ops.
std::string dst_
Second string operand, ignored except for RENAME.
Definition: hdfs-bulk-ops.h:69
std::vector< HdfsOp > ops_
The set of operations to be submitted to HDFS.
const Errors & errors()
HdfsOp()
Required for ThreadPool.
void Add(HdfsOpType op, const std::string &src)
boost::mutex errors_lock_
Protects errors_ and abort_on_error_ during Execute.
bool Offer(const T &work)
Definition: thread-pool.h:74
void AddError(const std::string &err, const HdfsOp *op)
Called by HdfsOp to record an error.
Errors errors_
All errors produced during Execute.
string GetStrErrMsg()
Definition: error-util.cc:30
ObjectPool pool
HdfsOpType op_
The kind of operation to execute.
Definition: hdfs-bulk-ops.h:63
AtomicInt< int64_t > num_ops_
void HdfsThreadPoolHelper(int thread_id, const HdfsOp &op)
const T & Get()
Definition: promise.h:59
HdfsOperationSet(hdfsFS *hdfs_connection)
bool Execute(HdfsOpThreadPool *pool, bool abort_on_error)
short permissions_
Permission operand, ignored except for CHMOD.
Definition: hdfs-bulk-ops.h:72
Promise< bool > promise_
#define VLOG_FILE
Definition: logging.h:58
hdfsFS * hdfs_connection() const
HdfsOpThreadPool * CreateHdfsOpThreadPool(const std::string &name, uint32_t num_threads, uint32_t max_queue_length)
Creates a new HdfsOp-processing thread pool.
string name
Definition: cpu-info.cc:50
HdfsOperationSet * op_set_
Containing operation set, used to record errors and to signal completion.
Definition: hdfs-bulk-ops.h:75