diff --git a/vfs/read.go b/vfs/read.go index c51f46008..c44d9118f 100644 --- a/vfs/read.go +++ b/vfs/read.go @@ -212,6 +212,43 @@ func (fh *ReadFileHandle) ReadAt(p []byte, off int64) (n int, err error) { return fh.readAt(p, off) } +// This waits for *poff to equal off or aborts after the timeout. +// +// Waits here potentially affect all seeks so need to keep them short +// +// Call with fh.mu Locked +func waitSequential(what string, remote string, cond *sync.Cond, maxWait time.Duration, poff *int64, off int64) { + var ( + timeout = time.NewTimer(maxWait) + done = make(chan struct{}) + abort = false + ) + go func() { + select { + case <-timeout.C: + // take the lock to make sure that cond.Wait() is called before + // cond.Broadcast. NB cond.L == mu + cond.L.Lock() + // set abort flag and give all the waiting goroutines a kick on timeout + abort = true + fs.Debugf(remote, "aborting in-sequence %s wait, off=%d", what, off) + cond.Broadcast() + cond.L.Unlock() + case <-done: + } + }() + for *poff != off && !abort { + fs.Debugf(remote, "waiting for in-sequence %s to %d for %v", what, off, maxWait) + cond.Wait() + } + // tidy up end timer + close(done) + timeout.Stop() + if *poff != off { + fs.Debugf(remote, "failed to wait for in-sequence %s to %d", what, off) + } +} + // Implementation of ReadAt - call with lock held func (fh *ReadFileHandle) readAt(p []byte, off int64) (n int, err error) { // defer log.Trace(fh.remote, "p[%d], off=%d", len(p), off)("n=%d, err=%v", &n, &err) @@ -229,39 +266,7 @@ func (fh *ReadFileHandle) readAt(p []byte, off int64) (n int, err error) { maxBuf = len(p) } if gap := off - fh.offset; gap > 0 && gap < int64(8*maxBuf) { - // Set a background timer so we don't wait for long - // Waits here potentially affect all seeks so need to keep them short - // The default time here was made by finding the - // smallest when mounting a local backend that didn't - // cause seeks. - maxWait := fh.file.VFS().Opt.ReadWait - timeout := time.NewTimer(maxWait) - done := make(chan struct{}) - abort := false - go func() { - select { - case <-timeout.C: - // take the lock to make sure that fh.cond.Wait() is called before - // fh.cond.Broadcast. NB fh.cond.L == fh.mu - fh.mu.Lock() - // set abort flag and give all the waiting goroutines a kick on timeout - abort = true - fs.Debugf(fh.remote, "aborting in-sequence read wait, off=%d", off) - fh.cond.Broadcast() - fh.mu.Unlock() - case <-done: - } - }() - for fh.offset != off && !abort { - fs.Debugf(fh.remote, "waiting for in-sequence read to %d for %v", off, maxWait) - fh.cond.Wait() - } - // tidy up end timer - close(done) - timeout.Stop() - if fh.offset != off { - fs.Debugf(fh.remote, "failed to wait for in-sequence read to %d", off) - } + waitSequential("read", fh.remote, fh.cond, fh.file.VFS().Opt.ReadWait, &fh.offset, off) } doSeek := off != fh.offset if doSeek && fh.noSeek { diff --git a/vfs/write.go b/vfs/write.go index b7401bbe9..dd45fdcfd 100644 --- a/vfs/write.go +++ b/vfs/write.go @@ -5,7 +5,6 @@ import ( "io" "os" "sync" - "sync/atomic" "time" "github.com/rclone/rclone/fs" @@ -131,28 +130,7 @@ func (fh *WriteFileHandle) writeAt(p []byte, off int64) (n int, err error) { return 0, ECLOSED } if fh.offset != off { - // Set a background timer so we don't wait forever - maxWait := fh.file.VFS().Opt.WriteWait - timeout := time.NewTimer(maxWait) - done := make(chan struct{}) - abort := int32(0) - go func() { - select { - case <-timeout.C: - // set abort flag an give all the waiting goroutines a kick on timeout - atomic.StoreInt32(&abort, 1) - fh.cond.Broadcast() - case <-done: - } - }() - // Wait for an in-sequence write or abort - for fh.offset != off && atomic.LoadInt32(&abort) == 0 { - // fs.Debugf(fh.remote, "waiting for in-sequence write to %d", off) - fh.cond.Wait() - } - // tidy up end timer - close(done) - timeout.Stop() + waitSequential("write", fh.remote, fh.cond, fh.file.VFS().Opt.WriteWait, &fh.offset, off) } if fh.offset != off { fs.Errorf(fh.remote, "WriteFileHandle.Write: can't seek in file without --vfs-cache-mode >= writes")