From 917cb4acb37131625bbbffa5f14208f150dcaa25 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Sat, 29 Feb 2020 18:08:22 +0000 Subject: [PATCH] vfs: implement partial reads in --vfs-cache-mode full This allows reads to only read part of the file and it keeps on disk a cache of what parts of each file have been loaded. File data itself is kept in sparse files. --- vfs/file.go | 162 +++---- vfs/read_write.go | 388 +++++---------- vfs/read_write_test.go | 61 +-- vfs/vfscache/cache.go | 513 ++++++++++++++++++++ vfs/vfscache/cache_test.go | 472 +++++++++++++++++++ vfs/vfscache/downloader.go | 310 ++++++++++++ vfs/vfscache/item.go | 858 ++++++++++++++++++++++++++++++++++ vfs/vfscache/vfscache.go | 600 ------------------------ vfs/vfscache/vfscache_test.go | 602 ------------------------ vfs/write.go | 3 +- 10 files changed, 2359 insertions(+), 1610 deletions(-) create mode 100644 vfs/vfscache/cache.go create mode 100644 vfs/vfscache/cache_test.go create mode 100644 vfs/vfscache/downloader.go create mode 100644 vfs/vfscache/item.go delete mode 100644 vfs/vfscache/vfscache.go delete mode 100644 vfs/vfscache/vfscache_test.go diff --git a/vfs/file.go b/vfs/file.go index aa11ca013..299def1e7 100644 --- a/vfs/file.go +++ b/vfs/file.go @@ -38,21 +38,17 @@ type File struct { inode uint64 // inode number - read only size int64 // size of file - read and written with atomic int64 - must be 64 bit aligned - mu sync.RWMutex // protects the following - d *Dir // parent directory - dPath string // path of parent directory. NB dir rename means all Files are flushed - o fs.Object // NB o may be nil if file is being written - leaf string // leaf name of the object - rwOpenCount int // number of open files on this handle - writers []Handle // writers for this file - nwriters int32 // len(writers) which is read/updated with atomic - readWriters int // how many RWFileHandle are open for writing - readWriterClosing bool // is an RWFileHandle currently cosing? - modified bool // has the cache file be modified by an RWFileHandle? - pendingModTime time.Time // will be applied once o becomes available, i.e. after file was written - pendingRenameFun func(ctx context.Context) error // will be run/renamed after all writers close - appendMode bool // file was opened with O_APPEND - sys interface{} // user defined info to be attached here + mu sync.RWMutex // protects the following + d *Dir // parent directory + dPath string // path of parent directory. NB dir rename means all Files are flushed + o fs.Object // NB o may be nil if file is being written + leaf string // leaf name of the object + writers []Handle // writers for this file + nwriters int32 // len(writers) which is read/updated with atomic + pendingModTime time.Time // will be applied once o becomes available, i.e. after file was written + pendingRenameFun func(ctx context.Context) error // will be run/renamed after all writers close + appendMode bool // file was opened with O_APPEND + sys interface{} // user defined info to be attached here muRW sync.Mutex // synchronize RWFileHandle.openPending(), RWFileHandle.close() and File.Remove } @@ -124,11 +120,6 @@ func (f *File) Path() string { return path.Join(dPath, leaf) } -// osPath returns the full path of the file in the cache in OS format -func (f *File) osPath() string { - return f.d.vfs.cache.ToOSPath(f.Path()) -} - // Sys returns underlying data source (can be nil) - satisfies Node interface func (f *File) Sys() interface{} { f.mu.RLock() @@ -184,10 +175,11 @@ func (f *File) rename(ctx context.Context, destDir *Dir, newName string) error { return err } + oldPath := f.Path() // File.mu is unlocked here to call Dir.Path() newPath := path.Join(destDir.Path(), newName) - renameCall := func(ctx context.Context) error { + renameCall := func(ctx context.Context) (err error) { // chain rename calls if any if oldPendingRenameFun != nil { err := oldPendingRenameFun(ctx) @@ -199,44 +191,46 @@ func (f *File) rename(ctx context.Context, destDir *Dir, newName string) error { f.mu.RLock() o := f.o f.mu.RUnlock() - if o == nil { - return errors.New("Cannot rename: file object is not available") - } - if o.Remote() == newPath { - return nil // no need to rename - } + var newObject fs.Object + // if o is nil then are writing the file so no need to rename the object + if o != nil { + if o.Remote() == newPath { + return nil // no need to rename + } - // do the move of the remote object - dstOverwritten, _ := d.Fs().NewObject(ctx, newPath) - newObject, err := operations.Move(ctx, d.Fs(), dstOverwritten, newPath, o) - if err != nil { - fs.Errorf(f.Path(), "File.Rename error: %v", err) - return err - } + // do the move of the remote object + dstOverwritten, _ := d.Fs().NewObject(ctx, newPath) + newObject, err = operations.Move(ctx, d.Fs(), dstOverwritten, newPath, o) + if err != nil { + fs.Errorf(f.Path(), "File.Rename error: %v", err) + return err + } - // newObject can be nil here for example if --dry-run - if newObject == nil { - err = errors.New("rename failed: nil object returned") - fs.Errorf(f.Path(), "File.Rename %v", err) - return err + // newObject can be nil here for example if --dry-run + if newObject == nil { + err = errors.New("rename failed: nil object returned") + fs.Errorf(f.Path(), "File.Rename %v", err) + return err + } + } + // Rename in the cache + if f.d.vfs.cache != nil { + if err := f.d.vfs.cache.Rename(oldPath, newPath, newObject); err != nil { + fs.Infof(f.Path(), "File.Rename failed in Cache: %v", err) + } } // Update the node with the new details fs.Debugf(f.Path(), "Updating file with %v %p", newObject, f) // f.rename(destDir, newObject) f.mu.Lock() - f.o = newObject + if newObject != nil { + f.o = newObject + } f.pendingRenameFun = nil f.mu.Unlock() return nil } - // Rename in the cache if it exists - if f.d.vfs.cache != nil && f.d.vfs.cache.Exists(f.Path()) { - if err := f.d.vfs.cache.Rename(f.Path(), newPath); err != nil { - fs.Infof(f.Path(), "File.Rename failed in Cache: %v", err) - } - } - // rename the file object dPath := destDir.Path() f.mu.Lock() @@ -246,7 +240,12 @@ func (f *File) rename(ctx context.Context, destDir *Dir, newName string) error { writing := f._writingInProgress() f.mu.Unlock() - if writing { + // Delay the rename if not using RW caching. For the minimal case we + // need to look in the cache to see if caching is in use. + CacheMode := d.vfs.Opt.CacheMode + if writing && + (CacheMode < vfscommon.CacheModeMinimal || + (CacheMode == vfscommon.CacheModeMinimal && !f.d.vfs.cache.Exists(f.Path()))) { fs.Debugf(f.Path(), "File is currently open, delaying rename %p", f) f.mu.Lock() f.pendingRenameFun = renameCall @@ -262,14 +261,11 @@ func (f *File) addWriter(h Handle) { f.mu.Lock() f.writers = append(f.writers, h) atomic.AddInt32(&f.nwriters, 1) - if _, ok := h.(*RWFileHandle); ok { - f.readWriters++ - } f.mu.Unlock() } // delWriter removes a write handle from the file -func (f *File) delWriter(h Handle, modifiedCacheFile bool) (lastWriterAndModified bool) { +func (f *File) delWriter(h Handle) { f.mu.Lock() defer f.applyPendingRename() defer f.mu.Unlock() @@ -286,51 +282,6 @@ func (f *File) delWriter(h Handle, modifiedCacheFile bool) (lastWriterAndModifie } else { fs.Debugf(f._path(), "File.delWriter couldn't find handle") } - if _, ok := h.(*RWFileHandle); ok { - f.readWriters-- - } - f.readWriterClosing = true - if modifiedCacheFile { - f.modified = true - } - lastWriterAndModified = len(f.writers) == 0 && f.modified - if lastWriterAndModified { - f.modified = false - } - return -} - -// addRWOpen should be called by ReadWriteHandle when they have -// actually opened the file for read or write. -func (f *File) addRWOpen() { - f.mu.Lock() - f.rwOpenCount++ - f.mu.Unlock() -} - -// delRWOpen should be called by ReadWriteHandle when they have closed -// an actually opene file for read or write. -func (f *File) delRWOpen() { - f.mu.Lock() - f.rwOpenCount-- - f.mu.Unlock() -} - -// rwOpens returns how many active open ReadWriteHandles there are. -// Note that file handles which are in pending open state aren't -// counted. -func (f *File) rwOpens() int { - f.mu.RLock() - defer f.mu.RUnlock() - return f.rwOpenCount -} - -// finishWriterClose resets the readWriterClosing flag -func (f *File) finishWriterClose() { - f.mu.Lock() - f.readWriterClosing = false - f.mu.Unlock() - f.applyPendingRename() } // activeWriters returns the number of writers on the file @@ -413,11 +364,6 @@ func (f *File) _applyPendingModTime() error { return errors.New("Cannot apply ModTime, file object is not available") } - // set the time of the file in the cache - if f.d.vfs.cache != nil { - f.d.vfs.cache.SetModTime(f._path(), f.pendingModTime) - } - // set the time of the object err := f.o.SetModTime(context.TODO(), f.pendingModTime) switch err { @@ -430,13 +376,18 @@ func (f *File) _applyPendingModTime() error { return err } + // set the time of the file in the cache + if f.d.vfs.cache != nil { + f.d.vfs.cache.SetModTime(f._path(), f.pendingModTime) + } + return nil } // _writingInProgress returns true of there are any open writers // Call with read lock held func (f *File) _writingInProgress() bool { - return f.o == nil || len(f.writers) != 0 || f.readWriterClosing + return f.o == nil || len(f.writers) != 0 } // Update the size while writing @@ -486,12 +437,11 @@ func (f *File) waitForValidObject() (o fs.Object, err error) { f.mu.RLock() o = f.o nwriters := len(f.writers) - wclosing := f.readWriterClosing f.mu.RUnlock() if o != nil { return o, nil } - if nwriters == 0 && !wclosing { + if nwriters == 0 { return nil, errors.New("can't open file - writer failed") } time.Sleep(100 * time.Millisecond) @@ -690,7 +640,7 @@ func (f *File) Open(flags int) (fd Handle, err error) { d := f.d f.mu.RUnlock() CacheMode := d.vfs.Opt.CacheMode - if CacheMode >= vfscommon.CacheModeMinimal && (d.vfs.cache.Opens(f.Path()) > 0 || d.vfs.cache.Exists(f.Path())) { + if CacheMode >= vfscommon.CacheModeMinimal && (d.vfs.cache.InUse(f.Path()) || d.vfs.cache.Exists(f.Path())) { fd, err = f.openRW(flags) } else if read && write { if CacheMode >= vfscommon.CacheModeMinimal { diff --git a/vfs/read_write.go b/vfs/read_write.go index 0eaef8155..19a18f445 100644 --- a/vfs/read_write.go +++ b/vfs/read_write.go @@ -1,18 +1,15 @@ package vfs import ( - "context" "fmt" "io" - "io/ioutil" "os" - "runtime" "sync" "github.com/pkg/errors" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/log" - "github.com/rclone/rclone/lib/file" + "github.com/rclone/rclone/vfs/vfscache" ) // RWFileHandle is a handle that can be open for read and write. @@ -20,21 +17,29 @@ import ( // It will be open to a temporary file which, when closed, will be // transferred to the remote. type RWFileHandle struct { + // read only variables + file *File + d *Dir + flags int // open flags + item *vfscache.Item // cached file item + + // read write variables protected by mutex mu sync.Mutex - fd *os.File offset int64 // file pointer offset - file *File - d *Dir - flags int // open flags - closed bool // set if handle has been closed + closed bool // set if handle has been closed opened bool writeCalled bool // if any Write() methods have been called - changed bool // file contents was changed in any other way } func newRWFileHandle(d *Dir, f *File, flags int) (fh *RWFileHandle, err error) { + defer log.Trace(f.Path(), "")("err=%v", &err) + // get an item to represent this from the cache + item := d.vfs.cache.Item(f.Path()) + + exists := f.exists() // || item.Exists() + // if O_CREATE and O_EXCL are set and if path already exists, then return EEXIST - if flags&(os.O_CREATE|os.O_EXCL) == os.O_CREATE|os.O_EXCL && f.exists() { + if flags&(os.O_CREATE|os.O_EXCL) == os.O_CREATE|os.O_EXCL && exists { return nil, EEXIST } @@ -42,130 +47,62 @@ func newRWFileHandle(d *Dir, f *File, flags int) (fh *RWFileHandle, err error) { file: f, d: d, flags: flags, + item: item, } - // mark the file as open in the cache - must be done before the mkdir - fh.d.VFS().cache.Open(fh.file.Path()) - - // Make a place for the file - _, err = d.VFS().cache.Mkdir(fh.file.Path()) - if err != nil { - fh.d.VFS().cache.Close(fh.file.Path()) - return nil, errors.Wrap(err, "open RW handle failed to make cache directory") - } - - rdwrMode := fh.flags & accessModeMask - if rdwrMode != os.O_RDONLY { - fh.file.addWriter(fh) - } - - // truncate or create files immediately to prepare the cache - if fh.flags&os.O_TRUNC != 0 || fh.flags&(os.O_CREATE) != 0 && !f.exists() { - if err := fh.openPending(false); err != nil { - fh.file.delWriter(fh, false) - return nil, err + // truncate immediately if O_TRUNC is set or O_CREATE is set and file doesn't exist + if !fh.readOnly() && (fh.flags&os.O_TRUNC != 0 || (fh.flags&os.O_CREATE != 0 && !(exists || item.Exists()))) { + err = fh.Truncate(0) + if err != nil { + return nil, errors.Wrap(err, "cache open with O_TRUNC: failed to truncate") } + // we definitely need to write back the item even if we don't write to it + item.Dirty() + } + + if !fh.readOnly() { + fh.file.addWriter(fh) } return fh, nil } +// readOnly returns whether flags say fh is read only +func (fh *RWFileHandle) readOnly() bool { + return (fh.flags & accessModeMask) == os.O_RDONLY +} + +// writeOnly returns whether flags say fh is write only +func (fh *RWFileHandle) writeOnly() bool { + return (fh.flags & accessModeMask) == os.O_WRONLY +} + // openPending opens the file if there is a pending open // // call with the lock held -func (fh *RWFileHandle) openPending(truncate bool) (err error) { +func (fh *RWFileHandle) openPending() (err error) { if fh.opened { return nil } + defer log.Trace(fh.logPrefix(), "")("err=%v", &err) fh.file.muRW.Lock() defer fh.file.muRW.Unlock() o := fh.file.getObject() + err = fh.item.Open(o) + if err != nil { + return errors.Wrap(err, "open RW handle failed to open cache file") + } - var fd *os.File - cacheFileOpenFlags := fh.flags - // if not truncating the file, need to read it first - if fh.flags&os.O_TRUNC == 0 && !truncate { - // If the remote object exists AND its cached file exists locally AND there are no - // other RW handles with it open, then attempt to update it. - if o != nil && fh.file.rwOpens() == 0 { - err = fh.d.VFS().cache.Check(context.TODO(), o, fh.file.Path()) - if err != nil { - return errors.Wrap(err, "open RW handle failed to check cache file") - } - } - - // try to open an existing cache file - fd, err = file.OpenFile(fh.file.osPath(), cacheFileOpenFlags&^os.O_CREATE, 0600) - if os.IsNotExist(err) { - // cache file does not exist, so need to fetch it if we have an object to fetch - // it from - if o != nil { - err = fh.d.VFS().cache.Fetch(context.TODO(), o, fh.file.Path()) - if err != nil { - cause := errors.Cause(err) - if cause != fs.ErrorObjectNotFound && cause != fs.ErrorDirNotFound { - // return any non NotFound errors - return errors.Wrap(err, "open RW handle failed to cache file") - } - // continue here with err=fs.Error{Object,Dir}NotFound - } - } - // if err == nil, then we have cached the file successfully, otherwise err is - // indicating some kind of non existent file/directory either - // os.IsNotExist(err) or fs.Error{Object,Dir}NotFound - if err != nil { - if fh.flags&os.O_CREATE != 0 { - // if the object wasn't found AND O_CREATE is set then - // ignore error as we are about to create the file - fh.file.setSize(0) - fh.changed = true - } else { - return errors.Wrap(err, "open RW handle failed to cache file") - } - } - } else if err != nil { - return errors.Wrap(err, "cache open file failed") - } else { - fs.Debugf(fh.logPrefix(), "Opened existing cached copy with flags=%s", decodeOpenFlags(fh.flags)) - } + size := fh._size() // update size in file and read size + if fh.flags&os.O_APPEND != 0 { + fh.offset = size + fs.Debugf(fh.logPrefix(), "open at offset %d", fh.offset) } else { - // Set the size to 0 since we are truncating and flag we need to write it back - fh.file.setSize(0) - fh.changed = true - if fh.flags&os.O_CREATE == 0 && fh.file.exists() { - // create an empty file if it exists on the source - err = ioutil.WriteFile(fh.file.osPath(), []byte{}, 0600) - if err != nil { - return errors.Wrap(err, "cache open failed to create zero length file") - } - } - // Windows doesn't seem to deal well with O_TRUNC and - // certain access modes so truncate the file if it - // exists in these cases. - if runtime.GOOS == "windows" && fh.flags&os.O_APPEND != 0 { - cacheFileOpenFlags &^= os.O_TRUNC - _, err = os.Stat(fh.file.osPath()) - if err == nil { - err = os.Truncate(fh.file.osPath(), 0) - if err != nil { - return errors.Wrap(err, "cache open failed to truncate") - } - } - } + fh.offset = 0 } - - if fd == nil { - fs.Debugf(fh.logPrefix(), "Opening cached copy with flags=%s", decodeOpenFlags(fh.flags)) - fd, err = file.OpenFile(fh.file.osPath(), cacheFileOpenFlags, 0600) - if err != nil { - return errors.Wrap(err, "cache open file failed") - } - } - fh.fd = fd fh.opened = true - fh.file.addRWOpen() fh.d.addObject(fh.file) // make sure the directory has this object in it now return nil } @@ -188,79 +125,16 @@ func (fh *RWFileHandle) Node() Node { return fh.file } -// Returns whether the file needs to be written back. -// -// If write hasn't been called and the file hasn't been changed in any other -// way we haven't modified it so we don't need to transfer it +// updateSize updates the size of the file if necessary // // Must be called with fh.mu held -func (fh *RWFileHandle) modified() bool { - if !fh.writeCalled && !fh.changed { - fs.Debugf(fh.logPrefix(), "not modified so not transferring") - return false +func (fh *RWFileHandle) updateSize() { + // If read only or not opened then ignore + if fh.readOnly() || !fh.opened { + return } - return true -} - -// flushWrites flushes any pending writes to cloud storage -// -// Must be called with fh.muRW held -func (fh *RWFileHandle) flushWrites(closeFile bool) error { - if fh.closed && !closeFile { - return nil - } - - rdwrMode := fh.flags & accessModeMask - writer := rdwrMode != os.O_RDONLY - - // If read only then return - if !fh.opened && rdwrMode == os.O_RDONLY { - return nil - } - - isCopied := false - if writer { - isCopied = fh.file.delWriter(fh, fh.modified()) - defer fh.file.finishWriterClose() - } - - // If we aren't creating or truncating the file then - // we haven't modified it so don't need to transfer it - if fh.flags&(os.O_CREATE|os.O_TRUNC) != 0 { - if err := fh.openPending(false); err != nil { - return err - } - } - - if writer && fh.opened { - fi, err := fh.fd.Stat() - if err != nil { - fs.Errorf(fh.logPrefix(), "Failed to stat cache file: %v", err) - } else { - fh.file.setSize(fi.Size()) - } - } - - // Close the underlying file - if fh.opened && closeFile { - err := fh.fd.Close() - if err != nil { - err = errors.Wrap(err, "failed to close cache file") - return err - } - } - - if isCopied { - o, err := fh.d.VFS().cache.Store(context.TODO(), fh.file.getObject(), fh.file.Path()) - if err != nil { - fs.Errorf(fh.logPrefix(), "%v", err) - return err - } - fh.file.setObject(o) - fs.Debugf(o, "transferred to remote") - } - - return nil + size := fh._size() + fh.file.setSize(size) } // close the file handle returning EBADF if it has been @@ -278,15 +152,19 @@ func (fh *RWFileHandle) close() (err error) { if fh.closed { return ECLOSED } - fh.closed = true - defer func() { - if fh.opened { - fh.file.delRWOpen() - } - fh.d.VFS().cache.Close(fh.file.Path()) - }() - return fh.flushWrites(true) + fh.closed = true + fh.updateSize() + if fh.opened { + err = fh.item.Close(fh.file.setObject) + fh.opened = false + } + + if !fh.readOnly() { + fh.file.delWriter(fh) + } + + return err } // Close closes the file @@ -301,37 +179,10 @@ func (fh *RWFileHandle) Close() error { // single opened file, Flush can be called multiple times. func (fh *RWFileHandle) Flush() error { fh.mu.Lock() - defer fh.mu.Unlock() - if !fh.opened { - return nil - } - if fh.closed { - fs.Debugf(fh.logPrefix(), "RWFileHandle.Flush nothing to do") - return nil - } - // fs.Debugf(fh.logPrefix(), "RWFileHandle.Flush") - if !fh.opened { - fs.Debugf(fh.logPrefix(), "RWFileHandle.Flush ignoring flush on unopened handle") - return nil - } - - // If Write hasn't been called then ignore the Flush - Release - // will pick it up - if !fh.writeCalled { - fs.Debugf(fh.logPrefix(), "RWFileHandle.Flush ignoring flush on unwritten handle") - return nil - } - - fh.file.muRW.Lock() - defer fh.file.muRW.Unlock() - err := fh.flushWrites(false) - - if err != nil { - fs.Errorf(fh.logPrefix(), "RWFileHandle.Flush error: %v", err) - } else { - // fs.Debugf(fh.logPrefix(), "RWFileHandle.Flush OK") - } - return err + fs.Debugf(fh.logPrefix(), "RWFileHandle.Flush") + fh.updateSize() + fh.mu.Unlock() + return nil } // Release is called when we are finished with the file handle @@ -341,35 +192,35 @@ func (fh *RWFileHandle) Flush() error { func (fh *RWFileHandle) Release() error { fh.mu.Lock() defer fh.mu.Unlock() + fs.Debugf(fh.logPrefix(), "RWFileHandle.Release") if fh.closed { - fs.Debugf(fh.logPrefix(), "RWFileHandle.Release nothing to do") + // Don't return an error if called twice return nil } - fs.Debugf(fh.logPrefix(), "RWFileHandle.Release closing") err := fh.close() if err != nil { fs.Errorf(fh.logPrefix(), "RWFileHandle.Release error: %v", err) - } else { - // fs.Debugf(fh.logPrefix(), "RWFileHandle.Release OK") } return err } -// _size returns the size of the underlying file +// _size returns the size of the underlying file and also sets it in +// the owning file // // call with the lock held -// -// FIXME what if a file was partially read in - this may return the wrong thing? -// FIXME need to make sure we extend the file to the maximum when creating it func (fh *RWFileHandle) _size() int64 { - if !fh.opened { - return fh.file.Size() - } - fi, err := fh.fd.Stat() + size, err := fh.item.GetSize() if err != nil { - return 0 + o := fh.file.getObject() + if o != nil { + size = o.Size() + } else { + fs.Errorf(fh.logPrefix(), "Couldn't read size of file") + size = 0 + } } - return fi.Size() + fh.file.setSize(size) + return size } // Size returns the size of the underlying file @@ -390,16 +241,20 @@ func (fh *RWFileHandle) Stat() (os.FileInfo, error) { // // call with lock held func (fh *RWFileHandle) _readAt(b []byte, off int64) (n int, err error) { + defer log.Trace(fh.logPrefix(), "size=%d, off=%d", len(b), off)("n=%d, err=%v", &n, &err) if fh.closed { return n, ECLOSED } - if fh.flags&accessModeMask == os.O_WRONLY { + if fh.writeOnly() { return n, EBADF } - if err = fh.openPending(false); err != nil { + if off >= fh._size() { + return n, io.EOF + } + if err = fh.openPending(); err != nil { return n, err } - return fh.fd.ReadAt(b, off) + return fh.item.ReadAt(b, off) } // ReadAt bytes from the file at off @@ -428,7 +283,7 @@ func (fh *RWFileHandle) Seek(offset int64, whence int) (ret int64, err error) { if !fh.opened && offset == 0 && whence != 2 { return 0, nil } - if err = fh.openPending(false); err != nil { + if err = fh.openPending(); err != nil { return ret, err } switch whence { @@ -444,32 +299,30 @@ func (fh *RWFileHandle) Seek(offset int64, whence int) (ret int64, err error) { // WriteAt bytes to the file at off func (fh *RWFileHandle) _writeAt(b []byte, off int64) (n int, err error) { + defer log.Trace(fh.logPrefix(), "size=%d, off=%d", len(b), off)("n=%d, err=%v", &n, &err) if fh.closed { return n, ECLOSED } - if fh.flags&accessModeMask == os.O_RDONLY { + if fh.readOnly() { return n, EBADF } - if err = fh.openPending(false); err != nil { + if err = fh.openPending(); err != nil { return n, err } + if fh.flags&os.O_APPEND != 0 { + // From open(2): Before each write(2), the file offset is + // positioned at the end of the file, as if with lseek(2). + size := fh._size() + fh.offset = size + off = fh.offset + } fh.writeCalled = true - - if fh.flags&os.O_APPEND != 0 { - // if append is set, call Write as WriteAt returns an error if append is set - n, err = fh.fd.Write(b) - } else { - n, err = fh.fd.WriteAt(b, off) - } + n, err = fh.item.WriteAt(b, off) if err != nil { return n, err } - fi, err := fh.fd.Stat() - if err != nil { - return n, errors.Wrap(err, "failed to stat cache file") - } - fh.file.setSize(fi.Size()) + _ = fh._size() return n, err } @@ -477,7 +330,11 @@ func (fh *RWFileHandle) _writeAt(b []byte, off int64) (n int, err error) { func (fh *RWFileHandle) WriteAt(b []byte, off int64) (n int, err error) { fh.mu.Lock() defer fh.mu.Unlock() - return fh._writeAt(b, off) + n, err = fh._writeAt(b, off) + if fh.flags&os.O_APPEND != 0 { + fh.offset += int64(n) + } + return n, err } // Write bytes to the file @@ -494,6 +351,17 @@ func (fh *RWFileHandle) WriteString(s string) (n int, err error) { return fh.Write([]byte(s)) } +// Truncate file to given size +// +// Call with mutex held +func (fh *RWFileHandle) _truncate(size int64) (err error) { + if size == fh._size() { + return nil + } + fh.file.setSize(size) + return fh.item.Truncate(size) +} + // Truncate file to given size func (fh *RWFileHandle) Truncate(size int64) (err error) { fh.mu.Lock() @@ -501,12 +369,10 @@ func (fh *RWFileHandle) Truncate(size int64) (err error) { if fh.closed { return ECLOSED } - if err = fh.openPending(size == 0); err != nil { + if err = fh.openPending(); err != nil { return err } - fh.changed = true - fh.file.setSize(size) - return fh.fd.Truncate(size) + return fh._truncate(size) } // Sync commits the current contents of the file to stable storage. Typically, @@ -521,10 +387,10 @@ func (fh *RWFileHandle) Sync() error { if !fh.opened { return nil } - if fh.flags&accessModeMask == os.O_RDONLY { + if fh.readOnly() { return nil } - return fh.fd.Sync() + return fh.item.Sync() } func (fh *RWFileHandle) logPrefix() string { @@ -549,7 +415,7 @@ func (fh *RWFileHandle) Chown(uid, gid int) error { // Fd returns the integer Unix file descriptor referencing the open file. func (fh *RWFileHandle) Fd() uintptr { - return fh.fd.Fd() + return 0xdeadbeef // FIXME } // Name returns the name of the file from the underlying Object. diff --git a/vfs/read_write_test.go b/vfs/read_write_test.go index c5753aa4e..b05dbe248 100644 --- a/vfs/read_write_test.go +++ b/vfs/read_write_test.go @@ -91,15 +91,9 @@ func TestRWFileHandleMethodsRead(t *testing.T) { // Size assert.Equal(t, int64(16), fh.Size()) - // No opens yet - assert.Equal(t, 0, fh.file.rwOpens()) - // Read 1 assert.Equal(t, "0", rwReadString(t, fh, 1)) - // Open after the read - assert.Equal(t, 1, fh.file.rwOpens()) - // Read remainder assert.Equal(t, "123456789abcdef", rwReadString(t, fh, 256)) @@ -124,9 +118,6 @@ func TestRWFileHandleMethodsRead(t *testing.T) { assert.Equal(t, nil, fh.Close()) assert.True(t, fh.closed) - // No opens again - assert.Equal(t, 0, fh.file.rwOpens()) - // Close again assert.Equal(t, ECLOSED, fh.Close()) } @@ -292,10 +283,6 @@ func TestRWFileHandleMethodsWrite(t *testing.T) { vfs, fh := rwHandleCreateWriteOnly(t, r) defer cleanup(t, r, vfs) - // 1 opens since we opened with O_CREATE and the file didn't - // exist in the cache - assert.Equal(t, 1, fh.file.rwOpens()) - // String assert.Equal(t, "file1 (rw)", fh.String()) assert.Equal(t, "", (*RWFileHandle)(nil).String()) @@ -323,9 +310,6 @@ func TestRWFileHandleMethodsWrite(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 5, n) - // Open after the write - assert.Equal(t, 1, fh.file.rwOpens()) - // Offset #2 assert.Equal(t, int64(5), offset()) assert.Equal(t, int64(5), node.Size()) @@ -356,9 +340,6 @@ func TestRWFileHandleMethodsWrite(t *testing.T) { // Close assert.NoError(t, fh.Close()) - // No opens again - assert.Equal(t, 0, fh.file.rwOpens()) - // Check double close err = fh.Close() assert.Equal(t, ECLOSED, err) @@ -388,7 +369,6 @@ func TestRWFileHandleWriteAt(t *testing.T) { assert.Equal(t, int64(0), offset()) assert.True(t, fh.opened) assert.False(t, fh.writeCalled) - assert.True(t, fh.changed) // Write the data n, err := fh.WriteAt([]byte("hello**"), 0) @@ -468,6 +448,7 @@ func TestRWFileHandleFlushWrite(t *testing.T) { n, err := fh.Write([]byte("hello")) assert.NoError(t, err) assert.Equal(t, 5, n) + assert.True(t, fh.opened) // Check Flush does not close file if write called err = fh.Flush() @@ -601,7 +582,7 @@ func testRWFileHandleOpenTest(t *testing.T, vfs *VFS, test *openTest) { // first try with file not existing _, err := vfs.Stat(fileName) - require.True(t, os.IsNotExist(err), test.what) + require.True(t, os.IsNotExist(err)) f, openNonExistentErr := vfs.OpenFile(fileName, test.flags, 0666) @@ -617,16 +598,16 @@ func testRWFileHandleOpenTest(t *testing.T, vfs *VFS, test *openTest) { // close err = f.Close() - require.NoError(t, err, test.what) + require.NoError(t, err) } // write the file f, err = vfs.OpenFile(fileName, os.O_WRONLY|os.O_CREATE, 0777) - require.NoError(t, err, test.what) + require.NoError(t, err) _, err = f.Write([]byte("hello")) - require.NoError(t, err, test.what) + require.NoError(t, err) err = f.Close() - require.NoError(t, err, test.what) + require.NoError(t, err) // then open file and try with file existing @@ -643,32 +624,32 @@ func testRWFileHandleOpenTest(t *testing.T, vfs *VFS, test *openTest) { // close err = f.Close() - require.NoError(t, err, test.what) + require.NoError(t, err) } // read the file f, err = vfs.OpenFile(fileName, os.O_RDONLY, 0) - require.NoError(t, err, test.what) + require.NoError(t, err) buf, err := ioutil.ReadAll(f) - require.NoError(t, err, test.what) + require.NoError(t, err) err = f.Close() - require.NoError(t, err, test.what) + require.NoError(t, err) contents := string(buf) // remove file node, err := vfs.Stat(fileName) - require.NoError(t, err, test.what) + require.NoError(t, err) err = node.Remove() - require.NoError(t, err, test.what) + require.NoError(t, err) // check - assert.Equal(t, test.openNonExistentErr, openNonExistentErr, "openNonExistentErr: %s: want=%v, got=%v", test.what, test.openNonExistentErr, openNonExistentErr) - assert.Equal(t, test.readNonExistentErr, readNonExistentErr, "readNonExistentErr: %s: want=%v, got=%v", test.what, test.readNonExistentErr, readNonExistentErr) - assert.Equal(t, test.writeNonExistentErr, writeNonExistentErr, "writeNonExistentErr: %s: want=%v, got=%v", test.what, test.writeNonExistentErr, writeNonExistentErr) - assert.Equal(t, test.openExistingErr, openExistingErr, "openExistingErr: %s: want=%v, got=%v", test.what, test.openExistingErr, openExistingErr) - assert.Equal(t, test.readExistingErr, readExistingErr, "readExistingErr: %s: want=%v, got=%v", test.what, test.readExistingErr, readExistingErr) - assert.Equal(t, test.writeExistingErr, writeExistingErr, "writeExistingErr: %s: want=%v, got=%v", test.what, test.writeExistingErr, writeExistingErr) - assert.Equal(t, test.contents, contents, test.what) + assert.Equal(t, test.openNonExistentErr, openNonExistentErr, "openNonExistentErr: want=%v, got=%v", test.openNonExistentErr, openNonExistentErr) + assert.Equal(t, test.readNonExistentErr, readNonExistentErr, "readNonExistentErr: want=%v, got=%v", test.readNonExistentErr, readNonExistentErr) + assert.Equal(t, test.writeNonExistentErr, writeNonExistentErr, "writeNonExistentErr: want=%v, got=%v", test.writeNonExistentErr, writeNonExistentErr) + assert.Equal(t, test.openExistingErr, openExistingErr, "openExistingErr: want=%v, got=%v", test.openExistingErr, openExistingErr) + assert.Equal(t, test.readExistingErr, readExistingErr, "readExistingErr: want=%v, got=%v", test.readExistingErr, readExistingErr) + assert.Equal(t, test.writeExistingErr, writeExistingErr, "writeExistingErr: want=%v, got=%v", test.writeExistingErr, writeExistingErr) + assert.Equal(t, test.contents, contents) } func TestRWFileHandleOpenTests(t *testing.T) { @@ -679,7 +660,9 @@ func TestRWFileHandleOpenTests(t *testing.T) { defer cleanup(t, r, vfs) for _, test := range openTests { - testRWFileHandleOpenTest(t, vfs, &test) + t.Run(test.what, func(t *testing.T) { + testRWFileHandleOpenTest(t, vfs, &test) + }) } } diff --git a/vfs/vfscache/cache.go b/vfs/vfscache/cache.go new file mode 100644 index 000000000..d1a1e1d36 --- /dev/null +++ b/vfs/vfscache/cache.go @@ -0,0 +1,513 @@ +// Package vfscache deals with caching of files locally for the VFS layer +package vfscache + +import ( + "context" + "fmt" + "os" + "path" + "path/filepath" + "runtime" + "sort" + "strings" + "sync" + "time" + + "github.com/pkg/errors" + "github.com/rclone/rclone/fs" + fscache "github.com/rclone/rclone/fs/cache" + "github.com/rclone/rclone/fs/config" + "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/fs/log" + "github.com/rclone/rclone/fs/operations" + "github.com/rclone/rclone/vfs/vfscommon" +) + +// NB as Cache and Item are tightly linked it is necessary to have a +// total lock ordering between them. So Cache.mu must always be +// taken before Item.mu to avoid deadlocks. +// +// Cache may call into Item but care is needed if Item calls Cache + +// FIXME size in cache needs to be size on disk if we have sparse files... + +// Cache opened files +type Cache struct { + // read only - no locking needed to read these + fremote fs.Fs // fs for the remote we are caching + fcache fs.Fs // fs for the cache directory + fcacheMeta fs.Fs // fs for the cache metadata directory + opt *vfscommon.Options // vfs Options + root string // root of the cache directory + metaRoot string // root of the cache metadata directory + hashType hash.Type // hash to use locally and remotely + hashOption *fs.HashesOption // corresponding OpenOption + + mu sync.Mutex // protects the following variables + item map[string]*Item // files/directories in the cache + used int64 // total size of files in the cache +} + +// New creates a new cache heirachy for fremote +// +// This starts background goroutines which can be cancelled with the +// context passed in. +func New(ctx context.Context, fremote fs.Fs, opt *vfscommon.Options) (*Cache, error) { + fRoot := filepath.FromSlash(fremote.Root()) + if runtime.GOOS == "windows" { + if strings.HasPrefix(fRoot, `\\?`) { + fRoot = fRoot[3:] + } + fRoot = strings.Replace(fRoot, ":", "", -1) + } + root := filepath.Join(config.CacheDir, "vfs", fremote.Name(), fRoot) + fs.Debugf(nil, "vfs cache root is %q", root) + metaRoot := filepath.Join(config.CacheDir, "vfsMeta", fremote.Name(), fRoot) + fs.Debugf(nil, "vfs metadata cache root is %q", root) + + fcache, err := fscache.Get(root) + if err != nil { + return nil, errors.Wrap(err, "failed to create cache remote") + } + fcacheMeta, err := fscache.Get(root) + if err != nil { + return nil, errors.Wrap(err, "failed to create cache meta remote") + } + + hashType, hashOption := operations.CommonHash(fcache, fremote) + + c := &Cache{ + fremote: fremote, + fcache: fcache, + fcacheMeta: fcacheMeta, + opt: opt, + root: root, + metaRoot: metaRoot, + item: make(map[string]*Item), + hashType: hashType, + hashOption: hashOption, + } + + // Make sure cache directories exist + _, err = c.mkdir("") + if err != nil { + return nil, errors.Wrap(err, "failed to make cache directory") + } + + // load in the cache and metadata off disk + err = c.reload() + if err != nil { + return nil, errors.Wrap(err, "failed to load cache") + } + + // Remove any empty directories + c.purgeEmptyDirs() + + go c.cleaner(ctx) + + return c, nil +} + +// clean returns the cleaned version of name for use in the index map +// +// name should be a remote path not an osPath +func clean(name string) string { + name = strings.Trim(name, "/") + name = path.Clean(name) + if name == "." || name == "/" { + name = "" + } + return name +} + +// toOSPath turns a remote relative name into an OS path in the cache +func (c *Cache) toOSPath(name string) string { + return filepath.Join(c.root, filepath.FromSlash(name)) +} + +// toOSPathMeta turns a remote relative name into an OS path in the +// cache for the metadata +func (c *Cache) toOSPathMeta(name string) string { + return filepath.Join(c.metaRoot, filepath.FromSlash(name)) +} + +// mkdir makes the directory for name in the cache and returns an os +// path for the file +func (c *Cache) mkdir(name string) (string, error) { + parent := vfscommon.FindParent(name) + leaf := filepath.Base(name) + parentPath := c.toOSPath(parent) + err := os.MkdirAll(parentPath, 0700) + if err != nil { + return "", errors.Wrap(err, "make cache directory failed") + } + parentPathMeta := c.toOSPathMeta(parent) + err = os.MkdirAll(parentPathMeta, 0700) + if err != nil { + return "", errors.Wrap(err, "make cache meta directory failed") + } + return filepath.Join(parentPath, leaf), nil +} + +// _get gets name from the cache or creates a new one +// +// It returns the item and found as to whether this item was found in +// the cache (or just created). +// +// name should be a remote path not an osPath +// +// must be called with mu held +func (c *Cache) _get(name string) (item *Item, found bool) { + item = c.item[name] + found = item != nil + if !found { + item = newItem(c, name) + c.item[name] = item + } + return item, found +} + +// put puts item under name in the cache +// +// It returns an old item if there was one or nil if not. +// +// name should be a remote path not an osPath +func (c *Cache) put(name string, item *Item) (oldItem *Item) { + name = clean(name) + c.mu.Lock() + oldItem = c.item[name] + if oldItem != item { + c.item[name] = item + } else { + oldItem = nil + } + c.mu.Unlock() + return oldItem +} + +// InUse returns whether the name is in use in the cache +// +// name should be a remote path not an osPath +func (c *Cache) InUse(name string) bool { + name = clean(name) + c.mu.Lock() + item := c.item[name] + c.mu.Unlock() + if item == nil { + return false + } + return item.inUse() +} + +// get gets a file name from the cache or creates a new one +// +// It returns the item and found as to whether this item was found in +// the cache (or just created). +// +// name should be a remote path not an osPath +func (c *Cache) get(name string) (item *Item, found bool) { + name = clean(name) + c.mu.Lock() + item, found = c._get(name) + c.mu.Unlock() + return item, found +} + +// Item gets a cache item for name +// +// To use it item.Open will need to be called +// +// name should be a remote path not an osPath +func (c *Cache) Item(name string) (item *Item) { + item, _ = c.get(name) + return item +} + +// Exists checks to see if the file exists in the cache or not +// +// FIXME check the metadata exists here too? +func (c *Cache) Exists(name string) bool { + osPath := c.toOSPath(name) + fi, err := os.Stat(osPath) + if err != nil { + return false + } + // checks for non-regular files (e.g. directories, symlinks, devices, etc.) + if !fi.Mode().IsRegular() { + return false + } + return true +} + +// rename with os.Rename and more checking +func rename(osOldPath, osNewPath string) error { + sfi, err := os.Stat(osOldPath) + if err != nil { + // Just do nothing if the source does not exist + if os.IsNotExist(err) { + return nil + } + return errors.Wrapf(err, "Failed to stat source: %s", osOldPath) + } + if !sfi.Mode().IsRegular() { + // cannot copy non-regular files (e.g., directories, symlinks, devices, etc.) + return errors.Errorf("Non-regular source file: %s (%q)", sfi.Name(), sfi.Mode().String()) + } + dfi, err := os.Stat(osNewPath) + if err != nil { + if !os.IsNotExist(err) { + return errors.Wrapf(err, "Failed to stat destination: %s", osNewPath) + } + parent := vfscommon.OsFindParent(osNewPath) + err = os.MkdirAll(parent, 0700) + if err != nil { + return errors.Wrapf(err, "Failed to create parent dir: %s", parent) + } + } else { + if !(dfi.Mode().IsRegular()) { + return errors.Errorf("Non-regular destination file: %s (%q)", dfi.Name(), dfi.Mode().String()) + } + if os.SameFile(sfi, dfi) { + return nil + } + } + if err = os.Rename(osOldPath, osNewPath); err != nil { + return errors.Wrapf(err, "Failed to rename in cache: %s to %s", osOldPath, osNewPath) + } + return nil +} + +// Rename the item in cache +func (c *Cache) Rename(name string, newName string, newObj fs.Object) (err error) { + item, _ := c.get(name) + err = item.rename(name, newName, newObj) + if err != nil { + return err + } + + // Move the item in the cache + c.mu.Lock() + if item, ok := c.item[name]; ok { + c.item[newName] = item + delete(c.item, name) + } + c.mu.Unlock() + + fs.Infof(name, "Renamed in cache to %q", newName) + return nil +} + +// Remove should be called if name is deleted +func (c *Cache) Remove(name string) { + item, _ := c.get(name) + item.remove("file deleted") +} + +// SetModTime should be called to set the modification time of the cache file +func (c *Cache) SetModTime(name string, modTime time.Time) { + item, _ := c.get(name) + item.setModTime(modTime) +} + +// CleanUp empties the cache of everything +func (c *Cache) CleanUp() error { + err1 := os.RemoveAll(c.root) + err2 := os.RemoveAll(c.metaRoot) + if err1 != nil { + return err1 + } + return err2 +} + +// walk walks the cache calling the function +func (c *Cache) walk(dir string, fn func(osPath string, fi os.FileInfo, name string) error) error { + return filepath.Walk(dir, func(osPath string, fi os.FileInfo, err error) error { + if err != nil { + return err + } + // Find path relative to the cache root + name, err := filepath.Rel(dir, osPath) + if err != nil { + return errors.Wrap(err, "filepath.Rel failed in walk") + } + if name == "." { + name = "" + } + // And convert into slashes + name = filepath.ToSlash(name) + + return fn(osPath, fi, name) + }) +} + +// reload walks the cache loading metadata files +func (c *Cache) reload() error { + err := c.walk(c.root, func(osPath string, fi os.FileInfo, name string) error { + if !fi.IsDir() { + _, _ = c.get(name) + } + return nil + }) + if err != nil { + return errors.Wrap(err, "failed to walk cache") + } + err = c.walk(c.root, func(osPathMeta string, fi os.FileInfo, name string) error { + if !fi.IsDir() { + _, _ = c.get(name) + } + return nil + }) + if err != nil { + return errors.Wrap(err, "failed to walk meta cache") + } + return err +} + +// purgeOld gets rid of any files that are over age +func (c *Cache) purgeOld(maxAge time.Duration) { + c._purgeOld(maxAge, func(item *Item) { + item.remove("too old") + }) +} + +func (c *Cache) _purgeOld(maxAge time.Duration, remove func(item *Item)) { + c.mu.Lock() + defer c.mu.Unlock() + cutoff := time.Now().Add(-maxAge) + for name, item := range c.item { + if !item.inUse() { + // If not locked and access time too long ago - delete the file + dt := item.getATime().Sub(cutoff) + // fs.Debugf(name, "atime=%v cutoff=%v, dt=%v", item.info.ATime, cutoff, dt) + if dt < 0 { + remove(item) + // Remove the entry + delete(c.item, name) + } + } + } +} + +// Purge any empty directories +func (c *Cache) purgeEmptyDirs() { + ctx := context.Background() + err := operations.Rmdirs(ctx, c.fcache, "", true) + if err != nil { + fs.Errorf(c.fcache, "Failed to remove empty directories from cache: %v", err) + } + err = operations.Rmdirs(ctx, c.fcacheMeta, "", true) + if err != nil { + fs.Errorf(c.fcache, "Failed to remove empty directories from metadata cache: %v", err) + } +} + +// Remove any files that are over quota starting from the +// oldest first +func (c *Cache) purgeOverQuota(quota int64) { + c._purgeOverQuota(quota, func(item *Item) { + item.remove("over quota") + }) +} + +// updateUsed updates c.used so it is accurate +func (c *Cache) updateUsed() { + c.mu.Lock() + defer c.mu.Unlock() + + newUsed := int64(0) + for _, item := range c.item { + newUsed += item.getDiskSize() + } + c.used = newUsed +} + +func (c *Cache) _purgeOverQuota(quota int64, remove func(item *Item)) { + c.updateUsed() + + c.mu.Lock() + defer c.mu.Unlock() + + if quota <= 0 || c.used < quota { + return + } + + var items Items + + // Make a slice of unused files + for _, item := range c.item { + if !item.inUse() { + items = append(items, item) + } + } + + sort.Sort(items) + + // Remove items until the quota is OK + for _, item := range items { + if c.used < quota { + break + } + c.used -= item.getDiskSize() + remove(item) + // Remove the entry + delete(c.item, item.name) + } +} + +// clean empties the cache of stuff if it can +func (c *Cache) clean() { + // Cache may be empty so end + _, err := os.Stat(c.root) + if os.IsNotExist(err) { + return + } + + c.mu.Lock() + oldItems, oldUsed := len(c.item), fs.SizeSuffix(c.used) + c.mu.Unlock() + + // Remove any files that are over age + c.purgeOld(c.opt.CacheMaxAge) + + // Now remove any files that are over quota starting from the + // oldest first + c.purgeOverQuota(int64(c.opt.CacheMaxSize)) + + // Stats + c.mu.Lock() + newItems, newUsed := len(c.item), fs.SizeSuffix(c.used) + c.mu.Unlock() + + fs.Infof(nil, "Cleaned the cache: objects %d (was %d), total size %v (was %v)", newItems, oldItems, newUsed, oldUsed) +} + +// cleaner calls clean at regular intervals +// +// doesn't return until context is cancelled +func (c *Cache) cleaner(ctx context.Context) { + if c.opt.CachePollInterval <= 0 { + fs.Debugf(nil, "Cache cleaning thread disabled because poll interval <= 0") + return + } + // Start cleaning the cache immediately + c.clean() + // Then every interval specified + timer := time.NewTicker(c.opt.CachePollInterval) + defer timer.Stop() + for { + select { + case <-timer.C: + c.clean() + case <-ctx.Done(): + fs.Debugf(nil, "cache cleaner exiting") + return + } + } +} + +// Check the local file is up to date in the cache +func (c *Cache) Check(ctx context.Context, o fs.Object, remote string) (err error) { + defer log.Trace(o, "remote=%q", remote)("err=%v", &err) + item, _ := c.get(remote) + return item.checkObject(o) +} diff --git a/vfs/vfscache/cache_test.go b/vfs/vfscache/cache_test.go new file mode 100644 index 000000000..b9a317093 --- /dev/null +++ b/vfs/vfscache/cache_test.go @@ -0,0 +1,472 @@ +package vfscache + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sort" + "testing" + "time" + + _ "github.com/rclone/rclone/backend/local" // import the local backend + "github.com/rclone/rclone/fstest" + "github.com/rclone/rclone/vfs/vfscommon" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestMain drives the tests +func TestMain(m *testing.M) { + fstest.TestMain(m) +} + +// convert c.item to a string +func itemAsString(c *Cache) []string { + c.mu.Lock() + defer c.mu.Unlock() + var out []string + for name, item := range c.item { + out = append(out, fmt.Sprintf("name=%q opens=%d size=%d", filepath.ToSlash(name), item.opens, item.info.Size)) + } + sort.Strings(out) + return out +} + +func TestCacheNew(t *testing.T) { + r := fstest.NewRun(t) + defer r.Finalise() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // FIXME need to be writing to the actual file here + // need a item.WriteAt/item.ReadAt method I think + + // Disable the cache cleaner as it interferes with these tests + opt := vfscommon.DefaultOpt + opt.CachePollInterval = 0 + c, err := New(ctx, r.Fremote, &opt) + require.NoError(t, err) + + assert.Contains(t, c.root, "vfs") + assert.Contains(t, c.fcache.Root(), filepath.Base(r.Fremote.Root())) + assert.Equal(t, []string(nil), itemAsString(c)) + + // mkdir + p, err := c.mkdir("potato") + require.NoError(t, err) + assert.Equal(t, "potato", filepath.Base(p)) + assert.Equal(t, []string(nil), itemAsString(c)) + + fi, err := os.Stat(filepath.Dir(p)) + require.NoError(t, err) + assert.True(t, fi.IsDir()) + + // get + item, _ := c.get("potato") + item2, _ := c.get("potato") + assert.Equal(t, item, item2) + assert.WithinDuration(t, time.Now(), item.info.ATime, time.Second) + + // open + assert.Equal(t, []string{ + `name="potato" opens=0 size=0`, + }, itemAsString(c)) + potato := c.Item("/potato") + require.NoError(t, potato.Open(nil)) + assert.Equal(t, []string{ + `name="potato" opens=1 size=0`, + }, itemAsString(c)) + assert.WithinDuration(t, time.Now(), potato.info.ATime, time.Second) + assert.Equal(t, 1, potato.opens) + + // write the file + require.NoError(t, potato.Truncate(5)) + atime := time.Now() + potato.info.ATime = atime + // err = ioutil.WriteFile(p, []byte("hello"), 0600) + // require.NoError(t, err) + + // read its atime + + // updateAtimes + //potato.ATime = time.Now().Add(-24 * time.Hour) + + assert.Equal(t, []string{ + `name="potato" opens=1 size=5`, + }, itemAsString(c)) + assert.True(t, atime.Equal(potato.info.ATime), fmt.Sprintf("%v != %v", atime, potato.info.ATime)) + + // try purging with file open + c.purgeOld(10 * time.Second) + // _, err = os.Stat(p) + // assert.NoError(t, err) + + // close + assert.Equal(t, []string{ + `name="potato" opens=1 size=5`, + }, itemAsString(c)) + require.NoError(t, potato.Truncate(6)) + assert.Equal(t, []string{ + `name="potato" opens=1 size=6`, + }, itemAsString(c)) + require.NoError(t, potato.Close(nil)) + assert.Equal(t, []string{ + `name="potato" opens=0 size=6`, + }, itemAsString(c)) + item, _ = c.get("potato") + assert.WithinDuration(t, time.Now(), item.info.ATime, time.Second) + assert.Equal(t, 0, item.opens) + + // try purging with file closed + c.purgeOld(10 * time.Second) + // ...nothing should happend + // _, err = os.Stat(p) + // assert.NoError(t, err) + + //.. purge again with -ve age + c.purgeOld(-10 * time.Second) + _, err = os.Stat(p) + assert.True(t, os.IsNotExist(err)) + + // clean - have tested the internals already + c.clean() + + // cleanup + err = c.CleanUp() + require.NoError(t, err) + _, err = os.Stat(c.root) + assert.True(t, os.IsNotExist(err)) +} + +func TestCacheOpens(t *testing.T) { + r := fstest.NewRun(t) + defer r.Finalise() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c, err := New(ctx, r.Fremote, &vfscommon.DefaultOpt) + require.NoError(t, err) + defer func() { require.NoError(t, c.CleanUp()) }() + + assert.Equal(t, []string(nil), itemAsString(c)) + potato := c.Item("potato") + require.NoError(t, potato.Open(nil)) + assert.Equal(t, []string{ + `name="potato" opens=1 size=0`, + }, itemAsString(c)) + require.NoError(t, potato.Open(nil)) + assert.Equal(t, []string{ + `name="potato" opens=2 size=0`, + }, itemAsString(c)) + require.NoError(t, potato.Close(nil)) + assert.Equal(t, []string{ + `name="potato" opens=1 size=0`, + }, itemAsString(c)) + require.NoError(t, potato.Close(nil)) + assert.Equal(t, []string{ + `name="potato" opens=0 size=0`, + }, itemAsString(c)) + + require.NoError(t, potato.Open(nil)) + a1 := c.Item("a//b/c/d/one") + a2 := c.Item("a/b/c/d/e/two") + a3 := c.Item("a/b/c/d/e/f/three") + require.NoError(t, a1.Open(nil)) + require.NoError(t, a2.Open(nil)) + require.NoError(t, a3.Open(nil)) + assert.Equal(t, []string{ + `name="a/b/c/d/e/f/three" opens=1 size=0`, + `name="a/b/c/d/e/two" opens=1 size=0`, + `name="a/b/c/d/one" opens=1 size=0`, + `name="potato" opens=1 size=0`, + }, itemAsString(c)) + require.NoError(t, potato.Close(nil)) + require.NoError(t, a1.Close(nil)) + require.NoError(t, a2.Close(nil)) + require.NoError(t, a3.Close(nil)) + assert.Equal(t, []string{ + `name="a/b/c/d/e/f/three" opens=0 size=0`, + `name="a/b/c/d/e/two" opens=0 size=0`, + `name="a/b/c/d/one" opens=0 size=0`, + `name="potato" opens=0 size=0`, + }, itemAsString(c)) +} + +// test the open, mkdir, purge, close, purge sequence +func TestCacheOpenMkdir(t *testing.T) { + r := fstest.NewRun(t) + defer r.Finalise() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Disable the cache cleaner as it interferes with these tests + opt := vfscommon.DefaultOpt + opt.CachePollInterval = 0 + c, err := New(ctx, r.Fremote, &opt) + require.NoError(t, err) + defer func() { require.NoError(t, c.CleanUp()) }() + + // open + potato := c.Item("sub/potato") + require.NoError(t, potato.Open(nil)) + + assert.Equal(t, []string{ + `name="sub/potato" opens=1 size=0`, + }, itemAsString(c)) + + // mkdir + p, err := c.mkdir("sub/potato") + require.NoError(t, err) + assert.Equal(t, "potato", filepath.Base(p)) + assert.Equal(t, []string{ + `name="sub/potato" opens=1 size=0`, + }, itemAsString(c)) + + // test directory exists + fi, err := os.Stat(filepath.Dir(p)) + require.NoError(t, err) + assert.True(t, fi.IsDir()) + + // clean the cache + c.purgeOld(-10 * time.Second) + + // test directory still exists + fi, err = os.Stat(filepath.Dir(p)) + require.NoError(t, err) + assert.True(t, fi.IsDir()) + + // close + require.NoError(t, potato.Close(nil)) + + assert.Equal(t, []string{ + `name="sub/potato" opens=0 size=0`, + }, itemAsString(c)) + + // clean the cache + c.purgeOld(-10 * time.Second) + c.purgeEmptyDirs() + + assert.Equal(t, []string(nil), itemAsString(c)) + + // FIXME test directory does not exist + // _, err = os.Stat(filepath.Dir(p)) + // require.True(t, os.IsNotExist(err)) +} + +func TestCachePurgeOld(t *testing.T) { + r := fstest.NewRun(t) + defer r.Finalise() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c, err := New(ctx, r.Fremote, &vfscommon.DefaultOpt) + require.NoError(t, err) + defer func() { require.NoError(t, c.CleanUp()) }() + + // Test funcs + var removed []string + //removedDir := true + removeFile := func(item *Item) { + removed = append(removed, item.name) + item._remove("TestCachePurgeOld") + } + // removeDir := func(name string) bool { + // if removedDir { + // removed = append(removed, filepath.ToSlash(name)+"/") + // } + // return removedDir + // } + + removed = nil + c._purgeOld(-10*time.Second, removeFile) + // FIXME c._purgeEmptyDirs(removeDir) + assert.Equal(t, []string(nil), removed) + + potato2 := c.Item("sub/dir2/potato2") + require.NoError(t, potato2.Open(nil)) + potato := c.Item("sub/dir/potato") + require.NoError(t, potato.Open(nil)) + require.NoError(t, potato2.Close(nil)) + require.NoError(t, potato.Open(nil)) + + assert.Equal(t, []string{ + `name="sub/dir/potato" opens=2 size=0`, + `name="sub/dir2/potato2" opens=0 size=0`, + }, itemAsString(c)) + + removed = nil + // removedDir = true + c._purgeOld(-10*time.Second, removeFile) + // FIXME c._purgeEmptyDirs(removeDir) + assert.Equal(t, []string{ + "sub/dir2/potato2", + }, removed) + + require.NoError(t, potato.Close(nil)) + + removed = nil + // removedDir = true + c._purgeOld(-10*time.Second, removeFile) + // FIXME c._purgeEmptyDirs(removeDir) + assert.Equal(t, []string(nil), removed) + + require.NoError(t, potato.Close(nil)) + + assert.Equal(t, []string{ + `name="sub/dir/potato" opens=0 size=0`, + }, itemAsString(c)) + + removed = nil + // removedDir = false + c._purgeOld(10*time.Second, removeFile) + // FIXME c._purgeEmptyDirs(removeDir) + assert.Equal(t, []string(nil), removed) + + assert.Equal(t, []string{ + `name="sub/dir/potato" opens=0 size=0`, + }, itemAsString(c)) + + removed = nil + // removedDir = true + c._purgeOld(-10*time.Second, removeFile) + // FIXME c._purgeEmptyDirs(removeDir) + assert.Equal(t, []string{ + "sub/dir/potato", + }, removed) + + assert.Equal(t, []string(nil), itemAsString(c)) +} + +func TestCachePurgeOverQuota(t *testing.T) { + r := fstest.NewRun(t) + defer r.Finalise() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Disable the cache cleaner as it interferes with these tests + opt := vfscommon.DefaultOpt + opt.CachePollInterval = 0 + c, err := New(ctx, r.Fremote, &opt) + require.NoError(t, err) + + // Test funcs + var removed []string + remove := func(item *Item) { + removed = append(removed, item.name) + item._remove("TestCachePurgeOverQuota") + } + + removed = nil + c._purgeOverQuota(-1, remove) + assert.Equal(t, []string(nil), removed) + + removed = nil + c._purgeOverQuota(0, remove) + assert.Equal(t, []string(nil), removed) + + removed = nil + c._purgeOverQuota(1, remove) + assert.Equal(t, []string(nil), removed) + + // Make some test files + potato := c.Item("sub/dir/potato") + require.NoError(t, potato.Open(nil)) + require.NoError(t, potato.Truncate(5)) + + potato2 := c.Item("sub/dir2/potato2") + require.NoError(t, potato2.Open(nil)) + require.NoError(t, potato2.Truncate(6)) + + assert.Equal(t, []string{ + `name="sub/dir/potato" opens=1 size=5`, + `name="sub/dir2/potato2" opens=1 size=6`, + }, itemAsString(c)) + + // Check nothing removed + removed = nil + c._purgeOverQuota(1, remove) + assert.Equal(t, []string(nil), removed) + + // Close the files + require.NoError(t, potato.Close(nil)) + require.NoError(t, potato2.Close(nil)) + + assert.Equal(t, []string{ + `name="sub/dir/potato" opens=0 size=5`, + `name="sub/dir2/potato2" opens=0 size=6`, + }, itemAsString(c)) + + // Update the stats to read the total size + c.updateUsed() + + // make potato2 definitely after potato + t1 := time.Now().Add(10 * time.Second) + require.NoError(t, potato2.Truncate(6)) + potato2.info.ATime = t1 + + // Check only potato removed to get below quota + removed = nil + c._purgeOverQuota(10, remove) + assert.Equal(t, []string{ + "sub/dir/potato", + }, removed) + assert.Equal(t, int64(6), c.used) + + assert.Equal(t, []string{ + `name="sub/dir2/potato2" opens=0 size=6`, + }, itemAsString(c)) + + // Put potato back + potato = c.Item("sub/dir/potato") + require.NoError(t, potato.Open(nil)) + require.NoError(t, potato.Truncate(5)) + require.NoError(t, potato.Close(nil)) + + // Update the stats to read the total size + c.updateUsed() + + assert.Equal(t, []string{ + `name="sub/dir/potato" opens=0 size=5`, + `name="sub/dir2/potato2" opens=0 size=6`, + }, itemAsString(c)) + + // make potato definitely after potato2 + t2 := t1.Add(20 * time.Second) + require.NoError(t, potato.Truncate(5)) + potato.info.ATime = t2 + + // Check only potato2 removed to get below quota + removed = nil + c._purgeOverQuota(10, remove) + assert.Equal(t, []string{ + "sub/dir2/potato2", + }, removed) + assert.Equal(t, int64(5), c.used) + c.purgeEmptyDirs() + + assert.Equal(t, []string{ + `name="sub/dir/potato" opens=0 size=5`, + }, itemAsString(c)) + + // Now purge everything + removed = nil + c._purgeOverQuota(1, remove) + assert.Equal(t, []string{ + "sub/dir/potato", + }, removed) + assert.Equal(t, int64(0), c.used) + c.purgeEmptyDirs() + + assert.Equal(t, []string(nil), itemAsString(c)) + + // Check nothing left behind + c.clean() + assert.Equal(t, int64(0), c.used) + assert.Equal(t, []string(nil), itemAsString(c)) +} diff --git a/vfs/vfscache/downloader.go b/vfs/vfscache/downloader.go new file mode 100644 index 000000000..ea16b45fd --- /dev/null +++ b/vfs/vfscache/downloader.go @@ -0,0 +1,310 @@ +package vfscache + +import ( + "context" + "io" + "io/ioutil" + "os" + "sync" + + "github.com/pkg/errors" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/accounting" + "github.com/rclone/rclone/fs/asyncreader" + "github.com/rclone/rclone/fs/log" + "github.com/rclone/rclone/fs/operations" + "github.com/rclone/rclone/lib/file" + "github.com/rclone/rclone/lib/ranges" + "github.com/rclone/rclone/lib/readers" +) + +// downloader represents a running download for a file +type downloader struct { + // write only + mu sync.Mutex + ctx context.Context + item *Item + src fs.Object // source object + fcache fs.Fs // destination Fs + osPath string + + // per download + out *os.File // file we are writing to + offset int64 // current offset + waiters []waiter + tr *accounting.Transfer + in *accounting.Account // input we are reading from + downloading bool // whether the download thread is running + finished chan struct{} // closed when download finished +} + +// waiter is a range we are waiting for and a channel to signal +type waiter struct { + r ranges.Range + errChan chan<- error +} + +func newDownloader(item *Item, fcache fs.Fs, remote string, src fs.Object) (dl *downloader, err error) { + defer log.Trace(src, "remote=%q", remote)("dl=%+v, err=%v", &dl, &err) + + dl = &downloader{ + ctx: context.Background(), + item: item, + src: src, + fcache: fcache, + osPath: item.c.toOSPath(remote), + } + + // make sure there is a cache file + _, err = os.Stat(dl.osPath) + if err == nil { + // do nothing + } else if os.IsNotExist(err) { + fs.Debugf(src, "creating empty file") + err = item._truncateToCurrentSize() + if err != nil { + return nil, errors.Wrap(err, "newDownloader: failed to create empty file") + } + } else { + return nil, errors.Wrap(err, "newDownloader: failed to stat cache file") + } + + return dl, nil +} + +// close any waiters with the error passed in +// +// call with lock held +func (dl *downloader) _closeWaiters(err error) { + for _, waiter := range dl.waiters { + waiter.errChan <- err + } + dl.waiters = nil +} + +// Write writes len(p) bytes from p to the underlying data stream. It +// returns the number of bytes written from p (0 <= n <= len(p)) and +// any error encountered that caused the write to stop early. Write +// must return a non-nil error if it returns n < len(p). Write must +// not modify the slice data, even temporarily. +// +// Implementations must not retain p. +func (dl *downloader) Write(p []byte) (n int, err error) { + defer log.Trace(dl.src, "p_len=%d", len(p))("n=%d, err=%v", &n, &err) + + var ( + // Range we wish to write + r = ranges.Range{Pos: dl.offset, Size: int64(len(p))} + curr ranges.Range + present bool + nn int + ) + + // Check to see what regions are already present + dl.mu.Lock() + defer dl.mu.Unlock() + dl.item.mu.Lock() + defer dl.item.mu.Unlock() + + // Write the range out ignoring already written chunks + // FIXME might stop downloading if we are ignoring chunks? + for err == nil && !r.IsEmpty() { + curr, r, present = dl.item.info.Rs.Find(r) + if curr.Pos != dl.offset { + return n, errors.New("internal error: offset of range is wrong") + } + if present { + // if present want to skip this range + fs.Debugf(dl.src, "skip chunk offset=%d size=%d", dl.offset, curr.Size) + nn = int(curr.Size) + _, err = dl.out.Seek(curr.Size, io.SeekCurrent) + if err != nil { + nn = 0 + } + } else { + // if range not present then we want to write it + fs.Debugf(dl.src, "write chunk offset=%d size=%d", dl.offset, curr.Size) + nn, err = dl.out.Write(p[:curr.Size]) + dl.item.info.Rs.Insert(ranges.Range{Pos: dl.offset, Size: int64(nn)}) + } + dl.offset += int64(nn) + p = p[nn:] + n += nn + } + if n > 0 { + if len(dl.waiters) > 0 { + newWaiters := dl.waiters[:0] + for _, waiter := range dl.waiters { + if dl.item.info.Rs.Present(waiter.r) { + waiter.errChan <- nil + } else { + newWaiters = append(newWaiters, waiter) + } + } + dl.waiters = newWaiters + } + } + if err != nil && err != io.EOF { + dl._closeWaiters(err) + } + return n, err +} + +// start the download running from offset +func (dl *downloader) start(offset int64) (err error) { + err = dl.open(offset) + if err != nil { + _ = dl.close(err) + return errors.Wrap(err, "failed to open downloader") + } + + go func() { + err := dl.download() + _ = dl.close(err) + if err != nil && errors.Cause(err) != asyncreader.ErrorStreamAbandoned { + fs.Errorf(dl.src, "Failed to download: %v", err) + // FIXME set an error here???? + } + }() + + return nil +} + +// open the file from offset +// +// should be called on a fresh downloader +func (dl *downloader) open(offset int64) (err error) { + defer log.Trace(dl.src, "offset=%d", offset)("err=%v", &err) + dl.finished = make(chan struct{}) + defer close(dl.finished) + dl.downloading = true + dl.tr = accounting.Stats(dl.ctx).NewTransfer(dl.src) + + size := dl.src.Size() + if size < 0 { + // FIXME should just completely download these + return errors.New("can't open unknown sized file") + } + + // FIXME hashType needs to ignore when --no-checksum is set too? Which is a VFS flag. + var rangeOption *fs.RangeOption + if offset > 0 { + rangeOption = &fs.RangeOption{Start: offset, End: size - 1} + } + in0, err := operations.NewReOpen(dl.ctx, dl.src, fs.Config.LowLevelRetries, dl.item.c.hashOption, rangeOption) + if err != nil { + return errors.Wrap(err, "vfs reader: failed to open source file") + } + dl.in = dl.tr.Account(in0).WithBuffer() // account and buffer the transfer + + dl.out, err = file.OpenFile(dl.osPath, os.O_CREATE|os.O_WRONLY, 0700) + if err != nil { + return errors.Wrap(err, "vfs reader: failed to open cache file") + } + + dl.offset = offset + + err = file.SetSparse(dl.out) + if err != nil { + fs.Debugf(dl.src, "vfs reader: failed to set as a sparse file: %v", err) + } + + _, err = dl.out.Seek(offset, io.SeekStart) + if err != nil { + return errors.Wrap(err, "vfs reader: failed to seek") + } + + // FIXME set mod time + // FIXME check checksums + + return nil +} + +var errStop = errors.New("vfs downloader: reading stopped") + +// stop the downloader if running and close everything +func (dl *downloader) stop() { + defer log.Trace(dl.src, "")("") + + dl.mu.Lock() + if !dl.downloading || dl.in == nil { + dl.mu.Unlock() + return + } + + // stop the downloader + dl.in.StopBuffering() + oldReader := dl.in.GetReader() + dl.in.UpdateReader(ioutil.NopCloser(readers.ErrorReader{Err: errStop})) + err := oldReader.Close() + if err != nil { + fs.Debugf(dl.src, "vfs downloader: stop close old failed: %v", err) + } + + dl.mu.Unlock() + + // wait for downloader to finish... + <-dl.finished +} + +func (dl *downloader) close(inErr error) (err error) { + defer log.Trace(dl.src, "inErr=%v", err)("err=%v", &err) + dl.stop() + dl.mu.Lock() + if dl.in != nil { + fs.CheckClose(dl.in, &err) + dl.in = nil + } + if dl.tr != nil { + dl.tr.Done(inErr) + dl.tr = nil + } + if dl.out != nil { + fs.CheckClose(dl.out, &err) + dl.out = nil + } + dl._closeWaiters(err) + dl.downloading = false + dl.mu.Unlock() + return nil +} + +/* +FIXME +need gating at all the Read/Write sites +need to pass in offset somehow and start the readfile off +need to end when offset is reached +need to be able to quit on demand +Need offset to be passed to NewReOpen +*/ +// fetch the (offset, size) block from the remote file +func (dl *downloader) download() (err error) { + defer log.Trace(dl.src, "")("err=%v", &err) + _, err = dl.in.WriteTo(dl) + if err != nil { + return errors.Wrap(err, "vfs reader: failed to write to cache file") + } + return nil +} + +// ensure the range is present +func (dl *downloader) ensure(r ranges.Range) (err error) { + defer log.Trace(dl.src, "r=%+v", r)("err=%v", &err) + errChan := make(chan error) + waiter := waiter{ + r: r, + errChan: errChan, + } + dl.mu.Lock() + // FIXME racey - might have finished here + dl.waiters = append(dl.waiters, waiter) + dl.mu.Unlock() + return <-errChan +} + +// ensure the range is present +func (dl *downloader) running() bool { + dl.mu.Lock() + defer dl.mu.Unlock() + return dl.downloading +} diff --git a/vfs/vfscache/item.go b/vfs/vfscache/item.go new file mode 100644 index 000000000..aaa848907 --- /dev/null +++ b/vfs/vfscache/item.go @@ -0,0 +1,858 @@ +package vfscache + +import ( + "context" + "encoding/json" + "fmt" + "os" + "sync" + "time" + + "github.com/pkg/errors" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/log" + "github.com/rclone/rclone/fs/operations" + "github.com/rclone/rclone/lib/file" + "github.com/rclone/rclone/lib/ranges" +) + +// NB as Cache and Item are tightly linked it is necessary to have a +// total lock ordering between them. So Cache.mu must always be +// taken before Item.mu to avoid deadlocks. +// +// Cache may call into Item but care is needed if Item calls Cache +// +// A lot of the Cache methods do not require locking, these include +// +// - Cache.toOSPath +// - Cache.toOSPathMeta +// - Cache.mkdir +// - Cache.objectFingerprint + +// NB Item and downloader are tightly linked so it is necessary to +// have a total lock ordering between them. downloader.mu must always +// be taken before Item.mu. downloader may call into Item but Item may +// **not** call downloader methods with Item.mu held, except for +// +// - downloader.running + +// Item is stored in the item map +// +// These are written to the backing store to store status +type Item struct { + // read only + c *Cache // cache this is part of + + mu sync.Mutex // protect the variables + name string // name in the VFS + opens int // number of times file is open + downloader *downloader // if the file is being downloaded to cache + o fs.Object // object we are caching - may be nil + fd *os.File // handle we are using to read and write to the file + metaDirty bool // set if the info needs writeback + info Info // info about the file to persist to backing store + +} + +// Info is persisted to backing store +type Info struct { + ModTime time.Time // last time file was modified + ATime time.Time // last time file was accessed + Size int64 // size of the file + Rs ranges.Ranges // which parts of the file are present + Fingerprint string // fingerprint of remote object + Dirty bool // set if the backing file has been modifed +} + +// Items are a slice of *Item ordered by ATime +type Items []*Item + +func (v Items) Len() int { return len(v) } +func (v Items) Swap(i, j int) { v[i], v[j] = v[j], v[i] } +func (v Items) Less(i, j int) bool { + if i == j { + return false + } + iItem := v[i] + jItem := v[j] + iItem.mu.Lock() + defer iItem.mu.Unlock() + jItem.mu.Lock() + defer jItem.mu.Unlock() + + return iItem.info.ATime.Before(jItem.info.ATime) +} + +// clean the item after its cache file has been deleted +func (info *Info) clean() { + *info = Info{} + info.ModTime = time.Now() + info.ATime = info.ModTime +} + +// StoreFn is called back with an object after it has been uploaded +type StoreFn func(fs.Object) + +// newItem returns an item for the cache +func newItem(c *Cache, name string) (item *Item) { + now := time.Now() + item = &Item{ + c: c, + name: name, + info: Info{ + ModTime: now, + ATime: now, + }, + } + + // check the cache file exists + osPath := c.toOSPath(name) + fi, statErr := os.Stat(osPath) + if statErr != nil { + if os.IsNotExist(statErr) { + item._removeMeta("cache file doesn't exist") + } else { + item._remove(fmt.Sprintf("failed to stat cache file: %v", statErr)) + } + } + + // Try to load the metadata + exists, err := item.load() + if !exists { + item._removeFile("metadata doesn't exist") + } else if err != nil { + item._remove(fmt.Sprintf("failed to load metadata: %v", err)) + } + + // Get size estimate (which is best we can do until Open() called) + if statErr == nil { + item.info.Size = fi.Size() + } + return item +} + +// inUse returns true if the item is open or dirty +func (item *Item) inUse() bool { + item.mu.Lock() + defer item.mu.Unlock() + return item.opens != 0 || item.metaDirty || item.info.Dirty +} + +// getATime returns the ATime of the item +func (item *Item) getATime() time.Time { + item.mu.Lock() + defer item.mu.Unlock() + return item.info.ATime +} + +// getDiskSize returns the size on disk (approximately) of the item +// +// We return the sizes of the chunks we have fetched, however there is +// likely to be some overhead which we are not taking into account. +func (item *Item) getDiskSize() int64 { + item.mu.Lock() + defer item.mu.Unlock() + return item.info.Rs.Size() +} + +// load reads an item from the disk or returns nil if not found +func (item *Item) load() (exists bool, err error) { + item.mu.Lock() + defer item.mu.Unlock() + osPathMeta := item.c.toOSPathMeta(item.name) // No locking in Cache + in, err := os.Open(osPathMeta) + if err != nil { + if os.IsNotExist(err) { + return false, err + } + return true, errors.Wrap(err, "vfs cache item: failed to read metadata") + } + defer fs.CheckClose(in, &err) + decoder := json.NewDecoder(in) + err = decoder.Decode(&item.info) + if err != nil { + return true, errors.Wrap(err, "vfs cache item: corrupt metadata") + } + item.metaDirty = false + return true, nil +} + +// save writes an item to the disk +// +// call with the lock held +func (item *Item) _save() (err error) { + osPathMeta := item.c.toOSPathMeta(item.name) // No locking in Cache + out, err := os.Create(osPathMeta) + if err != nil { + return errors.Wrap(err, "vfs cache item: failed to write metadata") + } + defer fs.CheckClose(out, &err) + encoder := json.NewEncoder(out) + encoder.SetIndent("", "\t") + err = encoder.Encode(item.info) + if err != nil { + return errors.Wrap(err, "vfs cache item: failed to encode metadata") + } + item.metaDirty = false + return nil +} + +// truncate the item to the given size, creating it if necessary +// +// this does not mark the object as dirty +// +// call with the lock held +func (item *Item) _truncate(size int64) (err error) { + if size < 0 { + // FIXME ignore unknown length files + return nil + } + + // Use open handle if available + fd := item.fd + if fd == nil { + osPath := item.c.toOSPath(item.name) // No locking in Cache + fd, err = file.OpenFile(osPath, os.O_CREATE|os.O_WRONLY, 0600) + if err != nil { + return errors.Wrap(err, "vfs item truncate: failed to open cache file") + } + + defer fs.CheckClose(fd, &err) + + err = file.SetSparse(fd) + if err != nil { + fs.Debugf(item.name, "vfs item truncate: failed to set as a sparse file: %v", err) + } + } + + fs.Debugf(item.name, "vfs cache: truncate to size=%d", size) + + err = fd.Truncate(size) + if err != nil { + return errors.Wrap(err, "vfs truncate: failed to truncate") + } + + item.info.Size = size + + return nil +} + +// Truncate the item to the current size, creating if necessary +// +// This does not mark the object as dirty +// +// call with the lock held +func (item *Item) _truncateToCurrentSize() (err error) { + size, err := item._getSize() + if err != nil && !os.IsNotExist(errors.Cause(err)) { + return errors.Wrap(err, "truncate to current size") + } + if size < 0 { + // FIXME ignore unknown length files + return nil + } + err = item._truncate(size) + if err != nil { + return err + } + return nil +} + +// Truncate the item to the given size, creating it if necessary +// +// If the new size is shorter than the existing size then the object +// will be shortened and marked as dirty. +// +// If the new size is longer than the old size then the object will be +// extended and the extended data will be filled with zeros. The +// object will be marked as dirty in this case also. +func (item *Item) Truncate(size int64) (err error) { + item.mu.Lock() + defer item.mu.Unlock() + + // Read old size + oldSize, err := item._getSize() + if err != nil { + if !os.IsNotExist(errors.Cause(err)) { + return errors.Wrap(err, "truncate failed to read size") + } + oldSize = 0 + } + + err = item._truncate(size) + if err != nil { + return err + } + + changed := true + if size > oldSize { + // Truncate extends the file in which case all new bytes are + // read as zeros. In this case we must show we have written to + // the new parts of the file. + item._written(oldSize, size) + } else if size < oldSize { + // Truncate shrinks the file so clip the downloaded ranges + item.info.Rs = item.info.Rs.Intersection(ranges.Range{Pos: 0, Size: size}) + } else { + changed = item.o == nil + } + if changed { + item._dirty() + } + + return nil +} + +// _getSize gets the current size of the item and updates item.info.Size +// +// Call with mutex held +func (item *Item) _getSize() (size int64, err error) { + var fi os.FileInfo + if item.fd != nil { + fi, err = item.fd.Stat() + } else { + osPath := item.c.toOSPath(item.name) // No locking in Cache + fi, err = os.Stat(osPath) + } + if err != nil { + if os.IsNotExist(err) && item.o != nil { + size = item.o.Size() + err = nil + } + } else { + size = fi.Size() + } + if err == nil { + item.info.Size = size + } + return size, err +} + +// GetSize gets the current size of the item +func (item *Item) GetSize() (size int64, err error) { + item.mu.Lock() + defer item.mu.Unlock() + return item._getSize() +} + +// _exists returns whether the backing file for the item exists or not +// +// call with mutex held +func (item *Item) _exists() bool { + osPath := item.c.toOSPath(item.name) // No locking in Cache + _, err := os.Stat(osPath) + return err == nil +} + +// Exists returns whether the backing file for the item exists or not +func (item *Item) Exists() bool { + item.mu.Lock() + defer item.mu.Unlock() + return item._exists() +} + +// _dirty marks the item as changed and needing writeback +// +// call with lock held +func (item *Item) _dirty() { + item.info.ModTime = time.Now() + item.info.ATime = item.info.ModTime + item.metaDirty = true + if !item.info.Dirty { + item.info.Dirty = true + err := item._save() + if err != nil { + fs.Errorf(item.name, "vfs cache: failed to save item info: %v", err) + } + } +} + +// Dirty marks the item as changed and needing writeback +func (item *Item) Dirty() { + item.mu.Lock() + item._dirty() + item.mu.Unlock() +} + +// Open the local file from the object passed in (which may be nil) +// which implies we are about to create the file +func (item *Item) Open(o fs.Object) (err error) { + defer log.Trace(o, "item=%p", item)("err=%v", &err) + item.mu.Lock() + defer item.mu.Unlock() + + item.info.ATime = time.Now() + item.opens++ + + osPath, err := item.c.mkdir(item.name) // No locking in Cache + if err != nil { + return errors.Wrap(err, "vfs cache item: open mkdir failed") + } + + err = item._checkObject(o) + if err != nil { + return errors.Wrap(err, "vfs cache item: check object failed") + } + + if item.opens != 1 { + return nil + } + if item.fd != nil { + return errors.New("vfs cache item: internal error: didn't Close file") + } + + fd, err := file.OpenFile(osPath, os.O_RDWR, 0600) + if err != nil { + return errors.Wrap(err, "vfs cache item: open failed") + } + err = file.SetSparse(fd) + if err != nil { + fs.Debugf(item.name, "vfs cache item: failed to set as a sparse file: %v", err) + } + item.fd = fd + + err = item._save() + if err != nil { + return err + } + + // Unlock the Item.mu so we can call some methods which take Cache.mu + item.mu.Unlock() + + // Ensure this item is in the cache. It is possible a cache + // expiry has run and removed the item if it had no opens so + // we put it back here. If there was an item with opens + // already then return an error. This shouldn't happen because + // there should only be one vfs.File with a pointer to this + // item in at a time. + oldItem := item.c.put(item.name, item) // LOCKING in Cache method + if oldItem != nil { + oldItem.mu.Lock() + if oldItem.opens != 0 { + // Put the item back and return an error + item.c.put(item.name, oldItem) // LOCKING in Cache method + err = errors.Errorf("internal error: item %q already open in the cache", item.name) + } + oldItem.mu.Unlock() + } + + // Relock the Item.mu for the return + item.mu.Lock() + + return err +} + +// Store stores the local cache file to the remote object, returning +// the new remote object. objOld is the old object if known. +// +// call with item lock held +func (item *Item) _store() (err error) { + defer log.Trace(item.name, "item=%p", item)("err=%v", &err) + ctx := context.Background() + + // Ensure any segments not transferred are brought in + err = item._ensure(0, item.info.Size) + if err != nil { + return errors.Wrap(err, "vfs cache: failed to download missing parts of cache file") + } + + // Transfer the temp file to the remote + cacheObj, err := item.c.fcache.NewObject(ctx, item.name) + if err != nil { + return errors.Wrap(err, "vfs cache: failed to find cache file") + } + + o, err := operations.Copy(ctx, item.c.fremote, item.o, item.name, cacheObj) + if err != nil { + return errors.Wrap(err, "vfs cache: failed to transfer file from cache to remote") + } + item.o = o + item._updateFingerprint() + return nil +} + +// Close the cache file +func (item *Item) Close(storeFn StoreFn) (err error) { + defer log.Trace(item.o, "Item.Close")("err=%v", &err) + var ( + downloader *downloader + o fs.Object + ) + // close downloader and set item with mutex unlocked + defer func() { + if downloader != nil { + closeErr := downloader.close(nil) + if closeErr != nil && err == nil { + err = closeErr + } + } + if err == nil && storeFn != nil && o != nil { + // Write the object back to the VFS layer + storeFn(item.o) + } + }() + item.mu.Lock() + defer item.mu.Unlock() + + item.info.ATime = time.Now() + item.opens-- + + if item.opens < 0 { + return os.ErrClosed + } else if item.opens > 0 { + return nil + } + + // Update the size on close + _, _ = item._getSize() + err = item._save() + if err != nil { + return errors.Wrap(err, "close failed to save item") + } + + // close the downloader + downloader = item.downloader + item.downloader = nil + + // close the file handle + if item.fd == nil { + return errors.New("vfs cache item: internal error: didn't Open file") + } + err = item.fd.Close() + item.fd = nil + + // if the item hasn't been changed but has been completed then + // set the modtime from the object otherwise set it from the info + if item._exists() { + if !item.info.Dirty && item.o != nil { + item._setModTime(item.o.ModTime(context.Background())) + } else { + item._setModTime(item.info.ModTime) + } + } + + // upload the file to backing store if changed + if item.info.Dirty { + fs.Debugf(item.name, "item changed - writeback") + err = item._store() + if err != nil { + fs.Errorf(item.name, "%v", err) + return err + } + fs.Debugf(item.o, "transferred to remote") + item.info.Dirty = false + o = item.o + } + + return err +} + +// check the fingerprint of an object and update the item or delete +// the cached file accordingly +// +// It ensures the file is the correct size for the object +// +// call with lock held +func (item *Item) _checkObject(o fs.Object) error { + if o == nil { + if item.info.Fingerprint != "" { + // no remote object && local object + // remove local object + item._remove("stale (remote deleted)") + } else { + // no remote object && no local object + // OK + } + } else { + remoteFingerprint := fs.Fingerprint(context.TODO(), o, false) + fs.Debugf(item.name, "vfs cache: checking remote fingerprint %q against cached fingerprint %q", remoteFingerprint, item.info.Fingerprint) + if item.info.Fingerprint != "" { + // remote object && local object + if remoteFingerprint != item.info.Fingerprint { + fs.Debugf(item.name, "vfs cache: removing cached entry as stale (remote fingerprint %q != cached fingerprint %q)", remoteFingerprint, item.info.Fingerprint) + item._remove("stale (remote is different)") + } + } else { + // remote object && no local object + // Set fingerprint + item.info.Fingerprint = remoteFingerprint + item.metaDirty = true + } + item.info.Size = o.Size() + } + item.o = o + + err := item._truncateToCurrentSize() + if err != nil { + return errors.Wrap(err, "vfs cache item: open truncate failed") + } + + return nil +} + +// check the fingerprint of an object and update the item or delete +// the cached file accordingly. +// +// It ensures the file is the correct size for the object +func (item *Item) checkObject(o fs.Object) error { + item.mu.Lock() + defer item.mu.Unlock() + return item._checkObject(o) +} + +// remove the cached file +// +// call with lock held +func (item *Item) _removeFile(reason string) { + osPath := item.c.toOSPath(item.name) // No locking in Cache + err := os.Remove(osPath) + if err != nil { + if !os.IsNotExist(err) { + fs.Errorf(item.name, "Failed to remove cache file as %s: %v", reason, err) + } + } else { + fs.Infof(item.name, "Removed cache file as %s", reason) + } +} + +// remove the metadata +// +// call with lock held +func (item *Item) _removeMeta(reason string) { + osPathMeta := item.c.toOSPathMeta(item.name) // No locking in Cache + err := os.Remove(osPathMeta) + if err != nil { + if !os.IsNotExist(err) { + fs.Errorf(item.name, "Failed to remove metadata from cache as %s: %v", reason, err) + } + } else { + fs.Infof(item.name, "Removed metadata from cache as %s", reason) + } +} + +// remove the cached file and empty the metadata +// +// call with lock held +func (item *Item) _remove(reason string) { + item.info.clean() + item.metaDirty = false + item._removeFile(reason) + item._removeMeta(reason) +} + +// remove the cached file and empty the metadata +func (item *Item) remove(reason string) { + item.mu.Lock() + item._remove(reason) + item.mu.Unlock() +} + +// create a downloader for the item +// +// call with item mutex held +func (item *Item) _newDownloader() (err error) { + // If no cached object then can't download + if item.o == nil { + return errors.New("vfs cache: internal error: tried to download nil object") + } + // If downloading the object already stop the downloader and restart it + if item.downloader != nil { + item.mu.Unlock() + _ = item.downloader.close(nil) + item.mu.Lock() + item.downloader = nil + } + item.downloader, err = newDownloader(item, item.c.fremote, item.name, item.o) + return err +} + +// _present returns true if the whole file has been downloaded +// +// call with the lock held +func (item *Item) _present() bool { + if item.downloader != nil && item.downloader.running() { + return false + } + return item.info.Rs.Present(ranges.Range{Pos: 0, Size: item.info.Size}) +} + +// ensure the range from offset, size is present in the backing file +// +// call with the item lock held +func (item *Item) _ensure(offset, size int64) (err error) { + defer log.Trace(item.name, "offset=%d, size=%d", offset, size)("err=%v", &err) + if offset+size > item.info.Size { + size = item.info.Size - offset + } + r := ranges.Range{Pos: offset, Size: size} + present := item.info.Rs.Present(r) + downloader := item.downloader + fs.Debugf(nil, "looking for range=%+v in %+v - present %v", r, item.info.Rs, present) + if present { + return nil + } + // FIXME pass in offset here to decide to seek? + err = item._newDownloader() + if err != nil { + return errors.Wrap(err, "Ensure: failed to start downloader") + } + downloader = item.downloader + if downloader == nil { + return errors.New("internal error: downloader is nil") + } + if !downloader.running() { + // FIXME need to make sure we start in the correct place because some of offset,size might exist + // FIXME this could stop an old download + item.mu.Unlock() + err = downloader.start(offset) + item.mu.Lock() + if err != nil { + return errors.Wrap(err, "Ensure: failed to run downloader") + } + } + item.mu.Unlock() + defer item.mu.Lock() + return item.downloader.ensure(r) +} + +// _written marks the (offset, size) as present in the backing file +// +// This is called by the downloader downloading file segments and the +// vfs layer writing to the file. +// +// call with lock held +func (item *Item) _written(offset, size int64) { + defer log.Trace(item.name, "offset=%d, size=%d", offset, size)("") + item.info.Rs.Insert(ranges.Range{Pos: offset, Size: offset + size}) + item.metaDirty = true +} + +// update the fingerprint of the object if any +// +// call with lock held +func (item *Item) _updateFingerprint() { + if item.o == nil { + return + } + oldFingerprint := item.info.Fingerprint + item.info.Fingerprint = fs.Fingerprint(context.TODO(), item.o, false) + if oldFingerprint != item.info.Fingerprint { + fs.Debugf(item.o, "fingerprint now %q", item.info.Fingerprint) + item.metaDirty = true + } +} + +// setModTime of the cache file +// +// call with lock held +func (item *Item) _setModTime(modTime time.Time) { + fs.Debugf(item.name, "vfs cache: setting modification time to %v", modTime) + osPath := item.c.toOSPath(item.name) // No locking in Cache + err := os.Chtimes(osPath, modTime, modTime) + if err != nil { + fs.Errorf(item.name, "Failed to set modification time of cached file: %v", err) + } +} + +// setModTime of the cache file and in the Item +func (item *Item) setModTime(modTime time.Time) { + defer log.Trace(item.name, "modTime=%v", modTime)("") + item.mu.Lock() + item._updateFingerprint() + item._setModTime(modTime) + item.mu.Unlock() +} + +// ReadAt bytes from the file at off +func (item *Item) ReadAt(b []byte, off int64) (n int, err error) { + item.mu.Lock() + if item.fd == nil { + item.mu.Unlock() + return 0, errors.New("vfs cache item ReadAt: internal error: didn't Open file") + } + err = item._ensure(off, int64(len(b))) + if err != nil { + item.mu.Unlock() + return n, err + } + item.info.ATime = time.Now() + item.mu.Unlock() + // Do the reading with Item.mu unlocked + return item.fd.ReadAt(b, off) +} + +// WriteAt bytes to the file at off +func (item *Item) WriteAt(b []byte, off int64) (n int, err error) { + item.mu.Lock() + if item.fd == nil { + item.mu.Unlock() + return 0, errors.New("vfs cache item WriteAt: internal error: didn't Open file") + } + item.mu.Unlock() + // Do the writing with Item.mu unlocked + n, err = item.fd.WriteAt(b, off) + item.mu.Lock() + item._written(off, int64(n)) + if n > 0 { + item._dirty() + } + item.mu.Unlock() + return n, err +} + +// Sync commits the current contents of the file to stable storage. Typically, +// this means flushing the file system's in-memory copy of recently written +// data to disk. +func (item *Item) Sync() (err error) { + item.mu.Lock() + defer item.mu.Unlock() + if item.fd == nil { + return errors.New("vfs cache item sync: internal error: didn't Open file") + } + // sync the file and the metadata to disk + err = item.fd.Sync() + if err != nil { + return errors.Wrap(err, "vfs cache item sync: failed to sync file") + } + err = item._save() + if err != nil { + return errors.Wrap(err, "vfs cache item sync: failed to sync metadata") + } + return nil +} + +// rename the item +func (item *Item) rename(name string, newName string, newObj fs.Object) (err error) { + var downloader *downloader + // close downloader with mutex unlocked + defer func() { + if downloader != nil { + _ = downloader.close(nil) + } + }() + + item.mu.Lock() + defer item.mu.Unlock() + + // stop downloader + downloader = item.downloader + item.downloader = nil + + // Set internal state + item.name = newName + item.o = newObj + + // Rename cache file if it exists + err = rename(item.c.toOSPath(name), item.c.toOSPath(newName)) // No locking in Cache + if err != nil { + return err + } + + // Rename meta file if it exists + err = rename(item.c.toOSPathMeta(name), item.c.toOSPathMeta(newName)) // No locking in Cache + if err != nil { + return err + } + + return nil +} diff --git a/vfs/vfscache/vfscache.go b/vfs/vfscache/vfscache.go deleted file mode 100644 index fc4938f7e..000000000 --- a/vfs/vfscache/vfscache.go +++ /dev/null @@ -1,600 +0,0 @@ -// Package vfscache deals with caching of files locally for the VFS layer -package vfscache - -import ( - "context" - "os" - "path" - "path/filepath" - "runtime" - "sort" - "strings" - "sync" - "time" - - "github.com/djherbis/times" - "github.com/pkg/errors" - "github.com/rclone/rclone/fs" - fscache "github.com/rclone/rclone/fs/cache" - "github.com/rclone/rclone/fs/config" - "github.com/rclone/rclone/fs/operations" - "github.com/rclone/rclone/vfs/vfscommon" -) - -// Cache opened files -type Cache struct { - fremote fs.Fs // fs for the remote we are caching - fcache fs.Fs // fs for the cache directory - opt *vfscommon.Options // vfs Options - root string // root of the cache directory - itemMu sync.Mutex // protects the following variables - item map[string]*cacheItem // files/directories in the cache - used int64 // total size of files in the cache -} - -// cacheItem is stored in the item map -type cacheItem struct { - opens int // number of times file is open - atime time.Time // last time file was accessed - isFile bool // if this is a file or a directory - size int64 // size of the cached item -} - -// newCacheItem returns an item for the cache -func newCacheItem(isFile bool) *cacheItem { - return &cacheItem{atime: time.Now(), isFile: isFile} -} - -// New creates a new cache heirachy for fremote -// -// This starts background goroutines which can be cancelled with the -// context passed in. -func New(ctx context.Context, fremote fs.Fs, opt *vfscommon.Options) (*Cache, error) { - fRoot := filepath.FromSlash(fremote.Root()) - if runtime.GOOS == "windows" { - if strings.HasPrefix(fRoot, `\\?`) { - fRoot = fRoot[3:] - } - fRoot = strings.Replace(fRoot, ":", "", -1) - } - root := filepath.Join(config.CacheDir, "vfs", fremote.Name(), fRoot) - fs.Debugf(nil, "vfs cache root is %q", root) - - fcache, err := fscache.Get(root) - if err != nil { - return nil, errors.Wrap(err, "failed to create cache remote") - } - - c := &Cache{ - fremote: fremote, - fcache: fcache, - opt: opt, - root: root, - item: make(map[string]*cacheItem), - } - - go c.cleaner(ctx) - - return c, nil -} - -// clean returns the cleaned version of name for use in the index map -// -// name should be a remote path not an osPath -func clean(name string) string { - name = strings.Trim(name, "/") - name = path.Clean(name) - if name == "." || name == "/" { - name = "" - } - return name -} - -// ToOSPath turns a remote relative name into an OS path in the cache -func (c *Cache) ToOSPath(name string) string { - return filepath.Join(c.root, filepath.FromSlash(name)) -} - -// Mkdir makes the directory for name in the cache and returns an os -// path for the file -// -// name should be a remote path not an osPath -func (c *Cache) Mkdir(name string) (string, error) { - parent := vfscommon.FindParent(name) - leaf := path.Base(name) - parentPath := c.ToOSPath(parent) - err := os.MkdirAll(parentPath, 0700) - if err != nil { - return "", errors.Wrap(err, "make cache directory failed") - } - c.cacheDir(parent) - return filepath.Join(parentPath, leaf), nil -} - -// _get gets name from the cache or creates a new one -// -// It returns the item and found as to whether this item was found in -// the cache (or just created). -// -// name should be a remote path not an osPath -// -// must be called with itemMu held -func (c *Cache) _get(isFile bool, name string) (item *cacheItem, found bool) { - item = c.item[name] - found = item != nil - if !found { - item = newCacheItem(isFile) - c.item[name] = item - } - return item, found -} - -// Opens returns the number of opens that are on the file -// -// name should be a remote path not an osPath -func (c *Cache) Opens(name string) int { - name = clean(name) - c.itemMu.Lock() - defer c.itemMu.Unlock() - item := c.item[name] - if item == nil { - return 0 - } - return item.opens -} - -// get gets name from the cache or creates a new one -// -// name should be a remote path not an osPath -func (c *Cache) get(name string) *cacheItem { - name = clean(name) - c.itemMu.Lock() - item, _ := c._get(true, name) - c.itemMu.Unlock() - return item -} - -// updateStat sets the atime of the name to that passed in if it is -// newer than the existing or there isn't an existing time. -// -// it also sets the size -// -// name should be a remote path not an osPath -func (c *Cache) updateStat(name string, when time.Time, size int64) { - name = clean(name) - c.itemMu.Lock() - item, found := c._get(true, name) - if !found || when.Sub(item.atime) > 0 { - fs.Debugf(name, "updateTime: setting atime to %v", when) - item.atime = when - } - item.size = size - c.itemMu.Unlock() -} - -// _open marks name as open, must be called with the lock held -// -// name should be a remote path not an osPath -func (c *Cache) _open(isFile bool, name string) { - for { - item, _ := c._get(isFile, name) - item.opens++ - item.atime = time.Now() - if name == "" { - break - } - isFile = false - name = vfscommon.FindParent(name) - } -} - -// Open marks name as open -// -// name should be a remote path not an osPath -func (c *Cache) Open(name string) { - name = clean(name) - c.itemMu.Lock() - c._open(true, name) - c.itemMu.Unlock() -} - -// cacheDir marks a directory and its parents as being in the cache -// -// name should be a remote path not an osPath -func (c *Cache) cacheDir(name string) { - name = clean(name) - c.itemMu.Lock() - defer c.itemMu.Unlock() - for { - item := c.item[name] - if item != nil { - break - } - c.item[name] = newCacheItem(false) - if name == "" { - break - } - name = vfscommon.FindParent(name) - } -} - -// Exists checks to see if the file exists in the cache or not -func (c *Cache) Exists(name string) bool { - osPath := c.ToOSPath(name) - fi, err := os.Stat(osPath) - if err != nil { - return false - } - // checks for non-regular files (e.g. directories, symlinks, devices, etc.) - if !fi.Mode().IsRegular() { - return false - } - return true -} - -// Rename the file in cache -func (c *Cache) Rename(name string, newName string) (err error) { - osOldPath := c.ToOSPath(name) - osNewPath := c.ToOSPath(newName) - sfi, err := os.Stat(osOldPath) - if err != nil { - return errors.Wrapf(err, "Failed to stat source: %s", osOldPath) - } - if !sfi.Mode().IsRegular() { - // cannot copy non-regular files (e.g., directories, symlinks, devices, etc.) - return errors.Errorf("Non-regular source file: %s (%q)", sfi.Name(), sfi.Mode().String()) - } - dfi, err := os.Stat(osNewPath) - if err != nil { - if !os.IsNotExist(err) { - return errors.Wrapf(err, "Failed to stat destination: %s", osNewPath) - } - parent := vfscommon.OsFindParent(osNewPath) - err = os.MkdirAll(parent, 0700) - if err != nil { - return errors.Wrapf(err, "Failed to create parent dir: %s", parent) - } - } else { - if !(dfi.Mode().IsRegular()) { - return errors.Errorf("Non-regular destination file: %s (%q)", dfi.Name(), dfi.Mode().String()) - } - if os.SameFile(sfi, dfi) { - return nil - } - } - if err = os.Rename(osOldPath, osNewPath); err != nil { - return errors.Wrapf(err, "Failed to rename in cache: %s to %s", osOldPath, osNewPath) - } - // Rename the cache item - c.itemMu.Lock() - if oldItem, ok := c.item[name]; ok { - c.item[newName] = oldItem - delete(c.item, name) - } - c.itemMu.Unlock() - fs.Infof(name, "Renamed in cache") - return nil -} - -// _close marks name as closed - must be called with the lock held -func (c *Cache) _close(isFile bool, name string) { - for { - item, _ := c._get(isFile, name) - item.opens-- - item.atime = time.Now() - if item.opens < 0 { - fs.Errorf(name, "cache: double close") - } - osPath := c.ToOSPath(name) - fi, err := os.Stat(osPath) - // Update the size on close - if err == nil && !fi.IsDir() { - item.size = fi.Size() - } - if name == "" { - break - } - isFile = false - name = vfscommon.FindParent(name) - } -} - -// Close marks name as closed -// -// name should be a remote path not an osPath -func (c *Cache) Close(name string) { - name = clean(name) - c.itemMu.Lock() - c._close(true, name) - c.itemMu.Unlock() -} - -// Remove should be called if name is deleted -func (c *Cache) Remove(name string) { - osPath := c.ToOSPath(name) - err := os.Remove(osPath) - if err != nil && !os.IsNotExist(err) { - fs.Errorf(name, "Failed to remove from cache: %v", err) - } else { - fs.Infof(name, "Removed from cache") - } -} - -// removeDir should be called if dir is deleted and returns true if -// the directory is gone. -func (c *Cache) removeDir(dir string) bool { - osPath := c.ToOSPath(dir) - err := os.Remove(osPath) - if err == nil || os.IsNotExist(err) { - if err == nil { - fs.Debugf(dir, "Removed empty directory") - } - return true - } - if !os.IsExist(err) { - fs.Errorf(dir, "Failed to remove cached dir: %v", err) - } - return false -} - -// SetModTime should be called to set the modification time of the cache file -func (c *Cache) SetModTime(name string, modTime time.Time) { - osPath := c.ToOSPath(name) - err := os.Chtimes(osPath, modTime, modTime) - if err != nil { - fs.Errorf(name, "Failed to set modification time of cached file: %v", err) - } -} - -// CleanUp empties the cache of everything -func (c *Cache) CleanUp() error { - return os.RemoveAll(c.root) -} - -// walk walks the cache calling the function -func (c *Cache) walk(fn func(osPath string, fi os.FileInfo, name string) error) error { - return filepath.Walk(c.root, func(osPath string, fi os.FileInfo, err error) error { - if err != nil { - return err - } - // Find path relative to the cache root - name, err := filepath.Rel(c.root, osPath) - if err != nil { - return errors.Wrap(err, "filepath.Rel failed in walk") - } - if name == "." { - name = "" - } - // And convert into slashes - name = filepath.ToSlash(name) - - return fn(osPath, fi, name) - }) -} - -// updateStats walks the cache updating any atimes and sizes it finds -// -// it also updates used -func (c *Cache) updateStats() error { - var newUsed int64 - err := c.walk(func(osPath string, fi os.FileInfo, name string) error { - if !fi.IsDir() { - // Update the atime with that of the file - atime := times.Get(fi).AccessTime() - c.updateStat(name, atime, fi.Size()) - newUsed += fi.Size() - } else { - c.cacheDir(name) - } - return nil - }) - c.itemMu.Lock() - c.used = newUsed - c.itemMu.Unlock() - return err -} - -// purgeOld gets rid of any files that are over age -func (c *Cache) purgeOld(maxAge time.Duration) { - c._purgeOld(maxAge, c.Remove) -} - -func (c *Cache) _purgeOld(maxAge time.Duration, remove func(name string)) { - c.itemMu.Lock() - defer c.itemMu.Unlock() - cutoff := time.Now().Add(-maxAge) - for name, item := range c.item { - if item.isFile && item.opens == 0 { - // If not locked and access time too long ago - delete the file - dt := item.atime.Sub(cutoff) - // fs.Debugf(name, "atime=%v cutoff=%v, dt=%v", item.atime, cutoff, dt) - if dt < 0 { - remove(name) - // Remove the entry - delete(c.item, name) - } - } - } -} - -// Purge any empty directories -func (c *Cache) purgeEmptyDirs() { - c._purgeEmptyDirs(c.removeDir) -} - -func (c *Cache) _purgeEmptyDirs(removeDir func(name string) bool) { - c.itemMu.Lock() - defer c.itemMu.Unlock() - var dirs []string - for name, item := range c.item { - if !item.isFile && item.opens == 0 { - dirs = append(dirs, name) - } - } - // remove empty directories in reverse alphabetical order - sort.Strings(dirs) - for i := len(dirs) - 1; i >= 0; i-- { - dir := dirs[i] - // Remove the entry - if removeDir(dir) { - delete(c.item, dir) - } - } -} - -// This is a cacheItem with a name for sorting -type cacheNamedItem struct { - name string - item *cacheItem -} -type cacheNamedItems []cacheNamedItem - -func (v cacheNamedItems) Len() int { return len(v) } -func (v cacheNamedItems) Swap(i, j int) { v[i], v[j] = v[j], v[i] } -func (v cacheNamedItems) Less(i, j int) bool { return v[i].item.atime.Before(v[j].item.atime) } - -// Remove any files that are over quota starting from the -// oldest first -func (c *Cache) purgeOverQuota(quota int64) { - c._purgeOverQuota(quota, c.Remove) -} - -func (c *Cache) _purgeOverQuota(quota int64, remove func(name string)) { - c.itemMu.Lock() - defer c.itemMu.Unlock() - - if quota <= 0 || c.used < quota { - return - } - - var items cacheNamedItems - - // Make a slice of unused files - for name, item := range c.item { - if item.isFile && item.opens == 0 { - items = append(items, cacheNamedItem{ - name: name, - item: item, - }) - } - } - sort.Sort(items) - - // Remove items until the quota is OK - for _, item := range items { - if c.used < quota { - break - } - remove(item.name) - // Remove the entry - delete(c.item, item.name) - c.used -= item.item.size - } -} - -// clean empties the cache of stuff if it can -func (c *Cache) clean() { - // Cache may be empty so end - _, err := os.Stat(c.root) - if os.IsNotExist(err) { - return - } - - c.itemMu.Lock() - oldItems, oldUsed := len(c.item), fs.SizeSuffix(c.used) - c.itemMu.Unlock() - - // first walk the FS to update the atimes and sizes - err = c.updateStats() - if err != nil { - fs.Errorf(nil, "Error traversing cache %q: %v", c.root, err) - } - - // Remove any files that are over age - c.purgeOld(c.opt.CacheMaxAge) - - // Now remove any files that are over quota starting from the - // oldest first - c.purgeOverQuota(int64(c.opt.CacheMaxSize)) - - // Remove any empty directories - c.purgeEmptyDirs() - - // Stats - c.itemMu.Lock() - newItems, newUsed := len(c.item), fs.SizeSuffix(c.used) - c.itemMu.Unlock() - - fs.Infof(nil, "Cleaned the cache: objects %d (was %d), total size %v (was %v)", newItems, oldItems, newUsed, oldUsed) -} - -// cleaner calls clean at regular intervals -// -// doesn't return until context is cancelled -func (c *Cache) cleaner(ctx context.Context) { - if c.opt.CachePollInterval <= 0 { - fs.Debugf(nil, "Cache cleaning thread disabled because poll interval <= 0") - return - } - // Start cleaning the cache immediately - c.clean() - // Then every interval specified - timer := time.NewTicker(c.opt.CachePollInterval) - defer timer.Stop() - for { - select { - case <-timer.C: - c.clean() - case <-ctx.Done(): - fs.Debugf(nil, "cache cleaner exiting") - return - } - } -} - -// copy an object to or from the remote while accounting for it -func copyObj(f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) { - if operations.NeedTransfer(context.TODO(), dst, src) { - newDst, err = operations.Copy(context.TODO(), f, dst, remote, src) - } else { - newDst = dst - } - return newDst, err -} - -// Check the local file is up to date in the cache -func (c *Cache) Check(ctx context.Context, o fs.Object, remote string) error { - cacheObj, err := c.fcache.NewObject(ctx, remote) - if err == nil && cacheObj != nil { - _, err = copyObj(c.fcache, cacheObj, remote, o) - if err != nil { - return errors.Wrap(err, "failed to update cached file") - } - } - return nil -} - -// Fetch fetches the object to the cache file -func (c *Cache) Fetch(ctx context.Context, o fs.Object, remote string) error { - _, err := copyObj(c.fcache, nil, remote, o) - return err -} - -// Store stores the local cache file to the remote object, returning -// the new remote object. objOld is the old object if known. -func (c *Cache) Store(ctx context.Context, objOld fs.Object, remote string) (fs.Object, error) { - // Transfer the temp file to the remote - cacheObj, err := c.fcache.NewObject(ctx, remote) - if err != nil { - return nil, errors.Wrap(err, "failed to find cache file") - } - - if objOld != nil { - remote = objOld.Remote() // use the path of the actual object if available - } - o, err := copyObj(c.fremote, objOld, remote, cacheObj) - if err != nil { - return nil, errors.Wrap(err, "failed to transfer file from cache to remote") - } - return o, nil -} diff --git a/vfs/vfscache/vfscache_test.go b/vfs/vfscache/vfscache_test.go deleted file mode 100644 index 1980871a7..000000000 --- a/vfs/vfscache/vfscache_test.go +++ /dev/null @@ -1,602 +0,0 @@ -package vfscache - -import ( - "context" - "fmt" - "io/ioutil" - "os" - "path/filepath" - "sort" - "testing" - "time" - - "github.com/djherbis/times" - _ "github.com/rclone/rclone/backend/local" // import the local backend - "github.com/rclone/rclone/fstest" - "github.com/rclone/rclone/vfs/vfscommon" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -// TestMain drives the tests -func TestMain(m *testing.M) { - fstest.TestMain(m) -} - -// convert c.item to a string -func itemAsString(c *Cache) []string { - c.itemMu.Lock() - defer c.itemMu.Unlock() - var out []string - for name, item := range c.item { - out = append(out, fmt.Sprintf("name=%q isFile=%v opens=%d size=%d", filepath.ToSlash(name), item.isFile, item.opens, item.size)) - } - sort.Strings(out) - return out -} - -func TestCacheNew(t *testing.T) { - r := fstest.NewRun(t) - defer r.Finalise() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Disable the cache cleaner as it interferes with these tests - opt := vfscommon.DefaultOpt - opt.CachePollInterval = 0 - c, err := New(ctx, r.Fremote, &opt) - require.NoError(t, err) - - assert.Contains(t, c.root, "vfs") - assert.Contains(t, c.fcache.Root(), filepath.Base(r.Fremote.Root())) - assert.Equal(t, []string(nil), itemAsString(c)) - - // mkdir - p, err := c.Mkdir("potato") - require.NoError(t, err) - assert.Equal(t, "potato", filepath.Base(p)) - assert.Equal(t, []string{ - `name="" isFile=false opens=0 size=0`, - }, itemAsString(c)) - - fi, err := os.Stat(filepath.Dir(p)) - require.NoError(t, err) - assert.True(t, fi.IsDir()) - - // get - item := c.get("potato") - item2 := c.get("potato") - assert.Equal(t, item, item2) - assert.WithinDuration(t, time.Now(), item.atime, time.Second) - - // updateTime - //.. before - t1 := time.Now().Add(-60 * time.Minute) - c.updateStat("potato", t1, 0) - item = c.get("potato") - assert.NotEqual(t, t1, item.atime) - assert.Equal(t, 0, item.opens) - //..after - t2 := time.Now().Add(60 * time.Minute) - c.updateStat("potato", t2, 0) - item = c.get("potato") - assert.Equal(t, t2, item.atime) - assert.Equal(t, 0, item.opens) - - // open - assert.Equal(t, []string{ - `name="" isFile=false opens=0 size=0`, - `name="potato" isFile=true opens=0 size=0`, - }, itemAsString(c)) - c.Open("/potato") - assert.Equal(t, []string{ - `name="" isFile=false opens=1 size=0`, - `name="potato" isFile=true opens=1 size=0`, - }, itemAsString(c)) - item = c.get("potato") - assert.WithinDuration(t, time.Now(), item.atime, time.Second) - assert.Equal(t, 1, item.opens) - - // write the file - err = ioutil.WriteFile(p, []byte("hello"), 0600) - require.NoError(t, err) - - // read its atime - fi, err = os.Stat(p) - assert.NoError(t, err) - atime := times.Get(fi).AccessTime() - - // updateAtimes - item = c.get("potato") - item.atime = time.Now().Add(-24 * time.Hour) - err = c.updateStats() - require.NoError(t, err) - assert.Equal(t, []string{ - `name="" isFile=false opens=1 size=0`, - `name="potato" isFile=true opens=1 size=5`, - }, itemAsString(c)) - item = c.get("potato") - assert.Equal(t, atime, item.atime) - - // updateAtimes - not in the cache - oldItem := item - c.itemMu.Lock() - delete(c.item, "potato") // remove from cache - c.itemMu.Unlock() - err = c.updateStats() - require.NoError(t, err) - assert.Equal(t, []string{ - `name="" isFile=false opens=1 size=0`, - `name="potato" isFile=true opens=0 size=5`, - }, itemAsString(c)) - item = c.get("potato") - assert.Equal(t, atime, item.atime) - c.itemMu.Lock() - c.item["potato"] = oldItem // restore to cache - c.itemMu.Unlock() - - // try purging with file open - c.purgeOld(10 * time.Second) - _, err = os.Stat(p) - assert.NoError(t, err) - - // close - assert.Equal(t, []string{ - `name="" isFile=false opens=1 size=0`, - `name="potato" isFile=true opens=1 size=5`, - }, itemAsString(c)) - c.updateStat("potato", t2, 6) - assert.Equal(t, []string{ - `name="" isFile=false opens=1 size=0`, - `name="potato" isFile=true opens=1 size=6`, - }, itemAsString(c)) - c.Close("potato/") - assert.Equal(t, []string{ - `name="" isFile=false opens=0 size=0`, - `name="potato" isFile=true opens=0 size=5`, - }, itemAsString(c)) - item = c.get("potato") - assert.WithinDuration(t, time.Now(), item.atime, time.Second) - assert.Equal(t, 0, item.opens) - - // try purging with file closed - c.purgeOld(10 * time.Second) - // ...nothing should happen - _, err = os.Stat(p) - assert.NoError(t, err) - - //.. purge again with -ve age - c.purgeOld(-10 * time.Second) - _, err = os.Stat(p) - assert.True(t, os.IsNotExist(err)) - - // clean - have tested the internals already - c.clean() - - // cleanup - err = c.CleanUp() - require.NoError(t, err) - _, err = os.Stat(c.root) - assert.True(t, os.IsNotExist(err)) -} - -func TestCacheOpens(t *testing.T) { - r := fstest.NewRun(t) - defer r.Finalise() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - c, err := New(ctx, r.Fremote, &vfscommon.DefaultOpt) - require.NoError(t, err) - - assert.Equal(t, []string(nil), itemAsString(c)) - c.Open("potato") - assert.Equal(t, []string{ - `name="" isFile=false opens=1 size=0`, - `name="potato" isFile=true opens=1 size=0`, - }, itemAsString(c)) - c.Open("potato") - assert.Equal(t, []string{ - `name="" isFile=false opens=2 size=0`, - `name="potato" isFile=true opens=2 size=0`, - }, itemAsString(c)) - c.Close("potato") - assert.Equal(t, []string{ - `name="" isFile=false opens=1 size=0`, - `name="potato" isFile=true opens=1 size=0`, - }, itemAsString(c)) - c.Close("potato") - assert.Equal(t, []string{ - `name="" isFile=false opens=0 size=0`, - `name="potato" isFile=true opens=0 size=0`, - }, itemAsString(c)) - - c.Open("potato") - c.Open("a//b/c/d/one") - c.Open("a/b/c/d/e/two") - c.Open("a/b/c/d/e/f/three") - assert.Equal(t, []string{ - `name="" isFile=false opens=4 size=0`, - `name="a" isFile=false opens=3 size=0`, - `name="a/b" isFile=false opens=3 size=0`, - `name="a/b/c" isFile=false opens=3 size=0`, - `name="a/b/c/d" isFile=false opens=3 size=0`, - `name="a/b/c/d/e" isFile=false opens=2 size=0`, - `name="a/b/c/d/e/f" isFile=false opens=1 size=0`, - `name="a/b/c/d/e/f/three" isFile=true opens=1 size=0`, - `name="a/b/c/d/e/two" isFile=true opens=1 size=0`, - `name="a/b/c/d/one" isFile=true opens=1 size=0`, - `name="potato" isFile=true opens=1 size=0`, - }, itemAsString(c)) - c.Close("potato") - c.Close("a/b/c/d/one") - c.Close("a/b/c/d/e/two") - c.Close("a/b/c//d/e/f/three") - assert.Equal(t, []string{ - `name="" isFile=false opens=0 size=0`, - `name="a" isFile=false opens=0 size=0`, - `name="a/b" isFile=false opens=0 size=0`, - `name="a/b/c" isFile=false opens=0 size=0`, - `name="a/b/c/d" isFile=false opens=0 size=0`, - `name="a/b/c/d/e" isFile=false opens=0 size=0`, - `name="a/b/c/d/e/f" isFile=false opens=0 size=0`, - `name="a/b/c/d/e/f/three" isFile=true opens=0 size=0`, - `name="a/b/c/d/e/two" isFile=true opens=0 size=0`, - `name="a/b/c/d/one" isFile=true opens=0 size=0`, - `name="potato" isFile=true opens=0 size=0`, - }, itemAsString(c)) -} - -// test the open, mkdir, purge, close, purge sequence -func TestCacheOpenMkdir(t *testing.T) { - r := fstest.NewRun(t) - defer r.Finalise() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Disable the cache cleaner as it interferes with these tests - opt := vfscommon.DefaultOpt - opt.CachePollInterval = 0 - c, err := New(ctx, r.Fremote, &opt) - require.NoError(t, err) - - // open - c.Open("sub/potato") - - assert.Equal(t, []string{ - `name="" isFile=false opens=1 size=0`, - `name="sub" isFile=false opens=1 size=0`, - `name="sub/potato" isFile=true opens=1 size=0`, - }, itemAsString(c)) - - // mkdir - p, err := c.Mkdir("sub/potato") - require.NoError(t, err) - assert.Equal(t, "potato", filepath.Base(p)) - assert.Equal(t, []string{ - `name="" isFile=false opens=1 size=0`, - `name="sub" isFile=false opens=1 size=0`, - `name="sub/potato" isFile=true opens=1 size=0`, - }, itemAsString(c)) - - // test directory exists - fi, err := os.Stat(filepath.Dir(p)) - require.NoError(t, err) - assert.True(t, fi.IsDir()) - - // clean the cache - c.purgeOld(-10 * time.Second) - - // test directory still exists - fi, err = os.Stat(filepath.Dir(p)) - require.NoError(t, err) - assert.True(t, fi.IsDir()) - - // close - c.Close("sub/potato") - - assert.Equal(t, []string{ - `name="" isFile=false opens=0 size=0`, - `name="sub" isFile=false opens=0 size=0`, - `name="sub/potato" isFile=true opens=0 size=0`, - }, itemAsString(c)) - - // clean the cache - c.purgeOld(-10 * time.Second) - c.purgeEmptyDirs() - - assert.Equal(t, []string(nil), itemAsString(c)) - - // test directory does not exist - _, err = os.Stat(filepath.Dir(p)) - require.True(t, os.IsNotExist(err)) -} - -func TestCacheCacheDir(t *testing.T) { - r := fstest.NewRun(t) - defer r.Finalise() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - c, err := New(ctx, r.Fremote, &vfscommon.DefaultOpt) - require.NoError(t, err) - - assert.Equal(t, []string(nil), itemAsString(c)) - - c.cacheDir("dir") - assert.Equal(t, []string{ - `name="" isFile=false opens=0 size=0`, - `name="dir" isFile=false opens=0 size=0`, - }, itemAsString(c)) - - c.cacheDir("dir/sub") - assert.Equal(t, []string{ - `name="" isFile=false opens=0 size=0`, - `name="dir" isFile=false opens=0 size=0`, - `name="dir/sub" isFile=false opens=0 size=0`, - }, itemAsString(c)) - - c.cacheDir("dir/sub2/subsub2") - assert.Equal(t, []string{ - `name="" isFile=false opens=0 size=0`, - `name="dir" isFile=false opens=0 size=0`, - `name="dir/sub" isFile=false opens=0 size=0`, - `name="dir/sub2" isFile=false opens=0 size=0`, - `name="dir/sub2/subsub2" isFile=false opens=0 size=0`, - }, itemAsString(c)) -} - -func TestCachePurgeOld(t *testing.T) { - r := fstest.NewRun(t) - defer r.Finalise() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - c, err := New(ctx, r.Fremote, &vfscommon.DefaultOpt) - require.NoError(t, err) - - // Test funcs - var removed []string - removedDir := true - removeFile := func(name string) { - removed = append(removed, filepath.ToSlash(name)) - } - removeDir := func(name string) bool { - if removedDir { - removed = append(removed, filepath.ToSlash(name)+"/") - } - return removedDir - } - - removed = nil - c._purgeOld(-10*time.Second, removeFile) - c._purgeEmptyDirs(removeDir) - assert.Equal(t, []string(nil), removed) - - c.Open("sub/dir2/potato2") - c.Open("sub/dir/potato") - c.Close("sub/dir2/potato2") - c.Open("sub/dir/potato") - - assert.Equal(t, []string{ - `name="" isFile=false opens=2 size=0`, - `name="sub" isFile=false opens=2 size=0`, - `name="sub/dir" isFile=false opens=2 size=0`, - `name="sub/dir/potato" isFile=true opens=2 size=0`, - `name="sub/dir2" isFile=false opens=0 size=0`, - `name="sub/dir2/potato2" isFile=true opens=0 size=0`, - }, itemAsString(c)) - - removed = nil - removedDir = true - c._purgeOld(-10*time.Second, removeFile) - c._purgeEmptyDirs(removeDir) - assert.Equal(t, []string{ - "sub/dir2/potato2", - "sub/dir2/", - }, removed) - - c.Close("sub/dir/potato") - - removed = nil - removedDir = true - c._purgeOld(-10*time.Second, removeFile) - c._purgeEmptyDirs(removeDir) - assert.Equal(t, []string(nil), removed) - - c.Close("sub/dir/potato") - - assert.Equal(t, []string{ - `name="" isFile=false opens=0 size=0`, - `name="sub" isFile=false opens=0 size=0`, - `name="sub/dir" isFile=false opens=0 size=0`, - `name="sub/dir/potato" isFile=true opens=0 size=0`, - }, itemAsString(c)) - - removed = nil - removedDir = false - c._purgeOld(10*time.Second, removeFile) - c._purgeEmptyDirs(removeDir) - assert.Equal(t, []string(nil), removed) - - assert.Equal(t, []string{ - `name="" isFile=false opens=0 size=0`, - `name="sub" isFile=false opens=0 size=0`, - `name="sub/dir" isFile=false opens=0 size=0`, - `name="sub/dir/potato" isFile=true opens=0 size=0`, - }, itemAsString(c)) - - removed = nil - removedDir = true - c._purgeOld(-10*time.Second, removeFile) - c._purgeEmptyDirs(removeDir) - assert.Equal(t, []string{ - "sub/dir/potato", - "sub/dir/", - "sub/", - "/", - }, removed) - - assert.Equal(t, []string(nil), itemAsString(c)) -} - -func TestCachePurgeOverQuota(t *testing.T) { - r := fstest.NewRun(t) - defer r.Finalise() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Disable the cache cleaner as it interferes with these tests - opt := vfscommon.DefaultOpt - opt.CachePollInterval = 0 - c, err := New(ctx, r.Fremote, &opt) - require.NoError(t, err) - - // Test funcs - var removed []string - remove := func(name string) { - removed = append(removed, filepath.ToSlash(name)) - c.Remove(name) - } - - removed = nil - c._purgeOverQuota(-1, remove) - assert.Equal(t, []string(nil), removed) - - removed = nil - c._purgeOverQuota(0, remove) - assert.Equal(t, []string(nil), removed) - - removed = nil - c._purgeOverQuota(1, remove) - assert.Equal(t, []string(nil), removed) - - // Make some test files - c.Open("sub/dir/potato") - p, err := c.Mkdir("sub/dir/potato") - require.NoError(t, err) - err = ioutil.WriteFile(p, []byte("hello"), 0600) - require.NoError(t, err) - - p, err = c.Mkdir("sub/dir2/potato2") - c.Open("sub/dir2/potato2") - require.NoError(t, err) - err = ioutil.WriteFile(p, []byte("hello2"), 0600) - require.NoError(t, err) - - assert.Equal(t, []string{ - `name="" isFile=false opens=2 size=0`, - `name="sub" isFile=false opens=2 size=0`, - `name="sub/dir" isFile=false opens=1 size=0`, - `name="sub/dir/potato" isFile=true opens=1 size=0`, - `name="sub/dir2" isFile=false opens=1 size=0`, - `name="sub/dir2/potato2" isFile=true opens=1 size=0`, - }, itemAsString(c)) - - // Check nothing removed - removed = nil - c._purgeOverQuota(1, remove) - assert.Equal(t, []string(nil), removed) - - // Close the files - c.Close("sub/dir/potato") - c.Close("sub/dir2/potato2") - - assert.Equal(t, []string{ - `name="" isFile=false opens=0 size=0`, - `name="sub" isFile=false opens=0 size=0`, - `name="sub/dir" isFile=false opens=0 size=0`, - `name="sub/dir/potato" isFile=true opens=0 size=5`, - `name="sub/dir2" isFile=false opens=0 size=0`, - `name="sub/dir2/potato2" isFile=true opens=0 size=6`, - }, itemAsString(c)) - - // Update the stats to read the total size - err = c.updateStats() - require.NoError(t, err) - assert.Equal(t, int64(11), c.used) - - // make potato2 definitely after potato - t1 := time.Now().Add(10 * time.Second) - c.updateStat("sub/dir2/potato2", t1, 6) - - // Check only potato removed to get below quota - removed = nil - c._purgeOverQuota(10, remove) - assert.Equal(t, []string{ - "sub/dir/potato", - }, removed) - assert.Equal(t, int64(6), c.used) - - assert.Equal(t, []string{ - `name="" isFile=false opens=0 size=0`, - `name="sub" isFile=false opens=0 size=0`, - `name="sub/dir" isFile=false opens=0 size=0`, - `name="sub/dir2" isFile=false opens=0 size=0`, - `name="sub/dir2/potato2" isFile=true opens=0 size=6`, - }, itemAsString(c)) - - // Put potato back - c.Open("sub/dir/potato") - p, err = c.Mkdir("sub/dir/potato") - require.NoError(t, err) - err = ioutil.WriteFile(p, []byte("hello"), 0600) - require.NoError(t, err) - c.Close("sub/dir/potato") - - // Update the stats to read the total size - err = c.updateStats() - require.NoError(t, err) - assert.Equal(t, int64(11), c.used) - - assert.Equal(t, []string{ - `name="" isFile=false opens=0 size=0`, - `name="sub" isFile=false opens=0 size=0`, - `name="sub/dir" isFile=false opens=0 size=0`, - `name="sub/dir/potato" isFile=true opens=0 size=5`, - `name="sub/dir2" isFile=false opens=0 size=0`, - `name="sub/dir2/potato2" isFile=true opens=0 size=6`, - }, itemAsString(c)) - - // make potato definitely after potato2 - t2 := t1.Add(20 * time.Second) - c.updateStat("sub/dir/potato", t2, 5) - - // Check only potato2 removed to get below quota - removed = nil - c._purgeOverQuota(10, remove) - assert.Equal(t, []string{ - "sub/dir2/potato2", - }, removed) - assert.Equal(t, int64(5), c.used) - c.purgeEmptyDirs() - - assert.Equal(t, []string{ - `name="" isFile=false opens=0 size=0`, - `name="sub" isFile=false opens=0 size=0`, - `name="sub/dir" isFile=false opens=0 size=0`, - `name="sub/dir/potato" isFile=true opens=0 size=5`, - }, itemAsString(c)) - - // Now purge everything - removed = nil - c._purgeOverQuota(1, remove) - assert.Equal(t, []string{ - "sub/dir/potato", - }, removed) - assert.Equal(t, int64(0), c.used) - c.purgeEmptyDirs() - - assert.Equal(t, []string(nil), itemAsString(c)) - - // Check nothing left behind - c.clean() - assert.Equal(t, int64(0), c.used) - assert.Equal(t, []string(nil), itemAsString(c)) -} diff --git a/vfs/write.go b/vfs/write.go index e845a3f24..2f5d039d6 100644 --- a/vfs/write.go +++ b/vfs/write.go @@ -189,8 +189,7 @@ func (fh *WriteFileHandle) close() (err error) { fh.closed = true // leave writer open until file is transferred defer func() { - fh.file.delWriter(fh, false) - fh.file.finishWriterClose() + fh.file.delWriter(fh) }() // If file not opened and not safe to truncate then leave file intact if !fh.opened && !fh.safeToTruncate() {