Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
hdfs-fs-cache.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "runtime/hdfs-fs-cache.h"
16 
17 #include <boost/thread/locks.hpp>
18 #include <gutil/strings/substitute.h>
19 
20 #include "common/logging.h"
21 #include "util/debug-util.h"
22 #include "util/error-util.h"
23 #include "util/hdfs-util.h"
24 #include "util/test-info.h"
25 
26 #include "common/names.h"
27 
28 using namespace strings;
29 
30 namespace impala {
31 
32 scoped_ptr<HdfsFsCache> HdfsFsCache::instance_;
33 
34 void HdfsFsCache::Init() {
35  DCHECK(HdfsFsCache::instance_.get() == NULL);
36  HdfsFsCache::instance_.reset(new HdfsFsCache());
37 }
38 
39 Status HdfsFsCache::GetConnection(const string& path, hdfsFS* fs,
40  HdfsFsMap* local_cache) {
41  string err;
42  const string& namenode = GetNameNodeFromPath(path, &err);
43  if (!err.empty()) return Status(err);
44  DCHECK(!namenode.empty());
45 
46  // First, check the local cache to avoid taking the global lock.
47  if (local_cache != NULL) {
48  HdfsFsMap::iterator local_iter = local_cache->find(namenode);
49  if (local_iter != local_cache->end()) {
50  *fs = local_iter->second;
51  return Status::OK;
52  }
53  }
54  // Otherwise, check the global cache.
55  {
56  lock_guard<mutex> l(lock_);
57  HdfsFsMap::iterator i = fs_map_.find(namenode);
58  if (i == fs_map_.end()) {
59  hdfsBuilder* hdfs_builder = hdfsNewBuilder();
60  hdfsBuilderSetNameNode(hdfs_builder, namenode.c_str());
61  *fs = hdfsBuilderConnect(hdfs_builder);
62  if (*fs == NULL) {
63  return Status(GetHdfsErrorMsg("Failed to connect to FS: ", namenode));
64  }
65  fs_map_.insert(make_pair(namenode, *fs));
66  } else {
67  *fs = i->second;
68  }
69  }
70  DCHECK_NOTNULL(*fs);
71  // Populate the local cache for the next lookup.
72  if (local_cache != NULL) {
73  local_cache->insert(make_pair(namenode, *fs));
74  }
75  return Status::OK;
76 }
77 
78 Status HdfsFsCache::GetLocalConnection(hdfsFS* fs) {
79  return GetConnection("file:///", fs);
80 }
81 
82 string HdfsFsCache::GetNameNodeFromPath(const string& path, string* err) {
83  string namenode;
84  const string local_fs("file:/");
85  size_t n = path.find("://");
86 
87  err->clear();
88  if (n == string::npos) {
89  if (path.compare(0, local_fs.length(), local_fs) == 0) {
90  // Hadoop Path routines strip out consecutive /'s, so recognize 'file:/blah'.
91  namenode = "file:///";
92  } else {
93  // Path is not qualified, so use the default FS.
94  namenode = "default";
95  }
96  } else if (n == 0) {
97  *err = Substitute("Path missing scheme: $0", path);
98  } else {
99  // Path is qualified, i.e. "scheme://authority/path/to/file". Extract
100  // "scheme://authority/".
101  n = path.find('/', n + 3);
102  if (n == string::npos) {
103  *err = Substitute("Path missing '/' after authority: $0", path);
104  } else {
105  // Include the trailing '/' for local filesystem case, i.e. "file:///".
106  namenode = path.substr(0, n + 1);
107  }
108  }
109  return namenode;
110 }
111 
112 }
string path("/usr/lib/sasl2:/usr/lib64/sasl2:/usr/local/lib/sasl2:/usr/lib/x86_64-linux-gnu/sasl2")
boost::unordered_map< std::string, hdfsFS > HdfsFsMap
Definition: hdfs-fs-cache.h:41
mutex lock_
string GetHdfsErrorMsg(const string &prefix, const string &file)
Definition: hdfs-util.cc:26