Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
data-provider.cc
Go to the documentation of this file.
2 #include <stdlib.h>
3 #include <math.h>
4 #include <iostream>
5 
6 #include "common/names.h"
7 
8 using boost::minstd_rand;
9 using boost::uniform_real;
10 using boost::variate_generator;
11 using namespace impala;
12 
14  pool_(pool),
15  profile_(profile),
16  num_rows_(0),
17  batch_size_(0),
18  rows_returned_(0),
19  data_(NULL),
20  row_size_(0) {
21  SetSeed(0);
22 
23  bytes_generated_ = ADD_COUNTER(profile, "BytesGenerated", TUnit::BYTES);
24 }
25 
26 void DataProvider::Reset(int num_rows, int batch_size, const vector<DataProvider::ColDesc>& cols) {
27  num_rows_ = num_rows;
28  batch_size_ = batch_size;
29  rows_returned_ = 0;
30  row_size_ = 0;
31  cols_ = cols;
32  for (int i = 0; i < cols_.size(); ++i) {
33  row_size_ += cols[i].bytes;
34  }
35  data_.reset(new char[row_size_ * batch_size_]);
37 }
38 
39 void DataProvider::SetSeed(int seed) {
40  rand_generator_.seed(seed);
41 }
42 
44  const StringValue& min, const StringValue& max, double r,
45  variate_generator<minstd_rand&, uniform_real<> >& rand) {
46  int min_len = min.len;
47  int max_len = max.len;
48  int len = r * (max_len - min_len) + min_len;
49  char* ptr = reinterpret_cast<char*>(pool->Allocate(len));
50  result->len = len;
51  result->ptr = ptr;
52 
53  for (int i = 0; i < len; ++i) {
54  int min_char = i < min_len ? min.ptr[i] : 'a';
55  int max_char = (i < max_len ? max.ptr[i] : 'z') + 1;
56  ptr[i] = rand() * (max_char - min_char) + min_char;
57  }
58 }
59 
60 void* DataProvider::NextBatch(int* rows_returned) {
61  int num_rows = min(batch_size_, num_rows_ - rows_returned_);
62  *rows_returned = num_rows;
63  if (num_rows == 0) return NULL;
65 
66  uniform_real<> dist(0,1);
67  variate_generator<minstd_rand&, uniform_real<> > rand_double(rand_generator_, dist);
68 
69  char* data = data_.get();
70  for (int i = 0, row_idx = rows_returned_; i < num_rows; ++i, ++row_idx) {
71  for (int j = 0; j < cols_.size(); ++j) {
72  double r = rand_double();
73  const ColDesc& col = cols_[j];
74  switch (col.type) {
75  case TYPE_BOOLEAN:
76  *reinterpret_cast<bool*>(data) = col.Generate<bool>(r, row_idx);
77  break;
78  case TYPE_TINYINT:
79  *reinterpret_cast<int8_t*>(data) = col.Generate<int8_t>(r, row_idx);
80  break;
81  case TYPE_SMALLINT:
82  *reinterpret_cast<int16_t*>(data) = col.Generate<int16_t>(r, row_idx);
83  break;
84  case TYPE_INT:
85  *reinterpret_cast<int32_t*>(data) = col.Generate<int32_t>(r, row_idx);
86  break;
87  case TYPE_BIGINT:
88  *reinterpret_cast<int64_t*>(data) = col.Generate<int64_t>(r, row_idx);
89  break;
90  case TYPE_FLOAT:
91  *reinterpret_cast<float*>(data) = col.Generate<float>(r, row_idx);
92  break;
93  case TYPE_DOUBLE:
94  *reinterpret_cast<double*>(data) = col.Generate<double>(r, row_idx);
95  break;
96  case TYPE_VARCHAR:
97  case TYPE_STRING: {
98  // TODO: generate sequential strings
99  StringValue* str = reinterpret_cast<StringValue*>(data);
100  RandString(pool_, str, col.min.s, col.max.s, r, rand_double);
101  break;
102  }
103  default:
104  break;
105  }
106  data += col.bytes;
107  }
108  }
109  rows_returned_ += num_rows;
110  return reinterpret_cast<void*>(data_.get());
111 }
112 
113 void DataProvider::Print(ostream* stream, char* data, int rows) const {
114  char* next_col = reinterpret_cast<char*>(data);
115  for (int i = 0; i < rows; ++i) {
116  for (int j = 0; j < cols_.size(); ++j) {
117  switch (cols_[j].type) {
118  case TYPE_BOOLEAN:
119  *stream << (*reinterpret_cast<int8_t*>(next_col) ? "true" : "false");
120  break;
121  case TYPE_TINYINT:
122  *stream << (int)*reinterpret_cast<int8_t*>(next_col);
123  break;
124  case TYPE_SMALLINT:
125  *stream << *reinterpret_cast<int16_t*>(next_col);
126  break;
127  case TYPE_INT:
128  *stream << *reinterpret_cast<int32_t*>(next_col);
129  break;
130  case TYPE_BIGINT:
131  *stream << *reinterpret_cast<int64_t*>(next_col);
132  break;
133  case TYPE_FLOAT:
134  *stream << *reinterpret_cast<float*>(next_col);
135  break;
136  case TYPE_DOUBLE:
137  *stream << *reinterpret_cast<double*>(next_col);
138  break;
139  case TYPE_STRING:
140  case TYPE_VARCHAR:
141  *stream << *reinterpret_cast<StringValue*>(next_col);
142  break;
143  default:
144  *stream << "BAD" << endl;
145  return;
146  }
147  if (j != cols_.size() - 1) *stream << ", ";
148  next_col += cols_[j].bytes;
149  }
150  *stream << endl;
151  }
152 }
void SetSeed(int seed)
void Reset(int num_rows, int batch_size, const std::vector< ColDesc > &columns)
void * NextBatch(int *rows_returned)
#define COUNTER_ADD(c, v)
impala::StringValue s
Definition: data-provider.h:45
impala::RuntimeProfile::Counter * bytes_generated_
boost::minstd_rand rand_generator_
T Generate(double d, int i) const
std::vector< ColDesc > cols_
ObjectPool pool
#define ADD_COUNTER(profile, name, unit)
impala::PrimitiveType type
Definition: data-provider.h:86
#define COUNTER_SET(c, v)
boost::scoped_ptr< char > data_
void RandString(MemPool *pool, StringValue *result, const StringValue &min, const StringValue &max, double r, variate_generator< minstd_rand &, uniform_real<> > &rand)
impala::MemPool * pool_
DataProvider(impala::MemPool *pool, impala::RuntimeProfile *profile)
void Print(std::ostream *, char *data, int num_rows) const
Print the row data in csv format.
uint8_t * Allocate(int size)
Definition: mem-pool.h:92