diff --git a/backend/b2/api/types.go b/backend/b2/api/types.go index fcf0c9834..a9d6af2b1 100644 --- a/backend/b2/api/types.go +++ b/backend/b2/api/types.go @@ -337,3 +337,11 @@ type CopyFileRequest struct { Info map[string]string `json:"fileInfo,omitempty"` // This field stores the metadata that will be stored with the file. (REPLACE only) DestBucketID string `json:"destinationBucketId,omitempty"` // The destination ID of the bucket if set, if not the source bucket will be used } + +// CopyPartRequest is the request for b2_copy_part - the response is UploadPartResponse +type CopyPartRequest struct { + SourceID string `json:"sourceFileId"` // The ID of the source file being copied. + LargeFileID string `json:"largeFileId"` // The ID of the large file the part will belong to, as returned by b2_start_large_file. + PartNumber int64 `json:"partNumber"` // Which part this is (starting from 1) + Range string `json:"range,omitempty"` // The range of bytes to copy. If not provided, the whole source file will be copied. +} diff --git a/backend/b2/b2.go b/backend/b2/b2.go index bddb09ca0..df8e9fc3f 100644 --- a/backend/b2/b2.go +++ b/backend/b2/b2.go @@ -33,6 +33,7 @@ import ( "github.com/rclone/rclone/lib/bucket" "github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/pacer" + "github.com/rclone/rclone/lib/pool" "github.com/rclone/rclone/lib/rest" ) @@ -54,6 +55,9 @@ const ( minChunkSize = 5 * fs.MebiByte defaultChunkSize = 96 * fs.MebiByte defaultUploadCutoff = 200 * fs.MebiByte + largeFileCopyCutoff = 4 * fs.GibiByte // 5E9 is the max + memoryPoolFlushTime = fs.Duration(time.Minute) // flush the cached buffers after this long + memoryPoolUseMmap = false ) // Globals @@ -113,6 +117,16 @@ Files above this size will be uploaded in chunks of "--b2-chunk-size". This value should be set no larger than 4.657GiB (== 5GB).`, Default: defaultUploadCutoff, Advanced: true, + }, { + Name: "copy_cutoff", + Help: `Cutoff for switching to multipart copy + +Any files larger than this that need to be server side copied will be +copied in chunks of this size. + +The minimum is 0 and the maximum is 4.6GB.`, + Default: largeFileCopyCutoff, + Advanced: true, }, { Name: "chunk_size", Help: `Upload chunk size. Must fit in memory. @@ -150,6 +164,18 @@ The duration before the download authorization token will expire. The minimum value is 1 second. The maximum value is one week.`, Default: fs.Duration(7 * 24 * time.Hour), Advanced: true, + }, { + Name: "memory_pool_flush_time", + Default: memoryPoolFlushTime, + Advanced: true, + Help: `How often internal memory buffer pools will be flushed. +Uploads which requires additional buffers (f.e multipart) will use memory pool for allocations. +This option controls how often unused buffers will be removed from the pool.`, + }, { + Name: "memory_pool_use_mmap", + Default: memoryPoolUseMmap, + Advanced: true, + Help: `Whether to use mmap buffers in internal memory pool.`, }, { Name: config.ConfigEncoding, Help: config.ConfigEncodingHelp, @@ -173,10 +199,13 @@ type Options struct { Versions bool `config:"versions"` HardDelete bool `config:"hard_delete"` UploadCutoff fs.SizeSuffix `config:"upload_cutoff"` + CopyCutoff fs.SizeSuffix `config:"copy_cutoff"` ChunkSize fs.SizeSuffix `config:"chunk_size"` DisableCheckSum bool `config:"disable_checksum"` DownloadURL string `config:"download_url"` DownloadAuthorizationDuration fs.Duration `config:"download_auth_duration"` + MemoryPoolFlushTime fs.Duration `config:"memory_pool_flush_time"` + MemoryPoolUseMmap bool `config:"memory_pool_use_mmap"` Enc encoder.MultiEncoder `config:"encoding"` } @@ -199,7 +228,8 @@ type Fs struct { uploads map[string][]*api.GetUploadURLResponse // Upload URLs by buckedID authMu sync.Mutex // lock for authorizing the account pacer *fs.Pacer // To pace and retry the API calls - bufferTokens chan []byte // control concurrency of multipart uploads + uploadToken *pacer.TokenDispenser // control concurrency + pool *pool.Pool // memory pool } // Object describes a b2 object @@ -335,7 +365,6 @@ func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) err = checkUploadChunkSize(cs) if err == nil { old, f.opt.ChunkSize = f.opt.ChunkSize, cs - f.fillBufferTokens() // reset the buffer tokens } return } @@ -396,6 +425,13 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { _bucketType: make(map[string]string, 1), uploads: make(map[string][]*api.GetUploadURLResponse), pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), + uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers), + pool: pool.New( + time.Duration(opt.MemoryPoolFlushTime), + int(opt.ChunkSize), + fs.Config.Transfers, + opt.MemoryPoolUseMmap, + ), } f.setRoot(root) f.features = (&fs.Features{ @@ -410,7 +446,6 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { f.srv.SetHeader(testModeHeader, testMode) fs.Debugf(f, "Setting test header \"%s: %s\"", testModeHeader, testMode) } - f.fillBufferTokens() err = f.authorizeAccount(ctx) if err != nil { return nil, errors.Wrap(err, "failed to authorize account") @@ -533,32 +568,25 @@ func (f *Fs) clearUploadURL(bucketID string) { f.uploadMu.Unlock() } -// Fill up (or reset) the buffer tokens -func (f *Fs) fillBufferTokens() { - f.bufferTokens = make(chan []byte, fs.Config.Transfers) - for i := 0; i < fs.Config.Transfers; i++ { - f.bufferTokens <- nil +// getBuf gets a buffer of f.opt.ChunkSize and an upload token +// +// If noBuf is set then it just gets an upload token +func (f *Fs) getBuf(noBuf bool) (buf []byte) { + f.uploadToken.Get() + if !noBuf { + buf = f.pool.Get() } -} - -// getUploadBlock gets a block from the pool of size chunkSize -func (f *Fs) getUploadBlock() []byte { - buf := <-f.bufferTokens - if buf == nil { - buf = make([]byte, f.opt.ChunkSize) - } - // fs.Debugf(f, "Getting upload block %p", buf) return buf } -// putUploadBlock returns a block to the pool of size chunkSize -func (f *Fs) putUploadBlock(buf []byte) { - buf = buf[:cap(buf)] - if len(buf) != int(f.opt.ChunkSize) { - panic("bad blocksize returned to pool") +// putBuf returns a buffer to the memory pool and an upload token +// +// If noBuf is set then it just returns the upload token +func (f *Fs) putBuf(buf []byte, noBuf bool) { + if !noBuf { + f.pool.Put(buf) } - // fs.Debugf(f, "Returning upload block %p", buf) - f.bufferTokens <- buf + f.uploadToken.Put() } // Return an Object from a path @@ -1205,6 +1233,63 @@ func (f *Fs) CleanUp(ctx context.Context) error { return f.purge(ctx, f.rootBucket, f.rootDirectory, true) } +// copy does a server side copy from dstObj <- srcObj +// +// If newInfo is nil then the metadata will be copied otherwise it +// will be replaced with newInfo +func (f *Fs) copy(ctx context.Context, dstObj *Object, srcObj *Object, newInfo *api.File) (err error) { + if srcObj.size >= int64(f.opt.CopyCutoff) { + if newInfo == nil { + newInfo, err = srcObj.getMetaData(ctx) + if err != nil { + return err + } + } + up, err := f.newLargeUpload(ctx, dstObj, nil, srcObj, f.opt.CopyCutoff, true, newInfo) + if err != nil { + return err + } + return up.Upload(ctx) + } + + dstBucket, dstPath := dstObj.split() + err = f.makeBucket(ctx, dstBucket) + if err != nil { + return err + } + + destBucketID, err := f.getBucketID(ctx, dstBucket) + if err != nil { + return err + } + + opts := rest.Opts{ + Method: "POST", + Path: "/b2_copy_file", + } + var request = api.CopyFileRequest{ + SourceID: srcObj.id, + Name: f.opt.Enc.FromStandardPath(dstPath), + DestBucketID: destBucketID, + } + if newInfo == nil { + request.MetadataDirective = "COPY" + } else { + request.MetadataDirective = "REPLACE" + request.ContentType = newInfo.ContentType + request.Info = newInfo.Info + } + var response api.FileInfo + err = f.pacer.Call(func() (bool, error) { + resp, err := f.srv.CallJSON(ctx, &opts, &request, &response) + return f.shouldRetry(ctx, resp, err) + }) + if err != nil { + return err + } + return dstObj.decodeMetaDataFileInfo(&response) +} + // Copy src to this remote using server side copy operations. // // This is stored with the remote path given @@ -1215,47 +1300,21 @@ func (f *Fs) CleanUp(ctx context.Context) error { // // If it isn't possible then return fs.ErrorCantCopy func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { - dstBucket, dstPath := f.split(remote) - err := f.makeBucket(ctx, dstBucket) - if err != nil { - return nil, err - } srcObj, ok := src.(*Object) if !ok { fs.Debugf(src, "Can't copy - not same remote type") return nil, fs.ErrorCantCopy } - destBucketID, err := f.getBucketID(ctx, dstBucket) - if err != nil { - return nil, err - } - opts := rest.Opts{ - Method: "POST", - Path: "/b2_copy_file", - } - var request = api.CopyFileRequest{ - SourceID: srcObj.id, - Name: f.opt.Enc.FromStandardPath(dstPath), - MetadataDirective: "COPY", - DestBucketID: destBucketID, - } - var response api.FileInfo - err = f.pacer.Call(func() (bool, error) { - resp, err := f.srv.CallJSON(ctx, &opts, &request, &response) - return f.shouldRetry(ctx, resp, err) - }) - if err != nil { - return nil, err - } - o := &Object{ + // Temporary Object under construction + dstObj := &Object{ fs: f, remote: remote, } - err = o.decodeMetaDataFileInfo(&response) + err := f.copy(ctx, dstObj, srcObj, nil) if err != nil { return nil, err } - return o, nil + return dstObj, nil } // Hashes returns the supported hash sets. @@ -1526,28 +1585,10 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error { if err != nil { return err } - _, bucketPath := o.split() info.Info[timeKey] = timeString(modTime) - opts := rest.Opts{ - Method: "POST", - Path: "/b2_copy_file", - } - var request = api.CopyFileRequest{ - SourceID: o.id, - Name: o.fs.opt.Enc.FromStandardPath(bucketPath), // copy to same name - MetadataDirective: "REPLACE", - ContentType: info.ContentType, - Info: info.Info, - } - var response api.FileInfo - err = o.fs.pacer.Call(func() (bool, error) { - resp, err := o.fs.srv.CallJSON(ctx, &opts, &request, &response) - return o.fs.shouldRetry(ctx, resp, err) - }) - if err != nil { - return err - } - return o.decodeMetaDataFileInfo(&response) + + // Copy to the same name, overwriting the metadata only + return o.fs.copy(ctx, o, o, info) } // Storable returns if this object is storable @@ -1723,7 +1764,8 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op } if size == -1 { // Check if the file is large enough for a chunked upload (needs to be at least two chunks) - buf := o.fs.getUploadBlock() + buf := o.fs.getBuf(false) + n, err := io.ReadFull(in, buf) if err == nil { bufReader := bufio.NewReader(in) @@ -1733,22 +1775,24 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op if err == nil { fs.Debugf(o, "File is big enough for chunked streaming") - up, err := o.fs.newLargeUpload(ctx, o, in, src) + up, err := o.fs.newLargeUpload(ctx, o, in, src, o.fs.opt.ChunkSize, false, nil) if err != nil { - o.fs.putUploadBlock(buf) + o.fs.putBuf(buf, false) return err } + // NB Stream returns the buffer and token return up.Stream(ctx, buf) } else if err == io.EOF || err == io.ErrUnexpectedEOF { fs.Debugf(o, "File has %d bytes, which makes only one chunk. Using direct upload.", n) - defer o.fs.putUploadBlock(buf) + defer o.fs.putBuf(buf, false) size = int64(n) in = bytes.NewReader(buf[:n]) } else { + o.fs.putBuf(buf, false) return err } } else if size > int64(o.fs.opt.UploadCutoff) { - up, err := o.fs.newLargeUpload(ctx, o, in, src) + up, err := o.fs.newLargeUpload(ctx, o, in, src, o.fs.opt.ChunkSize, false, nil) if err != nil { return err } diff --git a/backend/b2/upload.go b/backend/b2/upload.go index d390c340b..bc179c714 100644 --- a/backend/b2/upload.go +++ b/backend/b2/upload.go @@ -21,6 +21,7 @@ import ( "github.com/rclone/rclone/fs/accounting" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/lib/rest" + "golang.org/x/sync/errgroup" ) type hashAppendingReader struct { @@ -68,20 +69,26 @@ func newHashAppendingReader(in io.Reader, h gohash.Hash) *hashAppendingReader { // largeUpload is used to control the upload of large files which need chunking type largeUpload struct { - f *Fs // parent Fs - o *Object // object being uploaded - in io.Reader // read the data from here - wrap accounting.WrapFn // account parts being transferred - id string // ID of the file being uploaded - size int64 // total size - parts int64 // calculated number of parts, if known - sha1s []string // slice of SHA1s for each part - uploadMu sync.Mutex // lock for upload variable - uploads []*api.GetUploadPartURLResponse // result of get upload URL calls + f *Fs // parent Fs + o *Object // object being uploaded + doCopy bool // doing copy rather than upload + what string // text name of operation for logs + in io.Reader // read the data from here + wrap accounting.WrapFn // account parts being transferred + id string // ID of the file being uploaded + size int64 // total size + parts int64 // calculated number of parts, if known + sha1s []string // slice of SHA1s for each part + uploadMu sync.Mutex // lock for upload variable + uploads []*api.GetUploadPartURLResponse // result of get upload URL calls + chunkSize int64 // chunk size to use + src *Object // if copying, object we are reading from } // newLargeUpload starts an upload of object o from in with metadata in src -func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs.ObjectInfo) (up *largeUpload, err error) { +// +// If newInfo is set then metadata from that will be used instead of reading it from src +func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs.ObjectInfo, chunkSize fs.SizeSuffix, doCopy bool, newInfo *api.File) (up *largeUpload, err error) { remote := o.remote size := src.Size() parts := int64(0) @@ -89,8 +96,8 @@ func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs if size == -1 { fs.Debugf(o, "Streaming upload with --b2-chunk-size %s allows uploads of up to %s and will fail only when that limit is reached.", f.opt.ChunkSize, maxParts*f.opt.ChunkSize) } else { - parts = size / int64(o.fs.opt.ChunkSize) - if size%int64(o.fs.opt.ChunkSize) != 0 { + parts = size / int64(chunkSize) + if size%int64(chunkSize) != 0 { parts++ } if parts > maxParts { @@ -99,7 +106,6 @@ func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs sha1SliceSize = parts } - modTime := src.ModTime(ctx) opts := rest.Opts{ Method: "POST", Path: "/b2_start_large_file", @@ -110,18 +116,24 @@ func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs return nil, err } var request = api.StartLargeFileRequest{ - BucketID: bucketID, - Name: f.opt.Enc.FromStandardPath(bucketPath), - ContentType: fs.MimeType(ctx, src), - Info: map[string]string{ - timeKey: timeString(modTime), - }, + BucketID: bucketID, + Name: f.opt.Enc.FromStandardPath(bucketPath), } - // Set the SHA1 if known - if !o.fs.opt.DisableCheckSum { - if calculatedSha1, err := src.Hash(ctx, hash.SHA1); err == nil && calculatedSha1 != "" { - request.Info[sha1Key] = calculatedSha1 + if newInfo == nil { + modTime := src.ModTime(ctx) + request.ContentType = fs.MimeType(ctx, src) + request.Info = map[string]string{ + timeKey: timeString(modTime), } + // Set the SHA1 if known + if !o.fs.opt.DisableCheckSum || doCopy { + if calculatedSha1, err := src.Hash(ctx, hash.SHA1); err == nil && calculatedSha1 != "" { + request.Info[sha1Key] = calculatedSha1 + } + } + } else { + request.ContentType = newInfo.ContentType + request.Info = newInfo.Info } var response api.StartLargeFileResponse err = f.pacer.Call(func() (bool, error) { @@ -131,18 +143,24 @@ func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs if err != nil { return nil, err } + up = &largeUpload{ + f: f, + o: o, + doCopy: doCopy, + what: "upload", + id: response.ID, + size: size, + parts: parts, + sha1s: make([]string, sha1SliceSize), + chunkSize: int64(chunkSize), + } // unwrap the accounting from the input, we use wrap to put it // back on after the buffering - in, wrap := accounting.UnWrap(in) - up = &largeUpload{ - f: f, - o: o, - in: in, - wrap: wrap, - id: response.ID, - size: size, - parts: parts, - sha1s: make([]string, sha1SliceSize), + if doCopy { + up.what = "copy" + up.src = src.(*Object) + } else { + up.in, up.wrap = accounting.UnWrap(in) } return up, nil } @@ -256,9 +274,41 @@ func (up *largeUpload) transferChunk(ctx context.Context, part int64, body []byt return err } +// Copy a chunk +func (up *largeUpload) copyChunk(ctx context.Context, part int64, partSize int64) error { + err := up.f.pacer.Call(func() (bool, error) { + fs.Debugf(up.o, "Copying chunk %d length %d", part, partSize) + opts := rest.Opts{ + Method: "POST", + Path: "/b2_copy_part", + } + offset := (part - 1) * up.chunkSize // where we are in the source file + var request = api.CopyPartRequest{ + SourceID: up.src.id, + LargeFileID: up.id, + PartNumber: part, + Range: fmt.Sprintf("bytes=%d-%d", offset, offset+partSize-1), + } + var response api.UploadPartResponse + resp, err := up.f.srv.CallJSON(ctx, &opts, &request, &response) + retry, err := up.f.shouldRetry(ctx, resp, err) + if err != nil { + fs.Debugf(up.o, "Error copying chunk %d (retry=%v): %v: %#v", part, retry, err, err) + } + up.sha1s[part-1] = response.SHA1 + return retry, err + }) + if err != nil { + fs.Debugf(up.o, "Error copying chunk %d: %v", part, err) + } else { + fs.Debugf(up.o, "Done copying chunk %d", part) + } + return err +} + // finish closes off the large upload func (up *largeUpload) finish(ctx context.Context) error { - fs.Debugf(up.o, "Finishing large file upload with %d parts", up.parts) + fs.Debugf(up.o, "Finishing large file %s with %d parts", up.what, up.parts) opts := rest.Opts{ Method: "POST", Path: "/b2_finish_large_file", @@ -295,136 +345,145 @@ func (up *largeUpload) cancel(ctx context.Context) error { return err } -func (up *largeUpload) managedTransferChunk(ctx context.Context, wg *sync.WaitGroup, errs chan error, part int64, buf []byte) { - wg.Add(1) - go func(part int64, buf []byte) { - defer wg.Done() - defer up.f.putUploadBlock(buf) - err := up.transferChunk(ctx, part, buf) - if err != nil { - select { - case errs <- err: - default: - } - } - }(part, buf) -} - -func (up *largeUpload) finishOrCancelOnError(ctx context.Context, err error, errs chan error) error { - if err == nil { - select { - case err = <-errs: - default: - } +// If the error pointer is not nil then cancel the transfer +func (up *largeUpload) cancelOnError(ctx context.Context, err *error) { + if *err == nil { + return } - if err != nil { - fs.Debugf(up.o, "Cancelling large file upload due to error: %v", err) - cancelErr := up.cancel(ctx) - if cancelErr != nil { - fs.Errorf(up.o, "Failed to cancel large file upload: %v", cancelErr) - } - return err + fs.Debugf(up.o, "Cancelling large file %s due to error: %v", up.what, *err) + cancelErr := up.cancel(ctx) + if cancelErr != nil { + fs.Errorf(up.o, "Failed to cancel large file %s: %v", up.what, cancelErr) } - return up.finish(ctx) } // Stream uploads the chunks from the input, starting with a required initial // chunk. Assumes the file size is unknown and will upload until the input // reaches EOF. +// +// Note that initialUploadBlock must be returned to f.putBuf() func (up *largeUpload) Stream(ctx context.Context, initialUploadBlock []byte) (err error) { + defer up.cancelOnError(ctx, &err) fs.Debugf(up.o, "Starting streaming of large file (id %q)", up.id) - errs := make(chan error, 1) - hasMoreParts := true - var wg sync.WaitGroup - - // Transfer initial chunk + var ( + g, gCtx = errgroup.WithContext(ctx) + hasMoreParts = true + ) up.size = int64(len(initialUploadBlock)) - up.managedTransferChunk(ctx, &wg, errs, 1, initialUploadBlock) + g.Go(func() error { + for part := int64(1); hasMoreParts; part++ { + // Get a block of memory from the pool and token which limits concurrency. + var buf []byte + if part == 1 { + buf = initialUploadBlock + } else { + buf = up.f.getBuf(false) + } -outer: - for part := int64(2); hasMoreParts; part++ { - // Check any errors - select { - case err = <-errs: - break outer - default: + // Fail fast, in case an errgroup managed function returns an error + // gCtx is cancelled. There is no point in uploading all the other parts. + if gCtx.Err() != nil { + up.f.putBuf(buf, false) + return nil + } + + // Read the chunk + var n int + if part == 1 { + n = len(buf) + } else { + n, err = io.ReadFull(up.in, buf) + if err == io.ErrUnexpectedEOF { + fs.Debugf(up.o, "Read less than a full chunk, making this the last one.") + buf = buf[:n] + hasMoreParts = false + } else if err == io.EOF { + fs.Debugf(up.o, "Could not read any more bytes, previous chunk was the last.") + up.f.putBuf(buf, false) + return nil + } else if err != nil { + // other kinds of errors indicate failure + up.f.putBuf(buf, false) + return err + } + } + + // Keep stats up to date + up.parts = part + up.size += int64(n) + if part > maxParts { + up.f.putBuf(buf, false) + return errors.Errorf("%q too big (%d bytes so far) makes too many parts %d > %d - increase --b2-chunk-size", up.o, up.size, up.parts, maxParts) + } + + part := part // for the closure + g.Go(func() (err error) { + defer up.f.putBuf(buf, false) + return up.transferChunk(gCtx, part, buf) + }) } - - // Get a block of memory - buf := up.f.getUploadBlock() - - // Read the chunk - var n int - n, err = io.ReadFull(up.in, buf) - if err == io.ErrUnexpectedEOF { - fs.Debugf(up.o, "Read less than a full chunk, making this the last one.") - buf = buf[:n] - hasMoreParts = false - err = nil - } else if err == io.EOF { - fs.Debugf(up.o, "Could not read any more bytes, previous chunk was the last.") - up.f.putUploadBlock(buf) - err = nil - break outer - } else if err != nil { - // other kinds of errors indicate failure - up.f.putUploadBlock(buf) - break outer - } - - // Keep stats up to date - up.parts = part - up.size += int64(n) - if part > maxParts { - err = errors.Errorf("%q too big (%d bytes so far) makes too many parts %d > %d - increase --b2-chunk-size", up.o, up.size, up.parts, maxParts) - break outer - } - - // Transfer the chunk - up.managedTransferChunk(ctx, &wg, errs, part, buf) + return nil + }) + err = g.Wait() + if err != nil { + return err } - wg.Wait() up.sha1s = up.sha1s[:up.parts] - - return up.finishOrCancelOnError(ctx, err, errs) + return up.finish(ctx) } // Upload uploads the chunks from the input -func (up *largeUpload) Upload(ctx context.Context) error { - fs.Debugf(up.o, "Starting upload of large file in %d chunks (id %q)", up.parts, up.id) - remaining := up.size - errs := make(chan error, 1) - var wg sync.WaitGroup - var err error -outer: - for part := int64(1); part <= up.parts; part++ { - // Check any errors - select { - case err = <-errs: - break outer - default: +func (up *largeUpload) Upload(ctx context.Context) (err error) { + defer up.cancelOnError(ctx, &err) + fs.Debugf(up.o, "Starting %s of large file in %d chunks (id %q)", up.what, up.parts, up.id) + var ( + g, gCtx = errgroup.WithContext(ctx) + remaining = up.size + ) + g.Go(func() error { + for part := int64(1); part <= up.parts; part++ { + // Get a block of memory from the pool and token which limits concurrency. + buf := up.f.getBuf(up.doCopy) + + // Fail fast, in case an errgroup managed function returns an error + // gCtx is cancelled. There is no point in uploading all the other parts. + if gCtx.Err() != nil { + up.f.putBuf(buf, up.doCopy) + return nil + } + + reqSize := remaining + if reqSize >= up.chunkSize { + reqSize = up.chunkSize + } + + if !up.doCopy { + // Read the chunk + buf = buf[:reqSize] + _, err = io.ReadFull(up.in, buf) + if err != nil { + up.f.putBuf(buf, up.doCopy) + return err + } + } + + part := part // for the closure + g.Go(func() (err error) { + defer up.f.putBuf(buf, up.doCopy) + if !up.doCopy { + err = up.transferChunk(gCtx, part, buf) + } else { + err = up.copyChunk(gCtx, part, reqSize) + } + return err + }) + remaining -= reqSize } - - reqSize := remaining - if reqSize >= int64(up.f.opt.ChunkSize) { - reqSize = int64(up.f.opt.ChunkSize) - } - - // Get a block of memory - buf := up.f.getUploadBlock()[:reqSize] - - // Read the chunk - _, err = io.ReadFull(up.in, buf) - if err != nil { - up.f.putUploadBlock(buf) - break outer - } - - // Transfer the chunk - up.managedTransferChunk(ctx, &wg, errs, part, buf) - remaining -= reqSize + return nil + }) + err = g.Wait() + if err != nil { + return err } - wg.Wait() - - return up.finishOrCancelOnError(ctx, err, errs) + return up.finish(ctx) }