21 using namespace impala;
39 FILE* file = fopen(filename,
"w");
41 fwrite(data, 1, strlen(data), file);
48 for (
int i = 0; i < rand_len; ++i) {
49 char c = rand() % 26 +
'a';
65 int num_clients,
bool includes_cancellation) :
66 num_clients_(num_clients),
67 includes_cancellation_(includes_cancellation) {
69 time_t rand_seed = time(NULL);
70 LOG(INFO) <<
"Running with rand seed: " << rand_seed;
79 files_.resize(num_clients * 2);
80 for (
int i = 0; i <
files_.size(); ++i) {
82 ss <<
"/tmp/disk_io_mgr_stress_file" << i;
83 files_[i].filename = ss.str();
109 if (range == NULL)
break;
113 status = range->
GetNext(&buffer);
115 if (buffer == NULL)
break;
118 int len = buffer->
len();
119 CHECK_GE(scan_range_offset, 0);
120 CHECK_LT(scan_range_offset, expected.size());
125 int64_t file_offset = scan_range_offset + range->
offset();
128 CHECK_LE(file_offset + len, expected.size());
129 CHECK_EQ(strncmp(buffer->
buffer(), &expected.c_str()[file_offset], len), 0);
132 memcpy(read_buffer + file_offset, buffer->
buffer(), buffer->
len());
137 CHECK_GE(bytes_read, 0);
138 CHECK_LE(bytes_read, expected.size());
147 if (bytes_read == expected.size()) {
150 CHECK_EQ(strncmp(read_buffer, expected.c_str(), bytes_read), 0);
154 unique_lock<mutex> lock(client->
lock);
159 unique_lock<mutex> lock(client->
lock);
170 unique_lock<mutex> lock(
clients_[rand_client].lock);
182 for (
int loop_count = 1; sec == 0 || loop_count <= sec; ++loop_count) {
184 for (
int i = 0; i < iter; ++i) {
188 LOG(ERROR) <<
"Finished iteration: " << loop_count;
195 unique_lock<mutex> lock(
clients_[i].lock);
213 float rand_value = rand() / (float)RAND_MAX;
220 for (
int i = 0; i < client.
scan_ranges.size(); ++i) {
225 int assigned_len = 0;
226 while (assigned_len < file_len) {
228 range_len = min(range_len, file_len - assigned_len);
234 assigned_len += range_len;
static const int MIN_READ_LEN
std::vector< File > files_
static const int CANCEL_READER_PERIOD_MS
DiskIoMgrStress(int num_disks, int num_threads_per_disk, int num_clients, bool includes_cancellation)
static const int MAX_READ_LEN
boost::thread_group readers_
Thread group for reader threads.
int num_clients_
Array of clients.
bool includes_cancellation_
If true, tests cancelling readers.
volatile bool shutdown_
Flag to signal that client reader threads should exit.
boost::scoped_ptr< DiskIoMgr > io_mgr_
io manager
static void CreateTempFile(const char *filename, const char *data)
int64_t scan_range_offset() const
Returns the offset within the scan range that this buffer starts at.
void SleepForMs(const int64_t duration_ms)
Sleeps the current thread for at least duration_ms milliseconds.
static const int MIN_READ_BUFFER_SIZE
DiskIoMgr::RequestContext * reader
vector< DiskIoMgr::ScanRange * > scan_ranges
static const int MIN_FILE_LEN
void ClientThread(int client_id)
MemTracker dummy_tracker_
Dummy mem tracker.
static const int MAX_READ_BUFFER_SIZE
void Reset(hdfsFS fs, const char *file, int64_t len, int64_t offset, int disk_id, bool try_cache, bool expected_local, int64_t mtime, void *metadata=NULL)
string GenerateRandomData()
static const int MAX_BUFFERS
static const int64_t NEVER_CACHE
If the mtime is set to NEVER_CACHE, the file handle should never be cached.
static const float ABORT_CHANCE
Status GetNext(BufferDescriptor **buffer)
void CancelRandomReader()
Possibly cancels a random reader.
static const int MAX_FILE_LEN
void Run(int sec)
Run the test for 'sec'. If 0, run forever.