20 #include <gutil/port.h>
22 #ifndef IMPALA_UDF_SDK_BUILD
31 #if IMPALA_UDF_SDK_BUILD
39 #define VLOG_ROW while(false) std::cout
40 #define VLOG_ROW_IS_ON (false)
56 return reinterpret_cast<uint8_t*
>(malloc(byte_size));
59 uint8_t*
Reallocate(uint8_t* ptr,
int byte_size) {
60 return reinterpret_cast<uint8_t*
>(realloc(ptr, byte_size));
63 void Free(uint8_t* ptr) {
87 bool LogError(
const std::string& error) {
105 using namespace impala;
106 using namespace impala_udf;
109 "class.impala_udf::FunctionContext";
115 const vector<FunctionContext::TypeDesc>& arg_types,
116 int varargs_buffer_size,
bool debug) {
120 invalid_type.
scale = 0;
122 arg_types, varargs_buffer_size, debug);
128 const vector<FunctionContext::TypeDesc>& arg_types,
129 int varargs_buffer_size,
bool debug) {
139 reinterpret_cast<uint8_t*
>(aligned_malloc(varargs_buffer_size, 16));
142 VLOG_ROW <<
"Created FunctionContext: " << ctx
149 CreateContext(state_, pool, intermediate_type_, return_type_, arg_types_,
150 varargs_buffer_size_, debug_);
160 assert(
impl_->
closed_ &&
"FunctionContext wasn't closed!");
166 : varargs_buffer_(NULL),
167 varargs_buffer_size_(0),
176 thread_local_fn_state_(NULL),
177 fragment_local_fn_state_(NULL),
178 external_bytes_tracked_(0),
189 stringstream error_ss;
192 error_ss <<
"Memory leaked via FunctionContext::Allocate()";
194 error_ss <<
"FunctionContext::Free() called on buffer that was already freed or "
195 "was not allocated.";
199 for (map<uint8_t*, int>::iterator i =
allocations_.begin();
203 error_ss << bytes <<
" bytes leaked via FunctionContext::Allocate()";
208 if (!error_ss.str().empty()) error_ss <<
", ";
210 <<
" bytes leaked via FunctionContext::TrackAllocation()";
217 if (!error_ss.str().empty()) {
234 return impl_->version_;
237 const char* FunctionContext::user()
const {
238 if (impl_->state_ == NULL)
return NULL;
239 return impl_->state_->connected_user().c_str();
244 #if IMPALA_UDF_SDK_BUILD
247 id.hi = impl_->state_->query_id().hi;
248 id.lo = impl_->state_->query_id().lo;
253 bool FunctionContext::has_error()
const {
254 return !impl_->error_msg_.empty();
257 const char* FunctionContext::error_msg()
const {
258 if (has_error())
return impl_->error_msg_.c_str();
262 uint8_t* FunctionContext::Allocate(
int byte_size) {
263 assert(!impl_->closed_);
264 if (byte_size == 0)
return NULL;
265 uint8_t* buffer = impl_->pool_->Allocate(byte_size);
267 impl_->allocations_[buffer] = byte_size;
268 memset(buffer, 0xff, byte_size);
270 VLOG_ROW <<
"Allocate: FunctionContext=" <<
this
271 <<
" size=" << byte_size
272 <<
" result=" <<
reinterpret_cast<void*
>(buffer);
276 uint8_t* FunctionContext::Reallocate(uint8_t* ptr,
int byte_size) {
277 assert(!impl_->closed_);
278 VLOG_ROW <<
"Reallocate: FunctionContext=" <<
this
279 <<
" size=" << byte_size
280 <<
" ptr=" <<
reinterpret_cast<void*
>(ptr);
281 uint8_t* new_ptr = impl_->pool_->Reallocate(ptr, byte_size);
283 impl_->allocations_.erase(ptr);
284 impl_->allocations_[new_ptr] = byte_size;
286 VLOG_ROW <<
"FunctionContext=" <<
this
287 <<
" reallocated: " <<
reinterpret_cast<void*
>(new_ptr);
291 void FunctionContext::Free(uint8_t* buffer) {
292 assert(!impl_->closed_);
293 if (buffer == NULL)
return;
294 VLOG_ROW <<
"Free: FunctionContext=" <<
this <<
" "
295 <<
reinterpret_cast<void*
>(buffer);
297 map<uint8_t*, int>::iterator it = impl_->allocations_.find(buffer);
298 if (it != impl_->allocations_.end()) {
300 memset(buffer, 0xff, it->second);
301 impl_->allocations_.erase(it);
302 impl_->pool_->Free(buffer);
304 SetError(
"FunctionContext::Free() called on buffer that is already freed or was "
308 impl_->pool_->Free(buffer);
312 void FunctionContext::TrackAllocation(int64_t bytes) {
313 assert(!impl_->closed_);
314 impl_->external_bytes_tracked_ += bytes;
315 impl_->pool_->mem_tracker()->Consume(bytes);
318 void FunctionContext::Free(int64_t bytes) {
319 assert(!impl_->closed_);
320 if (bytes > impl_->external_bytes_tracked_) {
322 ss <<
"FunctionContext::Free() called with " << bytes <<
" bytes, but only "
323 << impl_->external_bytes_tracked_ <<
" bytes are tracked via "
324 <<
"FunctionContext::TrackAllocation()";
325 SetError(ss.str().c_str());
328 impl_->external_bytes_tracked_ -= bytes;
329 impl_->pool_->mem_tracker()->Release(bytes);
332 void FunctionContext::SetError(
const char* error_msg) {
333 assert(!impl_->closed_);
334 if (impl_->error_msg_.empty()) {
335 impl_->error_msg_ = error_msg;
337 ss <<
"UDF ERROR: " << error_msg;
338 if (impl_->state_ != NULL) impl_->state_->set_query_status(ss.str());
345 bool FunctionContext::AddWarning(
const char* warning_msg) {
346 assert(!impl_->closed_);
347 if (impl_->num_warnings_++ >=
MAX_WARNINGS)
return false;
349 ss <<
"UDF WARNING: " << warning_msg;
350 if (impl_->state_ != NULL) {
351 #ifndef IMPALA_UDF_SDK_BUILD
357 LOG(WARNING) << ss.str();
358 return impl_->state_->LogError(
ErrorMsg(TErrorCode::GENERAL, ss.str()));
361 return impl_->state_->LogError(ss.str());
365 cerr << ss.str() << endl;
371 assert(!impl_->closed_);
374 impl_->thread_local_fn_state_ = ptr;
377 impl_->fragment_local_fn_state_ = ptr;
381 ss <<
"Unknown FunctionStateScope: " << scope;
382 SetError(ss.str().c_str());
388 if (byte_size == 0)
return NULL;
392 <<
" size=" << byte_size
393 <<
" result=" <<
reinterpret_cast<void*
>(buffer);
401 ss <<
"Free local allocations: FunctionContext=" <<
context_
402 <<
" pool=" <<
pool_ << endl;
421 : len(len), ptr(context->impl()->AllocateLocal(len)) {
int precision
Only valid if type == TYPE_DECIMAL.
impala_udf::FunctionContext::TypeDesc intermediate_type_
Type descriptor for the intermediate type of a UDA. Set to INVALID_TYPE for UDFs. ...
std::vector< impala_udf::FunctionContext::TypeDesc > arg_types_
Type descriptors for each argument of the function.
const TUniqueId & query_id() const
void SetConstantArgs(const std::vector< impala_udf::AnyVal * > &constant_args)
Sets constant_args_. The AnyVal* values are owned by the caller.
int64_t net_allocations_
Diagnostic counter that tracks (# Allocates - # Frees)
int64_t external_bytes_tracked_
MemTracker * mem_tracker()
impala_udf::FunctionContext * context_
Parent context object. Not owned.
uint8_t * Reallocate(uint8_t *ptr, int size)
static const int MAX_WARNINGS
FreePool * pool_
Pool to service allocations from.
void set_query_status(const std::string &err_msg)
Sets query_status_ with err_msg if no error has been set yet.
uint8_t * Allocate(int size)
Allocates a buffer of size.
bool AddWarning(const char *warning_msg)
std::map< uint8_t *, int > allocations_
bool LogError(const ErrorMsg &msg)
std::vector< uint8_t * > local_allocations_
Allocations owned by Impala.
impala_udf::FunctionContext::TypeDesc return_type_
Type descriptor for the return type of the function.
const TypeDesc * GetArgType(int arg_idx) const
const std::string & connected_user() const
bool closed_
Indicates whether this context has been closed. Used for verification/debugging.
static const char * LLVM_FUNCTIONCONTEXT_NAME
std::vector< impala_udf::AnyVal * > constant_args_
void Free(uint8_t *buffer)
Frees a buffer returned from Allocate() or Reallocate()
FreePool(MemPool *mem_pool)
This class is thread-safe.
bool debug_
If true, indicates this is a debug context which will do additional validation.
void Release(int64_t bytes)
Decreases consumption of this tracker and its ancestors by 'bytes'.
impala::FunctionContextImpl * impl_
impala_udf::FunctionContext * Clone(MemPool *pool)
static impala_udf::FunctionContext * CreateContext(RuntimeState *state, MemPool *pool, const impala_udf::FunctionContext::TypeDesc &return_type, const std::vector< impala_udf::FunctionContext::TypeDesc > &arg_types, int varargs_buffer_size=0, bool debug=false)
Create a FunctionContext for a UDF. Caller is responsible for deleting it.
void Consume(int64_t bytes)
Increases consumption of this tracker and its ancestors by 'bytes'.
void * fragment_local_fn_state_
bool abort_on_error() const
uint8_t * AllocateLocal(int byte_size)
void SetError(const char *error_msg)
int64_t net_allocations() const
void FreeLocalAllocations()
Frees all allocations returned by AllocateLocal().
uint8_t * varargs_buffer_