Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
disk-io-mgr-stress.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
16 
17 #include "util/time.h"
18 
19 #include "common/names.h"
20 
21 using namespace impala;
22 
23 static const float ABORT_CHANCE = .10f;
24 static const int MIN_READ_LEN = 1;
25 static const int MAX_READ_LEN = 20;
26 
27 static const int MIN_FILE_LEN = 10;
28 static const int MAX_FILE_LEN = 1024;
29 
30 static const int MAX_BUFFERS = 12;
31 
32 // Make sure this is between MIN/MAX FILE_LEN to test more cases
33 static const int MIN_READ_BUFFER_SIZE = 64;
34 static const int MAX_READ_BUFFER_SIZE = 128;
35 
36 static const int CANCEL_READER_PERIOD_MS = 20; // in ms
37 
38 static void CreateTempFile(const char* filename, const char* data) {
39  FILE* file = fopen(filename, "w");
40  CHECK(file != NULL);
41  fwrite(data, 1, strlen(data), file);
42  fclose(file);
43 }
44 
46  int rand_len = rand() % (MAX_FILE_LEN - MIN_FILE_LEN) + MIN_FILE_LEN;
47  stringstream ss;
48  for (int i = 0; i < rand_len; ++i) {
49  char c = rand() % 26 + 'a';
50  ss << c;
51  }
52  return ss.str();
53 }
54 
56  boost::mutex lock;
58  int file_idx;
59  vector<DiskIoMgr::ScanRange*> scan_ranges;
62 };
63 
64 DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk,
65  int num_clients, bool includes_cancellation) :
66  num_clients_(num_clients),
67  includes_cancellation_(includes_cancellation) {
68 
69  time_t rand_seed = time(NULL);
70  LOG(INFO) << "Running with rand seed: " << rand_seed;
71  srand(rand_seed);
72 
73  io_mgr_.reset(new DiskIoMgr(
74  num_disks, num_threads_per_disk, MIN_READ_BUFFER_SIZE, MAX_READ_BUFFER_SIZE));
75  Status status = io_mgr_->Init(&dummy_tracker_);
76  CHECK(status.ok());
77 
78  // Initialize some data files. It doesn't really matter how many there are.
79  files_.resize(num_clients * 2);
80  for (int i = 0; i < files_.size(); ++i) {
81  stringstream ss;
82  ss << "/tmp/disk_io_mgr_stress_file" << i;
83  files_[i].filename = ss.str();
84  files_[i].data = GenerateRandomData();
85  CreateTempFile(files_[i].filename.c_str(), files_[i].data.c_str());
86  }
87 
89  for (int i = 0; i < num_clients_; ++i) {
90  NewClient(i);
91  }
92 }
93 
94 void DiskIoMgrStress::ClientThread(int client_id) {
95  Client* client = &clients_[client_id];
96  Status status;
97  char read_buffer[MAX_FILE_LEN];
98 
99  while (!shutdown_) {
100  bool eos = false;
101  int bytes_read = 0;
102 
103  const string& expected = files_[client->file_idx].data;
104 
105  while (!eos) {
106  DiskIoMgr::ScanRange* range;
107  Status status = io_mgr_->GetNextRange(client->reader, &range);
108  CHECK(status.ok() || status.IsCancelled());
109  if (range == NULL) break;
110 
111  while (true) {
113  status = range->GetNext(&buffer);
114  CHECK(status.ok() || status.IsCancelled());
115  if (buffer == NULL) break;
116 
117  int64_t scan_range_offset = buffer->scan_range_offset();
118  int len = buffer->len();
119  CHECK_GE(scan_range_offset, 0);
120  CHECK_LT(scan_range_offset, expected.size());
121  CHECK_GT(len, 0);
122 
123  // We get scan ranges back in arbitrary order so the scan range to the file
124  // offset.
125  int64_t file_offset = scan_range_offset + range->offset();
126 
127  // Validate the bytes read
128  CHECK_LE(file_offset + len, expected.size());
129  CHECK_EQ(strncmp(buffer->buffer(), &expected.c_str()[file_offset], len), 0);
130 
131  // Copy the bytes from this read into the result buffer.
132  memcpy(read_buffer + file_offset, buffer->buffer(), buffer->len());
133  buffer->Return();
134  buffer = NULL;
135  bytes_read += len;
136 
137  CHECK_GE(bytes_read, 0);
138  CHECK_LE(bytes_read, expected.size());
139 
140  if (bytes_read > client->abort_at_byte) {
141  eos = true;
142  break;
143  }
144  } // End of buffer
145  } // End of scan range
146 
147  if (bytes_read == expected.size()) {
148  // This entire file was read without being cancelled, validate the entire result
149  CHECK(status.ok());
150  CHECK_EQ(strncmp(read_buffer, expected.c_str(), bytes_read), 0);
151  }
152 
153  // Unregister the old client and get a new one
154  unique_lock<mutex> lock(client->lock);
155  io_mgr_->UnregisterContext(client->reader);
156  NewClient(client_id);
157  }
158 
159  unique_lock<mutex> lock(client->lock);
160  io_mgr_->UnregisterContext(client->reader);
161  client->reader = NULL;
162 }
163 
164 // Cancel a random reader
166  if (!includes_cancellation_) return;
167 
168  int rand_client = rand() % num_clients_;
169 
170  unique_lock<mutex> lock(clients_[rand_client].lock);
171  io_mgr_->CancelContext(clients_[rand_client].reader);
172 }
173 
174 void DiskIoMgrStress::Run(int sec) {
175  shutdown_ = false;
176  for (int i = 0; i < num_clients_; ++i) {
177  readers_.add_thread(
178  new thread(&DiskIoMgrStress::ClientThread, this, i));
179  }
180 
181  // Sleep and let the clients do their thing for 'sec'
182  for (int loop_count = 1; sec == 0 || loop_count <= sec; ++loop_count) {
183  int iter = (1000) / CANCEL_READER_PERIOD_MS;
184  for (int i = 0; i < iter; ++i) {
187  }
188  LOG(ERROR) << "Finished iteration: " << loop_count;
189  }
190 
191  // Signal shutdown for the client threads
192  shutdown_ = true;
193 
194  for (int i = 0; i < num_clients_; ++i) {
195  unique_lock<mutex> lock(clients_[i].lock);
196  if (clients_[i].reader != NULL) io_mgr_->CancelContext(clients_[i].reader);
197  }
198 
199  readers_.join_all();
200 }
201 
202 // Initialize a client to read one of the files at random. The scan ranges are
203 // assigned randomly.
205  Client& client = clients_[i];
206  ++client.files_processed;
207  client.file_idx = rand() % files_.size();
208  int file_len = files_[client.file_idx].data.size();
209 
210  client.abort_at_byte = file_len;
211 
213  float rand_value = rand() / (float)RAND_MAX;
214  if (rand_value < ABORT_CHANCE) {
215  // Abort at a random byte inside the file
216  client.abort_at_byte = rand() % file_len;
217  }
218  }
219 
220  for (int i = 0; i < client.scan_ranges.size(); ++i) {
221  delete client.scan_ranges[i];
222  }
223  client.scan_ranges.clear();
224 
225  int assigned_len = 0;
226  while (assigned_len < file_len) {
227  int range_len = rand() % (MAX_READ_LEN - MIN_READ_LEN) + MIN_READ_LEN;
228  range_len = min(range_len, file_len - assigned_len);
229 
231  range->Reset(NULL, files_[client.file_idx].filename.c_str(), range_len,
232  assigned_len, 0, false, false, DiskIoMgr::ScanRange::NEVER_CACHE);
233  client.scan_ranges.push_back(range);
234  assigned_len += range_len;
235  }
236  Status status = io_mgr_->RegisterContext(&client.reader, NULL);
237  CHECK(status.ok());
238  status = io_mgr_->AddScanRanges(client.reader, client.scan_ranges);
239  CHECK(status.ok());
240 }
static const int MIN_READ_LEN
std::vector< File > files_
static const int CANCEL_READER_PERIOD_MS
DiskIoMgrStress(int num_disks, int num_threads_per_disk, int num_clients, bool includes_cancellation)
static const int MAX_READ_LEN
boost::thread_group readers_
Thread group for reader threads.
int num_clients_
Array of clients.
bool includes_cancellation_
If true, tests cancelling readers.
volatile bool shutdown_
Flag to signal that client reader threads should exit.
boost::scoped_ptr< DiskIoMgr > io_mgr_
io manager
static void CreateTempFile(const char *filename, const char *data)
int64_t scan_range_offset() const
Returns the offset within the scan range that this buffer starts at.
Definition: disk-io-mgr.h:205
void SleepForMs(const int64_t duration_ms)
Sleeps the current thread for at least duration_ms milliseconds.
Definition: time.cc:21
static const int MIN_READ_BUFFER_SIZE
DiskIoMgr::RequestContext * reader
vector< DiskIoMgr::ScanRange * > scan_ranges
bool IsCancelled() const
Definition: status.h:174
static const int MIN_FILE_LEN
void ClientThread(int client_id)
MemTracker dummy_tracker_
Dummy mem tracker.
static const int MAX_READ_BUFFER_SIZE
void Reset(hdfsFS fs, const char *file, int64_t len, int64_t offset, int disk_id, bool try_cache, bool expected_local, int64_t mtime, void *metadata=NULL)
string GenerateRandomData()
static const int MAX_BUFFERS
static const int64_t NEVER_CACHE
If the mtime is set to NEVER_CACHE, the file handle should never be cached.
Definition: disk-io-mgr.h:299
static const float ABORT_CHANCE
Status GetNext(BufferDescriptor **buffer)
void CancelRandomReader()
Possibly cancels a random reader.
static const int MAX_FILE_LEN
bool ok() const
Definition: status.h:172
void Run(int sec)
Run the test for 'sec'. If 0, run forever.