b2: fix chunked streaming uploads

Streaming uploads are used by rclone rcat and rclone mount
--vfs-cache-mode off.

After the multipart chunker refactor the multipart chunked streaming
upload was accidentally mixing the first and the second parts up which
was causing corrupted uploads.

This was caused by a simple off by one error in the refactoring where
we went from 1 based part number counting to 0 based part number
counting.

Fixing this revealed that the metadata wasn't being re-read for the
copied object either.

This fixes both of those issues and adds an integration tests so it
won't happen again.

Fixes #7367
This commit is contained in:
Nick Craig-Wood 2023-10-13 15:46:36 +01:00
parent d24e5bc7e4
commit 31486aa7e3
3 changed files with 62 additions and 5 deletions

View File

@ -1888,7 +1888,11 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
return err
}
// NB Stream returns the buffer and token
return up.Stream(ctx, rw)
err = up.Stream(ctx, rw)
if err != nil {
return err
}
return o.decodeMetaDataFileInfo(up.info)
} else if err == io.EOF {
fs.Debugf(o, "File has %d bytes, which makes only one chunk. Using direct upload.", n)
defer o.fs.putRW(rw)

View File

@ -1,10 +1,14 @@
package b2
import (
"bytes"
"context"
"fmt"
"testing"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/object"
"github.com/rclone/rclone/fstest"
"github.com/rclone/rclone/fstest/fstests"
"github.com/rclone/rclone/lib/random"
@ -191,7 +195,7 @@ func (f *Fs) InternalTestChunkedCopy(t *testing.T) {
// Set copy cutoff to mininum value so we make chunks
origCutoff := f.opt.CopyCutoff
f.opt.CopyCutoff = 5 * 1024 * 1024
f.opt.CopyCutoff = minChunkSize
defer func() {
f.opt.CopyCutoff = origCutoff
}()
@ -216,9 +220,57 @@ func (f *Fs) InternalTestChunkedCopy(t *testing.T) {
assert.Equal(t, contents, gotContents)
}
// The integration tests do a reasonable job of testing the normal
// streaming upload but don't test the chunked streaming upload.
func (f *Fs) InternalTestChunkedStreamingUpload(t *testing.T, size int) {
ctx := context.Background()
contents := random.String(size)
item := fstest.NewItem(fmt.Sprintf("chunked-streaming-upload-%d", size), contents, fstest.Time("2001-05-06T04:05:06.499Z"))
// Set chunk size to mininum value so we make chunks
origOpt := f.opt
f.opt.ChunkSize = minChunkSize
f.opt.UploadCutoff = 0
defer func() {
f.opt = origOpt
}()
// Do the streaming upload
src := object.NewStaticObjectInfo(item.Path, item.ModTime, -1, true, item.Hashes, f)
in := bytes.NewBufferString(contents)
dst, err := f.PutStream(ctx, in, src)
require.NoError(t, err)
defer func() {
assert.NoError(t, dst.Remove(ctx))
}()
// Check size
assert.Equal(t, int64(size), dst.Size())
// Check modtime
srcModTime := src.ModTime(ctx)
dstModTime := dst.ModTime(ctx)
assert.Equal(t, srcModTime, dstModTime)
// Make sure contents are correct
gotContents := fstests.ReadObject(ctx, t, dst, -1)
assert.Equal(t, contents, gotContents, "Contents incorrect")
}
// -run TestIntegration/FsMkdir/FsPutFiles/Internal
func (f *Fs) InternalTest(t *testing.T) {
t.Run("ChunkedCopy", f.InternalTestChunkedCopy)
for _, size := range []fs.SizeSuffix{
minChunkSize - 1,
minChunkSize,
minChunkSize + 1,
(3 * minChunkSize) / 2,
(5 * minChunkSize) / 2,
} {
t.Run(fmt.Sprintf("ChunkedStreamingUpload/%d", size), func(t *testing.T) {
f.InternalTestChunkedStreamingUpload(t, int(size))
})
}
}
var _ fstests.InternalTester = (*Fs)(nil)

View File

@ -393,10 +393,11 @@ func (up *largeUpload) Stream(ctx context.Context, initialUploadBlock *pool.RW)
hasMoreParts = true
)
up.size = initialUploadBlock.Size()
up.parts = 0
for part := 0; hasMoreParts; part++ {
// Get a block of memory from the pool and token which limits concurrency.
var rw *pool.RW
if part == 1 {
if part == 0 {
rw = initialUploadBlock
} else {
rw = up.f.getRW(false)
@ -411,7 +412,7 @@ func (up *largeUpload) Stream(ctx context.Context, initialUploadBlock *pool.RW)
// Read the chunk
var n int64
if part == 1 {
if part == 0 {
n = rw.Size()
} else {
n, err = io.CopyN(rw, up.in, up.chunkSize)
@ -426,7 +427,7 @@ func (up *largeUpload) Stream(ctx context.Context, initialUploadBlock *pool.RW)
}
// Keep stats up to date
up.parts = part
up.parts += 1
up.size += n
if part > maxParts {
up.f.putRW(rw)