// This file contains the implementation of the sync batcher for uploads
// Dropbox rules say you can start as many batches as you want, but
// you may only have one batch being committed and must wait for the
// batch to be finished before committing another.

package dropbox

import (


const (
	maxBatchSize          = 1000                   // max size the batch can be
	defaultTimeoutSync    = 500 * time.Millisecond // kick off the batch if nothing added for this long (sync)
	defaultTimeoutAsync   = 10 * time.Second       // kick off the batch if nothing added for this long (ssync)
	defaultBatchSizeAsync = 100                    // default batch size if async

// batcher holds info about the current items waiting for upload
type batcher struct {
	f        *Fs                 // Fs this batch is part of
	mode     string              // configured batch mode
	size     int                 // maximum size for batch
	timeout  time.Duration       // idle timeout for batch
	async    bool                // whether we are using async batching
	in       chan batcherRequest // incoming items to batch
	closed   chan struct{}       // close to indicate batcher shut down
	atexit   atexit.FnHandle     // atexit handle
	shutOnce sync.Once           // make sure we shutdown once only
	wg       sync.WaitGroup      // wait for shutdown

// batcherRequest holds an incoming request with a place for a reply
type batcherRequest struct {
	commitInfo *files.UploadSessionFinishArg
	result     chan<- batcherResponse

// Return true if batcherRequest is the quit request
func (br *batcherRequest) isQuit() bool {
	return br.commitInfo == nil

// Send this to get the engine to quit
var quitRequest = batcherRequest{}

// batcherResponse holds a response to be delivered to clients waiting
// for a batch to complete.
type batcherResponse struct {
	err   error
	entry *files.FileMetadata

// newBatcher creates a new batcher structure
func newBatcher(ctx context.Context, f *Fs, mode string, size int, timeout time.Duration) (*batcher, error) {
	// fs.Debugf(f, "Creating batcher with mode %q, size %d, timeout %v", mode, size, timeout)
	if size > maxBatchSize || size < 0 {
		return nil, errors.Errorf("dropbox: batch size must be < %d and >= 0 - it is currently %d", maxBatchSize, size)

	async := false

	switch mode {
	case "sync":
		if size <= 0 {
			ci := fs.GetConfig(ctx)
			size = ci.Transfers
		if timeout <= 0 {
			timeout = defaultTimeoutSync
	case "async":
		if size <= 0 {
			size = defaultBatchSizeAsync
		if timeout <= 0 {
			timeout = defaultTimeoutAsync
		async = true
	case "off":
		size = 0
		return nil, errors.Errorf("dropbox: batch mode must be sync|async|off not %q", mode)

	b := &batcher{
		f:       f,
		mode:    mode,
		size:    size,
		timeout: timeout,
		async:   async,
		in:      make(chan batcherRequest, size),
		closed:  make(chan struct{}),
	if b.Batching() {
		b.atexit = atexit.Register(b.Shutdown)
		go b.commitLoop(context.Background())
	return b, nil


// Batching returns true if batching is active
func (b *batcher) Batching() bool {
	return b.size > 0

// finishBatch commits the batch, returning a batch status to poll or maybe complete
func (b *batcher) finishBatch(ctx context.Context, items []*files.UploadSessionFinishArg) (batchStatus *files.UploadSessionFinishBatchLaunch, err error) {
	var arg = &files.UploadSessionFinishBatchArg{
		Entries: items,
	err = b.f.pacer.Call(func() (bool, error) {
		batchStatus, err = b.f.srv.UploadSessionFinishBatch(arg)
		// If error is insufficient space then don't retry
		if e, ok := err.(files.UploadSessionFinishAPIError); ok {
			if e.EndpointError != nil && e.EndpointError.Path != nil && e.EndpointError.Path.Tag == files.WriteErrorInsufficientSpace {
				err = fserrors.NoRetryError(err)
				return false, err
		// after the first chunk is uploaded, we retry everything
		return err != nil, err
	if err != nil {
		return nil, errors.Wrap(err, "batch commit failed")
	return batchStatus, nil

// finishBatchJobStatus waits for the batch to complete returning completed entries
func (b *batcher) finishBatchJobStatus(ctx context.Context, launchBatchStatus *files.UploadSessionFinishBatchLaunch) (complete *files.UploadSessionFinishBatchResult, err error) {
	if launchBatchStatus.AsyncJobId == "" {
		return nil, errors.New("wait for batch completion: empty job ID")
	var batchStatus *files.UploadSessionFinishBatchJobStatus
	sleepTime := 100 * time.Millisecond
	const maxTries = 120
	for try := 1; try <= maxTries; try++ {
		err = b.f.pacer.Call(func() (bool, error) {
			batchStatus, err = b.f.srv.UploadSessionFinishBatchCheck(&async.PollArg{
				AsyncJobId: launchBatchStatus.AsyncJobId,
			return shouldRetry(ctx, err)
		if err != nil {
			fs.Debugf(b.f, "Wait for batch: sleeping for %v after error: %v: try %d/%d", sleepTime, err, try, maxTries)
		} else {
			if batchStatus.Tag == "complete" {
				return batchStatus.Complete, nil
			fs.Debugf(b.f, "Wait for batch: sleeping for %v after status: %q: try %d/%d", sleepTime, batchStatus.Tag, try, maxTries)
		sleepTime *= 2
		if sleepTime > time.Second {
			sleepTime = time.Second
	if err == nil {
		err = errors.New("batch didn't complete")
	return nil, errors.Wrapf(err, "wait for batch failed after %d tries", maxTries)

// commit a batch
func (b *batcher) commitBatch(ctx context.Context, items []*files.UploadSessionFinishArg, results []chan<- batcherResponse) (err error) {
	// If commit fails then signal clients if sync
	var signalled = b.async
	defer func() {
		if err != nil && signalled {
			// Signal to clients that there was an error
			for _, result := range results {
				result <- batcherResponse{err: err}
	desc := fmt.Sprintf("%s batch length %d starting with: %s", b.mode, len(items), items[0].Commit.Path)
	fs.Debugf(b.f, "Committing %s", desc)

	// finalise the batch getting either a result or a job id to poll
	batchStatus, err := b.finishBatch(ctx, items)
	if err != nil {
		return err

	// check whether batch is complete
	var complete *files.UploadSessionFinishBatchResult
	switch batchStatus.Tag {
	case "async_job_id":
		// wait for batch to complete
		complete, err = b.finishBatchJobStatus(ctx, batchStatus)
		if err != nil {
			return err
	case "complete":
		complete = batchStatus.Complete
		return errors.Errorf("batch returned unknown status %q", batchStatus.Tag)

	// Check we got the right number of entries
	entries := complete.Entries
	if len(entries) != len(results) {
		return errors.Errorf("expecting %d items in batch but got %d", len(results), len(entries))

	// Report results to clients
	var (
		errorTag   = ""
		errorCount = 0
	for i := range results {
		item := entries[i]
		resp := batcherResponse{}
		if item.Tag == "success" {
			resp.entry = item.Success
		} else {
			errorTag = item.Tag
			if item.Failure != nil {
				errorTag = item.Failure.Tag
				if item.Failure.LookupFailed != nil {
					errorTag += "/" + item.Failure.LookupFailed.Tag
				if item.Failure.Path != nil {
					errorTag += "/" + item.Failure.Path.Tag
				if item.Failure.PropertiesError != nil {
					errorTag += "/" + item.Failure.PropertiesError.Tag
			resp.err = errors.Errorf("batch upload failed: %s", errorTag)
		if !b.async {
			results[i] <- resp
	// Show signalled so no need to report error to clients from now on
	signalled = true

	// Report an error if any failed in the batch
	if errorTag != "" {
		return errors.Errorf("batch had %d errors: last error: %s", errorCount, errorTag)

	fs.Debugf(b.f, "Committed %s", desc)
	return nil

// commitLoop runs the commit engine in the background
func (b *batcher) commitLoop(ctx context.Context) {
	var (
		items     []*files.UploadSessionFinishArg // current batch of uncommitted files
		results   []chan<- batcherResponse        // current batch of clients awaiting results
		idleTimer = time.NewTimer(b.timeout)
		commit    = func() {
			err := b.commitBatch(ctx, items, results)
			if err != nil {
				fs.Errorf(b.f, "%s batch commit: failed to commit batch length %d: %v", b.mode, len(items), err)
			items, results = nil, nil
	defer b.wg.Done()
	defer idleTimer.Stop()

	for {
		select {
		case req := <-b.in:
			if req.isQuit() {
				break outer
			items = append(items, req.commitInfo)
			results = append(results, req.result)
			if len(items) >= b.size {
			} else {
		case <-idleTimer.C:
			if len(items) > 0 {
				fs.Debugf(b.f, "Batch idle for %v so committing", b.timeout)

	// commit any remaining items
	if len(items) > 0 {

// Shutdown finishes any pending batches then shuts everything down
// Can be called from atexit handler
func (b *batcher) Shutdown() {
	b.shutOnce.Do(func() {
		fs.Infof(b.f, "Commiting uploads - please wait...")
		// show that batcher is shutting down
		// quit the commitLoop by sending a quitRequest message
		// Note that we don't close b.in because that will
		// cause write to closed channel in Commit when we are
		// exiting due to a signal.
		b.in <- quitRequest

// Commit commits the file using a batch call, first adding it to the
// batch and then waiting for the batch to complete in a synchronous
// way if async is not set.
func (b *batcher) Commit(ctx context.Context, commitInfo *files.UploadSessionFinishArg) (entry *files.FileMetadata, err error) {
	select {
	case <-b.closed:
		return nil, fserrors.FatalError(errors.New("batcher is shutting down"))
	fs.Debugf(b.f, "Adding %q to batch", commitInfo.Commit.Path)
	resp := make(chan batcherResponse, 1)
	b.in <- batcherRequest{
		commitInfo: commitInfo,
		result:     resp,
	// If running async then don't wait for the result
	if b.async {
		return nil, nil
	result := <-resp
	return result.entry, result.err