1
mirror of https://github.com/rclone/rclone synced 2024-12-11 23:54:00 +01:00
rclone/fs/asyncreader/asyncreader_test.go
albertony 5d6b8141ec Replace deprecated ioutil
As of Go 1.16, the same functionality is now provided by package io or
package os, and those implementations should be preferred in new code.
2022-11-07 11:41:47 +00:00

380 lines
9.1 KiB
Go

package asyncreader
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"math/rand"
"strings"
"sync"
"testing"
"testing/iotest"
"time"
"github.com/rclone/rclone/lib/israce"
"github.com/rclone/rclone/lib/readers"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestAsyncReader(t *testing.T) {
ctx := context.Background()
buf := io.NopCloser(bytes.NewBufferString("Testbuffer"))
ar, err := New(ctx, buf, 4)
require.NoError(t, err)
var dst = make([]byte, 100)
n, err := ar.Read(dst)
assert.Equal(t, io.EOF, err)
assert.Equal(t, 10, n)
n, err = ar.Read(dst)
assert.Equal(t, io.EOF, err)
assert.Equal(t, 0, n)
// Test read after error
n, err = ar.Read(dst)
assert.Equal(t, io.EOF, err)
assert.Equal(t, 0, n)
err = ar.Close()
require.NoError(t, err)
// Test double close
err = ar.Close()
require.NoError(t, err)
// Test Close without reading everything
buf = io.NopCloser(bytes.NewBuffer(make([]byte, 50000)))
ar, err = New(ctx, buf, 4)
require.NoError(t, err)
err = ar.Close()
require.NoError(t, err)
}
func TestAsyncWriteTo(t *testing.T) {
ctx := context.Background()
buf := io.NopCloser(bytes.NewBufferString("Testbuffer"))
ar, err := New(ctx, buf, 4)
require.NoError(t, err)
var dst = &bytes.Buffer{}
n, err := io.Copy(dst, ar)
require.NoError(t, err)
assert.Equal(t, int64(10), n)
// Should still not return any errors
n, err = io.Copy(dst, ar)
require.NoError(t, err)
assert.Equal(t, int64(0), n)
err = ar.Close()
require.NoError(t, err)
}
func TestAsyncReaderErrors(t *testing.T) {
ctx := context.Background()
// test nil reader
_, err := New(ctx, nil, 4)
require.Error(t, err)
// invalid buffer number
buf := io.NopCloser(bytes.NewBufferString("Testbuffer"))
_, err = New(ctx, buf, 0)
require.Error(t, err)
_, err = New(ctx, buf, -1)
require.Error(t, err)
}
// Complex read tests, leveraged from "bufio".
type readMaker struct {
name string
fn func(io.Reader) io.Reader
}
var readMakers = []readMaker{
{"full", func(r io.Reader) io.Reader { return r }},
{"byte", iotest.OneByteReader},
{"half", iotest.HalfReader},
{"data+err", iotest.DataErrReader},
{"timeout", iotest.TimeoutReader},
}
// Call Read to accumulate the text of a file
func reads(buf io.Reader, m int) string {
var b [1000]byte
nb := 0
for {
n, err := buf.Read(b[nb : nb+m])
nb += n
if err == io.EOF {
break
} else if err != nil && err != iotest.ErrTimeout {
panic("Data: " + err.Error())
} else if err != nil {
break
}
}
return string(b[0:nb])
}
type bufReader struct {
name string
fn func(io.Reader) string
}
var bufreaders = []bufReader{
{"1", func(b io.Reader) string { return reads(b, 1) }},
{"2", func(b io.Reader) string { return reads(b, 2) }},
{"3", func(b io.Reader) string { return reads(b, 3) }},
{"4", func(b io.Reader) string { return reads(b, 4) }},
{"5", func(b io.Reader) string { return reads(b, 5) }},
{"7", func(b io.Reader) string { return reads(b, 7) }},
}
const minReadBufferSize = 16
var bufsizes = []int{
0, minReadBufferSize, 23, 32, 46, 64, 93, 128, 1024, 4096,
}
// Test various input buffer sizes, number of buffers and read sizes.
func TestAsyncReaderSizes(t *testing.T) {
ctx := context.Background()
var texts [31]string
str := ""
all := ""
for i := 0; i < len(texts)-1; i++ {
texts[i] = str + "\n"
all += texts[i]
str += string(rune(i)%26 + 'a')
}
texts[len(texts)-1] = all
for h := 0; h < len(texts); h++ {
text := texts[h]
for i := 0; i < len(readMakers); i++ {
for j := 0; j < len(bufreaders); j++ {
for k := 0; k < len(bufsizes); k++ {
for l := 1; l < 10; l++ {
readmaker := readMakers[i]
bufreader := bufreaders[j]
bufsize := bufsizes[k]
read := readmaker.fn(strings.NewReader(text))
buf := bufio.NewReaderSize(read, bufsize)
ar, _ := New(ctx, io.NopCloser(buf), l)
s := bufreader.fn(ar)
// "timeout" expects the Reader to recover, AsyncReader does not.
if s != text && readmaker.name != "timeout" {
t.Errorf("reader=%s fn=%s bufsize=%d want=%q got=%q",
readmaker.name, bufreader.name, bufsize, text, s)
}
err := ar.Close()
require.NoError(t, err)
}
}
}
}
}
}
// Test various input buffer sizes, number of buffers and read sizes.
func TestAsyncReaderWriteTo(t *testing.T) {
ctx := context.Background()
var texts [31]string
str := ""
all := ""
for i := 0; i < len(texts)-1; i++ {
texts[i] = str + "\n"
all += texts[i]
str += string(rune(i)%26 + 'a')
}
texts[len(texts)-1] = all
for h := 0; h < len(texts); h++ {
text := texts[h]
for i := 0; i < len(readMakers); i++ {
for j := 0; j < len(bufreaders); j++ {
for k := 0; k < len(bufsizes); k++ {
for l := 1; l < 10; l++ {
readmaker := readMakers[i]
bufreader := bufreaders[j]
bufsize := bufsizes[k]
read := readmaker.fn(strings.NewReader(text))
buf := bufio.NewReaderSize(read, bufsize)
ar, _ := New(ctx, io.NopCloser(buf), l)
dst := &bytes.Buffer{}
_, err := ar.WriteTo(dst)
if err != nil && err != io.EOF && err != iotest.ErrTimeout {
t.Fatal("Copy:", err)
}
s := dst.String()
// "timeout" expects the Reader to recover, AsyncReader does not.
if s != text && readmaker.name != "timeout" {
t.Errorf("reader=%s fn=%s bufsize=%d want=%q got=%q",
readmaker.name, bufreader.name, bufsize, text, s)
}
err = ar.Close()
require.NoError(t, err)
}
}
}
}
}
}
// Read an infinite number of zeros
type zeroReader struct {
closed bool
}
func (z *zeroReader) Read(p []byte) (n int, err error) {
if z.closed {
return 0, io.EOF
}
for i := range p {
p[i] = 0
}
return len(p), nil
}
func (z *zeroReader) Close() error {
if z.closed {
panic("double close on zeroReader")
}
z.closed = true
return nil
}
// Test closing and abandoning
func testAsyncReaderClose(t *testing.T, writeto bool) {
ctx := context.Background()
zr := &zeroReader{}
a, err := New(ctx, zr, 16)
require.NoError(t, err)
var copyN int64
var copyErr error
var wg sync.WaitGroup
started := make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
close(started)
if writeto {
// exercise the WriteTo path
copyN, copyErr = a.WriteTo(io.Discard)
} else {
// exercise the Read path
buf := make([]byte, 64*1024)
for {
var n int
n, copyErr = a.Read(buf)
copyN += int64(n)
if copyErr != nil {
break
}
}
}
}()
// Do some copying
<-started
time.Sleep(100 * time.Millisecond)
// Abandon the copy
a.Abandon()
wg.Wait()
assert.Equal(t, ErrorStreamAbandoned, copyErr)
// t.Logf("Copied %d bytes, err %v", copyN, copyErr)
assert.True(t, copyN > 0)
}
func TestAsyncReaderCloseRead(t *testing.T) { testAsyncReaderClose(t, false) }
func TestAsyncReaderCloseWriteTo(t *testing.T) { testAsyncReaderClose(t, true) }
func TestAsyncReaderSkipBytes(t *testing.T) {
ctx := context.Background()
t.Parallel()
data := make([]byte, 15000)
buf := make([]byte, len(data))
r := rand.New(rand.NewSource(42))
n, err := r.Read(data)
require.NoError(t, err)
require.Equal(t, len(data), n)
initialReads := []int{0, 1, 100, 2048,
softStartInitial - 1, softStartInitial, softStartInitial + 1,
8000, len(data)}
skips := []int{-1000, -101, -100, -99, 0, 1, 2048,
softStartInitial - 1, softStartInitial, softStartInitial + 1,
8000, len(data), BufferSize, 2 * BufferSize}
for buffers := 1; buffers <= 5; buffers++ {
if israce.Enabled && buffers > 1 {
t.Skip("FIXME Skipping further tests with race detector until https://github.com/golang/go/issues/27070 is fixed.")
}
t.Run(fmt.Sprintf("%d", buffers), func(t *testing.T) {
for _, initialRead := range initialReads {
t.Run(fmt.Sprintf("%d", initialRead), func(t *testing.T) {
for _, skip := range skips {
t.Run(fmt.Sprintf("%d", skip), func(t *testing.T) {
ar, err := New(ctx, io.NopCloser(bytes.NewReader(data)), buffers)
require.NoError(t, err)
wantSkipFalse := false
buf = buf[:initialRead]
n, err := readers.ReadFill(ar, buf)
if initialRead >= len(data) {
wantSkipFalse = true
if initialRead > len(data) {
assert.Equal(t, err, io.EOF)
} else {
assert.True(t, err == nil || err == io.EOF)
}
assert.Equal(t, len(data), n)
assert.Equal(t, data, buf[:len(data)])
} else {
assert.NoError(t, err)
assert.Equal(t, initialRead, n)
assert.Equal(t, data[:initialRead], buf)
}
skipped := ar.SkipBytes(skip)
buf = buf[:1024]
n, err = readers.ReadFill(ar, buf)
offset := initialRead + skip
if skipped {
assert.False(t, wantSkipFalse)
l := len(buf)
if offset >= len(data) {
assert.Equal(t, err, io.EOF)
} else {
if offset+1024 >= len(data) {
l = len(data) - offset
}
assert.Equal(t, l, n)
assert.Equal(t, data[offset:offset+l], buf[:l])
}
} else {
if initialRead >= len(data) {
assert.Equal(t, err, io.EOF)
} else {
assert.True(t, err == ErrorStreamAbandoned || err == io.EOF)
}
}
})
}
})
}
})
}
}