1
mirror of https://github.com/rclone/rclone synced 2024-11-24 01:26:25 +01:00
rclone/fs/chunkedreader/parallel.go
Nick Craig-Wood 27b281ef69 chunkedreader: add --vfs-read-chunk-streams to parallel read chunks
This converts the ChunkedReader into an interface and provides two
implementations one sequential and one parallel.

This can be used to improve the performance of the VFS on high
bandwidth or high latency links.

Fixes #4760
2024-08-14 21:13:09 +01:00

384 lines
9.7 KiB
Go

package chunkedreader
import (
"context"
"fmt"
"io"
"sync"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/log"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/lib/multipart"
"github.com/rclone/rclone/lib/pool"
)
// parallel reads Object in chunks of a given size in parallel.
type parallel struct {
ctx context.Context
o fs.Object // source to read from
mu sync.Mutex // protects following fields
endStream int64 // offset we have started streams for
offset int64 // offset the read file pointer is at
chunkSize int64 // length of the chunks to read
nstreams int // number of streams to use
streams []*stream // the opened streams in offset order - the current one is first
closed bool // has Close been called?
}
// stream holds the info about a single download
type stream struct {
cr *parallel // parent reader
ctx context.Context // ctx to cancel if needed
cancel func() // cancel the stream
rc io.ReadCloser // reader that it is reading from, may be nil
offset int64 // where the stream is reading from
size int64 // and the size it is reading
readBytes int64 // bytes read from the stream
rw *pool.RW // buffer for read
err chan error // error returned from the read
name string // name of this stream for debugging
}
// Start a stream reading (offset, offset+size)
func (cr *parallel) newStream(ctx context.Context, offset, size int64) (s *stream, err error) {
ctx, cancel := context.WithCancel(ctx)
// Create the stream
rw := multipart.NewRW()
s = &stream{
cr: cr,
ctx: ctx,
cancel: cancel,
offset: offset,
size: size,
rw: rw,
err: make(chan error, 1),
}
s.name = fmt.Sprintf("stream(%d,%d,%p)", s.offset, s.size, s)
// Start the background read into the buffer
go s.readFrom(ctx)
// Return the stream to the caller
return s, nil
}
// read the file into the buffer
func (s *stream) readFrom(ctx context.Context) {
// Open the object at the correct range
fs.Debugf(s.cr.o, "%s: open", s.name)
rc, err := operations.Open(ctx, s.cr.o,
&fs.HashesOption{Hashes: hash.Set(hash.None)},
&fs.RangeOption{Start: s.offset, End: s.offset + s.size - 1})
if err != nil {
s.err <- fmt.Errorf("parallel chunked reader: failed to open stream at %d size %d: %w", s.offset, s.size, err)
return
}
s.rc = rc
fs.Debugf(s.cr.o, "%s: readfrom started", s.name)
_, err = s.rw.ReadFrom(s.rc)
fs.Debugf(s.cr.o, "%s: readfrom finished (%d bytes): %v", s.name, s.rw.Size(), err)
s.err <- err
}
// eof is true when we've read all the data we are expecting
func (s *stream) eof() bool {
return s.readBytes >= s.size
}
// read reads up to len(p) bytes into p. It returns the number of
// bytes read (0 <= n <= len(p)) and any error encountered. If some
// data is available but not len(p) bytes, read returns what is
// available instead of waiting for more.
func (s *stream) read(p []byte) (n int, err error) {
defer log.Trace(s.cr.o, "%s: Read len(p)=%d", s.name, len(p))("n=%d, err=%v", &n, &err)
if len(p) == 0 {
return n, nil
}
for {
var nn int
nn, err = s.rw.Read(p[n:])
fs.Debugf(s.cr.o, "%s: rw.Read nn=%d, err=%v", s.name, nn, err)
s.readBytes += int64(nn)
n += nn
if err != nil && err != io.EOF {
return n, err
}
if s.eof() {
return n, io.EOF
}
// Received a faux io.EOF because we haven't read all the data yet
if n >= len(p) {
break
}
// Wait for a write to happen to read more
s.rw.WaitWrite(s.ctx)
}
return n, nil
}
// Sets *perr to newErr if err is nil
func orErr(perr *error, newErr error) {
if *perr == nil {
*perr = newErr
}
}
// Close a stream
func (s *stream) close() (err error) {
defer log.Trace(s.cr.o, "%s: close", s.name)("err=%v", &err)
s.cancel()
err = <-s.err // wait for readFrom to stop and return error
orErr(&err, s.rw.Close())
if s.rc != nil {
orErr(&err, s.rc.Close())
}
if err != nil && err != io.EOF {
return fmt.Errorf("parallel chunked reader: failed to read stream at %d size %d: %w", s.offset, s.size, err)
}
return nil
}
// Make a new parallel chunked reader
//
// Mustn't be called for an unknown size object
func newParallel(ctx context.Context, o fs.Object, chunkSize int64, streams int) ChunkedReader {
// Make sure chunkSize is a multiple of multipart.BufferSize
if chunkSize < 0 {
chunkSize = multipart.BufferSize
}
newChunkSize := multipart.BufferSize * (chunkSize / multipart.BufferSize)
if newChunkSize < chunkSize {
newChunkSize += multipart.BufferSize
}
fs.Debugf(o, "newParallel chunkSize=%d, streams=%d", chunkSize, streams)
return &parallel{
ctx: ctx,
o: o,
offset: 0,
chunkSize: newChunkSize,
nstreams: streams,
}
}
// _open starts the file transferring at offset
//
// Call with the lock held
func (cr *parallel) _open() (err error) {
size := cr.o.Size()
if size < 0 {
return fmt.Errorf("parallel chunked reader: can't use multiple threads for unknown sized object %q", cr.o)
}
// Launched enough streams already
if cr.endStream >= size {
return nil
}
// Make sure cr.nstreams are running
for i := len(cr.streams); i < cr.nstreams; i++ {
// clip to length of file
chunkSize := cr.chunkSize
newEndStream := cr.endStream + chunkSize
if newEndStream > size {
chunkSize = size - cr.endStream
newEndStream = cr.endStream + chunkSize
}
s, err := cr.newStream(cr.ctx, cr.endStream, chunkSize)
if err != nil {
return err
}
cr.streams = append(cr.streams, s)
cr.endStream = newEndStream
if cr.endStream >= size {
break
}
}
return nil
}
// Finished reading the current stream so pop it off and destroy it
//
// Call with lock held
func (cr *parallel) _popStream() (err error) {
defer log.Trace(cr.o, "streams=%+v", cr.streams)("streams=%+v, err=%v", &cr.streams, &err)
if len(cr.streams) == 0 {
return nil
}
stream := cr.streams[0]
err = stream.close()
cr.streams[0] = nil
cr.streams = cr.streams[1:]
return err
}
// Get rid of all the streams
//
// Call with lock held
func (cr *parallel) _popStreams() (err error) {
defer log.Trace(cr.o, "streams=%+v", cr.streams)("streams=%+v, err=%v", &cr.streams, &err)
for len(cr.streams) > 0 {
orErr(&err, cr._popStream())
}
cr.streams = nil
return err
}
// Read from the file - for details see io.Reader
func (cr *parallel) Read(p []byte) (n int, err error) {
defer log.Trace(cr.o, "Read len(p)=%d", len(p))("n=%d, err=%v", &n, &err)
cr.mu.Lock()
defer cr.mu.Unlock()
if cr.closed {
return 0, ErrorFileClosed
}
for n < len(p) {
// Make sure we have the correct number of streams open
err = cr._open()
if err != nil {
return n, err
}
// No streams left means EOF
if len(cr.streams) == 0 {
return n, io.EOF
}
// Read from the stream
stream := cr.streams[0]
nn, err := stream.read(p[n:])
n += nn
cr.offset += int64(nn)
if err == io.EOF {
err = cr._popStream()
if err != nil {
break
}
} else if err != nil {
break
}
}
return n, err
}
// Close the file - for details see io.Closer
//
// All methods on ChunkedReader will return ErrorFileClosed afterwards
func (cr *parallel) Close() error {
cr.mu.Lock()
defer cr.mu.Unlock()
if cr.closed {
return ErrorFileClosed
}
cr.closed = true
// Close all the streams
return cr._popStreams()
}
// Seek the file - for details see io.Seeker
func (cr *parallel) Seek(offset int64, whence int) (int64, error) {
cr.mu.Lock()
defer cr.mu.Unlock()
fs.Debugf(cr.o, "parallel chunked reader: seek from %d to %d whence %d", cr.offset, offset, whence)
if cr.closed {
return 0, ErrorFileClosed
}
size := cr.o.Size()
currentOffset := cr.offset
switch whence {
case io.SeekStart:
currentOffset = 0
case io.SeekEnd:
currentOffset = size
}
// set the new chunk start
newOffset := currentOffset + offset
if newOffset < 0 || newOffset >= size {
return 0, ErrorInvalidSeek
}
// If seek pointer didn't move, return now
if newOffset == cr.offset {
fs.Debugf(cr.o, "parallel chunked reader: seek pointer didn't move")
return cr.offset, nil
}
cr.offset = newOffset
// Ditch out of range streams
for len(cr.streams) > 0 {
stream := cr.streams[0]
if newOffset >= stream.offset+stream.size {
_ = cr._popStream()
} else {
break
}
}
// If no streams remain we can just restart
if len(cr.streams) == 0 {
fs.Debugf(cr.o, "parallel chunked reader: no streams remain")
cr.endStream = cr.offset
return cr.offset, nil
}
// Current stream
stream := cr.streams[0]
// If new offset is before current stream then ditch all the streams
if newOffset < stream.offset {
_ = cr._popStreams()
fs.Debugf(cr.o, "parallel chunked reader: new offset is before current stream - ditch all")
cr.endStream = cr.offset
return cr.offset, nil
}
// Seek the current stream
streamOffset := newOffset - stream.offset
stream.readBytes = streamOffset // correct read value
fs.Debugf(cr.o, "parallel chunked reader: seek the current stream to %d", streamOffset)
// Wait for the read to the correct part of the data
for stream.rw.Size() < streamOffset {
stream.rw.WaitWrite(cr.ctx)
}
_, err := stream.rw.Seek(streamOffset, io.SeekStart)
if err != nil {
return cr.offset, fmt.Errorf("parallel chunked reader: failed to seek stream: %w", err)
}
return cr.offset, nil
}
// RangeSeek the file - for details see RangeSeeker
//
// In the parallel chunked reader this just acts like Seek
func (cr *parallel) RangeSeek(ctx context.Context, offset int64, whence int, length int64) (int64, error) {
return cr.Seek(offset, whence)
}
// Open forces the connection to be opened
func (cr *parallel) Open() (ChunkedReader, error) {
cr.mu.Lock()
defer cr.mu.Unlock()
return cr, cr._open()
}
var (
_ ChunkedReader = (*parallel)(nil)
)