From 9fdf273614c014b08f1fe39191f15c9e389f2916 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=B6ller?= Date: Sun, 18 Feb 2018 15:10:15 +0100 Subject: [PATCH] fs: improve ChunkedReader - make Close permanent and return errors afterwards - use RangeSeek from the wrapped reader if present - add a limit to chunk growth - correct RangeSeek interface behavior - add tests --- fs/chunkedreader/chunkedreader.go | 129 +++++++++++++++++++------ fs/chunkedreader/chunkedreader_test.go | 111 +++++++++++++++++++++ fstest/mockobject/mockobject.go | 109 +++++++++++++++++++++ 3 files changed, 320 insertions(+), 29 deletions(-) create mode 100644 fs/chunkedreader/chunkedreader_test.go diff --git a/fs/chunkedreader/chunkedreader.go b/fs/chunkedreader/chunkedreader.go index fb135b83d..4a4a62cf3 100644 --- a/fs/chunkedreader/chunkedreader.go +++ b/fs/chunkedreader/chunkedreader.go @@ -1,43 +1,55 @@ package chunkedreader import ( + "errors" "io" "sync" "github.com/ncw/rclone/fs" ) +// io related errors returned by ChunkedReader +var ( + ErrorFileClosed = errors.New("file already closed") + ErrorInvalidSeek = errors.New("invalid seek position") +) + // ChunkedReader is a reader for a Object with the possibility // of reading the source in chunks of given size // -// A initialChunkSize of 0 will disable chunked reading. +// A initialChunkSize of <= 0 will disable chunked reading. type ChunkedReader struct { - mu sync.Mutex - o fs.Object - rc io.ReadCloser - offset int64 - chunkOffset int64 - chunkSize int64 - initialChunkSize int64 - chunkGrowth bool - doSeek bool + mu sync.Mutex // protects following fields + o fs.Object // source to read from + rc io.ReadCloser // reader for the current open chunk + offset int64 // offset the next Read will start. -1 forces a reopen of o + chunkOffset int64 // beginning of the current or next chunk + chunkSize int64 // length of the current or next chunk. -1 will open o from chunkOffset to the end + initialChunkSize int64 // default chunkSize after the chunk specified by RangeSeek is complete + maxChunkSize int64 // consecutive read chunks will double in size until reached. -1 means no limit + customChunkSize bool // is the current chunkSize set by RangeSeek? + closed bool // has Close been called? } // New returns a ChunkedReader for the Object. // -// A initialChunkSize of 0 will disable chunked reading. -// If chunkGrowth is true, the chunk size will be doubled after each chunk read. +// A initialChunkSize of <= 0 will disable chunked reading. +// If maxChunkSize is greater than initialChunkSize, the chunk size will be +// doubled after each chunk read with a maximun of maxChunkSize. // A Seek or RangeSeek will reset the chunk size to it's initial value -func New(o fs.Object, initialChunkSize int64, chunkGrowth bool) *ChunkedReader { - if initialChunkSize < 0 { - initialChunkSize = 0 +func New(o fs.Object, initialChunkSize int64, maxChunkSize int64) *ChunkedReader { + if initialChunkSize <= 0 { + initialChunkSize = -1 + } + if maxChunkSize != -1 && maxChunkSize < initialChunkSize { + maxChunkSize = initialChunkSize } return &ChunkedReader{ o: o, offset: -1, chunkSize: initialChunkSize, initialChunkSize: initialChunkSize, - chunkGrowth: chunkGrowth, + maxChunkSize: maxChunkSize, } } @@ -46,21 +58,32 @@ func (cr *ChunkedReader) Read(p []byte) (n int, err error) { cr.mu.Lock() defer cr.mu.Unlock() + if cr.closed { + return 0, ErrorFileClosed + } + for reqSize := int64(len(p)); reqSize > 0; reqSize = int64(len(p)) { + // the current chunk boundary. valid only when chunkSize > 0 chunkEnd := cr.chunkOffset + cr.chunkSize fs.Debugf(cr.o, "ChunkedReader.Read at %d length %d chunkOffset %d chunkSize %d", cr.offset, reqSize, cr.chunkOffset, cr.chunkSize) - if atChunkEnd := cr.offset == chunkEnd; cr.offset == -1 || atChunkEnd { - if atChunkEnd && cr.chunkSize > 0 { - if cr.doSeek { - cr.doSeek = false - cr.chunkSize = cr.initialChunkSize - } else if cr.chunkGrowth { - cr.chunkSize *= 2 + switch { + case cr.chunkSize > 0 && cr.offset == chunkEnd: // last chunk read completely + cr.chunkOffset = cr.offset + if cr.customChunkSize { // last chunkSize was set by RangeSeek + cr.customChunkSize = false + cr.chunkSize = cr.initialChunkSize + } else { + cr.chunkSize *= 2 + if cr.chunkSize > cr.maxChunkSize && cr.maxChunkSize != -1 { + cr.chunkSize = cr.maxChunkSize } - cr.chunkOffset = cr.offset } + // recalculate the chunk boundary. valid only when chunkSize > 0 + chunkEnd = cr.chunkOffset + cr.chunkSize + fallthrough + case cr.offset == -1: // first Read or Read after RangeSeek err = cr.openRange() if err != nil { return @@ -69,7 +92,8 @@ func (cr *ChunkedReader) Read(p []byte) (n int, err error) { var buf []byte chunkRest := chunkEnd - cr.offset - if reqSize > chunkRest && cr.chunkSize != 0 { + // limit read to chunk boundaries if chunkSize > 0 + if reqSize > chunkRest && cr.chunkSize > 0 { buf, p = p[0:chunkRest], p[chunkRest:] } else { buf, p = p, nil @@ -79,6 +103,9 @@ func (cr *ChunkedReader) Read(p []byte) (n int, err error) { n += rn cr.offset += int64(rn) if err != nil { + if err == io.ErrUnexpectedEOF { + err = io.EOF + } return } } @@ -86,10 +113,17 @@ func (cr *ChunkedReader) Read(p []byte) (n int, err error) { } // Close the file - for details see io.Closer +// +// All methods on ChunkedReader will return ErrorFileClosed afterwards func (cr *ChunkedReader) Close() error { cr.mu.Lock() defer cr.mu.Unlock() + if cr.closed { + return ErrorFileClosed + } + cr.closed = true + return cr.resetReader(nil, 0) } @@ -99,11 +133,18 @@ func (cr *ChunkedReader) Seek(offset int64, whence int) (int64, error) { } // RangeSeek the file - for details see RangeSeeker +// +// The specified length will only apply to the next chunk opened. +// RangeSeek will not reopen the source until Read is called. func (cr *ChunkedReader) RangeSeek(offset int64, whence int, length int64) (int64, error) { cr.mu.Lock() defer cr.mu.Unlock() - fs.Debugf(cr.o, "ChunkedReader.RangeSeek from %d to %d", cr.offset, offset) + fs.Debugf(cr.o, "ChunkedReader.RangeSeek from %d to %d length %d", cr.offset, offset, length) + + if cr.closed { + return 0, ErrorFileClosed + } size := cr.o.Size() switch whence { @@ -112,15 +153,21 @@ func (cr *ChunkedReader) RangeSeek(offset int64, whence int, length int64) (int6 case io.SeekEnd: cr.offset = size } + // set the new chunk start cr.chunkOffset = cr.offset + offset + // force reopen on next Read cr.offset = -1 - cr.doSeek = true if length > 0 { + cr.customChunkSize = true cr.chunkSize = length } else { cr.chunkSize = cr.initialChunkSize } - return cr.offset, nil + if cr.chunkOffset < 0 || cr.chunkOffset >= size { + cr.chunkOffset = 0 + return 0, ErrorInvalidSeek + } + return cr.chunkOffset, nil } // Open forces the connection to be opened @@ -128,15 +175,39 @@ func (cr *ChunkedReader) Open() (*ChunkedReader, error) { cr.mu.Lock() defer cr.mu.Unlock() + if cr.rc != nil && cr.offset != -1 { + return cr, nil + } return cr, cr.openRange() } -// openRange will open the source Object with the given range +// openRange will open the source Object with the current chunk range +// +// If the current open reader implenets RangeSeeker, it is tried first. +// When RangeSeek failes, o.Open with a RangeOption is used. +// // A length <= 0 will request till the end of the file func (cr *ChunkedReader) openRange() error { offset, length := cr.chunkOffset, cr.chunkSize fs.Debugf(cr.o, "ChunkedReader.openRange at %d length %d", offset, length) + if cr.closed { + return ErrorFileClosed + } + + if rs, ok := cr.rc.(fs.RangeSeeker); ok { + n, err := rs.RangeSeek(offset, io.SeekStart, length) + if err == nil && n == offset { + cr.offset = offset + return nil + } + if err != nil { + fs.Debugf(cr.o, "ChunkedReader.openRange seek failed (%s). Trying Open", err) + } else { + fs.Debugf(cr.o, "ChunkedReader.openRange seeked to wrong offset. Wanted %d, got %d. Trying Open", offset, n) + } + } + var rc io.ReadCloser var err error if length <= 0 { diff --git a/fs/chunkedreader/chunkedreader_test.go b/fs/chunkedreader/chunkedreader_test.go new file mode 100644 index 000000000..71a0a449e --- /dev/null +++ b/fs/chunkedreader/chunkedreader_test.go @@ -0,0 +1,111 @@ +package chunkedreader + +import ( + "fmt" + "io" + "math/rand" + "testing" + + "github.com/ncw/rclone/fstest/mockobject" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestChunkedReader(t *testing.T) { + content := makeContent(t, 1024) + + for _, mode := range mockobject.SeekModes { + t.Run(mode.String(), testRead(content, mode)) + } +} + +func testRead(content []byte, mode mockobject.SeekMode) func(*testing.T) { + return func(t *testing.T) { + chunkSizes := []int64{-1, 0, 1, 15, 16, 17, 1023, 1024, 1025, 2000} + offsets := []int64{0, 1, 2, 3, 4, 5, 7, 8, 9, 15, 16, 17, 31, 32, 33, + 63, 64, 65, 511, 512, 513, 1023, 1024, 1025} + limits := []int64{-1, 0, 1, 31, 32, 33, 1023, 1024, 1025} + cl := int64(len(content)) + bl := 32 + buf := make([]byte, bl) + + o := mockobject.New("test.bin").WithContent(content, mode) + for ics, cs := range chunkSizes { + for icsMax, csMax := range chunkSizes { + // skip tests where chunkSize is much bigger than maxChunkSize + if ics > icsMax+1 { + continue + } + + t.Run(fmt.Sprintf("Chunksize_%d_%d", cs, csMax), func(t *testing.T) { + cr := New(o, cs, csMax) + + for _, offset := range offsets { + for _, limit := range limits { + what := fmt.Sprintf("offset %d, limit %d", offset, limit) + + p, err := cr.RangeSeek(offset, io.SeekStart, limit) + if offset >= cl { + require.Error(t, err, what) + return + } + require.NoError(t, err, what) + require.Equal(t, offset, p, what) + + n, err := cr.Read(buf) + end := offset + int64(bl) + if end > cl { + end = cl + } + l := int(end - offset) + if l < bl { + require.Equal(t, io.EOF, err, what) + } else { + require.NoError(t, err, what) + } + require.Equal(t, l, n, what) + require.Equal(t, content[offset:end], buf[:n], what) + } + } + }) + } + } + } +} + +func TestErrorAfterClose(t *testing.T) { + content := makeContent(t, 1024) + o := mockobject.New("test.bin").WithContent(content, mockobject.SeekModeNone) + + // Close + cr := New(o, 0, 0) + require.NoError(t, cr.Close()) + require.Error(t, cr.Close()) + + // Read + cr = New(o, 0, 0) + require.NoError(t, cr.Close()) + var buf [1]byte + _, err := cr.Read(buf[:]) + require.Error(t, err) + + // Seek + cr = New(o, 0, 0) + require.NoError(t, cr.Close()) + _, err = cr.Seek(1, io.SeekCurrent) + require.Error(t, err) + + // RangeSeek + cr = New(o, 0, 0) + require.NoError(t, cr.Close()) + _, err = cr.RangeSeek(1, io.SeekCurrent, 0) + require.Error(t, err) +} + +func makeContent(t *testing.T, size int) []byte { + content := make([]byte, size) + r := rand.New(rand.NewSource(42)) + _, err := io.ReadFull(r, content) + assert.NoError(t, err) + return content +} diff --git a/fstest/mockobject/mockobject.go b/fstest/mockobject/mockobject.go index 9c83987a3..afeaad8ab 100644 --- a/fstest/mockobject/mockobject.go +++ b/fstest/mockobject/mockobject.go @@ -2,7 +2,9 @@ package mockobject import ( + "bytes" "errors" + "fmt" "io" "time" @@ -15,6 +17,11 @@ var errNotImpl = errors.New("not implemented") // Object is a mock fs.Object useful for testing type Object string +// New returns mock fs.Object useful for testing +func New(name string) Object { + return Object(name) +} + // String returns a description of the Object func (o Object) String() string { return string(o) @@ -69,3 +76,105 @@ func (o Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption func (o Object) Remove() error { return errNotImpl } + +// SeekMode specifies the optional Seek interface for the ReadCloser returned by Open +type SeekMode int + +const ( + // SeekModeNone specifies no seek interface + SeekModeNone SeekMode = iota + // SeekModeRegular specifies the regular io.Seek interface + SeekModeRegular + // SeekModeRange specifies the fs.RangeSeek interface + SeekModeRange +) + +// SeekModes contains all valid SeekMode's +var SeekModes = []SeekMode{SeekModeNone, SeekModeRegular, SeekModeRange} + +type contentMockObject struct { + Object + content []byte + seekMode SeekMode +} + +// WithContent returns a fs.Object with the given content. +func (o Object) WithContent(content []byte, mode SeekMode) fs.Object { + return &contentMockObject{ + Object: o, + content: content, + seekMode: mode, + } +} + +func (o *contentMockObject) Open(options ...fs.OpenOption) (io.ReadCloser, error) { + var offset, limit int64 = 0, -1 + for _, option := range options { + switch x := option.(type) { + case *fs.SeekOption: + offset = x.Offset + case *fs.RangeOption: + offset, limit = x.Decode(o.Size()) + default: + if option.Mandatory() { + return nil, fmt.Errorf("Unsupported mandatory option: %v", option) + } + } + } + if limit == -1 || offset+limit > o.Size() { + limit = o.Size() - offset + } + + var r *bytes.Reader + if o.seekMode == SeekModeNone { + r = bytes.NewReader(o.content[offset : offset+limit]) + } else { + r = bytes.NewReader(o.content) + _, err := r.Seek(offset, io.SeekStart) + if err != nil { + return nil, err + } + } + switch o.seekMode { + case SeekModeNone: + return &readCloser{r}, nil + case SeekModeRegular: + return &readSeekCloser{r}, nil + case SeekModeRange: + return &readRangeSeekCloser{r}, nil + default: + return nil, errors.New(o.seekMode.String()) + } +} +func (o *contentMockObject) Size() int64 { + return int64(len(o.content)) +} + +type readCloser struct{ io.Reader } + +func (r *readCloser) Close() error { return nil } + +type readSeekCloser struct{ io.ReadSeeker } + +func (r *readSeekCloser) Close() error { return nil } + +type readRangeSeekCloser struct{ io.ReadSeeker } + +func (r *readRangeSeekCloser) RangeSeek(offset int64, whence int, length int64) (int64, error) { + return r.ReadSeeker.Seek(offset, whence) +} + +func (r *readRangeSeekCloser) Close() error { return nil } + +func (m SeekMode) String() string { + switch m { + case SeekModeNone: + return "SeekModeNone" + case SeekModeRegular: + return "SeekModeRegular" + case SeekModeRange: + return "SeekModeRange" + default: + return fmt.Sprintf("SeekModeInvalid(%d)", m) + } +}