Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
udf.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "udf/udf.h"
16 
17 #include <iostream>
18 #include <sstream>
19 #include <assert.h>
20 #include <gutil/port.h> // for aligned_malloc
21 
22 #ifndef IMPALA_UDF_SDK_BUILD
23 #include "util/error-util.h"
24 #endif
25 
26 // Be careful what this includes since this needs to be linked into the UDF's
27 // binary. For example, it would be unfortunate if they had a random dependency
28 // on libhdfs.
29 #include "udf/udf-internal.h"
30 
31 #if IMPALA_UDF_SDK_BUILD
32 // For the SDK build, we are building the .lib that the developers would use to
33 // write UDFs. They want to link against this to run their UDFs in a test environment.
34 // Pulling in free-pool is very undesirable since it pulls in many other libraries.
35 // Instead, we'll implement a dummy version that is not used.
36 // When they build their library to a .so, they'd use the version of FunctionContext
37 // in the main binary, which does include FreePool.
38 
39 #define VLOG_ROW while(false) std::cout
40 #define VLOG_ROW_IS_ON (false)
41 
42 namespace impala {
43 
44 class MemTracker {
45  public:
46  void Consume(int64_t bytes) { }
47  void Release(int64_t bytes) { }
48 };
49 
50 class FreePool {
51  public:
53 
54  uint8_t* Allocate(int byte_size) {
56  return reinterpret_cast<uint8_t*>(malloc(byte_size));
57  }
58 
59  uint8_t* Reallocate(uint8_t* ptr, int byte_size) {
60  return reinterpret_cast<uint8_t*>(realloc(ptr, byte_size));
61  }
62 
63  void Free(uint8_t* ptr) {
65  free(ptr);
66  }
67 
68  MemTracker* mem_tracker() { return &mem_tracker_; }
69  int64_t net_allocations() const { return net_allocations_; }
70 
71  private:
72  MemTracker mem_tracker_;
73  int64_t net_allocations_;
74 };
75 
76 class RuntimeState {
77  public:
78  void set_query_status(const std::string& error_msg) {
79  assert(false);
80  }
81 
82  bool abort_on_error() {
83  assert(false);
84  return false;
85  }
86 
87  bool LogError(const std::string& error) {
88  assert(false);
89  return false;
90  }
91 
92  const std::string connected_user() const { return ""; }
93 };
94 
95 }
96 
97 #else
98 #include "runtime/free-pool.h"
99 #include "runtime/mem-tracker.h"
100 #include "runtime/runtime-state.h"
101 #endif
102 
103 #include "common/names.h"
104 
105 using namespace impala;
106 using namespace impala_udf;
107 
109  "class.impala_udf::FunctionContext";
110 
111 static const int MAX_WARNINGS = 1000;
112 
114  const FunctionContext::TypeDesc& return_type,
115  const vector<FunctionContext::TypeDesc>& arg_types,
116  int varargs_buffer_size, bool debug) {
117  FunctionContext::TypeDesc invalid_type;
118  invalid_type.type = FunctionContext::INVALID_TYPE;
119  invalid_type.precision = 0;
120  invalid_type.scale = 0;
121  return FunctionContextImpl::CreateContext(state, pool, invalid_type, return_type,
122  arg_types, varargs_buffer_size, debug);
123 }
124 
126  const FunctionContext::TypeDesc& intermediate_type,
127  const FunctionContext::TypeDesc& return_type,
128  const vector<FunctionContext::TypeDesc>& arg_types,
129  int varargs_buffer_size, bool debug) {
131  ctx->impl_->state_ = state;
132  ctx->impl_->pool_ = new FreePool(pool);
133  ctx->impl_->intermediate_type_ = intermediate_type;
134  ctx->impl_->return_type_ = return_type;
135  ctx->impl_->arg_types_ = arg_types;
136  // UDFs may manipulate DecimalVal arguments via SIMD instructions such as 'movaps'
137  // that require 16-byte memory alignment.
138  ctx->impl_->varargs_buffer_ =
139  reinterpret_cast<uint8_t*>(aligned_malloc(varargs_buffer_size, 16));
140  ctx->impl_->varargs_buffer_size_ = varargs_buffer_size;
141  ctx->impl_->debug_ = debug;
142  VLOG_ROW << "Created FunctionContext: " << ctx
143  << " with pool " << ctx->impl_->pool_;
144  return ctx;
145 }
146 
148  impala_udf::FunctionContext* new_context =
149  CreateContext(state_, pool, intermediate_type_, return_type_, arg_types_,
150  varargs_buffer_size_, debug_);
151  new_context->impl_->constant_args_ = constant_args_;
152  new_context->impl_->fragment_local_fn_state_ = fragment_local_fn_state_;
153  return new_context;
154 }
155 
156 FunctionContext::FunctionContext() : impl_(new FunctionContextImpl(this)) {
157 }
158 
160  assert(impl_->closed_ && "FunctionContext wasn't closed!");
161  delete impl_->pool_;
162  delete impl_;
163 }
164 
165 FunctionContextImpl::FunctionContextImpl(FunctionContext* parent)
166  : varargs_buffer_(NULL),
167  varargs_buffer_size_(0),
168  context_(parent),
169  pool_(NULL),
170  state_(NULL),
171  debug_(false),
172  version_(FunctionContext::v1_3),
173  num_warnings_(0),
174  num_updates_(0),
175  num_removes_(0),
176  thread_local_fn_state_(NULL),
177  fragment_local_fn_state_(NULL),
178  external_bytes_tracked_(0),
179  closed_(false) {
180 }
181 
183  if (closed_) return;
184 
185  // Free local allocations first so we can detect leaks through any remaining allocations
186  // (local allocations cannot be leaked, at least not by the UDF)
188 
189  stringstream error_ss;
190  if (!debug_) {
191  if (pool_->net_allocations() > 0) {
192  error_ss << "Memory leaked via FunctionContext::Allocate()";
193  } else if (pool_->net_allocations() < 0) {
194  error_ss << "FunctionContext::Free() called on buffer that was already freed or "
195  "was not allocated.";
196  }
197  } else if (!allocations_.empty()) {
198  int bytes = 0;
199  for (map<uint8_t*, int>::iterator i = allocations_.begin();
200  i != allocations_.end(); ++i) {
201  bytes += i->second;
202  }
203  error_ss << bytes << " bytes leaked via FunctionContext::Allocate()";
204  allocations_.clear();
205  }
206 
207  if (external_bytes_tracked_ > 0) {
208  if (!error_ss.str().empty()) error_ss << ", ";
209  error_ss << external_bytes_tracked_
210  << " bytes leaked via FunctionContext::TrackAllocation()";
211  // This isn't ideal because the memory is still leaked, but don't track it so our
212  // accounting stays sane.
213  // TODO: we need to modify the memtrackers to allow leaked user-allocated memory.
214  context_->Free(external_bytes_tracked_);
215  }
216 
217  if (!error_ss.str().empty()) {
218  // Treat memory leaks as errors in the SDK build so they trigger test failures, but
219  // don't blow up actual queries due to leaks (unless abort_on_error is true).
220  // TODO: revisit abort_on_error case. Setting the error won't do anything in close.
221  if (state_ == NULL || state_->abort_on_error()) {
222  context_->SetError(error_ss.str().c_str());
223  } else {
224  context_->AddWarning(error_ss.str().c_str());
225  }
226  }
227 
228  free(varargs_buffer_);
229  varargs_buffer_ = NULL;
230  closed_ = true;
231 }
232 
233 FunctionContext::ImpalaVersion FunctionContext::version() const {
234  return impl_->version_;
235 }
236 
237 const char* FunctionContext::user() const {
238  if (impl_->state_ == NULL) return NULL;
239  return impl_->state_->connected_user().c_str();
240 }
241 
243  UniqueId id;
244 #if IMPALA_UDF_SDK_BUILD
245  id.hi = id.lo = 0;
246 #else
247  id.hi = impl_->state_->query_id().hi;
248  id.lo = impl_->state_->query_id().lo;
249 #endif
250  return id;
251 }
252 
253 bool FunctionContext::has_error() const {
254  return !impl_->error_msg_.empty();
255 }
256 
257 const char* FunctionContext::error_msg() const {
258  if (has_error()) return impl_->error_msg_.c_str();
259  return NULL;
260 }
261 
262 uint8_t* FunctionContext::Allocate(int byte_size) {
263  assert(!impl_->closed_);
264  if (byte_size == 0) return NULL;
265  uint8_t* buffer = impl_->pool_->Allocate(byte_size);
266  if (impl_->debug_) {
267  impl_->allocations_[buffer] = byte_size;
268  memset(buffer, 0xff, byte_size);
269  }
270  VLOG_ROW << "Allocate: FunctionContext=" << this
271  << " size=" << byte_size
272  << " result=" << reinterpret_cast<void*>(buffer);
273  return buffer;
274 }
275 
276 uint8_t* FunctionContext::Reallocate(uint8_t* ptr, int byte_size) {
277  assert(!impl_->closed_);
278  VLOG_ROW << "Reallocate: FunctionContext=" << this
279  << " size=" << byte_size
280  << " ptr=" << reinterpret_cast<void*>(ptr);
281  uint8_t* new_ptr = impl_->pool_->Reallocate(ptr, byte_size);
282  if (impl_->debug_) {
283  impl_->allocations_.erase(ptr);
284  impl_->allocations_[new_ptr] = byte_size;
285  }
286  VLOG_ROW << "FunctionContext=" << this
287  << " reallocated: " << reinterpret_cast<void*>(new_ptr);
288  return new_ptr;
289 }
290 
291 void FunctionContext::Free(uint8_t* buffer) {
292  assert(!impl_->closed_);
293  if (buffer == NULL) return;
294  VLOG_ROW << "Free: FunctionContext=" << this << " "
295  << reinterpret_cast<void*>(buffer);
296  if (impl_->debug_) {
297  map<uint8_t*, int>::iterator it = impl_->allocations_.find(buffer);
298  if (it != impl_->allocations_.end()) {
299  // fill in garbage value into the buffer to increase the chance of detecting misuse
300  memset(buffer, 0xff, it->second);
301  impl_->allocations_.erase(it);
302  impl_->pool_->Free(buffer);
303  } else {
304  SetError("FunctionContext::Free() called on buffer that is already freed or was "
305  "not allocated.");
306  }
307  } else {
308  impl_->pool_->Free(buffer);
309  }
310 }
311 
312 void FunctionContext::TrackAllocation(int64_t bytes) {
313  assert(!impl_->closed_);
314  impl_->external_bytes_tracked_ += bytes;
315  impl_->pool_->mem_tracker()->Consume(bytes);
316 }
317 
318 void FunctionContext::Free(int64_t bytes) {
319  assert(!impl_->closed_);
320  if (bytes > impl_->external_bytes_tracked_) {
321  stringstream ss;
322  ss << "FunctionContext::Free() called with " << bytes << " bytes, but only "
323  << impl_->external_bytes_tracked_ << " bytes are tracked via "
324  << "FunctionContext::TrackAllocation()";
325  SetError(ss.str().c_str());
326  return;
327  }
328  impl_->external_bytes_tracked_ -= bytes;
329  impl_->pool_->mem_tracker()->Release(bytes);
330 }
331 
332 void FunctionContext::SetError(const char* error_msg) {
333  assert(!impl_->closed_);
334  if (impl_->error_msg_.empty()) {
335  impl_->error_msg_ = error_msg;
336  stringstream ss;
337  ss << "UDF ERROR: " << error_msg;
338  if (impl_->state_ != NULL) impl_->state_->set_query_status(ss.str());
339  }
340 }
341 
342 // TODO: is there a way to tell the user the expr in a reasonable way?
343 // Plumb the ToSql() from the FE?
344 // TODO: de-dup warnings
345 bool FunctionContext::AddWarning(const char* warning_msg) {
346  assert(!impl_->closed_);
347  if (impl_->num_warnings_++ >= MAX_WARNINGS) return false;
348  stringstream ss;
349  ss << "UDF WARNING: " << warning_msg;
350  if (impl_->state_ != NULL) {
351 #ifndef IMPALA_UDF_SDK_BUILD
352  // If this is called while the query is being closed, the runtime state log will have
353  // already been displayed to the user. Also log the warning so there's some chance
354  // the user will actually see it.
355  // TODO: somehow print the full error log in the shell? This is a problem for any
356  // function using LogError() during close.
357  LOG(WARNING) << ss.str();
358  return impl_->state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
359 #else
360  // In case of the SDK build, we simply, forward this call to a dummy method
361  return impl_->state_->LogError(ss.str());
362 #endif
363 
364  } else {
365  cerr << ss.str() << endl;
366  return true;
367  }
368 }
369 
370 void FunctionContext::SetFunctionState(FunctionStateScope scope, void* ptr) {
371  assert(!impl_->closed_);
372  switch (scope) {
373  case THREAD_LOCAL:
374  impl_->thread_local_fn_state_ = ptr;
375  break;
376  case FRAGMENT_LOCAL:
377  impl_->fragment_local_fn_state_ = ptr;
378  break;
379  default:
380  stringstream ss;
381  ss << "Unknown FunctionStateScope: " << scope;
382  SetError(ss.str().c_str());
383  }
384 }
385 
386 uint8_t* FunctionContextImpl::AllocateLocal(int byte_size) {
387  assert(!closed_);
388  if (byte_size == 0) return NULL;
389  uint8_t* buffer = pool_->Allocate(byte_size);
390  local_allocations_.push_back(buffer);
391  VLOG_ROW << "Allocate Local: FunctionContext=" << this->context_
392  << " size=" << byte_size
393  << " result=" << reinterpret_cast<void*>(buffer);
394  return buffer;
395 }
396 
398  assert(!closed_);
399  if (VLOG_ROW_IS_ON) {
400  stringstream ss;
401  ss << "Free local allocations: FunctionContext=" << context_
402  << " pool=" << pool_ << endl;
403  for (int i = 0; i < local_allocations_.size(); ++i) {
404  ss << " " << reinterpret_cast<void*>(local_allocations_[i]) << endl;
405  }
406  VLOG_ROW << ss.str();
407  }
408  for (int i = 0; i < local_allocations_.size(); ++i) {
410  }
411  local_allocations_.clear();
412 }
413 
414 void FunctionContextImpl::SetConstantArgs(const vector<AnyVal*>& constant_args) {
415  constant_args_ = constant_args;
416 }
417 
418 // Note: this function crashes LLVM's JIT in expr-test if it's xcompiled. Do not move to
419 // expr-ir.cc. This could probably use further investigation.
420 StringVal::StringVal(FunctionContext* context, int len)
421  : len(len), ptr(context->impl()->AllocateLocal(len)) {
422 }
423 
424 // TODO: why doesn't libudasample.so build if this in udf-ir.cc?
426  if (arg_idx < 0 || arg_idx >= impl_->arg_types_.size()) return NULL;
427  return &impl_->arg_types_[arg_idx];
428 }
int precision
Only valid if type == TYPE_DECIMAL.
Definition: udf.h:75
impala_udf::FunctionContext::TypeDesc intermediate_type_
Type descriptor for the intermediate type of a UDA. Set to INVALID_TYPE for UDFs. ...
Definition: udf-internal.h:150
std::vector< impala_udf::FunctionContext::TypeDesc > arg_types_
Type descriptors for each argument of the function.
Definition: udf-internal.h:156
const TUniqueId & query_id() const
Definition: coordinator.h:152
void SetConstantArgs(const std::vector< impala_udf::AnyVal * > &constant_args)
Sets constant_args_. The AnyVal* values are owned by the caller.
Definition: udf.cc:414
int64_t net_allocations_
Diagnostic counter that tracks (# Allocates - # Frees)
Definition: free-pool.h:181
MemTracker * mem_tracker()
Definition: free-pool.h:127
impala_udf::FunctionContext * context_
Parent context object. Not owned.
Definition: udf-internal.h:109
uint8_t * Reallocate(uint8_t *ptr, int size)
Definition: free-pool.h:102
static const int MAX_WARNINGS
Definition: udf.cc:111
FreePool * pool_
Pool to service allocations from.
Definition: udf-internal.h:112
void set_query_status(const std::string &err_msg)
Sets query_status_ with err_msg if no error has been set yet.
uint8_t * Allocate(int size)
Allocates a buffer of size.
Definition: free-pool.h:54
bool AddWarning(const char *warning_msg)
Definition: udf.cc:345
std::map< uint8_t *, int > allocations_
Definition: udf-internal.h:135
bool LogError(const ErrorMsg &msg)
void Free(uint8_t *ptr)
Definition: free-pool.h:82
std::vector< uint8_t * > local_allocations_
Allocations owned by Impala.
Definition: udf-internal.h:137
impala_udf::FunctionContext::TypeDesc return_type_
Type descriptor for the return type of the function.
Definition: udf-internal.h:153
const TypeDesc * GetArgType(int arg_idx) const
Definition: udf.cc:425
const std::string & connected_user() const
bool closed_
Indicates whether this context has been closed. Used for verification/debugging.
Definition: udf-internal.h:170
static const char * LLVM_FUNCTIONCONTEXT_NAME
Definition: udf-internal.h:93
std::vector< impala_udf::AnyVal * > constant_args_
Definition: udf-internal.h:161
ObjectPool pool
void Free(uint8_t *buffer)
Frees a buffer returned from Allocate() or Reallocate()
Definition: udf.cc:291
FreePool(MemPool *mem_pool)
Definition: free-pool.h:47
#define VLOG_ROW
Definition: logging.h:59
This class is thread-safe.
Definition: mem-tracker.h:61
bool debug_
If true, indicates this is a debug context which will do additional validation.
Definition: udf-internal.h:119
void Release(int64_t bytes)
Decreases consumption of this tracker and its ancestors by 'bytes'.
Definition: mem-tracker.h:209
impala::FunctionContextImpl * impl_
Definition: udf.h:214
impala_udf::FunctionContext * Clone(MemPool *pool)
Definition: udf.cc:147
static impala_udf::FunctionContext * CreateContext(RuntimeState *state, MemPool *pool, const impala_udf::FunctionContext::TypeDesc &return_type, const std::vector< impala_udf::FunctionContext::TypeDesc > &arg_types, int varargs_buffer_size=0, bool debug=false)
Create a FunctionContext for a UDF. Caller is responsible for deleting it.
void Consume(int64_t bytes)
Increases consumption of this tracker and its ancestors by 'bytes'.
Definition: mem-tracker.h:118
#define VLOG_ROW_IS_ON
Definition: logging.h:66
bool abort_on_error() const
Definition: runtime-state.h:99
uint8_t * AllocateLocal(int byte_size)
Definition: udf.cc:386
void SetError(const char *error_msg)
Definition: udf.cc:332
int64_t net_allocations() const
Definition: free-pool.h:128
void FreeLocalAllocations()
Frees all allocations returned by AllocateLocal().
Definition: udf.cc:397