chunkedreader: add --vfs-read-chunk-streams parameter

This commit is contained in:
Nick Craig-Wood 2024-03-12 16:57:16 +00:00
parent 6dad953dd8
commit eac8996d84
8 changed files with 33 additions and 12 deletions

View File

@ -38,6 +38,7 @@ import (
const (
initialChunkSize = 262144 // Initial and max sizes of chunks when reading parts of the file. Currently
maxChunkSize = 8388608 // at 256 KiB and 8 MiB.
chunkStreams = 0 // Streams to use for reading
bufferSize = 8388608
heuristicBytes = 1048576
@ -1351,7 +1352,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (rc io.Read
}
}
// Get a chunkedreader for the wrapped object
chunkedReader := chunkedreader.New(ctx, o.Object, initialChunkSize, maxChunkSize)
chunkedReader := chunkedreader.New(ctx, o.Object, initialChunkSize, maxChunkSize, chunkStreams)
// Get file handle
var file io.Reader
if offset != 0 {

View File

@ -1,4 +1,4 @@
// Package chunkedreader provides functionality for reading in chunks.
// Package chunkedreader provides functionality for reading a stream in chunks.
package chunkedreader
import (
@ -31,6 +31,7 @@ type ChunkedReader struct {
chunkSize int64 // length of the current or next chunk. -1 will open o from chunkOffset to the end
initialChunkSize int64 // default chunkSize after the chunk specified by RangeSeek is complete
maxChunkSize int64 // consecutive read chunks will double in size until reached. -1 means no limit
streams int // number of streams to use
customChunkSize bool // is the current chunkSize set by RangeSeek?
closed bool // has Close been called?
}
@ -41,7 +42,7 @@ type ChunkedReader struct {
// If maxChunkSize is greater than initialChunkSize, the chunk size will be
// doubled after each chunk read with a maximum of maxChunkSize.
// A Seek or RangeSeek will reset the chunk size to it's initial value
func New(ctx context.Context, o fs.Object, initialChunkSize int64, maxChunkSize int64) *ChunkedReader {
func New(ctx context.Context, o fs.Object, initialChunkSize int64, maxChunkSize int64, streams int) *ChunkedReader {
if initialChunkSize <= 0 {
initialChunkSize = -1
}
@ -55,6 +56,7 @@ func New(ctx context.Context, o fs.Object, initialChunkSize int64, maxChunkSize
chunkSize: initialChunkSize,
initialChunkSize: initialChunkSize,
maxChunkSize: maxChunkSize,
streams: streams,
}
}

View File

@ -39,7 +39,7 @@ func testRead(content []byte, mode mockobject.SeekMode) func(*testing.T) {
}
t.Run(fmt.Sprintf("Chunksize_%d_%d", cs, csMax), func(t *testing.T) {
cr := New(context.Background(), o, cs, csMax)
cr := New(context.Background(), o, cs, csMax, 0)
for _, offset := range offsets {
for _, limit := range limits {
@ -79,25 +79,25 @@ func TestErrorAfterClose(t *testing.T) {
o := mockobject.New("test.bin").WithContent(content, mockobject.SeekModeNone)
// Close
cr := New(context.Background(), o, 0, 0)
cr := New(context.Background(), o, 0, 0, 0)
require.NoError(t, cr.Close())
require.Error(t, cr.Close())
// Read
cr = New(context.Background(), o, 0, 0)
cr = New(context.Background(), o, 0, 0, 0)
require.NoError(t, cr.Close())
var buf [1]byte
_, err := cr.Read(buf[:])
require.Error(t, err)
// Seek
cr = New(context.Background(), o, 0, 0)
cr = New(context.Background(), o, 0, 0, 0)
require.NoError(t, cr.Close())
_, err = cr.Seek(1, io.SeekCurrent)
require.Error(t, err)
// RangeSeek
cr = New(context.Background(), o, 0, 0)
cr = New(context.Background(), o, 0, 0, 0)
require.NoError(t, cr.Close())
_, err = cr.RangeSeek(context.Background(), 1, io.SeekCurrent, 0)
require.Error(t, err)

View File

@ -75,7 +75,8 @@ func (fh *ReadFileHandle) openPending() (err error) {
return nil
}
o := fh.file.getObject()
r, err := chunkedreader.New(context.TODO(), o, int64(fh.file.VFS().Opt.ChunkSize), int64(fh.file.VFS().Opt.ChunkSizeLimit)).Open()
opt := &fh.file.VFS().Opt
r, err := chunkedreader.New(context.TODO(), o, int64(opt.ChunkSize), int64(opt.ChunkSizeLimit), opt.ChunkStreams).Open()
if err != nil {
return err
}
@ -127,7 +128,7 @@ func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) {
}
fh.r.StopBuffering() // stop the background reading first
oldReader := fh.r.GetReader()
r, ok := oldReader.(*chunkedreader.ChunkedReader)
r, ok := oldReader.(chunkedreader.ChunkedReader)
if !ok {
fs.Logf(fh.remote, "ReadFileHandle.Read expected reader to be a ChunkedReader, got %T", oldReader)
reopen = true
@ -148,7 +149,8 @@ func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) {
}
// re-open with a seek
o := fh.file.getObject()
r = chunkedreader.New(context.TODO(), o, int64(fh.file.VFS().Opt.ChunkSize), int64(fh.file.VFS().Opt.ChunkSizeLimit))
opt := &fh.file.VFS().Opt
r = chunkedreader.New(context.TODO(), o, int64(opt.ChunkSize), int64(opt.ChunkSizeLimit), opt.ChunkStreams)
_, err := r.Seek(offset, 0)
if err != nil {
fs.Debugf(fh.remote, "ReadFileHandle.Read seek failed: %v", err)

View File

@ -230,6 +230,11 @@ These flags control the chunking:
--vfs-read-chunk-size SizeSuffix Read the source objects in chunks (default 128M)
--vfs-read-chunk-size-limit SizeSuffix Max chunk doubling size (default off)
--vfs-read-chunk-streams int The number of parallel streams to read at once
The chunking behaves differently depending on the `--vfs-read-chunk-streams` parameter.
#### `--vfs-read-chunk-streams` == 0
Rclone will start reading a chunk of size `--vfs-read-chunk-size`,
and then double the size for each read. When `--vfs-read-chunk-size-limit` is
@ -245,6 +250,14 @@ When `--vfs-read-chunk-size-limit 500M` is specified, the result would be
Setting `--vfs-read-chunk-size` to `0` or "off" disables chunked reading.
The chunks will not be buffered in memory.
#### `--vfs-read-chunk-streams` > 0
Rclone read `--vfs-read-chunk-streams` chunks of size
`--vfs-read-chunk-size` in parallel. The size for each read will stay
constant.
### VFS Performance
These flags may be used to enable/disable features of the VFS for

View File

@ -533,7 +533,7 @@ func (dl *downloader) open(offset int64) (err error) {
// }
// in0, err := operations.NewReOpen(dl.dls.ctx, dl.dls.src, ci.LowLevelRetries, dl.dls.item.c.hashOption, rangeOption)
in0 := chunkedreader.New(context.TODO(), dl.dls.src, int64(dl.dls.opt.ChunkSize), int64(dl.dls.opt.ChunkSizeLimit))
in0 := chunkedreader.New(context.TODO(), dl.dls.src, int64(dl.dls.opt.ChunkSize), int64(dl.dls.opt.ChunkSizeLimit), dl.dls.opt.ChunkStreams)
_, err = in0.Seek(offset, 0)
if err != nil {
return fmt.Errorf("vfs reader: failed to open source file: %w", err)

View File

@ -24,6 +24,7 @@ type Options struct {
FilePerms os.FileMode
ChunkSize fs.SizeSuffix // if > 0 read files in chunks
ChunkSizeLimit fs.SizeSuffix // if > ChunkSize double the chunk size after each chunk until reached
ChunkStreams int // Number of download streams to use
CacheMode CacheMode
CacheMaxAge time.Duration
CacheMaxSize fs.SizeSuffix
@ -59,6 +60,7 @@ var DefaultOpt = Options{
CachePollInterval: 60 * time.Second,
ChunkSize: 128 * fs.Mebi,
ChunkSizeLimit: -1,
ChunkStreams: 0,
CacheMaxSize: -1,
CacheMinFreeSpace: -1,
CaseInsensitive: runtime.GOOS == "windows" || runtime.GOOS == "darwin", // default to true on Windows and Mac, false otherwise

View File

@ -32,6 +32,7 @@ func AddFlags(flagSet *pflag.FlagSet) {
flags.FVarP(flagSet, &Opt.CacheMinFreeSpace, "vfs-cache-min-free-space", "", "Target minimum free space on the disk containing the cache", "VFS")
flags.FVarP(flagSet, &Opt.ChunkSize, "vfs-read-chunk-size", "", "Read the source objects in chunks", "VFS")
flags.FVarP(flagSet, &Opt.ChunkSizeLimit, "vfs-read-chunk-size-limit", "", "If greater than --vfs-read-chunk-size, double the chunk size after each chunk read, until the limit is reached ('off' is unlimited)", "VFS")
flags.IntVarP(flagSet, &Opt.ChunkStreams, "vfs-read-chunk-streams", "", Opt.ChunkStreams, "The number of parallel streams to read at once", "VFS")
flags.FVarP(flagSet, DirPerms, "dir-perms", "", "Directory permissions", "VFS")
flags.FVarP(flagSet, FilePerms, "file-perms", "", "File permissions", "VFS")
flags.BoolVarP(flagSet, &Opt.CaseInsensitive, "vfs-case-insensitive", "", Opt.CaseInsensitive, "If a file name not found, find a case insensitive match", "VFS")