Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
lib-cache.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "runtime/lib-cache.h"
16 
17 #include <boost/filesystem.hpp>
18 #include <boost/foreach.hpp>
19 #include <boost/thread/locks.hpp>
20 
21 #include "codegen/llvm-codegen.h"
22 #include "runtime/hdfs-fs-cache.h"
23 #include "runtime/runtime-state.h"
24 #include "util/dynamic-util.h"
25 #include "util/hash-util.h"
26 #include "util/hdfs-util.h"
27 #include "util/path-builder.h"
28 #include "util/test-info.h"
29 
30 #include "common/names.h"
31 
32 namespace filesystem = boost::filesystem;
33 using namespace impala;
34 
35 DEFINE_string(local_library_dir, "/tmp",
36  "Local directory to copy UDF libraries from HDFS into");
37 
38 scoped_ptr<LibCache> LibCache::instance_;
39 
41  // Lock protecting all fields in this entry
42  boost::mutex lock;
43 
44  // The number of users that are using this cache entry. If this is
45  // a .so, we can't dlclose unless the use_count goes to 0.
46  int use_count;
47 
48  // If true, this cache entry should be removed from lib_cache_ when
49  // the use_count goes to 0.
51 
52  // If true, we need to check if there is a newer version of the cached library in HDFS
53  // on next access. Should hold lock_ to read/write.
55 
56  // The type of this file.
58 
59  // The path on the local file system for this library.
60  std::string local_path;
61 
62  // Status returned from copying this file from HDFS.
64 
65  // The last modification time of the HDFS file in seconds.
66  time_t last_mod_time;
67 
68  // Handle from dlopen.
70 
71  // mapping from symbol => address of loaded symbol.
72  // Only used if the type is TYPE_SO.
73  typedef boost::unordered_map<std::string, void*> SymbolMap;
75 
76  // Set of symbols in this entry. This is populated once on load and read
77  // only. This is only used if it is a llvm module.
78  // TODO: it would be nice to be able to do this for .so's as well but it's
79  // not trivial to walk an .so for the symbol table.
80  boost::unordered_set<std::string> symbols;
81 
82  // Set if an error occurs loading the cache entry before the cache entry
83  // can be evicted. This allows other threads that attempt to use the entry
84  // before it is removed to return the same error.
86 
88  shared_object_handle(NULL) {}
90 };
91 
92 LibCache::LibCache() : current_process_handle_(NULL) {
93 }
94 
96  DropCache();
98 }
99 
101  DCHECK(LibCache::instance_.get() == NULL);
102  LibCache::instance_.reset(new LibCache());
103  return LibCache::instance_->InitInternal();
104 }
105 
107  if (TestInfo::is_fe_test()) {
108  // In the FE tests, NULL gives the handle to the java process.
109  // Explicitly load the fe-support shared object.
110  string fe_support_path;
111  PathBuilder::GetFullBuildPath("service/libfesupport.so", &fe_support_path);
112  RETURN_IF_ERROR(DynamicOpen(fe_support_path.c_str(), &current_process_handle_));
113  } else {
115  }
116  DCHECK(current_process_handle_ != NULL)
117  << "We should always be able to get current process handle.";
118  return Status::OK;
119 }
120 
122  if (shared_object_handle != NULL) {
123  DCHECK_EQ(use_count, 0);
124  DCHECK(should_remove);
126  }
127  unlink(local_path.c_str());
128 }
129 
130 Status LibCache::GetSoFunctionPtr(const string& hdfs_lib_file, const string& symbol,
131  void** fn_ptr, LibCacheEntry** ent, bool quiet) {
132  if (hdfs_lib_file.empty()) {
133  // Just loading a function ptr in the current process. No need to take any locks.
134  DCHECK(current_process_handle_ != NULL);
135  RETURN_IF_ERROR(DynamicLookup(current_process_handle_, symbol.c_str(), fn_ptr, quiet));
136  return Status::OK;
137  }
138 
139  LibCacheEntry* entry = NULL;
140  unique_lock<mutex> lock;
141  if (ent != NULL && *ent != NULL) {
142  // Reuse already-cached entry provided by user
143  entry = *ent;
144  unique_lock<mutex> l(entry->lock);
145  lock.swap(l);
146  } else {
147  RETURN_IF_ERROR(GetCacheEntry(hdfs_lib_file, TYPE_SO, &lock, &entry));
148  }
149  DCHECK(entry != NULL);
150  DCHECK_EQ(entry->type, TYPE_SO);
151 
152  LibCacheEntry::SymbolMap::iterator it = entry->symbol_cache.find(symbol);
153  if (it != entry->symbol_cache.end()) {
154  *fn_ptr = it->second;
155  } else {
157  DynamicLookup(entry->shared_object_handle, symbol.c_str(), fn_ptr, quiet));
158  entry->symbol_cache[symbol] = *fn_ptr;
159  }
160 
161  DCHECK(*fn_ptr != NULL);
162  if (ent != NULL && *ent == NULL) {
163  // Only set and increment user's entry if it wasn't already cached
164  *ent = entry;
165  ++(*ent)->use_count;
166  }
167  return Status::OK;
168 }
169 
170 void LibCache::DecrementUseCount(LibCacheEntry* entry) {
171  if (entry == NULL) return;
172  bool can_delete = false;
173  {
174  unique_lock<mutex> lock(entry->lock);;
175  --entry->use_count;
176  can_delete = (entry->use_count == 0 && entry->should_remove);
177  }
178  if (can_delete) delete entry;
179 }
180 
181 Status LibCache::GetLocalLibPath(const string& hdfs_lib_file, LibType type,
182  string* local_path) {
183  unique_lock<mutex> lock;
184  LibCacheEntry* entry = NULL;
185  RETURN_IF_ERROR(GetCacheEntry(hdfs_lib_file, type, &lock, &entry));
186  DCHECK(entry != NULL);
187  DCHECK_EQ(entry->type, type);
188  *local_path = entry->local_path;
189  return Status::OK;
190 }
191 
192 Status LibCache::CheckSymbolExists(const string& hdfs_lib_file, LibType type,
193  const string& symbol, bool quiet) {
194  if (type == TYPE_SO) {
195  void* dummy_ptr = NULL;
196  return GetSoFunctionPtr(hdfs_lib_file, symbol, &dummy_ptr, NULL, quiet);
197  } else if (type == TYPE_IR) {
198  unique_lock<mutex> lock;
199  LibCacheEntry* entry = NULL;
200  RETURN_IF_ERROR(GetCacheEntry(hdfs_lib_file, type, &lock, &entry));
201  DCHECK(entry != NULL);
202  DCHECK_EQ(entry->type, TYPE_IR);
203  if (entry->symbols.find(symbol) == entry->symbols.end()) {
204  stringstream ss;
205  ss << "Symbol '" << symbol << "' does not exist in module: " << hdfs_lib_file
206  << " (local path: " << entry->local_path << ")";
207  return quiet ? Status::Expected(ss.str()) : Status(ss.str());
208  }
209  return Status::OK;
210  } else if (type == TYPE_JAR) {
211  // TODO: figure out how to inspect contents of jars
212  unique_lock<mutex> lock;
213  LibCacheEntry* dummy_entry = NULL;
214  return GetCacheEntry(hdfs_lib_file, type, &lock, &dummy_entry);
215  } else {
216  DCHECK(false);
217  return Status("Shouldn't get here.");
218  }
219 }
220 
221 void LibCache::SetNeedsRefresh(const string& hdfs_lib_file) {
222  unique_lock<mutex> lib_cache_lock(lock_);
223  LibMap::iterator it = lib_cache_.find(hdfs_lib_file);
224  if (it == lib_cache_.end()) return;
225  LibCacheEntry* entry = it->second;
226 
227  unique_lock<mutex> entry_lock(entry->lock);
228  // Need to hold lock_ before setting check_needs_refresh.
229  entry->check_needs_refresh = true;
230 }
231 
232 void LibCache::RemoveEntry(const string& hdfs_lib_file) {
233  unique_lock<mutex> lib_cache_lock(lock_);
234  LibMap::iterator it = lib_cache_.find(hdfs_lib_file);
235  if (it == lib_cache_.end()) return;
236  RemoveEntryInternal(hdfs_lib_file, it);
237 }
238 
239 void LibCache::RemoveEntryInternal(const string& hdfs_lib_file,
240  const LibMap::iterator& entry_iter) {
241  LibCacheEntry* entry = entry_iter->second;
242  VLOG(1) << "Removing lib cache entry: " << hdfs_lib_file
243  << ", local path: " << entry->local_path;
244  unique_lock<mutex> entry_lock(entry->lock);
245 
246  // We have both locks so no other thread can be updating lib_cache_ or trying to get
247  // the entry.
248  lib_cache_.erase(entry_iter);
249 
250  entry->should_remove = true;
251  DCHECK_GE(entry->use_count, 0);
252  bool can_delete = entry->use_count == 0;
253 
254  // Now that the entry is removed from the map, it means no future threads
255  // can find it->second (the entry), so it is safe to unlock.
256  entry_lock.unlock();
257 
258  // Now that we've unlocked, we can delete this entry if no one is using it.
259  if (can_delete) delete entry;
260 }
261 
262 void LibCache::DropCache() {
263  unique_lock<mutex> lib_cache_lock(lock_);
264  BOOST_FOREACH(LibMap::value_type& v, lib_cache_) {
265  bool can_delete = false;
266  {
267  // Lock to wait for any threads currently processing the entry.
268  unique_lock<mutex> entry_lock(v.second->lock);
269  v.second->should_remove = true;
270  DCHECK_GE(v.second->use_count, 0);
271  can_delete = v.second->use_count == 0;
272  }
273  VLOG(1) << "Removed lib cache entry: " << v.first;
274  if (can_delete) delete v.second;
275  }
276  lib_cache_.clear();
277 }
278 
279 Status LibCache::GetCacheEntry(const string& hdfs_lib_file, LibType type,
280  unique_lock<mutex>* entry_lock, LibCacheEntry** entry) {
281  Status status;
282  {
283  // If an error occurs, local_entry_lock is released before calling RemoveEntry()
284  // below because it takes the global lock_ which must be acquired before taking entry
285  // locks.
286  unique_lock<mutex> local_entry_lock;
287  status = GetCacheEntryInternal(hdfs_lib_file, type, &local_entry_lock, entry);
288  if (status.ok()) {
289  entry_lock->swap(local_entry_lock);
290  return status;
291  }
292  if (*entry == NULL) return status;
293 
294  // Set loading_status on the entry so that if another thread calls
295  // GetCacheEntry() for this lib before this thread is able to acquire lock_ in
296  // RemoveEntry(), it is able to return the same error.
297  (*entry)->loading_status = status;
298  }
299  // Takes lock_
300  RemoveEntry(hdfs_lib_file);
301  return status;
302 }
303 
304 Status LibCache::GetCacheEntryInternal(const string& hdfs_lib_file, LibType type,
305  unique_lock<mutex>* entry_lock, LibCacheEntry** entry) {
306  DCHECK(!hdfs_lib_file.empty());
307  *entry = NULL;
308 
309  // Check if this file is already cached or an error occured on another thread while
310  // loading the library.
311  unique_lock<mutex> lib_cache_lock(lock_);
312  LibMap::iterator it = lib_cache_.find(hdfs_lib_file);
313  if (it != lib_cache_.end()) {
314  {
315  unique_lock<mutex> local_entry_lock((it->second)->lock);
316  if (!(it->second)->loading_status.ok()) {
317  // If loading_status is already set, the returned *entry should be NULL.
318  DCHECK(*entry == NULL);
319  return (it->second)->loading_status;
320  }
321  }
322 
323  *entry = it->second;
324  if ((*entry)->check_needs_refresh) {
325  // Check if file has been modified since loading the cached copy. If so, remove the
326  // cached entry and create a new one.
327  (*entry)->check_needs_refresh = false;
328  time_t last_mod_time;
329  hdfsFS hdfs_conn;
330  Status status = HdfsFsCache::instance()->GetConnection(hdfs_lib_file, &hdfs_conn);
331  if (!status.ok()) {
332  RemoveEntryInternal(hdfs_lib_file, it);
333  *entry = NULL;
334  return status;
335  }
336  status = GetLastModificationTime(hdfs_conn, hdfs_lib_file.c_str(), &last_mod_time);
337  if (!status.ok() || (*entry)->last_mod_time < last_mod_time) {
338  RemoveEntryInternal(hdfs_lib_file, it);
339  *entry = NULL;
340  }
341  RETURN_IF_ERROR(status);
342  }
343  }
344 
345  if (*entry != NULL) {
346  // Release the lib_cache_ lock. This guarantees other threads looking at other
347  // libs can continue.
348  lib_cache_lock.unlock();
349  unique_lock<mutex> local_entry_lock((*entry)->lock);
350  entry_lock->swap(local_entry_lock);
351 
352  RETURN_IF_ERROR((*entry)->copy_file_status);
353  DCHECK_EQ((*entry)->type, type);
354  DCHECK(!(*entry)->local_path.empty());
355  return Status::OK;
356  }
357 
358  // Entry didn't exist. Add the entry then release lock_ (so other libraries
359  // can be accessed).
360  *entry = new LibCacheEntry();
361 
362  // Grab the entry lock before adding it to lib_cache_. We still need to do more
363  // work to initialize *entry and we don't want another thread to pick up
364  // the uninitialized entry.
365  unique_lock<mutex> local_entry_lock((*entry)->lock);
366  entry_lock->swap(local_entry_lock);
367  lib_cache_[hdfs_lib_file] = *entry;
368  lib_cache_lock.unlock();
369 
370  // At this point we have the entry lock but not the lib cache lock.
371  DCHECK(*entry != NULL);
372  (*entry)->type = type;
373 
374  // Copy the file
375  (*entry)->local_path = MakeLocalPath(hdfs_lib_file, FLAGS_local_library_dir);
376  VLOG(1) << "Adding lib cache entry: " << hdfs_lib_file
377  << ", local path: " << (*entry)->local_path;
378 
379  hdfsFS hdfs_conn, local_conn;
380  RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(hdfs_lib_file, &hdfs_conn));
381  RETURN_IF_ERROR(HdfsFsCache::instance()->GetLocalConnection(&local_conn));
382 
383  // Note: the file can be updated between getting last_mod_time and copying the file to
384  // local_path. This can only result in the file unnecessarily being refreshed, and does
385  // not affect correctness.
386  (*entry)->copy_file_status = GetLastModificationTime(
387  hdfs_conn, hdfs_lib_file.c_str(), &(*entry)->last_mod_time);
388  RETURN_IF_ERROR((*entry)->copy_file_status);
389 
390  (*entry)->copy_file_status = CopyHdfsFile(
391  hdfs_conn, hdfs_lib_file, local_conn, (*entry)->local_path);
392  RETURN_IF_ERROR((*entry)->copy_file_status);
393 
394  if (type == TYPE_SO) {
395  // dlopen the local library
397  DynamicOpen((*entry)->local_path.c_str(), &(*entry)->shared_object_handle));
398  } else if (type == TYPE_IR) {
399  // Load the module and populate all symbols.
401  scoped_ptr<LlvmCodeGen> codegen;
402  string module_id = filesystem::path((*entry)->local_path).stem().string();
403  RETURN_IF_ERROR(LlvmCodeGen::LoadFromFile(
404  &pool, (*entry)->local_path, module_id, &codegen));
405  codegen->GetSymbols(&(*entry)->symbols);
406  } else {
407  DCHECK_EQ(type, TYPE_JAR);
408  // Nothing to do.
409  }
410 
411  return Status::OK;
412 }
413 
414 string LibCache::MakeLocalPath(const string& hdfs_path, const string& local_dir) {
415  // Append the pid and library number to the local directory.
416  filesystem::path src(hdfs_path);
417  stringstream dst;
418  dst << local_dir << "/" << src.stem().native() << "." << getpid() << "."
419  << (num_libs_copied_++) << src.extension().native();
420  return dst.str();
421 }
string path("/usr/lib/sasl2:/usr/lib64/sasl2:/usr/local/lib/sasl2:/usr/lib/x86_64-linux-gnu/sasl2")
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
void * current_process_handle_
dlopen() handle for the current process (i.e. impalad).
Definition: lib-cache.h:116
boost::unordered_map< std::string, void * > SymbolMap
Definition: lib-cache.cc:73
Status DynamicOpen(const char *library, void **handle)
Definition: dynamic-util.cc:37
static void GetFullBuildPath(const std::string &path, std::string *full_path)
Sets full_path to <IMPALA_HOME>/<build><debug or="" release>="">/path.
Definition: path-builder.cc:38
Status DynamicLookup(void *handle, const char *symbol, void **fn_ptr, bool quiet)
Definition: dynamic-util.cc:26
static boost::scoped_ptr< LibCache > instance_
Singleton instance. Instantiated in Init().
Definition: lib-cache.h:113
~LibCache()
Calls dlclose on all cached handles.
Definition: lib-cache.cc:95
static bool is_fe_test()
Definition: test-info.h:33
boost::unordered_set< std::string > symbols
Definition: lib-cache.cc:80
ObjectPool pool
DEFINE_string(local_library_dir,"/tmp","Local directory to copy UDF libraries from HDFS into")
mutex lock_
Status InitInternal()
Definition: lib-cache.cc:106
Status GetLastModificationTime(const hdfsFS &connection, const char *filename, time_t *last_mod_time)
Definition: hdfs-util.cc:41
void DynamicClose(void *handle)
Closes the handle.
Definition: dynamic-util.cc:58
Status CopyHdfsFile(const hdfsFS &src_conn, const string &src_path, const hdfsFS &dst_conn, const string &dst_path)
Definition: hdfs-util.cc:54
void DropCache()
Removes all cached entries.
Definition: lib-cache.cc:262
static const Status OK
Definition: status.h:87
static Status Init()
Initializes the libcache. Must be called before any other APIs.
Definition: lib-cache.cc:100
bool ok() const
Definition: status.h:172