Impala
Impalaistheopensource,nativeanalyticdatabaseforApacheHadoop.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
scanner-context.h
Go to the documentation of this file.
1 // Copyright 2012 Cloudera Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 
16 #ifndef IMPALA_EXEC_SCANNER_CONTEXT_H
17 #define IMPALA_EXEC_SCANNER_CONTEXT_H
18 
19 #include <boost/cstdint.hpp>
20 #include <boost/scoped_ptr.hpp>
21 
22 #include "common/compiler-util.h"
23 #include "common/status.h"
24 #include "runtime/disk-io-mgr.h"
25 #include "runtime/row-batch.h"
26 
27 namespace impala {
28 
29 struct HdfsFileDesc;
30 class HdfsPartitionDescriptor;
31 class HdfsScanNode;
32 class MemPool;
33 class RowBatch;
34 class RuntimeState;
35 class StringBuffer;
36 class Tuple;
37 class TupleRow;
38 
46 //
56  public:
61  DiskIoMgr::ScanRange* scan_range);
62 
66  class Stream {
67  public:
84  bool GetBytes(int64_t requested_len, uint8_t** buffer, int64_t* out_len,
85  Status* status, bool peek = false);
86 
90  Status GetBuffer(bool peek, uint8_t** buffer, int64_t* out_len);
91 
98 
105  typedef boost::function<int (int64_t)> ReadPastSizeCallback;
107 
109  int64_t bytes_left() { return scan_range_->len() - total_bytes_returned_; }
110 
113  bool eosr() const { return total_bytes_returned_ >= scan_range_->len() || eof(); }
114 
116  bool eof() const { return file_offset() == file_len_; }
117 
118  const char* filename() { return scan_range_->file(); }
120  const HdfsFileDesc* file_desc() { return file_desc_; }
121 
123  int64_t file_offset() const { return scan_range_->offset() + total_bytes_returned_; }
124 
127 
130  bool ReadBoolean(bool* boolean, Status*);
131 
134  bool ReadInt(int32_t* val, Status*, bool peek = false);
135 
138  bool ReadVLong(int64_t* val, Status*);
139 
142  bool ReadVInt(int32_t* val, Status*);
143 
145  bool ReadZLong(int64_t* val, Status*);
146 
148  bool SkipBytes(int64_t length, Status*);
149 
152  bool ReadBytes(int64_t length, uint8_t** buf, Status*, bool peek = false);
153 
157  bool ReadText(uint8_t** buf, int64_t* length, Status*);
158 
160  bool SkipText(Status*);
161 
162  private:
163  friend class ScannerContext;
167 
171 
174 
177  int64_t file_len_;
178 
180 
183 
185  uint8_t* io_buffer_pos_;
186 
189 
197  boost::scoped_ptr<MemPool> boundary_pool_;
198  boost::scoped_ptr<StringBuffer> boundary_buffer_;
201 
205 
209 
214  std::list<DiskIoMgr::BufferDescriptor*> completed_io_buffers_;
215 
216  Stream(ScannerContext* parent);
217 
220  Status GetBytesInternal(int64_t requested_len, uint8_t** buffer, bool peek,
221  int64_t* out_len);
222 
225  //
229  //
234  Status GetNextBuffer(int64_t read_past_size = 0);
235 
239  void ReleaseCompletedResources(RowBatch* batch, bool done);
240 
242  Status ReportIncompleteRead(int64_t length, int64_t bytes_read);
243  Status ReportInvalidRead(int64_t length);
244  };
245 
246  Stream* GetStream(int idx = 0) {
247  DCHECK_GE(idx, 0);
248  DCHECK_LT(idx, streams_.size());
249  return streams_[idx];
250  }
251 
259  //
263  //
268  void ReleaseCompletedResources(RowBatch* batch, bool done);
269 
273 
275  bool cancelled() const;
276 
279 
280  private:
281  friend class Stream;
282 
285 
287 
289  std::vector<Stream*> streams_;
290 
293 };
294 
295 }
296 
297 #endif
void set_read_past_size_cb(ReadPastSizeCallback cb)
bool SkipText(Status *)
Skip this text object.
Status GetBuffer(bool peek, uint8_t **buffer, int64_t *out_len)
int64_t total_bytes_returned()
Returns the total number of bytes returned.
int64_t bytes_left()
Return the number of bytes left in the range for this stream.
bool ReadInt(int32_t *val, Status *, bool peek=false)
int num_completed_io_buffers() const
void ReleaseCompletedResources(RowBatch *batch, bool done)
std::vector< Stream * > streams_
Vector of streams. Non-columnar formats will always have one stream per context.
void ReleaseCompletedResources(RowBatch *batch, bool done)
Status GetNextBuffer(int64_t read_past_size=0)
HdfsScanNode * scan_node_
ScannerContext(RuntimeState *, HdfsScanNode *, HdfsPartitionDescriptor *, DiskIoMgr::ScanRange *scan_range)
const char * file() const
Definition: disk-io-mgr.h:266
HdfsPartitionDescriptor * partition_desc_
bool ReadBoolean(bool *boolean, Status *)
bool ReadVLong(int64_t *val, Status *)
bool cancelled() const
If true, the ScanNode has been cancelled and the scanner thread should finish up. ...
bool ReadZLong(int64_t *val, Status *)
Read a zigzag encoded long.
bool ReadText(uint8_t **buf, int64_t *length, Status *)
int64_t file_offset() const
Returns the buffer's current offset in the file.
Status GetBytesInternal(int64_t requested_len, uint8_t **buffer, bool peek, int64_t *out_len)
bool GetBytes(int64_t requested_len, uint8_t **buffer, int64_t *out_len, Status *status, bool peek=false)
int64_t io_buffer_bytes_left_
Bytes left in io_buffer_.
const HdfsFileDesc * file_desc()
bool eof() const
If true, the stream has reached the end of the file.
DiskIoMgr::ScanRange * scan_range_
bool ReadVInt(int32_t *val, Status *)
bool SkipBytes(int64_t length, Status *)
Skip over the next length bytes in the specified HDFS file.
boost::scoped_ptr< MemPool > boundary_pool_
Stream(ScannerContext *parent)
int64_t total_bytes_returned_
Total number of bytes returned from GetBytes()
uint8_t * io_buffer_pos_
Next byte to read in io_buffer_.
ReadPastSizeCallback read_past_size_cb_
std::list< DiskIoMgr::BufferDescriptor * > completed_io_buffers_
Status ReportIncompleteRead(int64_t length, int64_t bytes_read)
Error-reporting functions.
Metadata for a single partition inside an Hdfs table.
Definition: descriptors.h:177
Status ReportInvalidRead(int64_t length)
const DiskIoMgr::ScanRange * scan_range()
Stream * AddStream(DiskIoMgr::ScanRange *range)
DiskIoMgr::BufferDescriptor * io_buffer_
The current io buffer. This starts as NULL before we've read any bytes.
Stream * GetStream(int idx=0)
boost::scoped_ptr< StringBuffer > boundary_buffer_
const HdfsFileDesc * file_desc_
bool ReadBytes(int64_t length, uint8_t **buf, Status *, bool peek=false)
int num_completed_io_buffers_
Always equal to the sum of completed_io_buffers_.size() across all streams.
HdfsPartitionDescriptor * partition_descriptor()
boost::function< int(int64_t)> ReadPastSizeCallback