From b94806a1430172419bd945340611068bcaf6ff00 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Tue, 12 Sep 2023 16:10:45 +0100 Subject: [PATCH] dropbox: factor batcher into lib/batcher --- backend/dropbox/batcher.go | 244 ++----------------------------- backend/dropbox/dropbox.go | 106 ++++---------- lib/batcher/batcher.go | 282 ++++++++++++++++++++++++++++++++++++ lib/batcher/batcher_test.go | 275 +++++++++++++++++++++++++++++++++++ lib/batcher/options.go | 73 ++++++++++ 5 files changed, 668 insertions(+), 312 deletions(-) create mode 100644 lib/batcher/batcher.go create mode 100644 lib/batcher/batcher_test.go create mode 100644 lib/batcher/options.go diff --git a/backend/dropbox/batcher.go b/backend/dropbox/batcher.go index d057724f0..f55223d1f 100644 --- a/backend/dropbox/batcher.go +++ b/backend/dropbox/batcher.go @@ -8,121 +8,19 @@ package dropbox import ( "context" - "errors" "fmt" - "sync" - "time" "github.com/dropbox/dropbox-sdk-go-unofficial/v6/dropbox/files" - "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/fserrors" - "github.com/rclone/rclone/lib/atexit" ) -const ( - maxBatchSize = 1000 // max size the batch can be - defaultTimeoutSync = 500 * time.Millisecond // kick off the batch if nothing added for this long (sync) - defaultTimeoutAsync = 10 * time.Second // kick off the batch if nothing added for this long (ssync) - defaultBatchSizeAsync = 100 // default batch size if async -) - -// batcher holds info about the current items waiting for upload -type batcher struct { - f *Fs // Fs this batch is part of - mode string // configured batch mode - size int // maximum size for batch - timeout time.Duration // idle timeout for batch - async bool // whether we are using async batching - in chan batcherRequest // incoming items to batch - closed chan struct{} // close to indicate batcher shut down - atexit atexit.FnHandle // atexit handle - shutOnce sync.Once // make sure we shutdown once only - wg sync.WaitGroup // wait for shutdown -} - -// batcherRequest holds an incoming request with a place for a reply -type batcherRequest struct { - commitInfo *files.UploadSessionFinishArg - result chan<- batcherResponse -} - -// Return true if batcherRequest is the quit request -func (br *batcherRequest) isQuit() bool { - return br.commitInfo == nil -} - -// Send this to get the engine to quit -var quitRequest = batcherRequest{} - -// batcherResponse holds a response to be delivered to clients waiting -// for a batch to complete. -type batcherResponse struct { - err error - entry *files.FileMetadata -} - -// newBatcher creates a new batcher structure -func newBatcher(ctx context.Context, f *Fs, mode string, size int, timeout time.Duration) (*batcher, error) { - // fs.Debugf(f, "Creating batcher with mode %q, size %d, timeout %v", mode, size, timeout) - if size > maxBatchSize || size < 0 { - return nil, fmt.Errorf("dropbox: batch size must be < %d and >= 0 - it is currently %d", maxBatchSize, size) - } - - async := false - - switch mode { - case "sync": - if size <= 0 { - ci := fs.GetConfig(ctx) - size = ci.Transfers - } - if timeout <= 0 { - timeout = defaultTimeoutSync - } - case "async": - if size <= 0 { - size = defaultBatchSizeAsync - } - if timeout <= 0 { - timeout = defaultTimeoutAsync - } - async = true - case "off": - size = 0 - default: - return nil, fmt.Errorf("dropbox: batch mode must be sync|async|off not %q", mode) - } - - b := &batcher{ - f: f, - mode: mode, - size: size, - timeout: timeout, - async: async, - in: make(chan batcherRequest, size), - closed: make(chan struct{}), - } - if b.Batching() { - b.atexit = atexit.Register(b.Shutdown) - b.wg.Add(1) - go b.commitLoop(context.Background()) - } - return b, nil - -} - -// Batching returns true if batching is active -func (b *batcher) Batching() bool { - return b.size > 0 -} - // finishBatch commits the batch, returning a batch status to poll or maybe complete -func (b *batcher) finishBatch(ctx context.Context, items []*files.UploadSessionFinishArg) (complete *files.UploadSessionFinishBatchResult, err error) { +func (f *Fs) finishBatch(ctx context.Context, items []*files.UploadSessionFinishArg) (complete *files.UploadSessionFinishBatchResult, err error) { var arg = &files.UploadSessionFinishBatchArg{ Entries: items, } - err = b.f.pacer.Call(func() (bool, error) { - complete, err = b.f.srv.UploadSessionFinishBatchV2(arg) + err = f.pacer.Call(func() (bool, error) { + complete, err = f.srv.UploadSessionFinishBatchV2(arg) // If error is insufficient space then don't retry if e, ok := err.(files.UploadSessionFinishAPIError); ok { if e.EndpointError != nil && e.EndpointError.Path != nil && e.EndpointError.Path.Tag == files.WriteErrorInsufficientSpace { @@ -139,23 +37,10 @@ func (b *batcher) finishBatch(ctx context.Context, items []*files.UploadSessionF return complete, nil } -// commit a batch -func (b *batcher) commitBatch(ctx context.Context, items []*files.UploadSessionFinishArg, results []chan<- batcherResponse) (err error) { - // If commit fails then signal clients if sync - var signalled = b.async - defer func() { - if err != nil && !signalled { - // Signal to clients that there was an error - for _, result := range results { - result <- batcherResponse{err: err} - } - } - }() - desc := fmt.Sprintf("%s batch length %d starting with: %s", b.mode, len(items), items[0].Commit.Path) - fs.Debugf(b.f, "Committing %s", desc) - +// Called by the batcher to commit a batch +func (f *Fs) commitBatch(ctx context.Context, items []*files.UploadSessionFinishArg, results []*files.FileMetadata, errors []error) (err error) { // finalise the batch getting either a result or a job id to poll - complete, err := b.finishBatch(ctx, items) + complete, err := f.finishBatch(ctx, items) if err != nil { return err } @@ -166,19 +51,13 @@ func (b *batcher) commitBatch(ctx context.Context, items []*files.UploadSessionF return fmt.Errorf("expecting %d items in batch but got %d", len(results), len(entries)) } - // Report results to clients - var ( - errorTag = "" - errorCount = 0 - ) + // Format results for return for i := range results { item := entries[i] - resp := batcherResponse{} if item.Tag == "success" { - resp.entry = item.Success + results[i] = item.Success } else { - errorCount++ - errorTag = item.Tag + errorTag := item.Tag if item.Failure != nil { errorTag = item.Failure.Tag if item.Failure.LookupFailed != nil { @@ -191,112 +70,9 @@ func (b *batcher) commitBatch(ctx context.Context, items []*files.UploadSessionF errorTag += "/" + item.Failure.PropertiesError.Tag } } - resp.err = fmt.Errorf("batch upload failed: %s", errorTag) - } - if !b.async { - results[i] <- resp + errors[i] = fmt.Errorf("upload failed: %s", errorTag) } } - // Show signalled so no need to report error to clients from now on - signalled = true - // Report an error if any failed in the batch - if errorTag != "" { - return fmt.Errorf("batch had %d errors: last error: %s", errorCount, errorTag) - } - - fs.Debugf(b.f, "Committed %s", desc) return nil } - -// commitLoop runs the commit engine in the background -func (b *batcher) commitLoop(ctx context.Context) { - var ( - items []*files.UploadSessionFinishArg // current batch of uncommitted files - results []chan<- batcherResponse // current batch of clients awaiting results - idleTimer = time.NewTimer(b.timeout) - commit = func() { - err := b.commitBatch(ctx, items, results) - if err != nil { - fs.Errorf(b.f, "%s batch commit: failed to commit batch length %d: %v", b.mode, len(items), err) - } - items, results = nil, nil - } - ) - defer b.wg.Done() - defer idleTimer.Stop() - idleTimer.Stop() - -outer: - for { - select { - case req := <-b.in: - if req.isQuit() { - break outer - } - items = append(items, req.commitInfo) - results = append(results, req.result) - idleTimer.Stop() - if len(items) >= b.size { - commit() - } else { - idleTimer.Reset(b.timeout) - } - case <-idleTimer.C: - if len(items) > 0 { - fs.Debugf(b.f, "Batch idle for %v so committing", b.timeout) - commit() - } - } - - } - // commit any remaining items - if len(items) > 0 { - commit() - } -} - -// Shutdown finishes any pending batches then shuts everything down -// -// Can be called from atexit handler -func (b *batcher) Shutdown() { - if !b.Batching() { - return - } - b.shutOnce.Do(func() { - atexit.Unregister(b.atexit) - fs.Infof(b.f, "Committing uploads - please wait...") - // show that batcher is shutting down - close(b.closed) - // quit the commitLoop by sending a quitRequest message - // - // Note that we don't close b.in because that will - // cause write to closed channel in Commit when we are - // exiting due to a signal. - b.in <- quitRequest - b.wg.Wait() - }) -} - -// Commit commits the file using a batch call, first adding it to the -// batch and then waiting for the batch to complete in a synchronous -// way if async is not set. -func (b *batcher) Commit(ctx context.Context, commitInfo *files.UploadSessionFinishArg) (entry *files.FileMetadata, err error) { - select { - case <-b.closed: - return nil, fserrors.FatalError(errors.New("batcher is shutting down")) - default: - } - fs.Debugf(b.f, "Adding %q to batch", commitInfo.Commit.Path) - resp := make(chan batcherResponse, 1) - b.in <- batcherRequest{ - commitInfo: commitInfo, - result: resp, - } - // If running async then don't wait for the result - if b.async { - return nil, nil - } - result := <-resp - return result.entry, result.err -} diff --git a/backend/dropbox/dropbox.go b/backend/dropbox/dropbox.go index 6a9ced2c1..4447e13d5 100644 --- a/backend/dropbox/dropbox.go +++ b/backend/dropbox/dropbox.go @@ -47,6 +47,7 @@ import ( "github.com/rclone/rclone/fs/config/obscure" "github.com/rclone/rclone/fs/fserrors" "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/lib/batcher" "github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/oauthutil" "github.com/rclone/rclone/lib/pacer" @@ -121,6 +122,14 @@ var ( // Errors errNotSupportedInSharedMode = fserrors.NoRetryError(errors.New("not supported in shared files mode")) + + // Configure the batcher + defaultBatcherOptions = batcher.Options{ + MaxBatchSize: 1000, + DefaultTimeoutSync: 500 * time.Millisecond, + DefaultTimeoutAsync: 10 * time.Second, + DefaultBatchSizeAsync: 100, + } ) // Gets an oauth config with the right scopes @@ -152,7 +161,7 @@ func init() { }, }) }, - Options: append(oauthutil.SharedOptions, []fs.Option{{ + Options: append(append(oauthutil.SharedOptions, []fs.Option{{ Name: "chunk_size", Help: fmt.Sprintf(`Upload chunk size (< %v). @@ -210,68 +219,6 @@ Note that we don't unmount the shared folder afterwards so the shared folder.`, Default: false, Advanced: true, - }, { - Name: "batch_mode", - Help: `Upload file batching sync|async|off. - -This sets the batch mode used by rclone. - -For full info see [the main docs](https://rclone.org/dropbox/#batch-mode) - -This has 3 possible values - -- off - no batching -- sync - batch uploads and check completion (default) -- async - batch upload and don't check completion - -Rclone will close any outstanding batches when it exits which may make -a delay on quit. -`, - Default: "sync", - Advanced: true, - }, { - Name: "batch_size", - Help: `Max number of files in upload batch. - -This sets the batch size of files to upload. It has to be less than 1000. - -By default this is 0 which means rclone which calculate the batch size -depending on the setting of batch_mode. - -- batch_mode: async - default batch_size is 100 -- batch_mode: sync - default batch_size is the same as --transfers -- batch_mode: off - not in use - -Rclone will close any outstanding batches when it exits which may make -a delay on quit. - -Setting this is a great idea if you are uploading lots of small files -as it will make them a lot quicker. You can use --transfers 32 to -maximise throughput. -`, - Default: 0, - Advanced: true, - }, { - Name: "batch_timeout", - Help: `Max time to allow an idle upload batch before uploading. - -If an upload batch is idle for more than this long then it will be -uploaded. - -The default for this is 0 which means rclone will choose a sensible -default based on the batch_mode in use. - -- batch_mode: async - default batch_timeout is 10s -- batch_mode: sync - default batch_timeout is 500ms -- batch_mode: off - not in use -`, - Default: fs.Duration(0), - Advanced: true, - }, { - Name: "batch_commit_timeout", - Help: `Max time to wait for a batch to finish committing`, - Default: fs.Duration(10 * time.Minute), - Advanced: true, }, { Name: "pacer_min_sleep", Default: defaultMinSleep, @@ -290,23 +237,22 @@ default based on the batch_mode in use. encoder.EncodeDel | encoder.EncodeRightSpace | encoder.EncodeInvalidUtf8, - }}...), + }}...), defaultBatcherOptions.FsOptions("For full info see [the main docs](https://rclone.org/dropbox/#batch-mode)\n\n")...), }) } // Options defines the configuration for this backend type Options struct { - ChunkSize fs.SizeSuffix `config:"chunk_size"` - Impersonate string `config:"impersonate"` - SharedFiles bool `config:"shared_files"` - SharedFolders bool `config:"shared_folders"` - BatchMode string `config:"batch_mode"` - BatchSize int `config:"batch_size"` - BatchTimeout fs.Duration `config:"batch_timeout"` - BatchCommitTimeout fs.Duration `config:"batch_commit_timeout"` - AsyncBatch bool `config:"async_batch"` - PacerMinSleep fs.Duration `config:"pacer_min_sleep"` - Enc encoder.MultiEncoder `config:"encoding"` + ChunkSize fs.SizeSuffix `config:"chunk_size"` + Impersonate string `config:"impersonate"` + SharedFiles bool `config:"shared_files"` + SharedFolders bool `config:"shared_folders"` + BatchMode string `config:"batch_mode"` + BatchSize int `config:"batch_size"` + BatchTimeout fs.Duration `config:"batch_timeout"` + AsyncBatch bool `config:"async_batch"` + PacerMinSleep fs.Duration `config:"pacer_min_sleep"` + Enc encoder.MultiEncoder `config:"encoding"` } // Fs represents a remote dropbox server @@ -325,7 +271,7 @@ type Fs struct { slashRootSlash string // root with "/" prefix and postfix, lowercase pacer *fs.Pacer // To pace the API calls ns string // The namespace we are using or "" for none - batcher *batcher // batch builder + batcher *batcher.Batcher[*files.UploadSessionFinishArg, *files.FileMetadata] } // Object describes a dropbox object @@ -451,7 +397,11 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e ci: ci, pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(opt.PacerMinSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), } - f.batcher, err = newBatcher(ctx, f, f.opt.BatchMode, f.opt.BatchSize, time.Duration(f.opt.BatchTimeout)) + batcherOptions := defaultBatcherOptions + batcherOptions.Mode = f.opt.BatchMode + batcherOptions.Size = f.opt.BatchSize + batcherOptions.Timeout = time.Duration(f.opt.BatchTimeout) + f.batcher, err = batcher.New(ctx, f, f.commitBatch, batcherOptions) if err != nil { return nil, err } @@ -1722,7 +1672,7 @@ func (o *Object) uploadChunked(ctx context.Context, in0 io.Reader, commitInfo *f // If we are batching then we should have written all the data now // store the commit info now for a batch commit if o.fs.batcher.Batching() { - return o.fs.batcher.Commit(ctx, args) + return o.fs.batcher.Commit(ctx, o.remote, args) } err = o.fs.pacer.Call(func() (bool, error) { diff --git a/lib/batcher/batcher.go b/lib/batcher/batcher.go new file mode 100644 index 000000000..1ac7f589d --- /dev/null +++ b/lib/batcher/batcher.go @@ -0,0 +1,282 @@ +// Package batcher implements a generic batcher. +// +// It uses two types: +// +// Item - the thing to be batched +// Result - the result from the batching +// +// And one function of type CommitBatchFn which is called to do the actual batching. +package batcher + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/fserrors" + "github.com/rclone/rclone/lib/atexit" +) + +// Options for configuring the batcher +type Options struct { + Mode string // mode of the batcher "sync", "async" or "off" + Size int // size of batch + Timeout time.Duration // timeout before committing the batch + MaxBatchSize int // max size the batch can be + DefaultTimeoutSync time.Duration // default time to kick off the batch if nothing added for this long (sync) + DefaultTimeoutAsync time.Duration // default time to kick off the batch if nothing added for this long (async) + DefaultBatchSizeAsync int // default batch size if async +} + +// CommitBatchFn is called to commit a batch of Item and return Result to the callers. +// +// It should commit the batch of items then for each result i (of +// which there should be len(items)) it should set either results[i] +// or errors[i] +type CommitBatchFn[Item, Result any] func(ctx context.Context, items []Item, results []Result, errors []error) (err error) + +// Batcher holds info about the current items waiting to be acted on. +type Batcher[Item, Result any] struct { + opt Options // options for configuring the batcher + f any // logging identity for fs.Debugf(f, ...) + commit CommitBatchFn[Item, Result] // User defined function to commit the batch + async bool // whether we are using async batching + in chan request[Item, Result] // incoming items to batch + closed chan struct{} // close to indicate batcher shut down + atexit atexit.FnHandle // atexit handle + shutOnce sync.Once // make sure we shutdown once only + wg sync.WaitGroup // wait for shutdown +} + +// request holds an incoming request with a place for a reply +type request[Item, Result any] struct { + item Item + name string + result chan<- response[Result] + quit bool // if set then quit +} + +// response holds a response to be delivered to clients waiting +// for a batch to complete. +type response[Result any] struct { + err error + entry Result +} + +// New creates a Batcher for Item and Result calling commit to do the actual committing. +func New[Item, Result any](ctx context.Context, f any, commit CommitBatchFn[Item, Result], opt Options) (*Batcher[Item, Result], error) { + // fs.Debugf(f, "Creating batcher with mode %q, size %d, timeout %v", mode, size, timeout) + if opt.Size > opt.MaxBatchSize || opt.Size < 0 { + return nil, fmt.Errorf("batcher: batch size must be < %d and >= 0 - it is currently %d", opt.MaxBatchSize, opt.Size) + } + + async := false + + switch opt.Mode { + case "sync": + if opt.Size <= 0 { + ci := fs.GetConfig(ctx) + opt.Size = ci.Transfers + } + if opt.Timeout <= 0 { + opt.Timeout = opt.DefaultTimeoutSync + } + case "async": + if opt.Size <= 0 { + opt.Size = opt.DefaultBatchSizeAsync + } + if opt.Timeout <= 0 { + opt.Timeout = opt.DefaultTimeoutAsync + } + async = true + case "off": + opt.Size = 0 + default: + return nil, fmt.Errorf("batcher: batch mode must be sync|async|off not %q", opt.Mode) + } + + b := &Batcher[Item, Result]{ + opt: opt, + f: f, + commit: commit, + async: async, + in: make(chan request[Item, Result], opt.Size), + closed: make(chan struct{}), + } + if b.Batching() { + b.atexit = atexit.Register(b.Shutdown) + b.wg.Add(1) + go b.commitLoop(context.Background()) + } + return b, nil + +} + +// Batching returns true if batching is active +func (b *Batcher[Item, Result]) Batching() bool { + return b.opt.Size > 0 +} + +// commit a batch calling the user defined commit function then distributing the results. +func (b *Batcher[Item, Result]) commitBatch(ctx context.Context, requests []request[Item, Result]) (err error) { + // If commit fails then signal clients if sync + var signalled = b.async + defer func() { + if err != nil && !signalled { + // Signal to clients that there was an error + for _, req := range requests { + req.result <- response[Result]{err: err} + } + } + }() + desc := fmt.Sprintf("%s batch length %d starting with: %s", b.opt.Mode, len(requests), requests[0].name) + fs.Debugf(b.f, "Committing %s", desc) + + var ( + items = make([]Item, len(requests)) + results = make([]Result, len(requests)) + errors = make([]error, len(requests)) + ) + + for i := range requests { + items[i] = requests[i].item + } + + // Commit the batch + err = b.commit(ctx, items, results, errors) + if err != nil { + return err + } + + // Report results to clients + var ( + lastError error + errorCount = 0 + ) + for i, req := range requests { + result := results[i] + err := errors[i] + resp := response[Result]{} + if err == nil { + resp.entry = result + } else { + errorCount++ + lastError = err + resp.err = fmt.Errorf("batch upload failed: %w", err) + } + if !b.async { + req.result <- resp + } + } + + // show signalled so no need to report error to clients from now on + signalled = true + + // Report an error if any failed in the batch + if lastError != nil { + return fmt.Errorf("batch had %d errors: last error: %w", errorCount, lastError) + } + + fs.Debugf(b.f, "Committed %s", desc) + return nil +} + +// commitLoop runs the commit engine in the background +func (b *Batcher[Item, Result]) commitLoop(ctx context.Context) { + var ( + requests []request[Item, Result] // current batch of uncommitted Items + idleTimer = time.NewTimer(b.opt.Timeout) + commit = func() { + err := b.commitBatch(ctx, requests) + if err != nil { + fs.Errorf(b.f, "%s batch commit: failed to commit batch length %d: %v", b.opt.Mode, len(requests), err) + } + requests = nil + } + ) + defer b.wg.Done() + defer idleTimer.Stop() + idleTimer.Stop() + +outer: + for { + select { + case req := <-b.in: + if req.quit { + break outer + } + requests = append(requests, req) + idleTimer.Stop() + if len(requests) >= b.opt.Size { + commit() + } else { + idleTimer.Reset(b.opt.Timeout) + } + case <-idleTimer.C: + if len(requests) > 0 { + fs.Debugf(b.f, "Batch idle for %v so committing", b.opt.Timeout) + commit() + } + } + + } + // commit any remaining items + if len(requests) > 0 { + commit() + } +} + +// Shutdown finishes any pending batches then shuts everything down. +// +// This is registered as an atexit handler by New. +func (b *Batcher[Item, Result]) Shutdown() { + if !b.Batching() { + return + } + b.shutOnce.Do(func() { + atexit.Unregister(b.atexit) + fs.Infof(b.f, "Committing uploads - please wait...") + // show that batcher is shutting down + close(b.closed) + // quit the commitLoop by sending a quitRequest message + // + // Note that we don't close b.in because that will + // cause write to closed channel in Commit when we are + // exiting due to a signal. + b.in <- request[Item, Result]{quit: true} + b.wg.Wait() + }) +} + +// Commit commits the Item getting a Result or error using a batch +// call, first adding it to the batch and then waiting for the batch +// to complete in a synchronous way if async is not set. +// +// If async is set then this will return no error and a nil/empty +// Result. +// +// This should not be called if batching is off - check first with +// IsBatching. +func (b *Batcher[Item, Result]) Commit(ctx context.Context, name string, item Item) (entry Result, err error) { + select { + case <-b.closed: + return entry, fserrors.FatalError(errors.New("batcher is shutting down")) + default: + } + fs.Debugf(b.f, "Adding %q to batch", name) + resp := make(chan response[Result], 1) + b.in <- request[Item, Result]{ + item: item, + name: name, + result: resp, + } + // If running async then don't wait for the result + if b.async { + return entry, nil + } + result := <-resp + return result.entry, result.err +} diff --git a/lib/batcher/batcher_test.go b/lib/batcher/batcher_test.go new file mode 100644 index 000000000..e1f5d5551 --- /dev/null +++ b/lib/batcher/batcher_test.go @@ -0,0 +1,275 @@ +package batcher + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/rclone/rclone/fs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type ( + Result string + Item string +) + +func TestBatcherNew(t *testing.T) { + ctx := context.Background() + ci := fs.GetConfig(ctx) + + opt := Options{ + Mode: "async", + Size: 100, + Timeout: 1 * time.Second, + MaxBatchSize: 1000, + DefaultTimeoutSync: 500 * time.Millisecond, + DefaultTimeoutAsync: 10 * time.Second, + DefaultBatchSizeAsync: 100, + } + commitBatch := func(ctx context.Context, items []Item, results []Result, errors []error) (err error) { + return nil + } + + b, err := New[Item, Result](ctx, nil, commitBatch, opt) + require.NoError(t, err) + require.True(t, b.Batching()) + b.Shutdown() + + opt.Mode = "sync" + b, err = New[Item, Result](ctx, nil, commitBatch, opt) + require.NoError(t, err) + require.True(t, b.Batching()) + b.Shutdown() + + opt.Mode = "off" + b, err = New[Item, Result](ctx, nil, commitBatch, opt) + require.NoError(t, err) + require.False(t, b.Batching()) + b.Shutdown() + + opt.Mode = "bad" + _, err = New[Item, Result](ctx, nil, commitBatch, opt) + require.ErrorContains(t, err, "batch mode") + + opt.Mode = "async" + opt.Size = opt.MaxBatchSize + 1 + _, err = New[Item, Result](ctx, nil, commitBatch, opt) + require.ErrorContains(t, err, "batch size") + + opt.Mode = "sync" + opt.Size = 0 + opt.Timeout = 0 + b, err = New[Item, Result](ctx, nil, commitBatch, opt) + require.NoError(t, err) + assert.Equal(t, ci.Transfers, b.opt.Size) + assert.Equal(t, opt.DefaultTimeoutSync, b.opt.Timeout) + b.Shutdown() + + opt.Mode = "async" + opt.Size = 0 + opt.Timeout = 0 + b, err = New[Item, Result](ctx, nil, commitBatch, opt) + require.NoError(t, err) + assert.Equal(t, opt.DefaultBatchSizeAsync, b.opt.Size) + assert.Equal(t, opt.DefaultTimeoutAsync, b.opt.Timeout) + b.Shutdown() + + // Check we get an error on commit + _, err = b.Commit(ctx, "last", Item("last")) + require.ErrorContains(t, err, "shutting down") + +} + +func TestBatcherCommit(t *testing.T) { + ctx := context.Background() + + opt := Options{ + Mode: "sync", + Size: 3, + Timeout: 1 * time.Second, + MaxBatchSize: 1000, + DefaultTimeoutSync: 500 * time.Millisecond, + DefaultTimeoutAsync: 10 * time.Second, + DefaultBatchSizeAsync: 100, + } + var wg sync.WaitGroup + errFail := errors.New("fail") + var commits int + var totalSize int + commitBatch := func(ctx context.Context, items []Item, results []Result, errors []error) (err error) { + commits += 1 + totalSize += len(items) + for i := range items { + if items[i] == "5" { + errors[i] = errFail + } else { + results[i] = Result(items[i]) + " result" + } + } + return nil + } + b, err := New[Item, Result](ctx, nil, commitBatch, opt) + require.NoError(t, err) + defer b.Shutdown() + + for i := 0; i < 10; i++ { + wg.Add(1) + s := fmt.Sprintf("%d", i) + go func() { + defer wg.Done() + result, err := b.Commit(ctx, s, Item(s)) + if s == "5" { + assert.True(t, errors.Is(err, errFail)) + } else { + require.NoError(t, err) + assert.Equal(t, Result(s+" result"), result) + } + }() + } + wg.Wait() + assert.Equal(t, 4, commits) + assert.Equal(t, 10, totalSize) +} + +func TestBatcherCommitFail(t *testing.T) { + ctx := context.Background() + + opt := Options{ + Mode: "sync", + Size: 3, + Timeout: 1 * time.Second, + MaxBatchSize: 1000, + DefaultTimeoutSync: 500 * time.Millisecond, + DefaultTimeoutAsync: 10 * time.Second, + DefaultBatchSizeAsync: 100, + } + var wg sync.WaitGroup + errFail := errors.New("fail") + var commits int + var totalSize int + commitBatch := func(ctx context.Context, items []Item, results []Result, errors []error) (err error) { + commits += 1 + totalSize += len(items) + return errFail + } + b, err := New[Item, Result](ctx, nil, commitBatch, opt) + require.NoError(t, err) + defer b.Shutdown() + + for i := 0; i < 10; i++ { + wg.Add(1) + s := fmt.Sprintf("%d", i) + go func() { + defer wg.Done() + _, err := b.Commit(ctx, s, Item(s)) + assert.True(t, errors.Is(err, errFail)) + }() + } + wg.Wait() + assert.Equal(t, 4, commits) + assert.Equal(t, 10, totalSize) +} + +func TestBatcherCommitShutdown(t *testing.T) { + ctx := context.Background() + + opt := Options{ + Mode: "sync", + Size: 3, + Timeout: 1 * time.Second, + MaxBatchSize: 1000, + DefaultTimeoutSync: 500 * time.Millisecond, + DefaultTimeoutAsync: 10 * time.Second, + DefaultBatchSizeAsync: 100, + } + var wg sync.WaitGroup + var commits int + var totalSize int + commitBatch := func(ctx context.Context, items []Item, results []Result, errors []error) (err error) { + commits += 1 + totalSize += len(items) + for i := range items { + results[i] = Result(items[i]) + } + return nil + } + b, err := New[Item, Result](ctx, nil, commitBatch, opt) + require.NoError(t, err) + + for i := 0; i < 10; i++ { + wg.Add(1) + s := fmt.Sprintf("%d", i) + go func() { + defer wg.Done() + result, err := b.Commit(ctx, s, Item(s)) + assert.NoError(t, err) + assert.Equal(t, Result(s), result) + }() + } + + time.Sleep(100 * time.Millisecond) + b.Shutdown() // shutdown with batches outstanding + + wg.Wait() + assert.Equal(t, 4, commits) + assert.Equal(t, 10, totalSize) +} + +func TestBatcherCommitAsync(t *testing.T) { + ctx := context.Background() + + opt := Options{ + Mode: "async", + Size: 3, + Timeout: 1 * time.Second, + MaxBatchSize: 1000, + DefaultTimeoutSync: 500 * time.Millisecond, + DefaultTimeoutAsync: 10 * time.Second, + DefaultBatchSizeAsync: 100, + } + var wg sync.WaitGroup + errFail := errors.New("fail") + var commits atomic.Int32 + var totalSize atomic.Int32 + commitBatch := func(ctx context.Context, items []Item, results []Result, errors []error) (err error) { + wg.Add(1) + defer wg.Done() + // t.Logf("commit %d", len(items)) + commits.Add(1) + totalSize.Add(int32(len(items))) + for i := range items { + if items[i] == "5" { + errors[i] = errFail + } else { + results[i] = Result(items[i]) + " result" + } + } + return nil + } + b, err := New[Item, Result](ctx, nil, commitBatch, opt) + require.NoError(t, err) + defer b.Shutdown() + + for i := 0; i < 10; i++ { + wg.Add(1) + s := fmt.Sprintf("%d", i) + go func() { + defer wg.Done() + result, err := b.Commit(ctx, s, Item(s)) + // Async just returns straight away + require.NoError(t, err) + assert.Equal(t, Result(""), result) + }() + } + time.Sleep(2 * time.Second) // wait for batch timeout - needed with async + wg.Wait() + + assert.Equal(t, int32(4), commits.Load()) + assert.Equal(t, int32(10), totalSize.Load()) +} diff --git a/lib/batcher/options.go b/lib/batcher/options.go new file mode 100644 index 000000000..a4cc47c71 --- /dev/null +++ b/lib/batcher/options.go @@ -0,0 +1,73 @@ +package batcher + +import ( + "fmt" + "time" + + "github.com/rclone/rclone/fs" +) + +// FsOptions returns the batch mode fs.Options +func (opt *Options) FsOptions(extra string) []fs.Option { + return []fs.Option{{ + Name: "batch_mode", + Help: fmt.Sprintf(`Upload file batching sync|async|off. + +This sets the batch mode used by rclone. + +%sThis has 3 possible values + +- off - no batching +- sync - batch uploads and check completion (default) +- async - batch upload and don't check completion + +Rclone will close any outstanding batches when it exits which may make +a delay on quit. +`, extra), + Default: "sync", + Advanced: true, + }, { + Name: "batch_size", + Help: fmt.Sprintf(`Max number of files in upload batch. + +This sets the batch size of files to upload. It has to be less than %d. + +By default this is 0 which means rclone which calculate the batch size +depending on the setting of batch_mode. + +- batch_mode: async - default batch_size is %d +- batch_mode: sync - default batch_size is the same as --transfers +- batch_mode: off - not in use + +Rclone will close any outstanding batches when it exits which may make +a delay on quit. + +Setting this is a great idea if you are uploading lots of small files +as it will make them a lot quicker. You can use --transfers 32 to +maximise throughput. +`, opt.MaxBatchSize, opt.DefaultBatchSizeAsync), + Default: 0, + Advanced: true, + }, { + Name: "batch_timeout", + Help: fmt.Sprintf(`Max time to allow an idle upload batch before uploading. + +If an upload batch is idle for more than this long then it will be +uploaded. + +The default for this is 0 which means rclone will choose a sensible +default based on the batch_mode in use. + +- batch_mode: async - default batch_timeout is %v +- batch_mode: sync - default batch_timeout is %v +- batch_mode: off - not in use +`, opt.DefaultTimeoutAsync, opt.DefaultTimeoutSync), + Default: fs.Duration(0), + Advanced: true, + }, { + Name: "batch_commit_timeout", + Help: `Max time to wait for a batch to finish committing`, + Default: fs.Duration(10 * time.Minute), + Advanced: true, + }} +}