1
mirror of https://github.com/rclone/rclone synced 2024-11-21 22:50:16 +01:00

vfs: Make tests run reliably

On file Remove
- cancel any writebacks in progress
- ignore error message deleting non existent file if file was in the
  process of being uploaded

Writeback
- Don't transfer the file if it has disappeared in the meantime
- Take our own copy of the file name to avoid deadlocks
- Fix delayed retry logic
- Wait for upload to finish when cancelling upload

Fix race condition in item saving

Fix race condition in vfscache test

Make sure we delete the file on the error path - this makes cascading
failures much less likely
This commit is contained in:
Nick Craig-Wood 2020-06-03 15:49:41 +01:00
parent 496a87a665
commit 58a7faa281
6 changed files with 128 additions and 62 deletions

View File

@ -527,7 +527,8 @@ func (f *File) Sync() error {
}
// Remove the file
func (f *File) Remove() error {
func (f *File) Remove() (err error) {
defer log.Trace(f.Path(), "")("err=%v", &err)
f.mu.RLock()
d := f.d
f.mu.RUnlock()
@ -535,28 +536,33 @@ func (f *File) Remove() error {
if d.vfs.Opt.ReadOnly {
return EROFS
}
f.muRW.Lock() // muRW must be locked before mu to avoid
f.mu.Lock() // deadlock in RWFileHandle.openPending and .close
if f.o != nil {
err := f.o.Remove(context.TODO())
if err != nil {
fs.Debugf(f._path(), "File.Remove file error: %v", err)
f.mu.Unlock()
f.muRW.Unlock()
return err
}
// Remove the object from the cache
wasWriting := false
if d.vfs.cache != nil {
wasWriting = d.vfs.cache.Remove(f.Path())
}
f.mu.Unlock()
f.muRW.Unlock()
// Remove the item from the directory listing
// called with File.mu released
d.delObject(f.Name())
// Remove the object from the cache
if d.vfs.cache != nil {
d.vfs.cache.Remove(f.Path())
f.muRW.Lock() // muRW must be locked before mu to avoid
f.mu.Lock() // deadlock in RWFileHandle.openPending and .close
if f.o != nil {
err = f.o.Remove(context.TODO())
}
return nil
f.mu.Unlock()
f.muRW.Unlock()
if err != nil {
if wasWriting {
// Ignore error deleting file if was writing it as it may not be uploaded yet
err = nil
} else {
fs.Debugf(f._path(), "File.Remove file error: %v", err)
}
}
return err
}
// RemoveAll the file - same as remove for files

View File

@ -565,6 +565,11 @@ func TestRWFileHandleSizeCreateNew(t *testing.T) {
func testRWFileHandleOpenTest(t *testing.T, vfs *VFS, test *openTest) {
fileName := "open-test-file"
// Make sure we delete the file on failure too
defer func() {
_ = vfs.Remove(fileName)
}()
// first try with file not existing
_, err := vfs.Stat(fileName)
require.True(t, os.IsNotExist(err))

View File

@ -309,14 +309,16 @@ func (c *Cache) Rename(name string, newName string, newObj fs.Object) (err error
}
// Remove should be called if name is deleted
func (c *Cache) Remove(name string) {
//
// This returns true if the file was in the transfer queue so may not
// have completedly uploaded yet.
func (c *Cache) Remove(name string) (wasWriting bool) {
name = clean(name)
c.mu.Lock()
item, _ := c._get(name)
delete(c.item, name)
c.mu.Unlock()
item.remove("file deleted")
return item.remove("file deleted")
}
// SetModTime should be called to set the modification time of the cache file

View File

@ -76,7 +76,7 @@ func newTestCache(t *testing.T) (r *fstest.Run, c *Cache, cleanup func()) {
// Disable the cache cleaner as it interferes with these tests
opt.CachePollInterval = 0
// Enable synchronous write
// Disable synchronous write
opt.WriteBack = 0
return newTestCacheOpt(t, opt)
@ -264,7 +264,7 @@ func TestCachePurgeOld(t *testing.T) {
var removed []string
removeFile := func(item *Item) {
removed = append(removed, item.name)
item._remove("TestCachePurgeOld")
item.remove("TestCachePurgeOld")
}
removed = nil
@ -326,7 +326,7 @@ func TestCachePurgeOverQuota(t *testing.T) {
var removed []string
remove := func(item *Item) {
removed = append(removed, item.name)
item._remove("TestCachePurgeOverQuota")
item.remove("TestCachePurgeOverQuota")
}
removed = nil
@ -402,7 +402,6 @@ func TestCachePurgeOverQuota(t *testing.T) {
// make potato definitely after potato2
t2 := t1.Add(20 * time.Second)
require.NoError(t, potato.Truncate(5))
potato.info.ATime = t2
// Check only potato2 removed to get below quota
@ -563,13 +562,13 @@ func TestCacheCleaner(t *testing.T) {
potato := c.Item("potato")
potato2, found := c.get("potato")
assert.Equal(t, potato, potato2)
assert.Equal(t, fmt.Sprintf("%p", potato), fmt.Sprintf("%p", potato2))
assert.True(t, found)
time.Sleep(10 * opt.CachePollInterval)
potato2, found = c.get("potato")
assert.NotEqual(t, potato, potato2)
assert.NotEqual(t, fmt.Sprintf("%p", potato), fmt.Sprintf("%p", potato2))
assert.False(t, found)
}

View File

@ -471,18 +471,22 @@ func (item *Item) _store(ctx context.Context, storeFn StoreFn) (err error) {
// Transfer the temp file to the remote
cacheObj, err := item.c.fcache.NewObject(ctx, item.name)
if err != nil {
if err != nil && err != fs.ErrorObjectNotFound {
return errors.Wrap(err, "vfs cache: failed to find cache file")
}
item.mu.Unlock()
o, err := operations.Copy(ctx, item.c.fremote, item.o, item.name, cacheObj)
item.mu.Lock()
if err != nil {
return errors.Wrap(err, "vfs cache: failed to transfer file from cache to remote")
// Object has disappeared if cacheObj == nil
if cacheObj != nil {
item.mu.Unlock()
o, err := operations.Copy(ctx, item.c.fremote, item.o, item.name, cacheObj)
item.mu.Lock()
if err != nil {
return errors.Wrap(err, "vfs cache: failed to transfer file from cache to remote")
}
item.o = o
item._updateFingerprint()
}
item.o = o
item._updateFingerprint()
item.info.Dirty = false
err = item._save()
if err != nil {
@ -529,10 +533,12 @@ func (item *Item) Close(storeFn StoreFn) (err error) {
}
// save the metadata once more since it may be dirty
// after the downloader
item.mu.Lock()
saveErr := item._save()
if saveErr != nil && err == nil {
err = errors.Wrap(saveErr, "close failed to save item")
}
item.mu.Unlock()
}()
item.mu.Lock()
defer item.mu.Unlock()
@ -582,7 +588,7 @@ func (item *Item) Close(storeFn StoreFn) (err error) {
err = item._store(context.Background(), storeFn)
} else {
// asynchronous writeback
item.c.writeback.add(item, storeFn)
item.c.writeback.add(item, item.name, storeFn)
}
}
@ -666,10 +672,10 @@ func (item *Item) _removeFile(reason string) {
err := os.Remove(osPath)
if err != nil {
if !os.IsNotExist(err) {
fs.Errorf(item.name, "Failed to remove cache file as %s: %v", reason, err)
fs.Errorf(item.name, "vfs cache: failed to remove cache file as %s: %v", reason, err)
}
} else {
fs.Infof(item.name, "Removed cache file as %s", reason)
fs.Infof(item.name, "vfs cache: removed cache file as %s", reason)
}
}
@ -681,28 +687,37 @@ func (item *Item) _removeMeta(reason string) {
err := os.Remove(osPathMeta)
if err != nil {
if !os.IsNotExist(err) {
fs.Errorf(item.name, "Failed to remove metadata from cache as %s: %v", reason, err)
fs.Errorf(item.name, "vfs cache: failed to remove metadata from cache as %s: %v", reason, err)
}
} else {
fs.Infof(item.name, "Removed metadata from cache as %s", reason)
fs.Infof(item.name, "vfs cache: removed metadata from cache as %s", reason)
}
}
// remove the cached file and empty the metadata
//
// This returns true if the file was in the transfer queue so may not
// have completedly uploaded yet.
//
// call with lock held
func (item *Item) _remove(reason string) {
func (item *Item) _remove(reason string) (wasWriting bool) {
// Cancel writeback, if any
wasWriting = item.c.writeback.cancel(item)
item.info.clean()
item.metaDirty = false
item._removeFile(reason)
item._removeMeta(reason)
return wasWriting
}
// remove the cached file and empty the metadata
func (item *Item) remove(reason string) {
//
// This returns true if the file was in the transfer queue so may not
// have completedly uploaded yet.
func (item *Item) remove(reason string) (wasWriting bool) {
item.mu.Lock()
item._remove(reason)
item.mu.Unlock()
defer item.mu.Unlock()
return item._remove(reason)
}
// create a downloader for the item

View File

@ -13,15 +13,14 @@ import (
)
const (
uploadDelay = 10 * time.Second // delay betwen upload attempts
maxUploadAttempts = 10 // max number of times to try to upload
maxUploadDelay = 5 * time.Minute // max delay betwen upload attempts
)
// writeBack keeps track of the items which need to be written back to the disk at some point
type writeBack struct {
mu sync.Mutex
items writeBackItems // priority queue of *writeBackItem
lookup map[*Item]*writeBackItem // for getting a *writeBackItem from a *Item
items writeBackItems // priority queue of *writeBackItem - writeBackItems are in here while awaiting transfer only
lookup map[*Item]*writeBackItem // for getting a *writeBackItem from a *Item - writeBackItems are in here until cancelled
opt *vfscommon.Options // VFS options
timer *time.Timer // next scheduled time for the uploader
kick chan struct{} // send on this channel to wake up the uploader
@ -47,13 +46,19 @@ func newWriteBack(ctx context.Context, opt *vfscommon.Options) *writeBack {
// writeBackItem stores an Item awaiting writeback
//
// These are stored on the items heap when awaiting transfer but
// removed from the items heap when transferring. They remain in the
// lookup map until cancelled.
//
// writeBack.mu must be held to manipulate this
type writeBackItem struct {
name string // name of the item so we don't have to read it from item
index int // index into the priority queue for update
item *Item // Item that needs writeback
expiry time.Time // When this expires we will write it back
uploading bool // If we are uploading the item
cancel context.CancelFunc // To cancel the upload with
done chan struct{} // closed when the cancellation completes
storeFn StoreFn // To write the object back with
tries int // number of times we have tried to upload
delay time.Duration // delay between upload attempts
@ -114,11 +119,12 @@ func (wb *writeBack) _newExpiry() time.Time {
// make a new writeBackItem
//
// call with the lock held
func (wb *writeBack) _newItem(item *Item) *writeBackItem {
func (wb *writeBack) _newItem(item *Item, name string) *writeBackItem {
wbItem := &writeBackItem{
name: name,
item: item,
expiry: wb._newExpiry(),
delay: uploadDelay,
delay: wb.opt.WriteBack,
}
wb._addItem(wbItem)
wb._pushItem(wbItem)
@ -153,6 +159,13 @@ func (wb *writeBack) _pushItem(wbItem *writeBackItem) {
heap.Push(&wb.items, wbItem)
}
// remove a writeBackItem from the items heap
//
// call with the lock held
func (wb *writeBack) _removeItem(wbItem *writeBackItem) {
heap.Remove(&wb.items, wbItem.index)
}
// peek the oldest writeBackItem - may be nil
//
// call with the lock held
@ -179,13 +192,13 @@ func (wb *writeBack) _resetTimer() {
// add adds an item to the writeback queue or resets its timer if it
// is already there
func (wb *writeBack) add(item *Item, storeFn StoreFn) {
func (wb *writeBack) add(item *Item, name string, storeFn StoreFn) {
wb.mu.Lock()
defer wb.mu.Unlock()
wbItem, ok := wb.lookup[item]
if !ok {
wbItem = wb._newItem(item)
wbItem = wb._newItem(item, name)
} else {
if wbItem.uploading {
// We are uploading already so cancel the upload
@ -198,6 +211,28 @@ func (wb *writeBack) add(item *Item, storeFn StoreFn) {
wb._resetTimer()
}
// cancel a writeback if there is one
func (wb *writeBack) cancel(item *Item) (found bool) {
wb.mu.Lock()
defer wb.mu.Unlock()
wbItem, found := wb.lookup[item]
if found {
fs.Debugf(wbItem.name, "vfs cache: cancelling writeback")
if wbItem.uploading {
// We are uploading already so cancel the upload
wb._cancelUpload(wbItem)
} else {
// Remove the item from the heap
wb._removeItem(wbItem)
}
// Remove the item from the lookup map
wb._delItem(wbItem)
}
wb._resetTimer()
return found
}
// kick the upload checker
//
// This should be called at the end of uploads just in case we had to
@ -229,20 +264,22 @@ func (wb *writeBack) upload(ctx context.Context, wbItem *writeBackItem) {
}
if err != nil {
if wbItem.tries < maxUploadAttempts {
fs.Errorf(item.getName(), "vfs cache: failed to upload, will retry in %v: %v", wb.opt.WriteBack, err)
// push the item back on the queue for retry
wb._pushItem(wbItem)
wb.items._update(wbItem, time.Now().Add(wbItem.delay))
wbItem.delay *= 2
} else {
fs.Errorf(item.getName(), "vfs cache: failed to upload, will retry in %v: %v", wb.opt.WriteBack, err)
// FIXME should this have a max number of transfer attempts?
wbItem.delay *= 2
if wbItem.delay > maxUploadDelay {
wbItem.delay = maxUploadDelay
}
fs.Errorf(wbItem.name, "vfs cache: failed to upload try #%d, will retry in %v: %v", wbItem.tries, wbItem.delay, err)
// push the item back on the queue for retry
wb._pushItem(wbItem)
wb.items._update(wbItem, time.Now().Add(wbItem.delay))
} else {
fs.Infof(wbItem.name, "vfs cache: upload succeeded try #%d", wbItem.tries)
// show that we are done with the item
wb._delItem(wbItem)
}
wb._kickUploader()
close(wbItem.done)
}
// cancel the upload
@ -252,11 +289,12 @@ func (wb *writeBack) _cancelUpload(wbItem *writeBackItem) {
if !wbItem.uploading {
return
}
fs.Debugf(wbItem.item.getName(), "vfs cache: canceling upload")
fs.Debugf(wbItem.name, "vfs cache: cancelling upload")
if wbItem.cancel != nil {
// Cancel the upload - this may or may not be effective
// we don't wait for the completion
wbItem.cancel()
// wait for the uploader to finish
<-wbItem.done
}
if wbItem.uploading {
wbItem.uploading = false
@ -275,7 +313,7 @@ func (wb *writeBack) processItems(ctx context.Context) {
for wbItem := wb._peekItem(); wbItem != nil && time.Until(wbItem.expiry) <= 0; wbItem = wb._peekItem() {
// If reached transfer limit don't restart the timer
if wb.uploads >= fs.Config.Transfers {
fs.Debugf(wbItem.item.getName(), "vfs cache: delaying writeback as --transfers exceeded")
fs.Debugf(wbItem.name, "vfs cache: delaying writeback as --transfers exceeded")
resetTimer = false
break
}
@ -286,6 +324,7 @@ func (wb *writeBack) processItems(ctx context.Context) {
wb.uploads++
newCtx, cancel := context.WithCancel(ctx)
wbItem.cancel = cancel
wbItem.done = make(chan struct{})
go wb.upload(newCtx, wbItem)
}