1
mirror of https://github.com/rclone/rclone synced 2024-11-20 21:27:33 +01:00
rclone/fs/chunkedreader/parallel_test.go
Nick Craig-Wood 27b281ef69 chunkedreader: add --vfs-read-chunk-streams to parallel read chunks
This converts the ChunkedReader into an interface and provides two
implementations one sequential and one parallel.

This can be used to improve the performance of the VFS on high
bandwidth or high latency links.

Fixes #4760
2024-08-14 21:13:09 +01:00

103 lines
2.4 KiB
Go

package chunkedreader
import (
"context"
"io"
"math/rand"
"testing"
"github.com/rclone/rclone/fstest/mockobject"
"github.com/rclone/rclone/lib/multipart"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestParallel(t *testing.T) {
content := makeContent(t, 1024)
for _, mode := range mockobject.SeekModes {
t.Run(mode.String(), testRead(content, mode, 3))
}
}
func TestParallelErrorAfterClose(t *testing.T) {
testErrorAfterClose(t, 3)
}
func TestParallelLarge(t *testing.T) {
ctx := context.Background()
const streams = 3
const chunkSize = multipart.BufferSize
const size = (2*streams+1)*chunkSize + 255
content := makeContent(t, size)
o := mockobject.New("test.bin").WithContent(content, mockobject.SeekModeNone)
cr := New(ctx, o, chunkSize, 0, streams)
for _, test := range []struct {
name string
offset int64
seekMode int
}{
{name: "Straight", offset: 0, seekMode: -1},
{name: "Rewind", offset: 0, seekMode: io.SeekStart},
{name: "NearStart", offset: 1, seekMode: io.SeekStart},
{name: "NearEnd", offset: size - 2*chunkSize - 127, seekMode: io.SeekEnd},
} {
t.Run(test.name, func(t *testing.T) {
if test.seekMode >= 0 {
var n int64
var err error
if test.seekMode == io.SeekEnd {
n, err = cr.Seek(test.offset-size, test.seekMode)
} else {
n, err = cr.Seek(test.offset, test.seekMode)
}
require.NoError(t, err)
assert.Equal(t, test.offset, n)
}
got, err := io.ReadAll(cr)
require.NoError(t, err)
require.Equal(t, len(content[test.offset:]), len(got))
assert.Equal(t, content[test.offset:], got)
})
}
require.NoError(t, cr.Close())
t.Run("Seeky", func(t *testing.T) {
cr := New(ctx, o, chunkSize, 0, streams)
offset := 0
buf := make([]byte, 1024)
for {
// Read and check a random read
readSize := rand.Intn(1024)
readBuf := buf[:readSize]
n, err := cr.Read(readBuf)
require.Equal(t, content[offset:offset+n], readBuf[:n])
offset += n
if err == io.EOF {
assert.Equal(t, size, offset)
break
}
require.NoError(t, err)
// Now do a smaller random seek backwards
seekSize := rand.Intn(512)
if offset-seekSize < 0 {
seekSize = offset
}
nn, err := cr.Seek(-int64(seekSize), io.SeekCurrent)
offset -= seekSize
require.NoError(t, err)
assert.Equal(t, nn, int64(offset))
}
require.NoError(t, cr.Close())
})
}