From a5700a4a53abc7714461f13aee15bae8591cde18 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Thu, 16 May 2024 12:18:00 +0100 Subject: [PATCH] operations: rework rcat so that it doesn't call the --metadata-mapper twice The --metadata-mapper was being called twice for files that rclone needed to stream to disk, This happened only for: - files bigger than --upload-streaming-cutoff - on backends which didn't support PutStream This also meant that these were being logged as two transfers which was a little strange. This fixes the problem by not using operations.Copy to upload the file once it has been streamed to disk, instead using the Put method on the backend. This should have no effect on reliability of the transfers as we retry Put if possible. This also tidies up the Rcat function to make the different ways of uploading the data clearer and make it easy to see that it gets verified on all those paths. See #7848 --- fs/operations/operations.go | 158 +++++++++++++++++-------------- fs/operations/operations_test.go | 7 +- 2 files changed, 93 insertions(+), 72 deletions(-) diff --git a/fs/operations/operations.go b/fs/operations/operations.go index 59877bcc8..37dc4246d 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -36,6 +36,7 @@ import ( "github.com/rclone/rclone/fs/walk" "github.com/rclone/rclone/lib/atexit" "github.com/rclone/rclone/lib/errcount" + "github.com/rclone/rclone/lib/pacer" "github.com/rclone/rclone/lib/random" "github.com/rclone/rclone/lib/readers" "golang.org/x/sync/errgroup" @@ -719,13 +720,18 @@ func Retry(ctx context.Context, o interface{}, maxTries int, fn func() error) (e if err == nil { break } - // Retry if err returned a retry error + // End if ctx is in error if fserrors.ContextError(ctx, &err) { break } + // Retry if err returned a retry error if fserrors.IsRetryError(err) || fserrors.ShouldRetry(err) { fs.Debugf(o, "Received error: %v - low level retry %d/%d", err, tries, maxTries) continue + } else if t, ok := pacer.IsRetryAfter(err); ok { + fs.Debugf(o, "Sleeping for %v (as indicated by the server) to obey Retry-After error: %v", t, err) + time.Sleep(t) + continue } break } @@ -1269,22 +1275,32 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64, sep []b } // Rcat reads data from the Reader until EOF and uploads it to a file on remote +// +// in is closed at the end of the transfer func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time, meta fs.Metadata) (dst fs.Object, err error) { return rcatSrc(ctx, fdst, dstFileName, in, modTime, meta, nil) } // rcatSrc reads data from the Reader until EOF and uploads it to a file on remote // +// in is closed at the end of the transfer +// // Pass in fsrc if known or nil if not func rcatSrc(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time, meta fs.Metadata, fsrc fs.Fs) (dst fs.Object, err error) { + if SkipDestructive(ctx, dstFileName, "upload from pipe") { + // prevents "broken pipe" errors + _, err = io.Copy(io.Discard, in) + return nil, err + } + ci := fs.GetConfig(ctx) tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, -1, nil, fdst) defer func() { tr.Done(ctx, err) }() - in = tr.Account(ctx, in).WithBuffer() + var streamIn io.Reader = tr.Account(ctx, in).WithBuffer() - readCounter := readers.NewCountingReader(in) + readCounter := readers.NewCountingReader(streamIn) var trackingIn io.Reader var hasher *hash.MultiHasher var options []fs.OpenOption @@ -1307,86 +1323,90 @@ func rcatSrc(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadClos options = append(options, fs.MetadataOption(ci.MetadataSet)) } - compare := func(dst fs.Object) error { - var sums map[hash.Type]string - opt := defaultEqualOpt(ctx) + // get the sums from the hasher if in use, or nil + getSums := func() (sums map[hash.Type]string) { if hasher != nil { - // force --checksum on if we have hashes - opt.checkSum = true sums = hasher.Sums() } - src := object.NewStaticObjectInfo(dstFileName, modTime, int64(readCounter.BytesRead()), false, sums, fdst).WithMetadata(meta) - if !equal(ctx, src, dst, opt) { - err = fmt.Errorf("corrupted on transfer") - err = fs.CountError(err) - fs.Errorf(dst, "%v", err) - return err - } - return nil + return sums } - // check if file small enough for direct upload + // Read the start of the input and check if it is small enough for direct upload buf := make([]byte, ci.StreamingUploadCutoff) + fileIsSmall := false if n, err := io.ReadFull(trackingIn, buf); err == io.EOF || err == io.ErrUnexpectedEOF { - fs.Debugf(fdst, "File to upload is small (%d bytes), uploading instead of streaming", n) - src := object.NewMemoryObject(dstFileName, modTime, buf[:n]).WithMetadata(meta).SetFs(fsrc) - return Copy(ctx, fdst, nil, dstFileName, src) + fileIsSmall = true + buf = buf[:n] } - // Make a new ReadCloser with the bits we've already read - in = &readCloser{ - Reader: io.MultiReader(bytes.NewReader(buf), trackingIn), - Closer: in, - } + // Read the data we have already read in buf and any further unread + streamIn = io.MultiReader(bytes.NewReader(buf), trackingIn) - fStreamTo := fdst - canStream := fdst.Features().PutStream != nil - if !canStream { - fs.Debugf(fdst, "Target remote doesn't support streaming uploads, creating temporary local FS to spool file") - tmpLocalFs, err := fs.TemporaryLocalFs(ctx) - if err != nil { - return nil, fmt.Errorf("failed to create temporary local FS to spool file: %w", err) - } - defer func() { - err := Purge(ctx, tmpLocalFs, "") + doPutStream := fdst.Features().PutStream + + // Upload the input + if fileIsSmall || doPutStream == nil { + var rs io.ReadSeeker + if fileIsSmall { + fs.Debugf(fdst, "File to upload is small (%d bytes), uploading instead of streaming", len(buf)) + rs = bytes.NewReader(buf) + } else { + fs.Debugf(fdst, "Target remote doesn't support streaming uploads, creating temporary local FS to spool file") + spool, err := os.CreateTemp("", "rclone-spool") if err != nil { - fs.Infof(tmpLocalFs, "Failed to cleanup temporary FS: %v", err) + return nil, fmt.Errorf("failed to create temporary spool file: %v", err) } - }() - fStreamTo = tmpLocalFs - } - - if SkipDestructive(ctx, dstFileName, "upload from pipe") { - // prevents "broken pipe" errors - _, err = io.Copy(io.Discard, in) - return nil, err - } - - objInfo := object.NewStaticObjectInfo(dstFileName, modTime, -1, false, nil, fsrc).WithMetadata(meta) - if dst, err = fStreamTo.Features().PutStream(ctx, in, objInfo, options...); err != nil { - return dst, err - } - if err = compare(dst); err != nil { - return dst, err - } - if !canStream { - // copy dst (which is the local object we have just streamed to) to the remote - newCtx := ctx - if ci.Metadata && len(meta) != 0 { - // If we have metadata and we are setting it then use - // the --metadataset mechanism to supply it to Copy - var newCi *fs.ConfigInfo - newCtx, newCi = fs.AddConfig(ctx) - if len(newCi.MetadataSet) == 0 { - newCi.MetadataSet = meta - } else { - var newMeta fs.Metadata - newMeta.Merge(meta) - newMeta.Merge(newCi.MetadataSet) // --metadata-set takes priority - newCi.MetadataSet = newMeta + fileName := spool.Name() + defer func() { + err := spool.Close() + if err != nil { + fs.Errorf(fileName, "Failed to close temporary spool file: %v", err) + } + err = os.Remove(fileName) + if err != nil { + fs.Errorf(fileName, "Failed to delete temporary spool file: %v", err) + } + }() + _, err = io.Copy(spool, streamIn) + if err != nil { + return nil, fmt.Errorf("failed to copy to temporary spool file: %v", err) } + rs = spool } - return Copy(newCtx, fdst, nil, dstFileName, dst) + // Upload with Put with retries - since we have downloaded the file we know the size, and the hashes + sums := getSums() + size := int64(readCounter.BytesRead()) + objInfo := object.NewStaticObjectInfo(dstFileName, modTime, size, false, sums, fsrc).WithMetadata(meta) + err = Retry(ctx, objInfo, ci.LowLevelRetries, func() error { + _, err = rs.Seek(0, io.SeekStart) + if err != nil { + return fmt.Errorf("failed to rewind temporary spool file: %v", err) + } + dst, err = fdst.Put(ctx, rs, objInfo, options...) + return err + }) + } else { + // Upload with PutStream with no retries + objInfo := object.NewStaticObjectInfo(dstFileName, modTime, -1, false, nil, fsrc).WithMetadata(meta) + dst, err = doPutStream(ctx, streamIn, objInfo, options...) + } + if err != nil { + return dst, err + } + + // Check transfer + sums := getSums() + opt := defaultEqualOpt(ctx) + if sums != nil { + // force --checksum on if we have hashes + opt.checkSum = true + } + src := object.NewStaticObjectInfo(dstFileName, modTime, int64(readCounter.BytesRead()), false, sums, fdst).WithMetadata(meta) + if !equal(ctx, src, dst, opt) { + err = fmt.Errorf("corrupted on transfer") + err = fs.CountError(err) + fs.Errorf(dst, "%v", err) + return dst, err } return dst, nil } diff --git a/fs/operations/operations_test.go b/fs/operations/operations_test.go index d14678f13..3e66bb33a 100644 --- a/fs/operations/operations_test.go +++ b/fs/operations/operations_test.go @@ -42,6 +42,7 @@ import ( "github.com/rclone/rclone/fs/operations" "github.com/rclone/rclone/fstest" "github.com/rclone/rclone/fstest/fstests" + "github.com/rclone/rclone/lib/pacer" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/text/cases" @@ -504,12 +505,12 @@ func TestRetry(t *testing.T) { return err } - i, err = 3, io.EOF + i, err = 3, fmt.Errorf("Wrapped EOF is retriable: %w", io.EOF) assert.Equal(t, nil, operations.Retry(ctx, nil, 5, fn)) assert.Equal(t, 0, i) - i, err = 10, io.EOF - assert.Equal(t, io.EOF, operations.Retry(ctx, nil, 5, fn)) + i, err = 10, pacer.RetryAfterError(errors.New("BANG"), 10*time.Millisecond) + assert.Equal(t, err, operations.Retry(ctx, nil, 5, fn)) assert.Equal(t, 5, i) i, err = 10, fs.ErrorObjectNotFound