17 #include <boost/filesystem.hpp>
18 #include <boost/foreach.hpp>
19 #include <boost/thread/locks.hpp>
32 namespace filesystem = boost::filesystem;
33 using namespace impala;
36 "Local directory to copy UDF libraries from HDFS into");
73 typedef boost::unordered_map<std::string, void*>
SymbolMap;
80 boost::unordered_set<std::string>
symbols;
110 string fe_support_path;
117 <<
"We should always be able to get current process handle.";
130 Status LibCache::GetSoFunctionPtr(
const string& hdfs_lib_file,
const string& symbol,
132 if (hdfs_lib_file.empty()) {
134 DCHECK(current_process_handle_ != NULL);
140 unique_lock<mutex> lock;
141 if (ent != NULL && *ent != NULL) {
144 unique_lock<mutex> l(entry->
lock);
149 DCHECK(entry != NULL);
150 DCHECK_EQ(entry->
type, TYPE_SO);
152 LibCacheEntry::SymbolMap::iterator it = entry->
symbol_cache.find(symbol);
154 *fn_ptr = it->second;
161 DCHECK(*fn_ptr != NULL);
162 if (ent != NULL && *ent == NULL) {
171 if (entry == NULL)
return;
172 bool can_delete =
false;
174 unique_lock<mutex> lock(entry->
lock);;
178 if (can_delete)
delete entry;
182 string* local_path) {
183 unique_lock<mutex> lock;
186 DCHECK(entry != NULL);
187 DCHECK_EQ(entry->
type, type);
192 Status LibCache::CheckSymbolExists(
const string& hdfs_lib_file,
LibType type,
193 const string& symbol,
bool quiet) {
194 if (type == TYPE_SO) {
195 void* dummy_ptr = NULL;
196 return GetSoFunctionPtr(hdfs_lib_file, symbol, &dummy_ptr, NULL, quiet);
197 }
else if (type == TYPE_IR) {
198 unique_lock<mutex> lock;
201 DCHECK(entry != NULL);
202 DCHECK_EQ(entry->
type, TYPE_IR);
205 ss <<
"Symbol '" << symbol <<
"' does not exist in module: " << hdfs_lib_file
206 <<
" (local path: " << entry->
local_path <<
")";
207 return quiet ? Status::Expected(ss.str()) :
Status(ss.str());
210 }
else if (type == TYPE_JAR) {
212 unique_lock<mutex> lock;
214 return GetCacheEntry(hdfs_lib_file, type, &lock, &dummy_entry);
217 return Status(
"Shouldn't get here.");
221 void LibCache::SetNeedsRefresh(
const string& hdfs_lib_file) {
222 unique_lock<mutex> lib_cache_lock(
lock_);
223 LibMap::iterator it = lib_cache_.find(hdfs_lib_file);
224 if (it == lib_cache_.end())
return;
227 unique_lock<mutex> entry_lock(entry->
lock);
232 void LibCache::RemoveEntry(
const string& hdfs_lib_file) {
233 unique_lock<mutex> lib_cache_lock(
lock_);
234 LibMap::iterator it = lib_cache_.find(hdfs_lib_file);
235 if (it == lib_cache_.end())
return;
236 RemoveEntryInternal(hdfs_lib_file, it);
239 void LibCache::RemoveEntryInternal(
const string& hdfs_lib_file,
240 const LibMap::iterator& entry_iter) {
242 VLOG(1) <<
"Removing lib cache entry: " << hdfs_lib_file
244 unique_lock<mutex> entry_lock(entry->
lock);
248 lib_cache_.erase(entry_iter);
259 if (can_delete)
delete entry;
262 void LibCache::DropCache() {
263 unique_lock<mutex> lib_cache_lock(
lock_);
264 BOOST_FOREACH(LibMap::value_type& v, lib_cache_) {
265 bool can_delete =
false;
268 unique_lock<mutex> entry_lock(v.second->lock);
269 v.second->should_remove =
true;
270 DCHECK_GE(v.second->use_count, 0);
271 can_delete = v.second->use_count == 0;
273 VLOG(1) <<
"Removed lib cache entry: " << v.first;
274 if (can_delete)
delete v.second;
286 unique_lock<mutex> local_entry_lock;
287 status = GetCacheEntryInternal(hdfs_lib_file, type, &local_entry_lock, entry);
289 entry_lock->swap(local_entry_lock);
292 if (*entry == NULL)
return status;
297 (*entry)->loading_status = status;
300 RemoveEntry(hdfs_lib_file);
304 Status LibCache::GetCacheEntryInternal(
const string& hdfs_lib_file,
LibType type,
306 DCHECK(!hdfs_lib_file.empty());
311 unique_lock<mutex> lib_cache_lock(
lock_);
312 LibMap::iterator it = lib_cache_.find(hdfs_lib_file);
313 if (it != lib_cache_.end()) {
315 unique_lock<mutex> local_entry_lock((it->second)->lock);
316 if (!(it->second)->loading_status.ok()) {
318 DCHECK(*entry == NULL);
319 return (it->second)->loading_status;
324 if ((*entry)->check_needs_refresh) {
327 (*entry)->check_needs_refresh =
false;
328 time_t last_mod_time;
330 Status status = HdfsFsCache::instance()->GetConnection(hdfs_lib_file, &hdfs_conn);
332 RemoveEntryInternal(hdfs_lib_file, it);
337 if (!status.
ok() || (*entry)->last_mod_time < last_mod_time) {
338 RemoveEntryInternal(hdfs_lib_file, it);
345 if (*entry != NULL) {
348 lib_cache_lock.unlock();
349 unique_lock<mutex> local_entry_lock((*entry)->lock);
350 entry_lock->swap(local_entry_lock);
353 DCHECK_EQ((*entry)->type, type);
354 DCHECK(!(*entry)->local_path.empty());
365 unique_lock<mutex> local_entry_lock((*entry)->lock);
366 entry_lock->swap(local_entry_lock);
367 lib_cache_[hdfs_lib_file] = *entry;
368 lib_cache_lock.unlock();
371 DCHECK(*entry != NULL);
372 (*entry)->type = type;
375 (*entry)->local_path = MakeLocalPath(hdfs_lib_file, FLAGS_local_library_dir);
376 VLOG(1) <<
"Adding lib cache entry: " << hdfs_lib_file
377 <<
", local path: " << (*entry)->local_path;
379 hdfsFS hdfs_conn, local_conn;
380 RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(hdfs_lib_file, &hdfs_conn));
381 RETURN_IF_ERROR(HdfsFsCache::instance()->GetLocalConnection(&local_conn));
387 hdfs_conn, hdfs_lib_file.c_str(), &(*entry)->last_mod_time);
391 hdfs_conn, hdfs_lib_file, local_conn, (*entry)->local_path);
394 if (type == TYPE_SO) {
397 DynamicOpen((*entry)->local_path.c_str(), &(*entry)->shared_object_handle));
398 }
else if (type == TYPE_IR) {
401 scoped_ptr<LlvmCodeGen> codegen;
404 &pool, (*entry)->local_path, module_id, &codegen));
405 codegen->GetSymbols(&(*entry)->symbols);
407 DCHECK_EQ(type, TYPE_JAR);
414 string LibCache::MakeLocalPath(
const string& hdfs_path,
const string& local_dir) {
418 dst << local_dir <<
"/" << src.stem().native() <<
"." << getpid() <<
"."
419 << (num_libs_copied_++) << src.extension().native();
string path("/usr/lib/sasl2:/usr/lib64/sasl2:/usr/local/lib/sasl2:/usr/lib/x86_64-linux-gnu/sasl2")
#define RETURN_IF_ERROR(stmt)
some generally useful macros
void * current_process_handle_
dlopen() handle for the current process (i.e. impalad).
boost::unordered_map< std::string, void * > SymbolMap
Status DynamicOpen(const char *library, void **handle)
static void GetFullBuildPath(const std::string &path, std::string *full_path)
Sets full_path to <IMPALA_HOME>/<build><debug or="" release>="">/path.
Status DynamicLookup(void *handle, const char *symbol, void **fn_ptr, bool quiet)
static boost::scoped_ptr< LibCache > instance_
Singleton instance. Instantiated in Init().
~LibCache()
Calls dlclose on all cached handles.
boost::unordered_set< std::string > symbols
DEFINE_string(local_library_dir,"/tmp","Local directory to copy UDF libraries from HDFS into")
Status GetLastModificationTime(const hdfsFS &connection, const char *filename, time_t *last_mod_time)
void DynamicClose(void *handle)
Closes the handle.
Status CopyHdfsFile(const hdfsFS &src_conn, const string &src_path, const hdfsFS &dst_conn, const string &dst_path)
void DropCache()
Removes all cached entries.
static Status Init()
Initializes the libcache. Must be called before any other APIs.
void * shared_object_handle