mirror of https://github.com/rclone/rclone
chunkedreader: make parallel chunk reader FIXME WIP
This commit is contained in:
parent
eac8996d84
commit
c4d4c656a4
|
@ -5,10 +5,8 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/rclone/rclone/fs"
|
"github.com/rclone/rclone/fs"
|
||||||
"github.com/rclone/rclone/fs/hash"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// io related errors returned by ChunkedReader
|
// io related errors returned by ChunkedReader
|
||||||
|
@ -17,23 +15,13 @@ var (
|
||||||
ErrorInvalidSeek = errors.New("invalid seek position")
|
ErrorInvalidSeek = errors.New("invalid seek position")
|
||||||
)
|
)
|
||||||
|
|
||||||
// ChunkedReader is a reader for an Object with the possibility
|
// ChunkedReader describes what a chunked reader can do.
|
||||||
// of reading the source in chunks of given size
|
type ChunkedReader interface {
|
||||||
//
|
io.Reader
|
||||||
// An initialChunkSize of <= 0 will disable chunked reading.
|
io.Seeker
|
||||||
type ChunkedReader struct {
|
io.Closer
|
||||||
ctx context.Context
|
fs.RangeSeeker
|
||||||
mu sync.Mutex // protects following fields
|
Open() (ChunkedReader, error)
|
||||||
o fs.Object // source to read from
|
|
||||||
rc io.ReadCloser // reader for the current open chunk
|
|
||||||
offset int64 // offset the next Read will start. -1 forces a reopen of o
|
|
||||||
chunkOffset int64 // beginning of the current or next chunk
|
|
||||||
chunkSize int64 // length of the current or next chunk. -1 will open o from chunkOffset to the end
|
|
||||||
initialChunkSize int64 // default chunkSize after the chunk specified by RangeSeek is complete
|
|
||||||
maxChunkSize int64 // consecutive read chunks will double in size until reached. -1 means no limit
|
|
||||||
streams int // number of streams to use
|
|
||||||
customChunkSize bool // is the current chunkSize set by RangeSeek?
|
|
||||||
closed bool // has Close been called?
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a ChunkedReader for the Object.
|
// New returns a ChunkedReader for the Object.
|
||||||
|
@ -42,211 +30,18 @@ type ChunkedReader struct {
|
||||||
// If maxChunkSize is greater than initialChunkSize, the chunk size will be
|
// If maxChunkSize is greater than initialChunkSize, the chunk size will be
|
||||||
// doubled after each chunk read with a maximum of maxChunkSize.
|
// doubled after each chunk read with a maximum of maxChunkSize.
|
||||||
// A Seek or RangeSeek will reset the chunk size to it's initial value
|
// A Seek or RangeSeek will reset the chunk size to it's initial value
|
||||||
func New(ctx context.Context, o fs.Object, initialChunkSize int64, maxChunkSize int64, streams int) *ChunkedReader {
|
func New(ctx context.Context, o fs.Object, initialChunkSize int64, maxChunkSize int64, streams int) ChunkedReader {
|
||||||
if initialChunkSize <= 0 {
|
if initialChunkSize <= 0 {
|
||||||
initialChunkSize = -1
|
initialChunkSize = -1
|
||||||
}
|
}
|
||||||
if maxChunkSize != -1 && maxChunkSize < initialChunkSize {
|
if maxChunkSize != -1 && maxChunkSize < initialChunkSize {
|
||||||
maxChunkSize = initialChunkSize
|
maxChunkSize = initialChunkSize
|
||||||
}
|
}
|
||||||
return &ChunkedReader{
|
if streams < 0 {
|
||||||
ctx: ctx,
|
streams = 0
|
||||||
o: o,
|
|
||||||
offset: -1,
|
|
||||||
chunkSize: initialChunkSize,
|
|
||||||
initialChunkSize: initialChunkSize,
|
|
||||||
maxChunkSize: maxChunkSize,
|
|
||||||
streams: streams,
|
|
||||||
}
|
}
|
||||||
|
if streams == 0 || o.Size() < 0 {
|
||||||
|
return newSequential(ctx, o, initialChunkSize, maxChunkSize)
|
||||||
|
}
|
||||||
|
return newParallel(ctx, o, initialChunkSize, streams)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read from the file - for details see io.Reader
|
|
||||||
func (cr *ChunkedReader) Read(p []byte) (n int, err error) {
|
|
||||||
cr.mu.Lock()
|
|
||||||
defer cr.mu.Unlock()
|
|
||||||
|
|
||||||
if cr.closed {
|
|
||||||
return 0, ErrorFileClosed
|
|
||||||
}
|
|
||||||
|
|
||||||
for reqSize := int64(len(p)); reqSize > 0; reqSize = int64(len(p)) {
|
|
||||||
// the current chunk boundary. valid only when chunkSize > 0
|
|
||||||
chunkEnd := cr.chunkOffset + cr.chunkSize
|
|
||||||
|
|
||||||
fs.Debugf(cr.o, "ChunkedReader.Read at %d length %d chunkOffset %d chunkSize %d", cr.offset, reqSize, cr.chunkOffset, cr.chunkSize)
|
|
||||||
|
|
||||||
switch {
|
|
||||||
case cr.chunkSize > 0 && cr.offset == chunkEnd: // last chunk read completely
|
|
||||||
cr.chunkOffset = cr.offset
|
|
||||||
if cr.customChunkSize { // last chunkSize was set by RangeSeek
|
|
||||||
cr.customChunkSize = false
|
|
||||||
cr.chunkSize = cr.initialChunkSize
|
|
||||||
} else {
|
|
||||||
cr.chunkSize *= 2
|
|
||||||
if cr.chunkSize > cr.maxChunkSize && cr.maxChunkSize != -1 {
|
|
||||||
cr.chunkSize = cr.maxChunkSize
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// recalculate the chunk boundary. valid only when chunkSize > 0
|
|
||||||
chunkEnd = cr.chunkOffset + cr.chunkSize
|
|
||||||
fallthrough
|
|
||||||
case cr.offset == -1: // first Read or Read after RangeSeek
|
|
||||||
err = cr.openRange()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var buf []byte
|
|
||||||
chunkRest := chunkEnd - cr.offset
|
|
||||||
// limit read to chunk boundaries if chunkSize > 0
|
|
||||||
if reqSize > chunkRest && cr.chunkSize > 0 {
|
|
||||||
buf, p = p[0:chunkRest], p[chunkRest:]
|
|
||||||
} else {
|
|
||||||
buf, p = p, nil
|
|
||||||
}
|
|
||||||
var rn int
|
|
||||||
rn, err = io.ReadFull(cr.rc, buf)
|
|
||||||
n += rn
|
|
||||||
cr.offset += int64(rn)
|
|
||||||
if err != nil {
|
|
||||||
if err == io.ErrUnexpectedEOF {
|
|
||||||
err = io.EOF
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return n, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close the file - for details see io.Closer
|
|
||||||
//
|
|
||||||
// All methods on ChunkedReader will return ErrorFileClosed afterwards
|
|
||||||
func (cr *ChunkedReader) Close() error {
|
|
||||||
cr.mu.Lock()
|
|
||||||
defer cr.mu.Unlock()
|
|
||||||
|
|
||||||
if cr.closed {
|
|
||||||
return ErrorFileClosed
|
|
||||||
}
|
|
||||||
cr.closed = true
|
|
||||||
|
|
||||||
return cr.resetReader(nil, 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Seek the file - for details see io.Seeker
|
|
||||||
func (cr *ChunkedReader) Seek(offset int64, whence int) (int64, error) {
|
|
||||||
return cr.RangeSeek(context.TODO(), offset, whence, -1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// RangeSeek the file - for details see RangeSeeker
|
|
||||||
//
|
|
||||||
// The specified length will only apply to the next chunk opened.
|
|
||||||
// RangeSeek will not reopen the source until Read is called.
|
|
||||||
func (cr *ChunkedReader) RangeSeek(ctx context.Context, offset int64, whence int, length int64) (int64, error) {
|
|
||||||
cr.mu.Lock()
|
|
||||||
defer cr.mu.Unlock()
|
|
||||||
|
|
||||||
fs.Debugf(cr.o, "ChunkedReader.RangeSeek from %d to %d length %d", cr.offset, offset, length)
|
|
||||||
|
|
||||||
if cr.closed {
|
|
||||||
return 0, ErrorFileClosed
|
|
||||||
}
|
|
||||||
|
|
||||||
size := cr.o.Size()
|
|
||||||
switch whence {
|
|
||||||
case io.SeekStart:
|
|
||||||
cr.offset = 0
|
|
||||||
case io.SeekEnd:
|
|
||||||
cr.offset = size
|
|
||||||
}
|
|
||||||
// set the new chunk start
|
|
||||||
cr.chunkOffset = cr.offset + offset
|
|
||||||
// force reopen on next Read
|
|
||||||
cr.offset = -1
|
|
||||||
if length > 0 {
|
|
||||||
cr.customChunkSize = true
|
|
||||||
cr.chunkSize = length
|
|
||||||
} else {
|
|
||||||
cr.chunkSize = cr.initialChunkSize
|
|
||||||
}
|
|
||||||
if cr.chunkOffset < 0 || cr.chunkOffset >= size {
|
|
||||||
cr.chunkOffset = 0
|
|
||||||
return 0, ErrorInvalidSeek
|
|
||||||
}
|
|
||||||
return cr.chunkOffset, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Open forces the connection to be opened
|
|
||||||
func (cr *ChunkedReader) Open() (*ChunkedReader, error) {
|
|
||||||
cr.mu.Lock()
|
|
||||||
defer cr.mu.Unlock()
|
|
||||||
|
|
||||||
if cr.rc != nil && cr.offset != -1 {
|
|
||||||
return cr, nil
|
|
||||||
}
|
|
||||||
return cr, cr.openRange()
|
|
||||||
}
|
|
||||||
|
|
||||||
// openRange will open the source Object with the current chunk range
|
|
||||||
//
|
|
||||||
// If the current open reader implements RangeSeeker, it is tried first.
|
|
||||||
// When RangeSeek fails, o.Open with a RangeOption is used.
|
|
||||||
//
|
|
||||||
// A length <= 0 will request till the end of the file
|
|
||||||
func (cr *ChunkedReader) openRange() error {
|
|
||||||
offset, length := cr.chunkOffset, cr.chunkSize
|
|
||||||
fs.Debugf(cr.o, "ChunkedReader.openRange at %d length %d", offset, length)
|
|
||||||
|
|
||||||
if cr.closed {
|
|
||||||
return ErrorFileClosed
|
|
||||||
}
|
|
||||||
|
|
||||||
if rs, ok := cr.rc.(fs.RangeSeeker); ok {
|
|
||||||
n, err := rs.RangeSeek(cr.ctx, offset, io.SeekStart, length)
|
|
||||||
if err == nil && n == offset {
|
|
||||||
cr.offset = offset
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
fs.Debugf(cr.o, "ChunkedReader.openRange seek failed (%s). Trying Open", err)
|
|
||||||
} else {
|
|
||||||
fs.Debugf(cr.o, "ChunkedReader.openRange seeked to wrong offset. Wanted %d, got %d. Trying Open", offset, n)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var rc io.ReadCloser
|
|
||||||
var err error
|
|
||||||
if length <= 0 {
|
|
||||||
if offset == 0 {
|
|
||||||
rc, err = cr.o.Open(cr.ctx, &fs.HashesOption{Hashes: hash.Set(hash.None)})
|
|
||||||
} else {
|
|
||||||
rc, err = cr.o.Open(cr.ctx, &fs.HashesOption{Hashes: hash.Set(hash.None)}, &fs.RangeOption{Start: offset, End: -1})
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
rc, err = cr.o.Open(cr.ctx, &fs.HashesOption{Hashes: hash.Set(hash.None)}, &fs.RangeOption{Start: offset, End: offset + length - 1})
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return cr.resetReader(rc, offset)
|
|
||||||
}
|
|
||||||
|
|
||||||
// resetReader switches the current reader to the given reader.
|
|
||||||
// The old reader will be Close'd before setting the new reader.
|
|
||||||
func (cr *ChunkedReader) resetReader(rc io.ReadCloser, offset int64) error {
|
|
||||||
if cr.rc != nil {
|
|
||||||
if err := cr.rc.Close(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
cr.rc = rc
|
|
||||||
cr.offset = offset
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
_ io.ReadCloser = (*ChunkedReader)(nil)
|
|
||||||
_ io.Seeker = (*ChunkedReader)(nil)
|
|
||||||
_ fs.RangeSeeker = (*ChunkedReader)(nil)
|
|
||||||
)
|
|
||||||
|
|
|
@ -0,0 +1,377 @@
|
||||||
|
package chunkedreader
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/rclone/rclone/fs"
|
||||||
|
"github.com/rclone/rclone/fs/hash"
|
||||||
|
"github.com/rclone/rclone/fs/log"
|
||||||
|
"github.com/rclone/rclone/fs/operations"
|
||||||
|
"github.com/rclone/rclone/lib/multipart"
|
||||||
|
"github.com/rclone/rclone/lib/pool"
|
||||||
|
)
|
||||||
|
|
||||||
|
// parallel reads Object in chunks of a given size in parallel.
|
||||||
|
type parallel struct {
|
||||||
|
ctx context.Context
|
||||||
|
o fs.Object // source to read from
|
||||||
|
mu sync.Mutex // protects following fields
|
||||||
|
endStream int64 // offset we have started streams for
|
||||||
|
offset int64 // offset the read file pointer is at
|
||||||
|
chunkSize int64 // length of the chunks to read
|
||||||
|
nstreams int // number of streams to use
|
||||||
|
streams []*stream // the opened streams in offset order - the current one is first
|
||||||
|
closed bool // has Close been called?
|
||||||
|
}
|
||||||
|
|
||||||
|
// stream holds the info about a single download
|
||||||
|
type stream struct {
|
||||||
|
cr *parallel // parent reader
|
||||||
|
ctx context.Context // ctx to cancel if needed
|
||||||
|
cancel func() // cancel the stream
|
||||||
|
rc io.ReadCloser // reader that it is reading from
|
||||||
|
offset int64 // where the stream is reading from
|
||||||
|
size int64 // and the size it is reading
|
||||||
|
readBytes int64 // bytes read from the stream
|
||||||
|
rw *pool.RW // buffer for read
|
||||||
|
err chan error // error returned from the read
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start a stream reading (offset, offset+size)
|
||||||
|
func (cr *parallel) newStream(ctx context.Context, offset, size int64) (s *stream, err error) {
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
|
||||||
|
// Create the stream
|
||||||
|
rw := multipart.NewRW()
|
||||||
|
s = &stream{
|
||||||
|
cr: cr,
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
|
offset: offset,
|
||||||
|
size: size,
|
||||||
|
rw: rw,
|
||||||
|
err: make(chan error, 1),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start the background read into the buffer
|
||||||
|
go s.readFrom(ctx)
|
||||||
|
|
||||||
|
// Return the stream to the caller
|
||||||
|
return s, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// read the file into the buffer
|
||||||
|
func (s *stream) readFrom(ctx context.Context) {
|
||||||
|
// Open the object at the correct range
|
||||||
|
fs.Debugf(s.cr.o, "stream(%d,%d): open", s.offset, s.size)
|
||||||
|
rc, err := operations.Open(ctx, s.cr.o,
|
||||||
|
&fs.HashesOption{Hashes: hash.Set(hash.None)},
|
||||||
|
&fs.RangeOption{Start: s.offset, End: s.offset + s.size - 1})
|
||||||
|
if err != nil {
|
||||||
|
s.err <- fmt.Errorf("parallel chunked reader: failed to open stream at %d size %d: %w", s.offset, s.size, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.rc = rc
|
||||||
|
|
||||||
|
fs.Debugf(s.cr.o, "stream(%d,%d): readfrom started", s.offset, s.size)
|
||||||
|
_, err = s.rw.ReadFrom(s.rc)
|
||||||
|
fs.Debugf(s.cr.o, "stream(%d,%d): readfrom finished (%d bytes): %v", s.offset, s.size, s.rw.Size(), err)
|
||||||
|
s.err <- err
|
||||||
|
}
|
||||||
|
|
||||||
|
// eof is true when we've read all the data we are expecting
|
||||||
|
func (s *stream) eof() bool {
|
||||||
|
return s.readBytes >= s.size
|
||||||
|
}
|
||||||
|
|
||||||
|
// read reads up to len(p) bytes into p. It returns the number of
|
||||||
|
// bytes read (0 <= n <= len(p)) and any error encountered. If some
|
||||||
|
// data is available but not len(p) bytes, read returns what is
|
||||||
|
// available instead of waiting for more.
|
||||||
|
func (s *stream) read(p []byte) (n int, err error) {
|
||||||
|
defer log.Trace(s.cr.o, "stream(%d,%d): Read len(p)=%d", s.offset, s.size, len(p))("n=%d, err=%v", &n, &err)
|
||||||
|
if len(p) == 0 {
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
var nn int
|
||||||
|
nn, err = s.rw.Read(p[n:])
|
||||||
|
fs.Debugf(s.cr.o, "stream(%d,%d): rw.Read nn=%d, err=%v", s.offset, s.size, nn, err)
|
||||||
|
s.readBytes += int64(nn)
|
||||||
|
n += nn
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
if s.eof() {
|
||||||
|
return n, io.EOF
|
||||||
|
}
|
||||||
|
// Received a faux io.EOF because we haven't read all the data yet
|
||||||
|
if n >= len(p) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// Wait for a write to happen to read more
|
||||||
|
s.rw.WaitWrite(s.ctx)
|
||||||
|
}
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sets *perr to newErr if err is nil
|
||||||
|
func orErr(perr *error, newErr error) {
|
||||||
|
if *perr == nil {
|
||||||
|
*perr = newErr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close a stream
|
||||||
|
func (s *stream) close() (err error) {
|
||||||
|
fs.Debugf(s.cr.o, "stream(%d,%d): close", s.offset, s.size)
|
||||||
|
s.cancel()
|
||||||
|
err = <-s.err // wait for readFrom to stop and return error
|
||||||
|
orErr(&err, s.rw.Close())
|
||||||
|
orErr(&err, s.rc.Close())
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
return fmt.Errorf("parallel chunked reader: failed to read stream at %d size %d: %w", s.offset, s.size, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make a new parallel chunked reader
|
||||||
|
//
|
||||||
|
// Mustn't be called for an unknown size object
|
||||||
|
func newParallel(ctx context.Context, o fs.Object, chunkSize int64, streams int) ChunkedReader {
|
||||||
|
// Make sure chunkSize is a multiple of multipart.BufferSize
|
||||||
|
if chunkSize < 0 {
|
||||||
|
chunkSize = multipart.BufferSize
|
||||||
|
}
|
||||||
|
newChunkSize := multipart.BufferSize * (chunkSize / multipart.BufferSize)
|
||||||
|
if newChunkSize < chunkSize {
|
||||||
|
newChunkSize += multipart.BufferSize
|
||||||
|
}
|
||||||
|
|
||||||
|
fs.Debugf(o, "newParallel chunkSize=%d, streams=%d", chunkSize, streams)
|
||||||
|
|
||||||
|
return ¶llel{
|
||||||
|
ctx: ctx,
|
||||||
|
o: o,
|
||||||
|
offset: 0,
|
||||||
|
chunkSize: newChunkSize,
|
||||||
|
nstreams: streams,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// _open starts the file transferring at offset
|
||||||
|
//
|
||||||
|
// Call with the lock held
|
||||||
|
func (cr *parallel) _open() (err error) {
|
||||||
|
size := cr.o.Size()
|
||||||
|
if size < 0 {
|
||||||
|
return fmt.Errorf("parallel chunked reader: can't use multiple threads for unknown sized object %q", cr.o)
|
||||||
|
}
|
||||||
|
// Launched enough streams already
|
||||||
|
if cr.endStream >= size {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure cr.nstreams are running
|
||||||
|
for i := len(cr.streams); i < cr.nstreams; i++ {
|
||||||
|
// clip to length of file
|
||||||
|
chunkSize := cr.chunkSize
|
||||||
|
newEndStream := cr.endStream + chunkSize
|
||||||
|
if newEndStream > size {
|
||||||
|
chunkSize = size - cr.endStream
|
||||||
|
newEndStream = cr.endStream + chunkSize
|
||||||
|
}
|
||||||
|
|
||||||
|
s, err := cr.newStream(cr.ctx, cr.endStream, chunkSize)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
cr.streams = append(cr.streams, s)
|
||||||
|
cr.endStream = newEndStream
|
||||||
|
|
||||||
|
if cr.endStream >= size {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finished reading the current stream so pop it off and destroy it
|
||||||
|
//
|
||||||
|
// Call with lock held
|
||||||
|
func (cr *parallel) _popStream() (err error) {
|
||||||
|
if len(cr.streams) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
stream := cr.streams[0]
|
||||||
|
err = stream.close()
|
||||||
|
cr.streams[0] = nil
|
||||||
|
cr.streams = cr.streams[1:]
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get rid of all the streams
|
||||||
|
//
|
||||||
|
// Call with lock held
|
||||||
|
func (cr *parallel) _popStreams() (err error) {
|
||||||
|
for len(cr.streams) != 0 {
|
||||||
|
orErr(&err, cr._popStream())
|
||||||
|
}
|
||||||
|
cr.streams = nil
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read from the file - for details see io.Reader
|
||||||
|
func (cr *parallel) Read(p []byte) (n int, err error) {
|
||||||
|
defer log.Trace(cr.o, "Read len(p)=%d", len(p))("n=%d, err=%v", &n, &err)
|
||||||
|
cr.mu.Lock()
|
||||||
|
defer cr.mu.Unlock()
|
||||||
|
|
||||||
|
if cr.closed {
|
||||||
|
return 0, ErrorFileClosed
|
||||||
|
}
|
||||||
|
|
||||||
|
for n < len(p) {
|
||||||
|
// Make sure we have the correct number of streams open
|
||||||
|
err = cr._open()
|
||||||
|
if err != nil {
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// No streams left means EOF
|
||||||
|
if len(cr.streams) == 0 {
|
||||||
|
return n, io.EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read from the stream
|
||||||
|
stream := cr.streams[0]
|
||||||
|
nn, err := stream.read(p[n:])
|
||||||
|
n += nn
|
||||||
|
cr.offset += int64(nn)
|
||||||
|
if err == io.EOF {
|
||||||
|
err = cr._popStream()
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
} else if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close the file - for details see io.Closer
|
||||||
|
//
|
||||||
|
// All methods on ChunkedReader will return ErrorFileClosed afterwards
|
||||||
|
func (cr *parallel) Close() error {
|
||||||
|
cr.mu.Lock()
|
||||||
|
defer cr.mu.Unlock()
|
||||||
|
|
||||||
|
if cr.closed {
|
||||||
|
return ErrorFileClosed
|
||||||
|
}
|
||||||
|
cr.closed = true
|
||||||
|
|
||||||
|
// Close all the streams
|
||||||
|
return cr._popStreams()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Seek the file - for details see io.Seeker
|
||||||
|
func (cr *parallel) Seek(offset int64, whence int) (int64, error) {
|
||||||
|
cr.mu.Lock()
|
||||||
|
defer cr.mu.Unlock()
|
||||||
|
|
||||||
|
fs.Debugf(cr.o, "parallel chunked reader: seek from %d to %d whence %d", cr.offset, offset, whence)
|
||||||
|
|
||||||
|
if cr.closed {
|
||||||
|
return 0, ErrorFileClosed
|
||||||
|
}
|
||||||
|
|
||||||
|
size := cr.o.Size()
|
||||||
|
currentOffset := cr.offset
|
||||||
|
switch whence {
|
||||||
|
case io.SeekStart:
|
||||||
|
currentOffset = 0
|
||||||
|
case io.SeekEnd:
|
||||||
|
currentOffset = size
|
||||||
|
}
|
||||||
|
// set the new chunk start
|
||||||
|
newOffset := currentOffset + offset
|
||||||
|
if newOffset < 0 || newOffset >= size {
|
||||||
|
return 0, ErrorInvalidSeek
|
||||||
|
}
|
||||||
|
|
||||||
|
// If seek pointer didn't move, return now
|
||||||
|
if newOffset == cr.offset {
|
||||||
|
fs.Debugf(cr.o, "parallel chunked reader: seek pointer didn't move")
|
||||||
|
return cr.offset, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
cr.offset = newOffset
|
||||||
|
|
||||||
|
// Ditch out of range streams
|
||||||
|
for len(cr.streams) > 0 {
|
||||||
|
stream := cr.streams[0]
|
||||||
|
if newOffset >= stream.offset+stream.size {
|
||||||
|
_ = cr._popStream()
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If no streams remain we can just restart
|
||||||
|
if len(cr.streams) == 0 {
|
||||||
|
fs.Debugf(cr.o, "parallel chunked reader: no streams remain")
|
||||||
|
cr.endStream = cr.offset
|
||||||
|
return cr.offset, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Current stream
|
||||||
|
stream := cr.streams[0]
|
||||||
|
|
||||||
|
// If new offset is before current stream then ditch all the streams
|
||||||
|
if newOffset < stream.offset {
|
||||||
|
_ = cr._popStreams()
|
||||||
|
fs.Debugf(cr.o, "parallel chunked reader: new offset is before current stream - ditch all")
|
||||||
|
cr.endStream = cr.offset
|
||||||
|
return cr.offset, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Seek the current stream
|
||||||
|
streamOffset := newOffset - stream.offset
|
||||||
|
stream.readBytes = streamOffset // correct read value
|
||||||
|
fs.Debugf(cr.o, "parallel chunked reader: seek the current stream to %d", streamOffset)
|
||||||
|
// Wait for the read to the correct part of the data
|
||||||
|
for stream.rw.Size() < streamOffset {
|
||||||
|
stream.rw.WaitWrite(cr.ctx)
|
||||||
|
}
|
||||||
|
_, err := stream.rw.Seek(streamOffset, io.SeekStart)
|
||||||
|
if err != nil {
|
||||||
|
return cr.offset, fmt.Errorf("parallel chunked reader: failed to seek stream: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return cr.offset, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RangeSeek the file - for details see RangeSeeker
|
||||||
|
//
|
||||||
|
// In the parallel chunked reader this just acts like Seek
|
||||||
|
func (cr *parallel) RangeSeek(ctx context.Context, offset int64, whence int, length int64) (int64, error) {
|
||||||
|
return cr.Seek(offset, whence)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open forces the connection to be opened
|
||||||
|
func (cr *parallel) Open() (ChunkedReader, error) {
|
||||||
|
cr.mu.Lock()
|
||||||
|
defer cr.mu.Unlock()
|
||||||
|
|
||||||
|
return cr, cr._open()
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
_ ChunkedReader = (*parallel)(nil)
|
||||||
|
)
|
|
@ -0,0 +1,102 @@
|
||||||
|
package chunkedreader
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"math/rand"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/rclone/rclone/fstest/mockobject"
|
||||||
|
"github.com/rclone/rclone/lib/multipart"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestParallel(t *testing.T) {
|
||||||
|
content := makeContent(t, 1024)
|
||||||
|
|
||||||
|
for _, mode := range mockobject.SeekModes {
|
||||||
|
t.Run(mode.String(), testRead(content, mode, 3))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParallelErrorAfterClose(t *testing.T) {
|
||||||
|
testErrorAfterClose(t, 3)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParallelLarge(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
const streams = 3
|
||||||
|
const chunkSize = multipart.BufferSize
|
||||||
|
const size = (2*streams+1)*chunkSize + 255
|
||||||
|
content := makeContent(t, size)
|
||||||
|
o := mockobject.New("test.bin").WithContent(content, mockobject.SeekModeNone)
|
||||||
|
|
||||||
|
cr := New(ctx, o, chunkSize, 0, streams)
|
||||||
|
|
||||||
|
for _, test := range []struct {
|
||||||
|
name string
|
||||||
|
offset int64
|
||||||
|
seekMode int
|
||||||
|
}{
|
||||||
|
{name: "Straight", offset: 0, seekMode: -1},
|
||||||
|
{name: "Rewind", offset: 0, seekMode: io.SeekStart},
|
||||||
|
{name: "NearStart", offset: 1, seekMode: io.SeekStart},
|
||||||
|
{name: "NearEnd", offset: size - 2*chunkSize - 127, seekMode: io.SeekEnd},
|
||||||
|
} {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
if test.seekMode >= 0 {
|
||||||
|
var n int64
|
||||||
|
var err error
|
||||||
|
if test.seekMode == io.SeekEnd {
|
||||||
|
n, err = cr.Seek(test.offset-size, test.seekMode)
|
||||||
|
} else {
|
||||||
|
n, err = cr.Seek(test.offset, test.seekMode)
|
||||||
|
}
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, test.offset, n)
|
||||||
|
}
|
||||||
|
got, err := io.ReadAll(cr)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, len(content[test.offset:]), len(got))
|
||||||
|
assert.Equal(t, content[test.offset:], got)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, cr.Close())
|
||||||
|
|
||||||
|
t.Run("Seeky", func(t *testing.T) {
|
||||||
|
cr := New(ctx, o, chunkSize, 0, streams)
|
||||||
|
offset := 0
|
||||||
|
buf := make([]byte, 1024)
|
||||||
|
|
||||||
|
for {
|
||||||
|
// Read and check a random read
|
||||||
|
readSize := rand.Intn(1024)
|
||||||
|
readBuf := buf[:readSize]
|
||||||
|
n, err := cr.Read(readBuf)
|
||||||
|
|
||||||
|
require.Equal(t, content[offset:offset+n], readBuf[:n])
|
||||||
|
offset += n
|
||||||
|
|
||||||
|
if err == io.EOF {
|
||||||
|
assert.Equal(t, size, offset)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Now do a smaller random seek backwards
|
||||||
|
seekSize := rand.Intn(512)
|
||||||
|
if offset-seekSize < 0 {
|
||||||
|
seekSize = offset
|
||||||
|
}
|
||||||
|
nn, err := cr.Seek(-int64(seekSize), io.SeekCurrent)
|
||||||
|
offset -= seekSize
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, nn, int64(offset))
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, cr.Close())
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,232 @@
|
||||||
|
package chunkedreader
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/rclone/rclone/fs"
|
||||||
|
"github.com/rclone/rclone/fs/hash"
|
||||||
|
)
|
||||||
|
|
||||||
|
// sequential is a reader for an Object with the possibility
|
||||||
|
// of reading the source in chunks of given size
|
||||||
|
//
|
||||||
|
// An initialChunkSize of <= 0 will disable chunked reading.
|
||||||
|
type sequential struct {
|
||||||
|
ctx context.Context
|
||||||
|
mu sync.Mutex // protects following fields
|
||||||
|
o fs.Object // source to read from
|
||||||
|
rc io.ReadCloser // reader for the current open chunk
|
||||||
|
offset int64 // offset the next Read will start. -1 forces a reopen of o
|
||||||
|
chunkOffset int64 // beginning of the current or next chunk
|
||||||
|
chunkSize int64 // length of the current or next chunk. -1 will open o from chunkOffset to the end
|
||||||
|
initialChunkSize int64 // default chunkSize after the chunk specified by RangeSeek is complete
|
||||||
|
maxChunkSize int64 // consecutive read chunks will double in size until reached. -1 means no limit
|
||||||
|
customChunkSize bool // is the current chunkSize set by RangeSeek?
|
||||||
|
closed bool // has Close been called?
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make a new sequential chunked reader
|
||||||
|
func newSequential(ctx context.Context, o fs.Object, initialChunkSize int64, maxChunkSize int64) ChunkedReader {
|
||||||
|
return &sequential{
|
||||||
|
ctx: ctx,
|
||||||
|
o: o,
|
||||||
|
offset: -1,
|
||||||
|
chunkSize: initialChunkSize,
|
||||||
|
initialChunkSize: initialChunkSize,
|
||||||
|
maxChunkSize: maxChunkSize,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read from the file - for details see io.Reader
|
||||||
|
func (cr *sequential) Read(p []byte) (n int, err error) {
|
||||||
|
cr.mu.Lock()
|
||||||
|
defer cr.mu.Unlock()
|
||||||
|
|
||||||
|
if cr.closed {
|
||||||
|
return 0, ErrorFileClosed
|
||||||
|
}
|
||||||
|
|
||||||
|
for reqSize := int64(len(p)); reqSize > 0; reqSize = int64(len(p)) {
|
||||||
|
// the current chunk boundary. valid only when chunkSize > 0
|
||||||
|
chunkEnd := cr.chunkOffset + cr.chunkSize
|
||||||
|
|
||||||
|
fs.Debugf(cr.o, "ChunkedReader.Read at %d length %d chunkOffset %d chunkSize %d", cr.offset, reqSize, cr.chunkOffset, cr.chunkSize)
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case cr.chunkSize > 0 && cr.offset == chunkEnd: // last chunk read completely
|
||||||
|
cr.chunkOffset = cr.offset
|
||||||
|
if cr.customChunkSize { // last chunkSize was set by RangeSeek
|
||||||
|
cr.customChunkSize = false
|
||||||
|
cr.chunkSize = cr.initialChunkSize
|
||||||
|
} else {
|
||||||
|
cr.chunkSize *= 2
|
||||||
|
if cr.chunkSize > cr.maxChunkSize && cr.maxChunkSize != -1 {
|
||||||
|
cr.chunkSize = cr.maxChunkSize
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// recalculate the chunk boundary. valid only when chunkSize > 0
|
||||||
|
chunkEnd = cr.chunkOffset + cr.chunkSize
|
||||||
|
fallthrough
|
||||||
|
case cr.offset == -1: // first Read or Read after RangeSeek
|
||||||
|
err = cr.openRange()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var buf []byte
|
||||||
|
chunkRest := chunkEnd - cr.offset
|
||||||
|
// limit read to chunk boundaries if chunkSize > 0
|
||||||
|
if reqSize > chunkRest && cr.chunkSize > 0 {
|
||||||
|
buf, p = p[0:chunkRest], p[chunkRest:]
|
||||||
|
} else {
|
||||||
|
buf, p = p, nil
|
||||||
|
}
|
||||||
|
var rn int
|
||||||
|
rn, err = io.ReadFull(cr.rc, buf)
|
||||||
|
n += rn
|
||||||
|
cr.offset += int64(rn)
|
||||||
|
if err != nil {
|
||||||
|
if err == io.ErrUnexpectedEOF {
|
||||||
|
err = io.EOF
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close the file - for details see io.Closer
|
||||||
|
//
|
||||||
|
// All methods on ChunkedReader will return ErrorFileClosed afterwards
|
||||||
|
func (cr *sequential) Close() error {
|
||||||
|
cr.mu.Lock()
|
||||||
|
defer cr.mu.Unlock()
|
||||||
|
|
||||||
|
if cr.closed {
|
||||||
|
return ErrorFileClosed
|
||||||
|
}
|
||||||
|
cr.closed = true
|
||||||
|
|
||||||
|
return cr.resetReader(nil, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Seek the file - for details see io.Seeker
|
||||||
|
func (cr *sequential) Seek(offset int64, whence int) (int64, error) {
|
||||||
|
return cr.RangeSeek(context.TODO(), offset, whence, -1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RangeSeek the file - for details see RangeSeeker
|
||||||
|
//
|
||||||
|
// The specified length will only apply to the next chunk opened.
|
||||||
|
// RangeSeek will not reopen the source until Read is called.
|
||||||
|
func (cr *sequential) RangeSeek(ctx context.Context, offset int64, whence int, length int64) (int64, error) {
|
||||||
|
cr.mu.Lock()
|
||||||
|
defer cr.mu.Unlock()
|
||||||
|
|
||||||
|
fs.Debugf(cr.o, "ChunkedReader.RangeSeek from %d to %d length %d", cr.offset, offset, length)
|
||||||
|
|
||||||
|
if cr.closed {
|
||||||
|
return 0, ErrorFileClosed
|
||||||
|
}
|
||||||
|
|
||||||
|
size := cr.o.Size()
|
||||||
|
switch whence {
|
||||||
|
case io.SeekStart:
|
||||||
|
cr.offset = 0
|
||||||
|
case io.SeekEnd:
|
||||||
|
if size < 0 {
|
||||||
|
return 0, ErrorInvalidSeek // Can't seek from end for unknown size
|
||||||
|
}
|
||||||
|
cr.offset = size
|
||||||
|
}
|
||||||
|
// set the new chunk start
|
||||||
|
cr.chunkOffset = cr.offset + offset
|
||||||
|
// force reopen on next Read
|
||||||
|
cr.offset = -1
|
||||||
|
if length > 0 {
|
||||||
|
cr.customChunkSize = true
|
||||||
|
cr.chunkSize = length
|
||||||
|
} else {
|
||||||
|
cr.chunkSize = cr.initialChunkSize
|
||||||
|
}
|
||||||
|
if cr.chunkOffset < 0 || cr.chunkOffset >= size {
|
||||||
|
cr.chunkOffset = 0
|
||||||
|
return 0, ErrorInvalidSeek
|
||||||
|
}
|
||||||
|
return cr.chunkOffset, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open forces the connection to be opened
|
||||||
|
func (cr *sequential) Open() (ChunkedReader, error) {
|
||||||
|
cr.mu.Lock()
|
||||||
|
defer cr.mu.Unlock()
|
||||||
|
|
||||||
|
if cr.rc != nil && cr.offset != -1 {
|
||||||
|
return cr, nil
|
||||||
|
}
|
||||||
|
return cr, cr.openRange()
|
||||||
|
}
|
||||||
|
|
||||||
|
// openRange will open the source Object with the current chunk range
|
||||||
|
//
|
||||||
|
// If the current open reader implements RangeSeeker, it is tried first.
|
||||||
|
// When RangeSeek fails, o.Open with a RangeOption is used.
|
||||||
|
//
|
||||||
|
// A length <= 0 will request till the end of the file
|
||||||
|
func (cr *sequential) openRange() error {
|
||||||
|
offset, length := cr.chunkOffset, cr.chunkSize
|
||||||
|
fs.Debugf(cr.o, "ChunkedReader.openRange at %d length %d", offset, length)
|
||||||
|
|
||||||
|
if cr.closed {
|
||||||
|
return ErrorFileClosed
|
||||||
|
}
|
||||||
|
|
||||||
|
if rs, ok := cr.rc.(fs.RangeSeeker); ok {
|
||||||
|
n, err := rs.RangeSeek(cr.ctx, offset, io.SeekStart, length)
|
||||||
|
if err == nil && n == offset {
|
||||||
|
cr.offset = offset
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
fs.Debugf(cr.o, "ChunkedReader.openRange seek failed (%s). Trying Open", err)
|
||||||
|
} else {
|
||||||
|
fs.Debugf(cr.o, "ChunkedReader.openRange seeked to wrong offset. Wanted %d, got %d. Trying Open", offset, n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var rc io.ReadCloser
|
||||||
|
var err error
|
||||||
|
if length <= 0 {
|
||||||
|
if offset == 0 {
|
||||||
|
rc, err = cr.o.Open(cr.ctx, &fs.HashesOption{Hashes: hash.Set(hash.None)})
|
||||||
|
} else {
|
||||||
|
rc, err = cr.o.Open(cr.ctx, &fs.HashesOption{Hashes: hash.Set(hash.None)}, &fs.RangeOption{Start: offset, End: -1})
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
rc, err = cr.o.Open(cr.ctx, &fs.HashesOption{Hashes: hash.Set(hash.None)}, &fs.RangeOption{Start: offset, End: offset + length - 1})
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return cr.resetReader(rc, offset)
|
||||||
|
}
|
||||||
|
|
||||||
|
// resetReader switches the current reader to the given reader.
|
||||||
|
// The old reader will be Close'd before setting the new reader.
|
||||||
|
func (cr *sequential) resetReader(rc io.ReadCloser, offset int64) error {
|
||||||
|
if cr.rc != nil {
|
||||||
|
if err := cr.rc.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cr.rc = rc
|
||||||
|
cr.offset = offset
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
_ ChunkedReader = (*sequential)(nil)
|
||||||
|
)
|
|
@ -7,21 +7,33 @@ import (
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
_ "github.com/rclone/rclone/backend/local"
|
||||||
|
"github.com/rclone/rclone/fstest"
|
||||||
"github.com/rclone/rclone/fstest/mockobject"
|
"github.com/rclone/rclone/fstest/mockobject"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestChunkedReader(t *testing.T) {
|
// TestMain drives the tests
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
fstest.TestMain(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSequential(t *testing.T) {
|
||||||
content := makeContent(t, 1024)
|
content := makeContent(t, 1024)
|
||||||
|
|
||||||
for _, mode := range mockobject.SeekModes {
|
for _, mode := range mockobject.SeekModes {
|
||||||
t.Run(mode.String(), testRead(content, mode))
|
t.Run(mode.String(), testRead(content, mode, 0))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testRead(content []byte, mode mockobject.SeekMode) func(*testing.T) {
|
func TestSequentialErrorAfterClose(t *testing.T) {
|
||||||
|
testErrorAfterClose(t, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testRead(content []byte, mode mockobject.SeekMode, streams int) func(*testing.T) {
|
||||||
return func(t *testing.T) {
|
return func(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
chunkSizes := []int64{-1, 0, 1, 15, 16, 17, 1023, 1024, 1025, 2000}
|
chunkSizes := []int64{-1, 0, 1, 15, 16, 17, 1023, 1024, 1025, 2000}
|
||||||
offsets := []int64{0, 1, 2, 3, 4, 5, 7, 8, 9, 15, 16, 17, 31, 32, 33,
|
offsets := []int64{0, 1, 2, 3, 4, 5, 7, 8, 9, 15, 16, 17, 31, 32, 33,
|
||||||
63, 64, 65, 511, 512, 513, 1023, 1024, 1025}
|
63, 64, 65, 511, 512, 513, 1023, 1024, 1025}
|
||||||
|
@ -39,13 +51,13 @@ func testRead(content []byte, mode mockobject.SeekMode) func(*testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run(fmt.Sprintf("Chunksize_%d_%d", cs, csMax), func(t *testing.T) {
|
t.Run(fmt.Sprintf("Chunksize_%d_%d", cs, csMax), func(t *testing.T) {
|
||||||
cr := New(context.Background(), o, cs, csMax, 0)
|
cr := New(ctx, o, cs, csMax, streams)
|
||||||
|
|
||||||
for _, offset := range offsets {
|
for _, offset := range offsets {
|
||||||
for _, limit := range limits {
|
for _, limit := range limits {
|
||||||
what := fmt.Sprintf("offset %d, limit %d", offset, limit)
|
what := fmt.Sprintf("offset %d, limit %d", offset, limit)
|
||||||
|
|
||||||
p, err := cr.RangeSeek(context.Background(), offset, io.SeekStart, limit)
|
p, err := cr.RangeSeek(ctx, offset, io.SeekStart, limit)
|
||||||
if offset >= cl {
|
if offset >= cl {
|
||||||
require.Error(t, err, what)
|
require.Error(t, err, what)
|
||||||
return
|
return
|
||||||
|
@ -74,32 +86,33 @@ func testRead(content []byte, mode mockobject.SeekMode) func(*testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestErrorAfterClose(t *testing.T) {
|
func testErrorAfterClose(t *testing.T, streams int) {
|
||||||
|
ctx := context.Background()
|
||||||
content := makeContent(t, 1024)
|
content := makeContent(t, 1024)
|
||||||
o := mockobject.New("test.bin").WithContent(content, mockobject.SeekModeNone)
|
o := mockobject.New("test.bin").WithContent(content, mockobject.SeekModeNone)
|
||||||
|
|
||||||
// Close
|
// Close
|
||||||
cr := New(context.Background(), o, 0, 0, 0)
|
cr := New(ctx, o, 0, 0, streams)
|
||||||
require.NoError(t, cr.Close())
|
require.NoError(t, cr.Close())
|
||||||
require.Error(t, cr.Close())
|
require.Error(t, cr.Close())
|
||||||
|
|
||||||
// Read
|
// Read
|
||||||
cr = New(context.Background(), o, 0, 0, 0)
|
cr = New(ctx, o, 0, 0, streams)
|
||||||
require.NoError(t, cr.Close())
|
require.NoError(t, cr.Close())
|
||||||
var buf [1]byte
|
var buf [1]byte
|
||||||
_, err := cr.Read(buf[:])
|
_, err := cr.Read(buf[:])
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
|
|
||||||
// Seek
|
// Seek
|
||||||
cr = New(context.Background(), o, 0, 0, 0)
|
cr = New(ctx, o, 0, 0, streams)
|
||||||
require.NoError(t, cr.Close())
|
require.NoError(t, cr.Close())
|
||||||
_, err = cr.Seek(1, io.SeekCurrent)
|
_, err = cr.Seek(1, io.SeekCurrent)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
|
|
||||||
// RangeSeek
|
// RangeSeek
|
||||||
cr = New(context.Background(), o, 0, 0, 0)
|
cr = New(ctx, o, 0, 0, streams)
|
||||||
require.NoError(t, cr.Close())
|
require.NoError(t, cr.Close())
|
||||||
_, err = cr.RangeSeek(context.Background(), 1, io.SeekCurrent, 0)
|
_, err = cr.RangeSeek(ctx, 1, io.SeekCurrent, 0)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue