26 using namespace impala;
39 scan_node_(scan_node),
40 partition_desc_(partition_desc),
41 num_completed_io_buffers_(0) {
46 for (
int i = 0; i <
streams_.size(); ++i) {
47 streams_[i]->ReleaseCompletedResources(batch, done);
54 boundary_pool_(new
MemPool(parent->scan_node_->mem_tracker())),
55 boundary_buffer_(new
StringBuffer(boundary_pool_.get())) {
77 DCHECK((batch != NULL) || (batch == NULL && !contains_tuple_data_));
80 if (io_buffer_ != NULL) {
81 ++parent_->num_completed_io_buffers_;
82 completed_io_buffers_.push_back(io_buffer_);
86 io_buffer_pos_ = NULL;
87 io_buffer_bytes_left_ = 0;
92 for (list<DiskIoMgr::BufferDescriptor*>::iterator it = completed_io_buffers_.begin();
93 it != completed_io_buffers_.end(); ++it) {
94 if (contains_tuple_data_) {
101 --parent_->scan_node_->num_owned_io_buffers_;
104 parent_->num_completed_io_buffers_ -= completed_io_buffers_.size();
105 completed_io_buffers_.clear();
107 if (contains_tuple_data_) {
112 if (done) boundary_pool_->FreeAll();
119 DCHECK(io_buffer_ != NULL ||
120 (total_bytes_returned_ == 0 && completed_io_buffers_.empty()));
125 if (io_buffer_ != NULL) {
126 eosr = io_buffer_->eosr();
127 ++parent_->num_completed_io_buffers_;
128 completed_io_buffers_.push_back(io_buffer_);
133 SCOPED_TIMER(parent_->state_->total_storage_wait_timer());
136 SCOPED_TIMER(parent_->state_->total_storage_wait_timer());
137 int64_t
offset = file_offset() + boundary_buffer_bytes_left_;
139 int64_t read_past_buffer_size = read_past_size_cb_.empty() ?
141 int64_t file_bytes_remaining = file_desc()->file_length -
offset;
142 read_past_buffer_size = ::max(read_past_buffer_size, read_past_size);
143 read_past_buffer_size = ::min(read_past_buffer_size, file_bytes_remaining);
145 DCHECK_GE(read_past_buffer_size, 0);
146 if (read_past_buffer_size == 0) {
147 io_buffer_bytes_left_ = 0;
152 scan_range_->fs(), filename(), read_past_buffer_size,
offset, -1,
153 scan_range_->disk_id(),
false,
false, scan_range_->mtime());
155 parent_->scan_node_->reader_context(), range, &io_buffer_));
158 DCHECK(io_buffer_ != NULL);
159 ++parent_->scan_node_->num_owned_io_buffers_;
160 io_buffer_pos_ =
reinterpret_cast<uint8_t*
>(io_buffer_->buffer());
161 io_buffer_bytes_left_ = io_buffer_->len();
162 if (io_buffer_->len() == 0) {
163 file_len_ = file_offset() + boundary_buffer_bytes_left_;
164 VLOG_FILE <<
"Unexpectedly read 0 bytes from file=" << filename() <<
" table="
165 << parent_->scan_node_->hdfs_table()->name()
166 <<
". Setting expected file length=" << file_len_;
176 if (parent_->cancelled()) {
177 DCHECK(*out_buffer == NULL);
181 if (boundary_buffer_bytes_left_ > 0) {
182 DCHECK_EQ(output_buffer_pos_, &boundary_buffer_pos_);
183 DCHECK_EQ(output_buffer_bytes_left_, &boundary_buffer_bytes_left_);
184 *out_buffer = boundary_buffer_pos_;
186 *len = min(boundary_buffer_bytes_left_, bytes_left());
189 boundary_buffer_pos_ += *len;
190 boundary_buffer_bytes_left_ -= *len;
191 total_bytes_returned_ += *len;
196 if (io_buffer_bytes_left_ == 0) {
200 output_buffer_pos_ = &io_buffer_pos_;
201 output_buffer_bytes_left_ = &io_buffer_bytes_left_;
203 DCHECK(io_buffer_ != NULL);
205 *out_buffer = io_buffer_pos_;
206 *len = io_buffer_bytes_left_;
208 io_buffer_bytes_left_ = 0;
209 io_buffer_pos_ += *len;
210 total_bytes_returned_ += *len;
212 DCHECK_GE(bytes_left(), 0);
217 uint8_t** out_buffer,
bool peek, int64_t* out_len) {
218 DCHECK_GT(requested_len, boundary_buffer_bytes_left_);
221 if (boundary_buffer_bytes_left_ == 0) {
222 if (contains_tuple_data_) {
223 boundary_buffer_->Reset();
225 boundary_buffer_->Clear();
229 while (requested_len > boundary_buffer_bytes_left_ + io_buffer_bytes_left_) {
232 boundary_buffer_->Append(io_buffer_pos_, io_buffer_bytes_left_);
233 boundary_buffer_bytes_left_ += io_buffer_bytes_left_;
238 if (io_buffer_bytes_left_ == 0) {
245 int64_t requested_bytes_left = requested_len - boundary_buffer_bytes_left_;
246 DCHECK_GE(requested_len, 0);
247 int64_t num_bytes = min(io_buffer_bytes_left_, requested_bytes_left);
248 *out_len = boundary_buffer_bytes_left_ + num_bytes;
249 DCHECK_LE(*out_len, requested_len);
251 if (boundary_buffer_bytes_left_ == 0) {
253 output_buffer_pos_ = &io_buffer_pos_;
254 output_buffer_bytes_left_ = &io_buffer_bytes_left_;
256 boundary_buffer_->Append(io_buffer_pos_, num_bytes);
257 boundary_buffer_bytes_left_ += num_bytes;
258 boundary_buffer_pos_ =
reinterpret_cast<uint8_t*
>(boundary_buffer_->str().ptr) +
259 boundary_buffer_->Size() - boundary_buffer_bytes_left_;
260 io_buffer_bytes_left_ -= num_bytes;
261 io_buffer_pos_ += num_bytes;
263 output_buffer_pos_ = &boundary_buffer_pos_;
264 output_buffer_bytes_left_ = &boundary_buffer_bytes_left_;
266 *out_buffer = *output_buffer_pos_;
269 total_bytes_returned_ += *out_len;
270 if (boundary_buffer_bytes_left_ == 0) {
271 io_buffer_bytes_left_ -= num_bytes;
272 io_buffer_pos_ += num_bytes;
274 DCHECK_EQ(boundary_buffer_bytes_left_, *out_len);
275 boundary_buffer_bytes_left_ = 0;
288 ss <<
"Tried to read " << length <<
" bytes but could only read "
289 << bytes_read <<
" bytes. This may indicate data file corruption. "
290 <<
"(file " << filename() <<
", byte offset: " << file_offset() <<
")";
296 ss <<
"Invalid read of " << length <<
" bytes. This may indicate data file corruption. "
297 <<
"(file " << filename() <<
", byte offset: " << file_offset() <<
")";
bool contains_tuple_data_
uint8_t ** output_buffer_pos_
Status GetBuffer(bool peek, uint8_t **buffer, int64_t *out_len)
static const int64_t DEFAULT_READ_PAST_SIZE
void ReleaseCompletedResources(RowBatch *batch, bool done)
std::vector< Stream * > streams_
Vector of streams. Non-columnar formats will always have one stream per context.
#define RETURN_IF_ERROR(stmt)
some generally useful macros
void ReleaseCompletedResources(RowBatch *batch, bool done)
Status GetNextBuffer(int64_t read_past_size=0)
int64_t boundary_buffer_bytes_left_
HdfsScanNode * scan_node_
ScannerContext(RuntimeState *, HdfsScanNode *, HdfsPartitionDescriptor *, DiskIoMgr::ScanRange *scan_range)
void AcquireData(MemPool *src, bool keep_current)
bool cancelled() const
If true, the ScanNode has been cancelled and the scanner thread should finish up. ...
int64_t * output_buffer_bytes_left_
const std::vector< SlotDescriptor * > & string_slots() const
Status GetBytesInternal(int64_t requested_len, uint8_t **buffer, bool peek, int64_t *out_len)
int64_t io_buffer_bytes_left_
Bytes left in io_buffer_.
HdfsFileDesc * GetFileDesc(const std::string &filename)
Returns the file desc for 'filename'. Returns NULL if filename is invalid.
static const int64_t OUTPUT_BUFFER_BYTES_LEFT_INIT
void AddIoBuffer(DiskIoMgr::BufferDescriptor *buffer)
Add io buffer to this row batch.
ObjectPool * obj_pool() const
DiskIoMgr::ScanRange * scan_range_
Stream(ScannerContext *parent)
int64_t total_bytes_returned_
Total number of bytes returned from GetBytes()
static const Status CANCELLED
MemPool * tuple_data_pool()
uint8_t * io_buffer_pos_
Next byte to read in io_buffer_.
Status ReportIncompleteRead(int64_t length, int64_t bytes_read)
Error-reporting functions.
uint8_t offset[7 *64-sizeof(uint64_t)]
Metadata for a single partition inside an Hdfs table.
Status ReportInvalidRead(int64_t length)
Stream * AddStream(DiskIoMgr::ScanRange *range)
DiskIoMgr::BufferDescriptor * io_buffer_
The current io buffer. This starts as NULL before we've read any bytes.
const HdfsFileDesc * file_desc_
const TupleDescriptor * tuple_desc()