diff --git a/backend/dropbox/batcher.go b/backend/dropbox/batcher.go index 11303de81..a4bc3bf0d 100644 --- a/backend/dropbox/batcher.go +++ b/backend/dropbox/batcher.go @@ -8,6 +8,7 @@ package dropbox import ( "context" + "fmt" "sync" "time" @@ -34,7 +35,7 @@ type batcher struct { timeout time.Duration // idle timeout for batch async bool // whether we are using async batching in chan batcherRequest // incoming items to batch - quit chan struct{} // close to quit the loop + closed chan struct{} // close to indicate batcher shut down atexit atexit.FnHandle // atexit handle shutOnce sync.Once // make sure we shutdown once only wg sync.WaitGroup // wait for shutdown @@ -46,6 +47,14 @@ type batcherRequest struct { result chan<- batcherResponse } +// Return true if batcherRequest is the quit request +func (br *batcherRequest) isQuit() bool { + return br.commitInfo == nil +} + +// Send this to get the engine to quit +var quitRequest = batcherRequest{} + // batcherResponse holds a response to be delivered to clients waiting // for a batch to complete. type batcherResponse struct { @@ -92,7 +101,7 @@ func newBatcher(ctx context.Context, f *Fs, mode string, size int, timeout time. timeout: timeout, async: async, in: make(chan batcherRequest, size), - quit: make(chan struct{}), + closed: make(chan struct{}), } if b.Batching() { b.atexit = atexit.Register(b.Shutdown) @@ -178,7 +187,8 @@ func (b *batcher) commitBatch(ctx context.Context, items []*files.UploadSessionF } } }() - fs.Debugf(b.f, "Committing %s batch length %d", b.mode, len(items)) + desc := fmt.Sprintf("%s batch length %d starting with: %s", b.mode, len(items), items[0].Commit.Path) + fs.Debugf(b.f, "Committing %s", desc) // finalise the batch getting either a result or a job id to poll batchStatus, err := b.finishBatch(ctx, items) @@ -246,6 +256,7 @@ func (b *batcher) commitBatch(ctx context.Context, items []*files.UploadSessionF return errors.Errorf("batch had %d errors: last error: %s", errorCount, errorTag) } + fs.Debugf(b.f, "Committed %s", desc) return nil } @@ -270,10 +281,8 @@ func (b *batcher) commitLoop(ctx context.Context) { outer: for { select { - case <-b.quit: - break outer - case req, ok := <-b.in: - if !ok { + case req := <-b.in: + if req.isQuit() { break outer } items = append(items, req.commitInfo) @@ -304,9 +313,15 @@ outer: func (b *batcher) Shutdown() { b.shutOnce.Do(func() { atexit.Unregister(b.atexit) - // quit the commitLoop. Note that we don't close b.in - // because that will cause write to closed channel - close(b.quit) + fs.Infof(b.f, "Commiting uploads - please wait...") + // show that batcher is shutting down + close(b.closed) + // quit the commitLoop by sending a quitRequest message + // + // Note that we don't close b.in because that will + // cause write to closed channel in Commit when we are + // exiting due to a signal. + b.in <- quitRequest b.wg.Wait() }) } @@ -315,6 +330,11 @@ func (b *batcher) Shutdown() { // batch and then waiting for the batch to complete in a synchronous // way if async is not set. func (b *batcher) Commit(ctx context.Context, commitInfo *files.UploadSessionFinishArg) (entry *files.FileMetadata, err error) { + select { + case <-b.closed: + return nil, fserrors.FatalError(errors.New("batcher is shutting down")) + default: + } fs.Debugf(b.f, "Adding %q to batch", commitInfo.Commit.Path) resp := make(chan batcherResponse, 1) b.in <- batcherRequest{