1
mirror of https://github.com/rclone/rclone synced 2024-11-24 01:26:25 +01:00

dropbox: fix async batch missing the last few entries

This commit is contained in:
Nick Craig-Wood 2021-04-09 17:25:25 +01:00
parent 5ee646f264
commit 75c417ad93

View File

@ -8,6 +8,7 @@ package dropbox
import (
"context"
"fmt"
"sync"
"time"
@ -34,7 +35,7 @@ type batcher struct {
timeout time.Duration // idle timeout for batch
async bool // whether we are using async batching
in chan batcherRequest // incoming items to batch
quit chan struct{} // close to quit the loop
closed chan struct{} // close to indicate batcher shut down
atexit atexit.FnHandle // atexit handle
shutOnce sync.Once // make sure we shutdown once only
wg sync.WaitGroup // wait for shutdown
@ -46,6 +47,14 @@ type batcherRequest struct {
result chan<- batcherResponse
}
// Return true if batcherRequest is the quit request
func (br *batcherRequest) isQuit() bool {
return br.commitInfo == nil
}
// Send this to get the engine to quit
var quitRequest = batcherRequest{}
// batcherResponse holds a response to be delivered to clients waiting
// for a batch to complete.
type batcherResponse struct {
@ -92,7 +101,7 @@ func newBatcher(ctx context.Context, f *Fs, mode string, size int, timeout time.
timeout: timeout,
async: async,
in: make(chan batcherRequest, size),
quit: make(chan struct{}),
closed: make(chan struct{}),
}
if b.Batching() {
b.atexit = atexit.Register(b.Shutdown)
@ -178,7 +187,8 @@ func (b *batcher) commitBatch(ctx context.Context, items []*files.UploadSessionF
}
}
}()
fs.Debugf(b.f, "Committing %s batch length %d", b.mode, len(items))
desc := fmt.Sprintf("%s batch length %d starting with: %s", b.mode, len(items), items[0].Commit.Path)
fs.Debugf(b.f, "Committing %s", desc)
// finalise the batch getting either a result or a job id to poll
batchStatus, err := b.finishBatch(ctx, items)
@ -246,6 +256,7 @@ func (b *batcher) commitBatch(ctx context.Context, items []*files.UploadSessionF
return errors.Errorf("batch had %d errors: last error: %s", errorCount, errorTag)
}
fs.Debugf(b.f, "Committed %s", desc)
return nil
}
@ -270,10 +281,8 @@ func (b *batcher) commitLoop(ctx context.Context) {
outer:
for {
select {
case <-b.quit:
break outer
case req, ok := <-b.in:
if !ok {
case req := <-b.in:
if req.isQuit() {
break outer
}
items = append(items, req.commitInfo)
@ -304,9 +313,15 @@ outer:
func (b *batcher) Shutdown() {
b.shutOnce.Do(func() {
atexit.Unregister(b.atexit)
// quit the commitLoop. Note that we don't close b.in
// because that will cause write to closed channel
close(b.quit)
fs.Infof(b.f, "Commiting uploads - please wait...")
// show that batcher is shutting down
close(b.closed)
// quit the commitLoop by sending a quitRequest message
//
// Note that we don't close b.in because that will
// cause write to closed channel in Commit when we are
// exiting due to a signal.
b.in <- quitRequest
b.wg.Wait()
})
}
@ -315,6 +330,11 @@ func (b *batcher) Shutdown() {
// batch and then waiting for the batch to complete in a synchronous
// way if async is not set.
func (b *batcher) Commit(ctx context.Context, commitInfo *files.UploadSessionFinishArg) (entry *files.FileMetadata, err error) {
select {
case <-b.closed:
return nil, fserrors.FatalError(errors.New("batcher is shutting down"))
default:
}
fs.Debugf(b.f, "Adding %q to batch", commitInfo.Commit.Path)
resp := make(chan batcherResponse, 1)
b.in <- batcherRequest{