Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
scanner-context.cc
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "exec/scanner-context.h"
16 
17 #include "exec/hdfs-scan-node.h"
18 #include "runtime/row-batch.h"
19 #include "runtime/mem-pool.h"
20 #include "runtime/runtime-state.h"
21 #include "runtime/string-buffer.h"
22 #include "util/debug-util.h"
23 
24 #include "common/names.h"
25 
26 using namespace impala;
27 
28 static const int64_t DEFAULT_READ_PAST_SIZE = 1024; // in bytes
29 
30 // We always want output_buffer_bytes_left_ to be non-NULL, so we can avoid a NULL check
31 // in GetBytes(). We use this variable, which is set to 0, to initialize
32 // output_buffer_bytes_left_. After the first successful call to GetBytes(),
33 // output_buffer_bytes_left_ will be set to something else.
34 static const int64_t OUTPUT_BUFFER_BYTES_LEFT_INIT = 0;
35 
37  HdfsPartitionDescriptor* partition_desc, DiskIoMgr::ScanRange* scan_range)
38  : state_(state),
39  scan_node_(scan_node),
40  partition_desc_(partition_desc),
41  num_completed_io_buffers_(0) {
42  AddStream(scan_range);
43 }
44 
46  for (int i = 0; i < streams_.size(); ++i) {
47  streams_[i]->ReleaseCompletedResources(batch, done);
48  }
49  if (done) streams_.clear();
50 }
51 
53  : parent_(parent),
54  boundary_pool_(new MemPool(parent->scan_node_->mem_tracker())),
55  boundary_buffer_(new StringBuffer(boundary_pool_.get())) {
56 }
57 
59  Stream* stream = state_->obj_pool()->Add(new Stream(this));
60  stream->scan_range_ = range;
61  stream->file_desc_ = scan_node_->GetFileDesc(stream->filename());
62  stream->file_len_ = stream->file_desc_->file_length;
63  stream->total_bytes_returned_ = 0;
64  stream->io_buffer_pos_ = NULL;
65  stream->io_buffer_ = NULL;
66  stream->io_buffer_bytes_left_ = 0;
67  stream->boundary_buffer_bytes_left_ = 0;
68  stream->output_buffer_pos_ = NULL;
70  const_cast<int64_t*>(&OUTPUT_BUFFER_BYTES_LEFT_INIT);
71  stream->contains_tuple_data_ = !scan_node_->tuple_desc()->string_slots().empty();
72  streams_.push_back(stream);
73  return stream;
74 }
75 
77  DCHECK((batch != NULL) || (batch == NULL && !contains_tuple_data_));
78  if (done) {
79  // Mark any pending resources as completed
80  if (io_buffer_ != NULL) {
81  ++parent_->num_completed_io_buffers_;
82  completed_io_buffers_.push_back(io_buffer_);
83  }
84  // Set variables to NULL to make sure streams are not used again
85  io_buffer_ = NULL;
86  io_buffer_pos_ = NULL;
87  io_buffer_bytes_left_ = 0;
88  // Cancel the underlying scan range to clean up any queued buffers there
89  scan_range_->Cancel(Status::CANCELLED);
90  }
91 
92  for (list<DiskIoMgr::BufferDescriptor*>::iterator it = completed_io_buffers_.begin();
93  it != completed_io_buffers_.end(); ++it) {
94  if (contains_tuple_data_) {
95  batch->AddIoBuffer(*it);
96  // TODO: We can do row batch compaction here. This is the only place io buffers are
97  // queued. A good heuristic is to check the number of io buffers queued and if
98  // there are too many, we should compact.
99  } else {
100  (*it)->Return();
101  --parent_->scan_node_->num_owned_io_buffers_;
102  }
103  }
104  parent_->num_completed_io_buffers_ -= completed_io_buffers_.size();
105  completed_io_buffers_.clear();
106 
107  if (contains_tuple_data_) {
108  // If we're not done, keep using the last chunk allocated in boundary_pool_ so we
109  // don't have to reallocate. If we are done, transfer it to the row batch.
110  batch->tuple_data_pool()->AcquireData(boundary_pool_.get(), /* keep_current */ !done);
111  }
112  if (done) boundary_pool_->FreeAll();
113 }
114 
116  if (parent_->cancelled()) return Status::CANCELLED;
117 
118  // io_buffer_ should only be null the first time this is called
119  DCHECK(io_buffer_ != NULL ||
120  (total_bytes_returned_ == 0 && completed_io_buffers_.empty()));
121 
122  // We can't use the eosr() function because it reflects how many bytes have been
123  // returned, not if we're fetched all the buffers in the scan range
124  bool eosr = false;
125  if (io_buffer_ != NULL) {
126  eosr = io_buffer_->eosr();
127  ++parent_->num_completed_io_buffers_;
128  completed_io_buffers_.push_back(io_buffer_);
129  io_buffer_ = NULL;
130  }
131 
132  if (!eosr) {
133  SCOPED_TIMER(parent_->state_->total_storage_wait_timer());
134  RETURN_IF_ERROR(scan_range_->GetNext(&io_buffer_));
135  } else {
136  SCOPED_TIMER(parent_->state_->total_storage_wait_timer());
137  int64_t offset = file_offset() + boundary_buffer_bytes_left_;
138 
139  int64_t read_past_buffer_size = read_past_size_cb_.empty() ?
140  DEFAULT_READ_PAST_SIZE : read_past_size_cb_(offset);
141  int64_t file_bytes_remaining = file_desc()->file_length - offset;
142  read_past_buffer_size = ::max(read_past_buffer_size, read_past_size);
143  read_past_buffer_size = ::min(read_past_buffer_size, file_bytes_remaining);
144  // We're reading past the scan range. Be careful not to read past the end of file.
145  DCHECK_GE(read_past_buffer_size, 0);
146  if (read_past_buffer_size == 0) {
147  io_buffer_bytes_left_ = 0;
148  // TODO: We are leaving io_buffer_ = NULL, revisit.
149  return Status::OK;
150  }
151  DiskIoMgr::ScanRange* range = parent_->scan_node_->AllocateScanRange(
152  scan_range_->fs(), filename(), read_past_buffer_size, offset, -1,
153  scan_range_->disk_id(), false, false, scan_range_->mtime());
154  RETURN_IF_ERROR(parent_->state_->io_mgr()->Read(
155  parent_->scan_node_->reader_context(), range, &io_buffer_));
156  }
157 
158  DCHECK(io_buffer_ != NULL);
159  ++parent_->scan_node_->num_owned_io_buffers_;
160  io_buffer_pos_ = reinterpret_cast<uint8_t*>(io_buffer_->buffer());
161  io_buffer_bytes_left_ = io_buffer_->len();
162  if (io_buffer_->len() == 0) {
163  file_len_ = file_offset() + boundary_buffer_bytes_left_;
164  VLOG_FILE << "Unexpectedly read 0 bytes from file=" << filename() << " table="
165  << parent_->scan_node_->hdfs_table()->name()
166  << ". Setting expected file length=" << file_len_;
167  }
168  return Status::OK;
169 }
170 
171 Status ScannerContext::Stream::GetBuffer(bool peek, uint8_t** out_buffer, int64_t* len) {
172  *out_buffer = NULL;
173  *len = 0;
174  if (eosr()) return Status::OK;
175 
176  if (parent_->cancelled()) {
177  DCHECK(*out_buffer == NULL);
178  return Status::CANCELLED;
179  }
180 
181  if (boundary_buffer_bytes_left_ > 0) {
182  DCHECK_EQ(output_buffer_pos_, &boundary_buffer_pos_);
183  DCHECK_EQ(output_buffer_bytes_left_, &boundary_buffer_bytes_left_);
184  *out_buffer = boundary_buffer_pos_;
185  // Don't return more bytes past eosr
186  *len = min(boundary_buffer_bytes_left_, bytes_left());
187  DCHECK_GE(*len, 0);
188  if (!peek) {
189  boundary_buffer_pos_ += *len;
190  boundary_buffer_bytes_left_ -= *len;
191  total_bytes_returned_ += *len;
192  }
193  return Status::OK;
194  }
195 
196  if (io_buffer_bytes_left_ == 0) {
197  // We're at the end of the boundary buffer and the current IO buffer. Get a new IO
198  // buffer and set the current buffer to it.
199  RETURN_IF_ERROR(GetNextBuffer());
200  output_buffer_pos_ = &io_buffer_pos_;
201  output_buffer_bytes_left_ = &io_buffer_bytes_left_;
202  }
203  DCHECK(io_buffer_ != NULL);
204 
205  *out_buffer = io_buffer_pos_;
206  *len = io_buffer_bytes_left_;
207  if (!peek) {
208  io_buffer_bytes_left_ = 0;
209  io_buffer_pos_ += *len;
210  total_bytes_returned_ += *len;
211  }
212  DCHECK_GE(bytes_left(), 0);
213  return Status::OK;
214 }
215 
217  uint8_t** out_buffer, bool peek, int64_t* out_len) {
218  DCHECK_GT(requested_len, boundary_buffer_bytes_left_);
219  *out_buffer = NULL;
220 
221  if (boundary_buffer_bytes_left_ == 0) {
222  if (contains_tuple_data_) {
223  boundary_buffer_->Reset();
224  } else {
225  boundary_buffer_->Clear();
226  }
227  }
228 
229  while (requested_len > boundary_buffer_bytes_left_ + io_buffer_bytes_left_) {
230  // We need to fetch more bytes. Copy the end of the current buffer and fetch the next
231  // one.
232  boundary_buffer_->Append(io_buffer_pos_, io_buffer_bytes_left_);
233  boundary_buffer_bytes_left_ += io_buffer_bytes_left_;
234 
235  RETURN_IF_ERROR(GetNextBuffer());
236  if (parent_->cancelled()) return Status::CANCELLED;
237 
238  if (io_buffer_bytes_left_ == 0) {
239  // No more bytes (i.e. EOF)
240  break;
241  }
242  }
243 
244  // We have enough bytes in io_buffer_ or couldn't read more bytes
245  int64_t requested_bytes_left = requested_len - boundary_buffer_bytes_left_;
246  DCHECK_GE(requested_len, 0);
247  int64_t num_bytes = min(io_buffer_bytes_left_, requested_bytes_left);
248  *out_len = boundary_buffer_bytes_left_ + num_bytes;
249  DCHECK_LE(*out_len, requested_len);
250 
251  if (boundary_buffer_bytes_left_ == 0) {
252  // No stitching, just return the memory
253  output_buffer_pos_ = &io_buffer_pos_;
254  output_buffer_bytes_left_ = &io_buffer_bytes_left_;
255  } else {
256  boundary_buffer_->Append(io_buffer_pos_, num_bytes);
257  boundary_buffer_bytes_left_ += num_bytes;
258  boundary_buffer_pos_ = reinterpret_cast<uint8_t*>(boundary_buffer_->str().ptr) +
259  boundary_buffer_->Size() - boundary_buffer_bytes_left_;
260  io_buffer_bytes_left_ -= num_bytes;
261  io_buffer_pos_ += num_bytes;
262 
263  output_buffer_pos_ = &boundary_buffer_pos_;
264  output_buffer_bytes_left_ = &boundary_buffer_bytes_left_;
265  }
266  *out_buffer = *output_buffer_pos_;
267 
268  if (!peek) {
269  total_bytes_returned_ += *out_len;
270  if (boundary_buffer_bytes_left_ == 0) {
271  io_buffer_bytes_left_ -= num_bytes;
272  io_buffer_pos_ += num_bytes;
273  } else {
274  DCHECK_EQ(boundary_buffer_bytes_left_, *out_len);
275  boundary_buffer_bytes_left_ = 0;
276  }
277  }
278 
279  return Status::OK;
280 }
281 
283  return scan_node_->done_;
284 }
285 
286 Status ScannerContext::Stream::ReportIncompleteRead(int64_t length, int64_t bytes_read) {
287  stringstream ss;
288  ss << "Tried to read " << length << " bytes but could only read "
289  << bytes_read << " bytes. This may indicate data file corruption. "
290  << "(file " << filename() << ", byte offset: " << file_offset() << ")";
291  return Status(ss.str());
292 }
293 
295  stringstream ss;
296  ss << "Invalid read of " << length << " bytes. This may indicate data file corruption. "
297  << "(file " << filename() << ", byte offset: " << file_offset() << ")";
298  return Status(ss.str());
299 }
Status GetBuffer(bool peek, uint8_t **buffer, int64_t *out_len)
static const int64_t DEFAULT_READ_PAST_SIZE
void ReleaseCompletedResources(RowBatch *batch, bool done)
std::vector< Stream * > streams_
Vector of streams. Non-columnar formats will always have one stream per context.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
Definition: status.h:242
void ReleaseCompletedResources(RowBatch *batch, bool done)
Status GetNextBuffer(int64_t read_past_size=0)
HdfsScanNode * scan_node_
ScannerContext(RuntimeState *, HdfsScanNode *, HdfsPartitionDescriptor *, DiskIoMgr::ScanRange *scan_range)
void AcquireData(MemPool *src, bool keep_current)
Definition: mem-pool.cc:161
bool cancelled() const
If true, the ScanNode has been cancelled and the scanner thread should finish up. ...
#define SCOPED_TIMER(c)
const std::vector< SlotDescriptor * > & string_slots() const
Definition: descriptors.h:303
Status GetBytesInternal(int64_t requested_len, uint8_t **buffer, bool peek, int64_t *out_len)
int64_t io_buffer_bytes_left_
Bytes left in io_buffer_.
HdfsFileDesc * GetFileDesc(const std::string &filename)
Returns the file desc for 'filename'. Returns NULL if filename is invalid.
static const int64_t OUTPUT_BUFFER_BYTES_LEFT_INIT
void AddIoBuffer(DiskIoMgr::BufferDescriptor *buffer)
Add io buffer to this row batch.
Definition: row-batch.cc:211
ObjectPool * obj_pool() const
Definition: runtime-state.h:92
DiskIoMgr::ScanRange * scan_range_
Stream(ScannerContext *parent)
int64_t total_bytes_returned_
Total number of bytes returned from GetBytes()
static const Status CANCELLED
Definition: status.h:88
MemPool * tuple_data_pool()
Definition: row-batch.h:148
uint8_t * io_buffer_pos_
Next byte to read in io_buffer_.
static const Status OK
Definition: status.h:87
Status ReportIncompleteRead(int64_t length, int64_t bytes_read)
Error-reporting functions.
uint8_t offset[7 *64-sizeof(uint64_t)]
Metadata for a single partition inside an Hdfs table.
Definition: descriptors.h:177
Status ReportInvalidRead(int64_t length)
Stream * AddStream(DiskIoMgr::ScanRange *range)
DiskIoMgr::BufferDescriptor * io_buffer_
The current io buffer. This starts as NULL before we've read any bytes.
#define VLOG_FILE
Definition: logging.h:58
const HdfsFileDesc * file_desc_
const TupleDescriptor * tuple_desc()