16 #ifndef IMPALA_UDA_TEST_HARNESS_IMPL_H
17 #define IMPALA_UDA_TEST_HARNESS_IMPL_H
23 #include <boost/shared_ptr.hpp>
25 namespace impala_udf {
69 memcpy(v, src, byte_size);
74 template<
typename RESULT,
typename INTERMEDIATE>
78 ss <<
"UDA set error to: " << context->
error_msg();
79 error_msg_ = ss.str();
85 template<
typename RESULT,
typename INTERMEDIATE>
87 const RESULT& x,
const RESULT& y) {
88 if (result_comparator_fn_ == NULL)
return x == y;
89 return result_comparator_fn_(x, y);
93 template<
typename RESULT,
typename INTERMEDIATE>
100 std::vector<FunctionContext::TypeDesc> arg_types;
105 result = ExecuteSingleNode(&context);
106 if (error_msg_.empty() && !CheckResult(result, expected)) {
107 std::stringstream ss;
108 ss <<
"UDA failed running in single node execution." << std::endl
111 error_msg_ = ss.str();
114 if (!error_msg_.empty())
return false;
117 const int num_nodes[] = { 1, 2, 10, 20, 100 };
119 for (
int i = 0; i <
sizeof(num_nodes) /
sizeof(
int); ++i) {
122 result = ExecuteOneLevel(num_nodes[i], &context);
123 if (error_msg_.empty() && !CheckResult(result, expected)) {
124 std::stringstream ss;
125 ss <<
"UDA failed running in one level distributed mode with "
126 << num_nodes[i] <<
" nodes." << std::endl
129 error_msg_ = ss.str();
133 if (!error_msg_.empty())
return false;
137 for (
int i = 0; i <
sizeof(num_nodes) /
sizeof(
int); ++i) {
138 for (
int j = 0; j <= i; ++j) {
141 result = ExecuteTwoLevel(num_nodes[i], num_nodes[j], &context);
142 if (error_msg_.empty() && !CheckResult(result, expected)) {
143 std::stringstream ss;
144 ss <<
"UDA failed running in two level distributed mode with "
145 << num_nodes[i] <<
" nodes in the first level and "
146 << num_nodes[j] <<
" nodes in the second level." << std::endl
149 error_msg_ = ss.str();
154 if (!error_msg_.empty())
return false;
159 template<
typename RESULT,
typename INTERMEDIATE>
162 INTERMEDIATE intermediate =
163 UdaTestHarnessUtil::CreateIntermediate<INTERMEDIATE>(
164 context->
get(), fixed_buffer_byte_size_);
166 init_fn_(context->
get(), &intermediate);
167 if (!CheckContext(context->
get()))
return RESULT::null();
169 for (
int i = 0; i < num_input_values_; ++i) {
170 Update(i, context->
get(), &intermediate);
172 if (!CheckContext(context->
get()))
return RESULT::null();
175 RESULT result = finalize_fn_(context->
get(), intermediate);
176 UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(context->
get(), intermediate);
177 if (!CheckContext(context->
get()))
return RESULT::null();
181 template<
typename RESULT,
typename INTERMEDIATE>
184 std::vector<boost::shared_ptr<ScopedFunctionContext> > contexts;
185 std::vector<INTERMEDIATE> intermediates;
186 contexts.resize(num_nodes);
189 std::vector<FunctionContext::TypeDesc> arg_types;
191 for (
int i = 0; i < num_nodes; ++i) {
194 intermediates.push_back(UdaTestHarnessUtil::CreateIntermediate<INTERMEDIATE>(
195 cxt, fixed_buffer_byte_size_));
196 init_fn_(cxt, &intermediates[i]);
197 if (!CheckContext(cxt))
return RESULT::null();
200 INTERMEDIATE merged =
201 UdaTestHarnessUtil::CreateIntermediate<INTERMEDIATE>(
202 result_context->
get(), fixed_buffer_byte_size_);
203 init_fn_(result_context->
get(), &merged);
204 if (!CheckContext(result_context->
get()))
return RESULT::null();
207 for (
int i = 0; i < num_input_values_; ++i) {
208 int target = i % num_nodes;
209 Update(i, contexts[target].
get()->
get(), &intermediates[target]);
213 for (
int i = 0; i < num_nodes; ++i) {
214 if (!CheckContext(contexts[i].
get()->
get()))
return RESULT::null();
215 INTERMEDIATE serialized = intermediates[i];
216 if (serialize_fn_ != NULL) {
217 serialized = serialize_fn_(contexts[i].
get()->
get(), intermediates[i]);
220 UdaTestHarnessUtil::CopyIntermediate<INTERMEDIATE>(
221 result_context->
get(), fixed_buffer_byte_size_, serialized);
222 UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(
223 contexts[i].get()->get(), intermediates[i]);
224 merge_fn_(result_context->
get(), copy, &merged);
225 UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(result_context->
get(), copy);
226 if (!CheckContext(contexts[i].
get()->
get()))
return RESULT::null();
229 if (!CheckContext(result_context->
get()))
return RESULT::null();
231 RESULT result = finalize_fn_(result_context->
get(), merged);
232 UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(result_context->
get(), merged);
233 if (!CheckContext(result_context->
get()))
return RESULT::null();
237 template<
typename RESULT,
typename INTERMEDIATE>
240 std::vector<boost::shared_ptr<ScopedFunctionContext> > level1_contexts, level2_contexts;
241 std::vector<INTERMEDIATE> level1_intermediates, level2_intermediates;
242 level1_contexts.resize(num1);
243 level2_contexts.resize(num2);
246 std::vector<FunctionContext::TypeDesc> arg_types;
249 for (
int i = 0; i < num1; ++i) {
252 level1_intermediates.push_back(
253 UdaTestHarnessUtil::CreateIntermediate<INTERMEDIATE>(
254 cxt, fixed_buffer_byte_size_));
255 init_fn_(cxt, &level1_intermediates[i]);
256 if (!CheckContext(cxt))
return RESULT::null();
258 for (
int i = 0; i < num2; ++i) {
261 level2_intermediates.push_back(
262 UdaTestHarnessUtil::CreateIntermediate<INTERMEDIATE>(
263 cxt, fixed_buffer_byte_size_));
264 init_fn_(cxt, &level2_intermediates[i]);
265 if (!CheckContext(cxt))
return RESULT::null();
269 INTERMEDIATE final_intermediate =
270 UdaTestHarnessUtil::CreateIntermediate<INTERMEDIATE>(
271 result_context->
get(), fixed_buffer_byte_size_);
272 init_fn_(result_context->
get(), &final_intermediate);
273 if (!CheckContext(result_context->
get()))
return RESULT::null();
276 for (
int i = 0; i < num_input_values_; ++i) {
277 int target = i % num1;
278 Update(i, level1_contexts[target].
get()->
get(), &level1_intermediates[target]);
282 for (
int i = 0; i < num1; ++i) {
283 if (!CheckContext(level1_contexts[i].
get()->
get()))
return RESULT::null();
284 int target = i % num2;
285 INTERMEDIATE serialized = level1_intermediates[i];
286 if (serialize_fn_ != NULL) {
287 serialized = serialize_fn_(level1_contexts[i].
get()->
get(), level1_intermediates[i]);
290 UdaTestHarnessUtil::CopyIntermediate<INTERMEDIATE>(
291 level2_contexts[target].get()->get(), fixed_buffer_byte_size_, serialized);
292 UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(
293 level1_contexts[i].get()->get(), level1_intermediates[i]);
294 merge_fn_(level2_contexts[target].
get()->
get(),
295 copy, &level2_intermediates[target]);
296 UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(
297 level2_contexts[target].get()->get(), copy);
298 if (!CheckContext(level1_contexts[i].
get()->
get()))
return RESULT::null();
299 level1_contexts[i].reset();
303 for (
int i = 0; i < num2; ++i) {
304 if (!CheckContext(level2_contexts[i].
get()->
get()))
return RESULT::null();
305 INTERMEDIATE serialized = level2_intermediates[i];
306 if (serialize_fn_ != NULL) {
307 serialized = serialize_fn_(level2_contexts[i].
get()->
get(), level2_intermediates[i]);
310 UdaTestHarnessUtil::CopyIntermediate<INTERMEDIATE>(
311 result_context->
get(), fixed_buffer_byte_size_, serialized);
312 UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(
313 level2_contexts[i].get()->get(), level2_intermediates[i]);
314 merge_fn_(result_context->
get(), copy, &final_intermediate);
315 UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(
316 result_context->
get(), copy);
317 if (!CheckContext(level2_contexts[i].
get()->
get()))
return RESULT::null();
318 level2_contexts[i].reset();
320 if (!CheckContext(result_context->
get()))
return RESULT::null();
322 RESULT result = finalize_fn_(result_context->
get(), final_intermediate);
323 UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(
324 result_context->
get(), final_intermediate);
325 if (!CheckContext(result_context->
get()))
return RESULT::null();
329 template<
typename RESULT,
typename INTERMEDIATE,
typename INPUT>
331 const std::vector<INPUT>& values,
const RESULT& expected,
333 input_.resize(values.size());
334 BaseClass::num_input_values_ = values.size();
335 for (
int i = 0; i < values.size(); ++i) {
336 input_[i] = &values[i];
338 return BaseClass::Execute(expected, mode);
341 template<
typename RESULT,
typename INTERMEDIATE,
typename INPUT>
344 update_fn_(context, *input_[idx], dst);
348 template<
typename RESULT,
typename INTERMEDIATE,
typename INPUT1,
typename INPUT2>
350 const std::vector<INPUT1>& values1,
const std::vector<INPUT2>& values2,
352 if (values1.size() != values2.size()) {
353 BaseClass::error_msg_ =
354 "UdaTestHarness::Execute: values1 and values2 must be the same size.";
359 BaseClass::num_input_values_ = input1_->size();
360 return BaseClass::Execute(expected, mode);
364 template<
typename RESULT,
typename INTERMEDIATE,
typename INPUT1,
typename INPUT2>
367 update_fn_(context, (*input1_)[idx], (*input2_)[idx], dst);
370 template<
typename RESULT,
typename INTERMEDIATE,
typename INPUT1,
typename INPUT2,
373 const std::vector<INPUT1>& values1,
const std::vector<INPUT2>& values2,
374 const std::vector<INPUT3>& values3,
const RESULT& expected,
UdaExecutionMode mode) {
375 if (values1.size() != values2.size() || values1.size() != values3.size()) {
376 BaseClass::error_msg_ =
377 "UdaTestHarness::Execute: input values vectors must be the same size.";
383 BaseClass::num_input_values_ = input1_->size();
384 return BaseClass::Execute(expected, mode);
387 template<
typename RESULT,
typename INTERMEDIATE,
typename INPUT1,
typename INPUT2,
391 update_fn_(context, (*input1_)[idx], (*input2_)[idx], (*input3_)[idx], dst);
394 template<
typename RESULT,
typename INTERMEDIATE,
typename INPUT1,
typename INPUT2,
395 typename INPUT3,
typename INPUT4>
397 const std::vector<INPUT1>& values1,
const std::vector<INPUT2>& values2,
398 const std::vector<INPUT3>& values3,
const std::vector<INPUT4>& values4,
400 if (values1.size() != values2.size() || values1.size() != values3.size() ||
401 values1.size() != values4.size()) {
402 BaseClass::error_msg_ =
403 "UdaTestHarness::Execute: input values vectors must be the same size.";
410 BaseClass::num_input_values_ = input1_->size();
411 return BaseClass::Execute(expected, mode);
414 template<
typename RESULT,
typename INTERMEDIATE,
typename INPUT1,
typename INPUT2,
415 typename INPUT3,
typename INPUT4>
418 update_fn_(context, (*input1_)[idx], (*input2_)[idx], (*input3_)[idx], (*input4_)[idx],
bool Execute(const std::vector< INPUT1 > &values1, const std::vector< INPUT2 > &values2, const std::vector< INPUT3 > &values3, const std::vector< INPUT4 > &values4, const RESULT &expected, UdaExecutionMode mode=ALL)
Runs the UDA in all the modes, validating the result is 'expected' each time.
static FunctionContext * CreateTestContext(const FunctionContext::TypeDesc &return_type, const std::vector< FunctionContext::TypeDesc > &arg_types)
bool Execute(const std::vector< INPUT1 > &values1, const std::vector< INPUT2 > &values2, const RESULT &expected, UdaExecutionMode mode=ALL)
Runs the UDA in all the modes, validating the result is 'expected' each time.
virtual void Update(int idx, FunctionContext *context, INTERMEDIATE *dst)
static T CreateIntermediate(FunctionContext *context, int byte_size)
bool has_error() const
Returns true if there's been an error set.
RESULT ExecuteOneLevel(int num_nodes, ScopedFunctionContext *result_context)
std::string DebugString(const T &val)
bool Execute(const std::vector< INPUT1 > &values1, const std::vector< INPUT2 > &values2, const std::vector< INPUT3 > &values3, const RESULT &expected, UdaExecutionMode mode=ALL)
Runs the UDA in all the modes, validating the result is 'expected' each time.
virtual void Update(int idx, FunctionContext *context, INTERMEDIATE *dst)
void Free(uint8_t *buffer)
Frees a buffer returned from Allocate() or Reallocate()
bool CheckResult(const RESULT &x, const RESULT &y)
Verifies x == y, using the custom comparator if set.
bool CheckContext(FunctionContext *context)
Returns false if there is an error set in the context.
const char * error_msg() const
Returns the current error message. Returns NULL if there is no error.
virtual void Update(int idx, FunctionContext *context, INTERMEDIATE *dst)
RESULT ExecuteTwoLevel(int num1, int num2, ScopedFunctionContext *result_context)
bool Execute(const RESULT &expected, UdaExecutionMode mode)
Runs the UDA in all the modes, validating the result is 'expected' each time.
static T CopyIntermediate(FunctionContext *context, int byte_size, const T &src)
uint8_t * Allocate(int byte_size)
bool Execute(const std::vector< INPUT > &values, const RESULT &expected, UdaExecutionMode mode=ALL)
Runs the UDA in all the modes, validating the result is 'expected' each time.
virtual void Update(int idx, FunctionContext *context, INTERMEDIATE *dst)
RESULT ExecuteSingleNode(ScopedFunctionContext *result_context)
static void FreeIntermediate(FunctionContext *context, const T &v)