18 #include <boost/foreach.hpp>
26 using namespace impala;
29 : op_(op), src_(src), op_set_(op_set) {
31 DCHECK(!
src_.empty());
37 DCHECK(!
src_.empty());
38 DCHECK(!
dst_.empty());
43 : op_(op), src_(src), permissions_(permissions), op_set_(op_set) {
45 DCHECK(!
src_.empty());
57 err = hdfsDelete(*hdfs_connection,
src_.c_str(), 1);
61 err = hdfsCreateDirectory(*hdfs_connection,
src_.c_str());
65 err = hdfsRename(*hdfs_connection,
src_.c_str(),
dst_.c_str());
67 <<
" dst_file=" <<
dst_.c_str();
70 err = hdfsDelete(*hdfs_connection,
src_.c_str(), 1);
73 err = hdfsCreateDirectory(*hdfs_connection,
src_.c_str());
89 ss <<
"DELETE " <<
src_;
92 ss <<
"CREATE_DIR " <<
src_;
95 ss <<
"RENAME " << src_ <<
" TO " <<
dst_;
98 ss <<
"DELETE_THEN_CREATE " <<
src_;
104 ss <<
") failed, error was: " << error_msg;
120 uint32_t max_queue_length) {
126 : num_ops_(0L), hdfs_connection_(hdfs_connection) {
130 bool abort_on_error) {
135 int64_t num_ops =
ops_.size();
136 if (num_ops == 0)
return true;
154 ops_.push_back(
HdfsOp(op, src, permissions,
this));
159 errors_.push_back(make_pair(op, err));
T UpdateAndFetch(T delta)
Increments by delta (i.e. += delta) and returns the new val.
ThreadPool< HdfsOp > HdfsOpThreadPool
std::string src_
First operand.
bool abort_on_error_
True if a single error should cause any subsequent operations to become no-ops.
std::string dst_
Second string operand, ignored except for RENAME.
std::vector< HdfsOp > ops_
The set of operations to be submitted to HDFS.
HdfsOp()
Required for ThreadPool.
void Add(HdfsOpType op, const std::string &src)
boost::mutex errors_lock_
Protects errors_ and abort_on_error_ during Execute.
bool Offer(const T &work)
void AddError(const std::string &err, const HdfsOp *op)
Called by HdfsOp to record an error.
Errors errors_
All errors produced during Execute.
HdfsOpType op_
The kind of operation to execute.
AtomicInt< int64_t > num_ops_
void HdfsThreadPoolHelper(int thread_id, const HdfsOp &op)
HdfsOperationSet(hdfsFS *hdfs_connection)
bool Execute(HdfsOpThreadPool *pool, bool abort_on_error)
short permissions_
Permission operand, ignored except for CHMOD.
hdfsFS * hdfs_connection() const
HdfsOpThreadPool * CreateHdfsOpThreadPool(const std::string &name, uint32_t num_threads, uint32_t max_queue_length)
Creates a new HdfsOp-processing thread pool.
HdfsOperationSet * op_set_
Containing operation set, used to record errors and to signal completion.