Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
uda-test-harness-impl.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 
16 #ifndef IMPALA_UDA_TEST_HARNESS_IMPL_H
17 #define IMPALA_UDA_TEST_HARNESS_IMPL_H
18 
19 #include <string>
20 #include <sstream>
21 #include <vector>
22 
23 #include <boost/shared_ptr.hpp>
24 
25 namespace impala_udf {
26 
33  public:
34  template<typename T>
35  static T CreateIntermediate(FunctionContext* context, int byte_size) {
36  return T();
37  }
38 
39  template<typename T>
40  static void FreeIntermediate(FunctionContext* context, const T& v) {
42  return;
43  }
44 
47  template<typename T>
48  static T CopyIntermediate(FunctionContext* context, int byte_size, const T& src) {
49  return src;
50  }
51 };
52 
53 template<>
55  FunctionContext* context, int byte_size) {
56  return reinterpret_cast<BufferVal>(context->Allocate(byte_size));
57 }
58 
59 template<>
61  FunctionContext* context, const BufferVal& v) {
62  context->Free(v);
63 }
64 
65 template<>
67  FunctionContext* context, int byte_size, const BufferVal& src) {
68  BufferVal v = reinterpret_cast<BufferVal>(context->Allocate(byte_size));
69  memcpy(v, src, byte_size);
70  return v;
71 }
72 
74 template<typename RESULT, typename INTERMEDIATE>
76  if (context->has_error()) {
77  std::stringstream ss;
78  ss << "UDA set error to: " << context->error_msg();
79  error_msg_ = ss.str();
80  return false;
81  }
82  return true;
83 }
84 
85 template<typename RESULT, typename INTERMEDIATE>
87  const RESULT& x, const RESULT& y) {
88  if (result_comparator_fn_ == NULL) return x == y;
89  return result_comparator_fn_(x, y);
90 }
91 
93 template<typename RESULT, typename INTERMEDIATE>
95  const RESULT& expected, UdaExecutionMode mode) {
96  error_msg_ = "";
97  RESULT result;
98 
99  FunctionContext::TypeDesc return_type;
100  std::vector<FunctionContext::TypeDesc> arg_types; // TODO
101  if (mode == ALL || mode == SINGLE_NODE) {
102  {
103  ScopedFunctionContext context(
104  UdfTestHarness::CreateTestContext(return_type, arg_types), this);
105  result = ExecuteSingleNode(&context);
106  if (error_msg_.empty() && !CheckResult(result, expected)) {
107  std::stringstream ss;
108  ss << "UDA failed running in single node execution." << std::endl
109  << "Expected: " << DebugString(expected)
110  << " Actual: " << DebugString(result);
111  error_msg_ = ss.str();
112  }
113  }
114  if (!error_msg_.empty()) return false;
115  }
116 
117  const int num_nodes[] = { 1, 2, 10, 20, 100 };
118  if (mode == ALL || mode == ONE_LEVEL) {
119  for (int i = 0; i < sizeof(num_nodes) / sizeof(int); ++i) {
120  ScopedFunctionContext context(
121  UdfTestHarness::CreateTestContext(return_type, arg_types), this);
122  result = ExecuteOneLevel(num_nodes[i], &context);
123  if (error_msg_.empty() && !CheckResult(result, expected)) {
124  std::stringstream ss;
125  ss << "UDA failed running in one level distributed mode with "
126  << num_nodes[i] << " nodes." << std::endl
127  << "Expected: " << DebugString(expected)
128  << " Actual: " << DebugString(result);
129  error_msg_ = ss.str();
130  return false;
131  }
132  }
133  if (!error_msg_.empty()) return false;
134  }
135 
136  if (mode == ALL || mode == TWO_LEVEL) {
137  for (int i = 0; i < sizeof(num_nodes) / sizeof(int); ++i) {
138  for (int j = 0; j <= i; ++j) {
139  ScopedFunctionContext context(
140  UdfTestHarness::CreateTestContext(return_type, arg_types), this);
141  result = ExecuteTwoLevel(num_nodes[i], num_nodes[j], &context);
142  if (error_msg_.empty() && !CheckResult(result, expected)) {
143  std::stringstream ss;
144  ss << "UDA failed running in two level distributed mode with "
145  << num_nodes[i] << " nodes in the first level and "
146  << num_nodes[j] << " nodes in the second level." << std::endl
147  << "Expected: " << DebugString(expected)
148  << " Actual: " << DebugString(result);
149  error_msg_ = ss.str();
150  return false;
151  }
152  }
153  }
154  if (!error_msg_.empty()) return false;
155  }
156  return true;
157 }
158 
159 template<typename RESULT, typename INTERMEDIATE>
161  ScopedFunctionContext* context) {
162  INTERMEDIATE intermediate =
163  UdaTestHarnessUtil::CreateIntermediate<INTERMEDIATE>(
164  context->get(), fixed_buffer_byte_size_);
165 
166  init_fn_(context->get(), &intermediate);
167  if (!CheckContext(context->get())) return RESULT::null();
168 
169  for (int i = 0; i < num_input_values_; ++i) {
170  Update(i, context->get(), &intermediate);
171  }
172  if (!CheckContext(context->get())) return RESULT::null();
173 
174  // Single node doesn't need merge or serialize
175  RESULT result = finalize_fn_(context->get(), intermediate);
176  UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(context->get(), intermediate);
177  if (!CheckContext(context->get())) return RESULT::null();
178  return result;
179 }
180 
181 template<typename RESULT, typename INTERMEDIATE>
183  ScopedFunctionContext* result_context) {
184  std::vector<boost::shared_ptr<ScopedFunctionContext> > contexts;
185  std::vector<INTERMEDIATE> intermediates;
186  contexts.resize(num_nodes);
187 
188  FunctionContext::TypeDesc return_type;
189  std::vector<FunctionContext::TypeDesc> arg_types; // TODO
190 
191  for (int i = 0; i < num_nodes; ++i) {
192  FunctionContext* cxt = UdfTestHarness::CreateTestContext(return_type, arg_types);
193  contexts[i].reset(new ScopedFunctionContext(cxt, this));
194  intermediates.push_back(UdaTestHarnessUtil::CreateIntermediate<INTERMEDIATE>(
195  cxt, fixed_buffer_byte_size_));
196  init_fn_(cxt, &intermediates[i]);
197  if (!CheckContext(cxt)) return RESULT::null();
198  }
199 
200  INTERMEDIATE merged =
201  UdaTestHarnessUtil::CreateIntermediate<INTERMEDIATE>(
202  result_context->get(), fixed_buffer_byte_size_);
203  init_fn_(result_context->get(), &merged);
204  if (!CheckContext(result_context->get())) return RESULT::null();
205 
206  // Process all the values in the single level num_nodes contexts
207  for (int i = 0; i < num_input_values_; ++i) {
208  int target = i % num_nodes;
209  Update(i, contexts[target].get()->get(), &intermediates[target]);
210  }
211 
212  // Merge them all into the final
213  for (int i = 0; i < num_nodes; ++i) {
214  if (!CheckContext(contexts[i].get()->get())) return RESULT::null();
215  INTERMEDIATE serialized = intermediates[i];
216  if (serialize_fn_ != NULL) {
217  serialized = serialize_fn_(contexts[i].get()->get(), intermediates[i]);
218  }
219  INTERMEDIATE copy =
220  UdaTestHarnessUtil::CopyIntermediate<INTERMEDIATE>(
221  result_context->get(), fixed_buffer_byte_size_, serialized);
222  UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(
223  contexts[i].get()->get(), intermediates[i]);
224  merge_fn_(result_context->get(), copy, &merged);
225  UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(result_context->get(), copy);
226  if (!CheckContext(contexts[i].get()->get())) return RESULT::null();
227  contexts[i].reset();
228  }
229  if (!CheckContext(result_context->get())) return RESULT::null();
230 
231  RESULT result = finalize_fn_(result_context->get(), merged);
232  UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(result_context->get(), merged);
233  if (!CheckContext(result_context->get())) return RESULT::null();
234  return result;
235 }
236 
237 template<typename RESULT, typename INTERMEDIATE>
239  int num1, int num2, ScopedFunctionContext* result_context) {
240  std::vector<boost::shared_ptr<ScopedFunctionContext> > level1_contexts, level2_contexts;
241  std::vector<INTERMEDIATE> level1_intermediates, level2_intermediates;
242  level1_contexts.resize(num1);
243  level2_contexts.resize(num2);
244 
245  FunctionContext::TypeDesc return_type;
246  std::vector<FunctionContext::TypeDesc> arg_types; // TODO
247 
248  // Initialize the intermediate contexts and intermediate result buffers
249  for (int i = 0; i < num1; ++i) {
250  FunctionContext* cxt = UdfTestHarness::CreateTestContext(return_type, arg_types);
251  level1_contexts[i].reset(new ScopedFunctionContext(cxt, this));
252  level1_intermediates.push_back(
253  UdaTestHarnessUtil::CreateIntermediate<INTERMEDIATE>(
254  cxt, fixed_buffer_byte_size_));
255  init_fn_(cxt, &level1_intermediates[i]);
256  if (!CheckContext(cxt)) return RESULT::null();
257  }
258  for (int i = 0; i < num2; ++i) {
259  FunctionContext* cxt = UdfTestHarness::CreateTestContext(return_type, arg_types);
260  level2_contexts[i].reset(new ScopedFunctionContext(cxt, this));
261  level2_intermediates.push_back(
262  UdaTestHarnessUtil::CreateIntermediate<INTERMEDIATE>(
263  cxt, fixed_buffer_byte_size_));
264  init_fn_(cxt, &level2_intermediates[i]);
265  if (!CheckContext(cxt)) return RESULT::null();
266  }
267 
268  // Initialize the final context and final intermediate buffer
269  INTERMEDIATE final_intermediate =
270  UdaTestHarnessUtil::CreateIntermediate<INTERMEDIATE>(
271  result_context->get(), fixed_buffer_byte_size_);
272  init_fn_(result_context->get(), &final_intermediate);
273  if (!CheckContext(result_context->get())) return RESULT::null();
274 
275  // Assign all the input values to level 1 updates
276  for (int i = 0; i < num_input_values_; ++i) {
277  int target = i % num1;
278  Update(i, level1_contexts[target].get()->get(), &level1_intermediates[target]);
279  }
280 
281  // Serialize the level 1 intermediates and merge them with a level 2 intermediate
282  for (int i = 0; i < num1; ++i) {
283  if (!CheckContext(level1_contexts[i].get()->get())) return RESULT::null();
284  int target = i % num2;
285  INTERMEDIATE serialized = level1_intermediates[i];
286  if (serialize_fn_ != NULL) {
287  serialized = serialize_fn_(level1_contexts[i].get()->get(), level1_intermediates[i]);
288  }
289  INTERMEDIATE copy =
290  UdaTestHarnessUtil::CopyIntermediate<INTERMEDIATE>(
291  level2_contexts[target].get()->get(), fixed_buffer_byte_size_, serialized);
292  UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(
293  level1_contexts[i].get()->get(), level1_intermediates[i]);
294  merge_fn_(level2_contexts[target].get()->get(),
295  copy, &level2_intermediates[target]);
296  UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(
297  level2_contexts[target].get()->get(), copy);
298  if (!CheckContext(level1_contexts[i].get()->get())) return RESULT::null();
299  level1_contexts[i].reset();
300  }
301 
302  // Merge all the level twos into the final
303  for (int i = 0; i < num2; ++i) {
304  if (!CheckContext(level2_contexts[i].get()->get())) return RESULT::null();
305  INTERMEDIATE serialized = level2_intermediates[i];
306  if (serialize_fn_ != NULL) {
307  serialized = serialize_fn_(level2_contexts[i].get()->get(), level2_intermediates[i]);
308  }
309  INTERMEDIATE copy =
310  UdaTestHarnessUtil::CopyIntermediate<INTERMEDIATE>(
311  result_context->get(), fixed_buffer_byte_size_, serialized);
312  UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(
313  level2_contexts[i].get()->get(), level2_intermediates[i]);
314  merge_fn_(result_context->get(), copy, &final_intermediate);
315  UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(
316  result_context->get(), copy);
317  if (!CheckContext(level2_contexts[i].get()->get())) return RESULT::null();
318  level2_contexts[i].reset();
319  }
320  if (!CheckContext(result_context->get())) return RESULT::null();
321 
322  RESULT result = finalize_fn_(result_context->get(), final_intermediate);
323  UdaTestHarnessUtil::FreeIntermediate<INTERMEDIATE>(
324  result_context->get(), final_intermediate);
325  if (!CheckContext(result_context->get())) return RESULT::null();
326  return result;
327 }
328 
329 template<typename RESULT, typename INTERMEDIATE, typename INPUT>
331  const std::vector<INPUT>& values, const RESULT& expected,
332  UdaExecutionMode mode) {
333  input_.resize(values.size());
334  BaseClass::num_input_values_ = values.size();
335  for (int i = 0; i < values.size(); ++i) {
336  input_[i] = &values[i];
337  }
338  return BaseClass::Execute(expected, mode);
339 }
340 
341 template<typename RESULT, typename INTERMEDIATE, typename INPUT>
343  int idx, FunctionContext* context, INTERMEDIATE* dst) {
344  update_fn_(context, *input_[idx], dst);
345 }
346 
348 template<typename RESULT, typename INTERMEDIATE, typename INPUT1, typename INPUT2>
350  const std::vector<INPUT1>& values1, const std::vector<INPUT2>& values2,
351  const RESULT& expected, UdaExecutionMode mode) {
352  if (values1.size() != values2.size()) {
353  BaseClass::error_msg_ =
354  "UdaTestHarness::Execute: values1 and values2 must be the same size.";
355  return false;
356  }
357  input1_ = &values1;
358  input2_ = &values2;
359  BaseClass::num_input_values_ = input1_->size();
360  return BaseClass::Execute(expected, mode);
361 }
362 
363 
364 template<typename RESULT, typename INTERMEDIATE, typename INPUT1, typename INPUT2>
366  int idx, FunctionContext* context, INTERMEDIATE* dst) {
367  update_fn_(context, (*input1_)[idx], (*input2_)[idx], dst);
368 }
369 
370 template<typename RESULT, typename INTERMEDIATE, typename INPUT1, typename INPUT2,
371  typename INPUT3>
373  const std::vector<INPUT1>& values1, const std::vector<INPUT2>& values2,
374  const std::vector<INPUT3>& values3, const RESULT& expected, UdaExecutionMode mode) {
375  if (values1.size() != values2.size() || values1.size() != values3.size()) {
376  BaseClass::error_msg_ =
377  "UdaTestHarness::Execute: input values vectors must be the same size.";
378  return false;
379  }
380  input1_ = &values1;
381  input2_ = &values2;
382  input3_ = &values3;
383  BaseClass::num_input_values_ = input1_->size();
384  return BaseClass::Execute(expected, mode);
385 }
386 
387 template<typename RESULT, typename INTERMEDIATE, typename INPUT1, typename INPUT2,
388  typename INPUT3>
390  int idx, FunctionContext* context, INTERMEDIATE* dst) {
391  update_fn_(context, (*input1_)[idx], (*input2_)[idx], (*input3_)[idx], dst);
392 }
393 
394 template<typename RESULT, typename INTERMEDIATE, typename INPUT1, typename INPUT2,
395  typename INPUT3, typename INPUT4>
397  const std::vector<INPUT1>& values1, const std::vector<INPUT2>& values2,
398  const std::vector<INPUT3>& values3, const std::vector<INPUT4>& values4,
399  const RESULT& expected, UdaExecutionMode mode) {
400  if (values1.size() != values2.size() || values1.size() != values3.size() ||
401  values1.size() != values4.size()) {
402  BaseClass::error_msg_ =
403  "UdaTestHarness::Execute: input values vectors must be the same size.";
404  return false;
405  }
406  input1_ = &values1;
407  input2_ = &values2;
408  input3_ = &values3;
409  input4_ = &values4;
410  BaseClass::num_input_values_ = input1_->size();
411  return BaseClass::Execute(expected, mode);
412 }
413 
414 template<typename RESULT, typename INTERMEDIATE, typename INPUT1, typename INPUT2,
415  typename INPUT3, typename INPUT4>
417  int idx, FunctionContext* context, INTERMEDIATE* dst) {
418  update_fn_(context, (*input1_)[idx], (*input2_)[idx], (*input3_)[idx], (*input4_)[idx],
419  dst);
420 }
421 
422 }
423 
424 #endif
bool Execute(const std::vector< INPUT1 > &values1, const std::vector< INPUT2 > &values2, const std::vector< INPUT3 > &values3, const std::vector< INPUT4 > &values4, const RESULT &expected, UdaExecutionMode mode=ALL)
Runs the UDA in all the modes, validating the result is 'expected' each time.
static FunctionContext * CreateTestContext(const FunctionContext::TypeDesc &return_type, const std::vector< FunctionContext::TypeDesc > &arg_types)
bool Execute(const std::vector< INPUT1 > &values1, const std::vector< INPUT2 > &values2, const RESULT &expected, UdaExecutionMode mode=ALL)
Runs the UDA in all the modes, validating the result is 'expected' each time.
virtual void Update(int idx, FunctionContext *context, INTERMEDIATE *dst)
static T CreateIntermediate(FunctionContext *context, int byte_size)
bool has_error() const
Returns true if there's been an error set.
Definition: udf.cc:253
uint8_t * BufferVal
Definition: udf.h:600
RESULT ExecuteOneLevel(int num_nodes, ScopedFunctionContext *result_context)
std::string DebugString(const T &val)
Definition: udf-debug.h:27
bool Execute(const std::vector< INPUT1 > &values1, const std::vector< INPUT2 > &values2, const std::vector< INPUT3 > &values3, const RESULT &expected, UdaExecutionMode mode=ALL)
Runs the UDA in all the modes, validating the result is 'expected' each time.
virtual void Update(int idx, FunctionContext *context, INTERMEDIATE *dst)
void Free(uint8_t *buffer)
Frees a buffer returned from Allocate() or Reallocate()
Definition: udf.cc:291
bool CheckResult(const RESULT &x, const RESULT &y)
Verifies x == y, using the custom comparator if set.
bool CheckContext(FunctionContext *context)
Returns false if there is an error set in the context.
const char * error_msg() const
Returns the current error message. Returns NULL if there is no error.
Definition: udf.cc:257
virtual void Update(int idx, FunctionContext *context, INTERMEDIATE *dst)
RESULT ExecuteTwoLevel(int num1, int num2, ScopedFunctionContext *result_context)
bool Execute(const RESULT &expected, UdaExecutionMode mode)
Runs the UDA in all the modes, validating the result is 'expected' each time.
static T CopyIntermediate(FunctionContext *context, int byte_size, const T &src)
uint8_t * Allocate(int byte_size)
Definition: udf.cc:262
bool Execute(const std::vector< INPUT > &values, const RESULT &expected, UdaExecutionMode mode=ALL)
Runs the UDA in all the modes, validating the result is 'expected' each time.
virtual void Update(int idx, FunctionContext *context, INTERMEDIATE *dst)
RESULT ExecuteSingleNode(ScopedFunctionContext *result_context)
static void FreeIntermediate(FunctionContext *context, const T &v)