vfs: support synchronous cache space recovery upon ENOSPC

This patch provides the support of synchronous cache space recovery
to allow read threads to recover from ENOSPC errors when cache space
can be recovered from cache items that are not in use or safe to be
reset/emptied .

The patch complements the existing cache cleaning process in two ways.

Firstly, the existing cache cleaning process is time-driven that runs
periodically. The cache space can run out while the cache cleaner
thread is still waiting for its next scheduled run. The io threads
encountering ENOSPC return an internal error to the applications
in this case even when cache space can be recovered to avoid this
error. This patch addresses this problem by having the read threads
kick the cache cleaner thread in this condition to recover cache
space preventing unnecessary ENOSPC errors from being seen by the
applications.

Secondly, this patch enhances the cache cleaner to support cache
item reset. Currently the cache purge process removes cache
items that are not in use. This may not be sufficient when the
total size of the working set exceeds the cache directory's
capacity. Like in the current code, this patch starts the purge
process by removing cache files that are not in use. Cache items
whose access times are older than vfs-cache-max-age are removed first.
After that, other not-in-use items are removed in LRU order until
vfs-cache-max-size is reached. If the vfs-cache-max-size (the quota)
is still not reached at this time, this patch adds a cache reset
step to reset/empty cache files that are still in use but not
dirtied.  This enables application processes to continue without
seeing an error even when the working set depletes the cache space
as long as there is not a large write working set hoarding the
entire cache space.

By design this patch does not add ENOSPC error recovery for write
IOs. Rclone does not empty a write cache item until the file data
is written back to the backend upon close. Allowing more cache
space to be consumed by dirty cache items when the cache space is
already running low would increase the risk of exhausting the cache
space in a way that the vfs mount becomes unreadable.
This commit is contained in:
Leo Luan 2020-08-25 08:20:29 -07:00 committed by Nick Craig-Wood
parent d6996e3347
commit c665201b85
9 changed files with 656 additions and 149 deletions

View File

@ -0,0 +1,23 @@
// +build !plan9
package fserrors
import (
"syscall"
"github.com/rclone/rclone/lib/errors"
)
// IsErrNoSpace checks a possibly wrapped error to
// see if it contains a ENOSPC error
func IsErrNoSpace(cause error) (isNoSpc bool) {
errors.Walk(cause, func(c error) bool {
if c == syscall.ENOSPC {
isNoSpc = true
return true
}
isNoSpc = false
return false
})
return
}

View File

@ -0,0 +1,10 @@
// +build plan9
package fserrors
// IsErrNoSpace() on plan9 returns false because
// plan9 does not support syscall.ENOSPC error.
func IsErrNoSpace(cause error) (isNoSpc bool) {
isNoSpc = false
return
}

1
go.mod
View File

@ -63,6 +63,7 @@ require (
golang.org/x/sys v0.0.0-20200720211630-cb9d2d5c5666
golang.org/x/text v0.3.3
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1
golang.org/x/tools v0.0.0-20200820180210-c8f393745106 // indirect
google.golang.org/api v0.28.0
google.golang.org/genproto v0.0.0-20200626011028-ee7919e894b5 // indirect
google.golang.org/grpc v1.30.0 // indirect

4
go.sum
View File

@ -427,6 +427,7 @@ github.com/youmark/pkcs8 v0.0.0-20200520070018-fad002e585ce h1:F5MEHq8k6JiE10MNY
github.com/youmark/pkcs8 v0.0.0-20200520070018-fad002e585ce/go.mod h1:ul22v+Nro/R083muKhosV54bj5niojjWZvU8xrevuH4=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yunify/qingstor-sdk-go/v3 v3.2.0 h1:9sB2WZMgjwSUNZhrgvaNGazVltoFUUfuS9f0uCWtTr8=
github.com/yunify/qingstor-sdk-go/v3 v3.2.0/go.mod h1:KciFNuMu6F4WLk9nGwwK69sCGKLCdd9f97ac/wfumS4=
github.com/zeebo/admission/v3 v3.0.1/go.mod h1:BP3isIv9qa2A7ugEratNq1dnl2oZRXaQUGdU7WXKtbw=
@ -543,6 +544,7 @@ golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5/go.mod h1:qpuaurCH72eLCgpAm/
golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU=
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@ -654,6 +656,8 @@ golang.org/x/tools v0.0.0-20200512131952-2bc93b1c0c88/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20200515010526-7d3b6ebf133d/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200622203043-20e05c1c8ffa h1:mMXQKlWCw9mIWgVLLfiycDZjMHMMYqiuakI4E/l2xcA=
golang.org/x/tools v0.0.0-20200622203043-20e05c1c8ffa/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200820180210-c8f393745106 h1:42Zs/g7pjhSIE/wiAuKcp8zp20zv7W2diNU6arpshOA=
golang.org/x/tools v0.0.0-20200820180210-c8f393745106/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=

View File

@ -260,7 +260,9 @@ func (fh *RWFileHandle) _readAt(b []byte, off int64, release bool) (n int, err e
// Do the writing with fh.mu unlocked
fh.mu.Unlock()
}
n, err = fh.item.ReadAt(b, off)
if release {
fh.mu.Lock()
}

View File

@ -17,6 +17,7 @@ import (
"github.com/rclone/rclone/fs"
fscache "github.com/rclone/rclone/fs/cache"
"github.com/rclone/rclone/fs/config"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/lib/file"
@ -47,9 +48,16 @@ type Cache struct {
writeback *writeback.WriteBack // holds Items for writeback
avFn AddVirtualFn // if set, can be called to add dir entries
mu sync.Mutex // protects the following variables
item map[string]*Item // files/directories in the cache
used int64 // total size of files in the cache
mu sync.Mutex // protects the following variables
cond *sync.Cond // cond lock for synchronous cache cleaning
item map[string]*Item // files/directories in the cache
errItems map[string]error // items in error state
used int64 // total size of files in the cache
outOfSpace bool // out of space
cleanerKicked bool // some thread kicked the cleaner upon out of space
kickerMu sync.Mutex // mutex for clearnerKicked
kick chan struct{} // channel for kicking clear to start
}
// AddVirtualFn if registered by the WithAddVirtual method, can be
@ -96,6 +104,7 @@ func New(ctx context.Context, fremote fs.Fs, opt *vfscommon.Options, avFn AddVir
root: root,
metaRoot: metaRoot,
item: make(map[string]*Item),
errItems: make(map[string]error),
hashType: hashType,
hashOption: hashOption,
writeback: writeback.New(ctx, opt),
@ -117,6 +126,10 @@ func New(ctx context.Context, fremote fs.Fs, opt *vfscommon.Options, avFn AddVir
// Remove any empty directories
c.purgeEmptyDirs()
// Create a channel for cleaner to be kicked upon out of space con
c.kick = make(chan struct{}, 1)
c.cond = sync.NewCond(&c.mu)
go c.cleaner(ctx)
return c, nil
@ -401,28 +414,125 @@ func (c *Cache) reload(ctx context.Context) error {
return nil
}
// purgeOld gets rid of any files that are over age
func (c *Cache) purgeOld(maxAge time.Duration) {
c._purgeOld(maxAge, func(item *Item) {
item.remove("too old")
})
// KickCleaner kicks cache cleaner upon out of space situation
func (c *Cache) KickCleaner() {
/* Use a separate kicker mutex for the kick to go through without waiting for the
cache mutex to avoid letting a thread kick again after the clearer just
finished cleaning and unlock the cache mutex. */
fs.Debugf(nil, "vfs cache: at the beginning of KickCleaner")
c.kickerMu.Lock()
if !c.cleanerKicked {
c.cleanerKicked = true
fs.Debugf(nil, "vfs cache: in KickCleaner, ready to lock cache mutex")
c.mu.Lock()
c.outOfSpace = true
fs.Logf(nil, "vfs cache: in KickCleaner, ready to kick cleaner")
c.kick <- struct{}{}
c.mu.Unlock()
}
c.kickerMu.Unlock()
c.mu.Lock()
for c.outOfSpace == true {
fs.Debugf(nil, "vfs cache: in KickCleaner, looping on c.outOfSpace")
c.cond.Wait()
}
fs.Debugf(nil, "vfs cache: in KickCleaner, leaving c.outOfSpace loop")
c.mu.Unlock()
}
func (c *Cache) _purgeOld(maxAge time.Duration, remove func(item *Item)) {
c.mu.Lock()
defer c.mu.Unlock()
cutoff := time.Now().Add(-maxAge)
for name, item := range c.item {
if !item.inUse() {
// If not locked and access time too long ago - delete the file
dt := item.getATime().Sub(cutoff)
// fs.Debugf(name, "atime=%v cutoff=%v, dt=%v", item.info.ATime, cutoff, dt)
if dt < 0 {
remove(item)
// Remove the entry
delete(c.item, name)
// removeNotInUse removes items not in use with a possible maxAge cutoff
// called with cache mutex locked and up-to-date c.used (as we update it directly here)
func (c *Cache) removeNotInUse(item *Item, maxAge time.Duration, emptyOnly bool) {
removed, spaceFreed := item.RemoveNotInUse(maxAge, emptyOnly)
// The item space might be freed even if we get an error after the cache file is removed
// The item will not be removed or reset the cache data is dirty (DataDirty)
c.used -= spaceFreed
if removed {
fs.Infof(nil, "vfs cache RemoveNotInUse (maxAge=%d, emptyOnly=%v): item %s was removed, freed %d bytes", maxAge, emptyOnly, item.GetName(), spaceFreed)
// Remove the entry
delete(c.item, item.name)
} else {
fs.Infof(nil, "vfs cache RemoveNotInUse (maxAge=%d, emptyOnly=%v): item %s not removed, freed %d bytes", maxAge, emptyOnly, item.GetName(), spaceFreed)
}
return
}
// Retry failed resets during purgeClean()
func (c *Cache) retryFailedResets() {
// Some items may have failed to reset becasue there was not enough space
// for saving the cache item's metadata. Redo the Reset()'s here now that
// we may have some available space.
if len(c.errItems) != 0 {
fs.Debugf(nil, "vfs cache reset: before redoing reset errItems = %v", c.errItems)
for itemName := range c.errItems {
_, _, err := c.item[itemName].Reset()
if err == nil || !fserrors.IsErrNoSpace(err) {
// TODO: not trying to handle non-ENOSPC errors yet
delete(c.errItems, itemName)
}
}
fs.Debugf(nil, "vfs cache reset: after redoing reset errItems = %v", c.errItems)
}
}
func (c *Cache) purgeClean(quota int64) {
c.mu.Lock()
defer c.mu.Unlock()
var items Items
if quota <= 0 || c.used < quota {
return
}
// Make a slice of clean cache files
for _, item := range c.item {
if !item.IsDataDirty() {
items = append(items, item)
}
}
sort.Sort(items)
// Reset items until the quota is OK
for _, item := range items {
if c.used < quota {
break
}
resetResult, spaceFreed, err := item.Reset()
// The item space might be freed even if we get an error after the cache file is removed
// The item will not be removed or reset if the cache data is dirty (DataDirty)
c.used -= spaceFreed
fs.Infof(nil, "vfs cache purgeClean item.Reset %s: %s, freed %d bytes", item.GetName(), resetResult.String(), spaceFreed)
if resetResult == RemovedNotInUse {
delete(c.item, item.name)
}
if err != nil {
fs.Errorf(nil, "vfs cache purgeClean item.Reset %s reset failed, err = %v, freed %d bytes", item.GetName(), err, spaceFreed)
c.errItems[item.name] = err
}
}
// Resest outOfSpace without checking whether we have reduced cache space below the quota.
// This allows some files to reduce their pendingAccesses count to allow them to be reset
// in the next iteration of the purge cleaner loop.
c.outOfSpace = false
c.cond.Broadcast()
}
// purgeOld gets rid of any files that are over age
func (c *Cache) purgeOld(maxAge time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()
// cutoff := time.Now().Add(-maxAge)
for _, item := range c.item {
c.removeNotInUse(item, maxAge, false)
}
if c.used < int64(c.opt.CacheMaxSize) {
c.outOfSpace = false
c.cond.Broadcast()
}
}
@ -439,16 +549,8 @@ func (c *Cache) purgeEmptyDirs() {
}
}
// Remove any files that are over quota starting from the
// oldest first
func (c *Cache) purgeOverQuota(quota int64) {
c._purgeOverQuota(quota, func(item *Item) {
item.remove("over quota")
})
}
// updateUsed updates c.used so it is accurate
func (c *Cache) updateUsed() {
func (c *Cache) updateUsed() (used int64) {
c.mu.Lock()
defer c.mu.Unlock()
@ -457,15 +559,19 @@ func (c *Cache) updateUsed() {
newUsed += item.getDiskSize()
}
c.used = newUsed
return newUsed
}
func (c *Cache) _purgeOverQuota(quota int64, remove func(item *Item)) {
// Remove clean cache files that are not open until the total space
// is reduced below quota starting from the oldest first
func (c *Cache) purgeOverQuota(quota int64) {
c.updateUsed()
c.mu.Lock()
defer c.mu.Unlock()
if quota <= 0 || c.used < quota {
return
}
@ -482,18 +588,16 @@ func (c *Cache) _purgeOverQuota(quota int64, remove func(item *Item)) {
// Remove items until the quota is OK
for _, item := range items {
if c.used < quota {
break
}
c.used -= item.getDiskSize()
remove(item)
// Remove the entry
delete(c.item, item.name)
c.removeNotInUse(item, 0, c.used <= quota)
}
if c.used < quota {
c.outOfSpace = false
c.cond.Broadcast()
}
}
// clean empties the cache of stuff if it can
func (c *Cache) clean() {
func (c *Cache) clean(removeCleanFiles bool) {
// Cache may be empty so end
_, err := os.Stat(c.root)
if os.IsNotExist(err) {
@ -504,12 +608,37 @@ func (c *Cache) clean() {
oldItems, oldUsed := len(c.item), fs.SizeSuffix(c.used)
c.mu.Unlock()
// Remove any files that are over age
c.purgeOld(c.opt.CacheMaxAge)
// loop cleaning the cache until we reach below cache quota
for {
// Remove any files that are over age
c.purgeOld(c.opt.CacheMaxAge)
// Now remove any files that are over quota starting from the
// oldest first
c.purgeOverQuota(int64(c.opt.CacheMaxSize))
// Now remove files not in use until cache size is below quota starting from the
// oldest first
c.purgeOverQuota(int64(c.opt.CacheMaxSize))
// removeCleanFiles indicates that we got ENOSPC error
// We remove cache files that are not dirty if we are still avove the max cache size
if removeCleanFiles {
c.purgeClean(int64(c.opt.CacheMaxSize))
c.retryFailedResets()
} else {
break
}
used := c.updateUsed()
if used <= int64(c.opt.CacheMaxSize) && len(c.errItems) == 0 {
break
}
}
// Was kicked?
if removeCleanFiles {
c.kickerMu.Lock() // Make sure this is called with cache mutex unlocked
// Reenable io threads to kick me
c.cleanerKicked = false
c.kickerMu.Unlock()
}
// Stats
c.mu.Lock()
@ -526,7 +655,7 @@ func (c *Cache) clean() {
fs.Infof(nil, "vfs cache: cleaned: objects %d (was %d) in use %d, to upload %d, uploading %d, total size %v (was %v)", newItems, oldItems, totalInUse, uploadsQueued, uploadsInProgress, newUsed, oldUsed)
}
// cleaner calls clean at regular intervals
// cleaner calls clean at regular intervals and upon being kicked for out-of-space condition
//
// doesn't return until context is cancelled
func (c *Cache) cleaner(ctx context.Context) {
@ -535,14 +664,16 @@ func (c *Cache) cleaner(ctx context.Context) {
return
}
// Start cleaning the cache immediately
c.clean()
c.clean(false)
// Then every interval specified
timer := time.NewTicker(c.opt.CachePollInterval)
defer timer.Stop()
for {
select {
case <-c.kick: // a thread encountering ENOSPC kicked me
c.clean(true) // remove inUse files that are clean (!item.info.Dirty)
case <-timer.C:
c.clean()
c.clean(false) // do not remove inUse files
case <-ctx.Done():
fs.Debugf(nil, "vfs cache: cleaner exiting")
return

View File

@ -33,6 +33,19 @@ func itemAsString(c *Cache) []string {
return out
}
// convert c.item to a string
func itemSpaceAsString(c *Cache) []string {
c.mu.Lock()
defer c.mu.Unlock()
var out []string
for name, item := range c.item {
space := item.info.Rs.Size()
out = append(out, fmt.Sprintf("name=%q opens=%d size=%d space=%d", filepath.ToSlash(name), item.opens, item.info.Size, space))
}
sort.Strings(out)
return out
}
// open an item and write to it
func itemWrite(t *testing.T, item *Item, contents string) {
require.NoError(t, item.Open(nil))
@ -174,7 +187,7 @@ func TestCacheNew(t *testing.T) {
assertPathNotExist(t, p)
// clean - have tested the internals already
c.clean()
c.clean(false)
}
func TestCacheOpens(t *testing.T) {
@ -279,15 +292,7 @@ func TestCachePurgeOld(t *testing.T) {
defer cleanup()
// Test funcs
var removed []string
removeFile := func(item *Item) {
removed = append(removed, item.name)
item.remove("TestCachePurgeOld")
}
removed = nil
c._purgeOld(-10*time.Second, removeFile)
assert.Equal(t, []string(nil), removed)
c.purgeOld(-10 * time.Second)
potato2 := c.Item("sub/dir2/potato2")
require.NoError(t, potato2.Open(nil))
@ -301,17 +306,23 @@ func TestCachePurgeOld(t *testing.T) {
`name="sub/dir2/potato2" opens=0 size=0`,
}, itemAsString(c))
removed = nil
c._purgeOld(-10*time.Second, removeFile)
c.purgeOld(-10 * time.Second)
assert.Equal(t, []string{
"sub/dir2/potato2",
}, removed)
`name="sub/dir/potato" opens=2 size=0`,
}, itemAsString(c))
require.NoError(t, potato.Close(nil))
removed = nil
c._purgeOld(-10*time.Second, removeFile)
assert.Equal(t, []string(nil), removed)
assert.Equal(t, []string{
`name="sub/dir/potato" opens=1 size=0`,
}, itemAsString(c))
c.purgeOld(-10 * time.Second)
assert.Equal(t, []string{
`name="sub/dir/potato" opens=1 size=0`,
}, itemAsString(c))
require.NoError(t, potato.Close(nil))
@ -319,19 +330,13 @@ func TestCachePurgeOld(t *testing.T) {
`name="sub/dir/potato" opens=0 size=0`,
}, itemAsString(c))
removed = nil
c._purgeOld(10*time.Second, removeFile)
assert.Equal(t, []string(nil), removed)
c.purgeOld(10 * time.Second)
assert.Equal(t, []string{
`name="sub/dir/potato" opens=0 size=0`,
}, itemAsString(c))
removed = nil
c._purgeOld(-10*time.Second, removeFile)
assert.Equal(t, []string{
"sub/dir/potato",
}, removed)
c.purgeOld(-10 * time.Second)
assert.Equal(t, []string(nil), itemAsString(c))
}
@ -341,23 +346,6 @@ func TestCachePurgeOverQuota(t *testing.T) {
defer cleanup()
// Test funcs
var removed []string
remove := func(item *Item) {
removed = append(removed, item.name)
item.remove("TestCachePurgeOverQuota")
}
removed = nil
c._purgeOverQuota(-1, remove)
assert.Equal(t, []string(nil), removed)
removed = nil
c._purgeOverQuota(0, remove)
assert.Equal(t, []string(nil), removed)
removed = nil
c._purgeOverQuota(1, remove)
assert.Equal(t, []string(nil), removed)
// Make some test files
potato := c.Item("sub/dir/potato")
@ -372,9 +360,7 @@ func TestCachePurgeOverQuota(t *testing.T) {
}, itemAsString(c))
// Check nothing removed
removed = nil
c._purgeOverQuota(1, remove)
assert.Equal(t, []string(nil), removed)
c.purgeOverQuota(1)
// Close the files
require.NoError(t, potato.Close(nil))
@ -393,11 +379,7 @@ func TestCachePurgeOverQuota(t *testing.T) {
potato2.info.ATime = t1
// Check only potato removed to get below quota
removed = nil
c._purgeOverQuota(10, remove)
assert.Equal(t, []string{
"sub/dir/potato",
}, removed)
c.purgeOverQuota(10)
assert.Equal(t, int64(6), c.used)
assert.Equal(t, []string{
@ -423,11 +405,7 @@ func TestCachePurgeOverQuota(t *testing.T) {
potato.info.ATime = t2
// Check only potato2 removed to get below quota
removed = nil
c._purgeOverQuota(10, remove)
assert.Equal(t, []string{
"sub/dir2/potato2",
}, removed)
c.purgeOverQuota(10)
assert.Equal(t, int64(5), c.used)
c.purgeEmptyDirs()
@ -436,22 +414,82 @@ func TestCachePurgeOverQuota(t *testing.T) {
}, itemAsString(c))
// Now purge everything
removed = nil
c._purgeOverQuota(1, remove)
assert.Equal(t, []string{
"sub/dir/potato",
}, removed)
c.purgeOverQuota(1)
assert.Equal(t, int64(0), c.used)
c.purgeEmptyDirs()
assert.Equal(t, []string(nil), itemAsString(c))
// Check nothing left behind
c.clean()
c.clean(false)
assert.Equal(t, int64(0), c.used)
assert.Equal(t, []string(nil), itemAsString(c))
}
// test reset clean files
func TestCachePurgeClean(t *testing.T) {
r, c, cleanup := newItemTestCache(t)
defer cleanup()
contents, obj, patato1 := newFile(t, r, c, "existing")
_ = contents
// Open the object to create metadata for it
require.NoError(t, patato1.Open(obj))
require.NoError(t, patato1.Open(obj))
size, err := patato1.GetSize()
require.NoError(t, err)
assert.Equal(t, int64(100), size)
// Read something to instantiate the cache file
buf := make([]byte, 10)
_, err = patato1.ReadAt(buf, 10)
require.NoError(t, err)
// Test cache file present
_, err = os.Stat(patato1.c.toOSPath(patato1.name))
require.NoError(t, err)
// Add some potatos
potato2 := c.Item("sub/dir/potato2")
require.NoError(t, potato2.Open(nil))
require.NoError(t, potato2.Truncate(5))
potato3 := c.Item("sub/dir/potato3")
require.NoError(t, potato3.Open(nil))
require.NoError(t, potato3.Truncate(6))
c.updateUsed()
c.purgeClean(1)
assert.Equal(t, []string{
`name="existing" opens=2 size=100 space=0`,
`name="sub/dir/potato2" opens=1 size=5 space=5`,
`name="sub/dir/potato3" opens=1 size=6 space=6`,
}, itemSpaceAsString(c))
assert.Equal(t, int64(11), c.used)
require.NoError(t, potato2.Close(nil))
c.purgeClean(1)
assert.Equal(t, []string{
`name="existing" opens=2 size=100 space=0`,
`name="sub/dir/potato3" opens=1 size=6 space=6`,
}, itemSpaceAsString(c))
assert.Equal(t, int64(6), c.used)
require.NoError(t, patato1.Close(nil))
require.NoError(t, patato1.Close(nil))
require.NoError(t, potato3.Close(nil))
// Remove all files now. The are all not in use.
// purgeClean does not remove empty cache files. purgeOverQuota does.
// So we use purgeOverQuota here for the cleanup.
c.purgeOverQuota(1)
c.purgeEmptyDirs()
assert.Equal(t, []string(nil), itemAsString(c))
}
func TestCacheInUse(t *testing.T) {
_, c, cleanup := newTestCache(t)
defer cleanup()

View File

@ -10,6 +10,7 @@ import (
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/asyncreader"
"github.com/rclone/rclone/fs/chunkedreader"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/lib/ranges"
"github.com/rclone/rclone/vfs/vfscommon"
)
@ -146,7 +147,9 @@ func (dls *Downloaders) _countErrors(n int64, err error) {
return
}
if err != nil {
//if err != syscall.ENOSPC {
dls.errorCount++
//}
dls.lastErr = err
fs.Infof(dls.src, "vfs cache: downloader: error count now %d: %v", dls.errorCount, err)
}
@ -404,6 +407,11 @@ func (dls *Downloaders) kickWaiters() (err error) {
fs.Errorf(dls.src, "vfs cache: restart download failed: %v", err)
}
}
if fserrors.IsErrNoSpace(dls.lastErr) {
fs.Errorf(dls.src, "vfs cache: cache is out of space %d/%d: last error: %v", dls.errorCount, maxErrorCount, dls.lastErr)
dls._closeWaiters(dls.lastErr)
return dls.lastErr
}
if dls.errorCount > maxErrorCount {
fs.Errorf(dls.src, "vfs cache: too many errors %d/%d: last error: %v", dls.errorCount, maxErrorCount, dls.lastErr)
@ -600,6 +608,7 @@ func (dl *downloader) download() (n int64, err error) {
if err != nil && errors.Cause(err) != asyncreader.ErrorStreamAbandoned {
return n, errors.Wrap(err, "vfs reader: failed to write to cache file")
}
return n, nil
}

View File

@ -11,6 +11,7 @@ import (
"github.com/pkg/errors"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/lib/file"
"github.com/rclone/rclone/lib/ranges"
@ -47,19 +48,20 @@ import (
// The Info field is written to the backing store to store status
type Item struct {
// read only
c *Cache // cache this is part of
mu sync.Mutex // protect the variables
name string // name in the VFS
opens int // number of times file is open
downloaders *downloaders.Downloaders // a record of the downloaders in action - may be nil
o fs.Object // object we are caching - may be nil
fd *os.File // handle we are using to read and write to the file
metaDirty bool // set if the info needs writeback
modified bool // set if the file has been modified since the last Open
info Info // info about the file to persist to backing store
writeBackID writeback.Handle // id of any writebacks in progress
c *Cache // cache this is part of
mu sync.Mutex // protect the variables
cond *sync.Cond // synchronize with cache cleaner
name string // name in the VFS
opens int // number of times file is open
downloaders *downloaders.Downloaders // a record of the downloaders in action - may be nil
o fs.Object // object we are caching - may be nil
fd *os.File // handle we are using to read and write to the file
metaDirty bool // set if the info needs writeback
modified bool // set if the file has been modified since the last Open
info Info // info about the file to persist to backing store
writeBackID writeback.Handle // id of any writebacks in progress
pendingAccesses int // number of threads - cache reset not allowed if not zero
beingReset bool // cache cleaner is resetting the cache file, access not allowed
}
// Info is persisted to backing store
@ -75,6 +77,24 @@ type Info struct {
// Items are a slice of *Item ordered by ATime
type Items []*Item
// ResetResult reports the actual action taken in the Reset function and reason
type ResetResult int
// Constants used to report actual action taken in the Reset function and reason
const (
SkippedDirty ResetResult = iota // Dirty item cannot be reset
SkippedPendingAccess // Reset pending access can lead to deadlock
SkippedEmpty // Reset empty item does not save space
RemovedNotInUse // Item not used. Remove instead of reset
ResetFailed // Reset failed with an error
ResetComplete // Reset completed successfully
)
func (rr ResetResult) String() string {
return [...]string{"Dirty item skipped", "In-access item skipped", "Empty item skipped",
"Not-in-use item removed", "Item reset failed", "Item reset completed"}[rr]
}
func (v Items) Len() int { return len(v) }
func (v Items) Swap(i, j int) { v[i], v[j] = v[j], v[i] }
func (v Items) Less(i, j int) bool {
@ -112,7 +132,7 @@ func newItem(c *Cache, name string) (item *Item) {
ATime: now,
},
}
item.cond = sync.NewCond(&item.mu)
// check the cache file exists
osPath := c.toOSPath(name)
fi, statErr := os.Stat(osPath)
@ -340,6 +360,13 @@ func (item *Item) _getSize() (size int64, err error) {
return size, err
}
// GetName gets the vfs name of the item
func (item *Item) GetName() (name string) {
item.mu.Lock()
defer item.mu.Unlock()
return item.name
}
// GetSize gets the current size of the item
func (item *Item) GetSize() (size int64, err error) {
item.mu.Lock()
@ -399,34 +426,20 @@ func (item *Item) IsDirty() bool {
return item.metaDirty || item.info.Dirty
}
// Open the local file from the object passed in (which may be nil)
// which implies we are about to create the file
func (item *Item) Open(o fs.Object) (err error) {
// defer log.Trace(o, "item=%p", item)("err=%v", &err)
// IsDataDirty returns true if the item's data is dirty
func (item *Item) IsDataDirty() bool {
item.mu.Lock()
defer item.mu.Unlock()
return item.info.Dirty
}
item.info.ATime = time.Now()
item.opens++
osPath, err := item.c.mkdir(item.name) // No locking in Cache
if err != nil {
return errors.Wrap(err, "vfs cache item: open mkdir failed")
}
err = item._checkObject(o)
if err != nil {
return errors.Wrap(err, "vfs cache item: check object failed")
}
if item.opens != 1 {
return nil
}
// Create the cache file and store the metadata on disk
// Called with item.mu locked
func (item *Item) _createFile(osPath string) (err error) {
if item.fd != nil {
return errors.New("vfs cache item: internal error: didn't Close file")
}
item.modified = false
fd, err := file.OpenFile(osPath, os.O_RDWR, 0600)
if err != nil {
return errors.Wrap(err, "vfs cache item: open failed")
@ -439,9 +452,67 @@ func (item *Item) Open(o fs.Object) (err error) {
err = item._save()
if err != nil {
return err
closeErr := item.fd.Close()
if closeErr != nil {
fs.Errorf(item.name, "vfs cache: item.fd.Close: closeErr: %v", err)
}
item.fd = nil
return errors.Wrap(err, "vfs cache item: _save failed")
}
return err
}
// Open the local file from the object passed in. Wraps open()
// to provide recovery from out of space error.
func (item *Item) Open(o fs.Object) (err error) {
for retries := 0; retries < fs.Config.LowLevelRetries; retries++ {
item.preAccess()
err = item.open(o)
item.postAccess()
if err == nil {
break
}
fs.Errorf(item.name, "vfs cache: failed to open item: %v", err)
if !fserrors.IsErrNoSpace(err) && err.Error() != "no space left on device" {
fs.Errorf(item.name, "Non-out-of-space error encountered during open")
break
}
item.c.KickCleaner()
}
return err
}
// Open the local file from the object passed in (which may be nil)
// which implies we are about to create the file
func (item *Item) open(o fs.Object) (err error) {
// defer log.Trace(o, "item=%p", item)("err=%v", &err)
item.mu.Lock()
defer item.mu.Unlock()
item.info.ATime = time.Now()
osPath, err := item.c.mkdir(item.name) // No locking in Cache
if err != nil {
return errors.Wrap(err, "vfs cache item: open mkdir failed")
}
err = item._checkObject(o)
if err != nil {
return errors.Wrap(err, "vfs cache item: check object failed")
}
item.opens++
if item.opens != 1 {
return nil
}
err = item._createFile(osPath)
if err != nil {
item._remove("item.open failed on _createFile, remove cache data/metadata files")
item.fd = nil
item.opens--
return errors.Wrap(err, "vfs cache item: create cache file failed")
}
// Unlock the Item.mu so we can call some methods which take Cache.mu
item.mu.Unlock()
@ -767,6 +838,197 @@ func (item *Item) remove(reason string) (wasWriting bool) {
return item._remove(reason)
}
// RemoveNotInUse is called to remove cache file that has not been accessed recently
// It may also be called for removing empty cache files too when the quota is already reached.
func (item *Item) RemoveNotInUse(maxAge time.Duration, emptyOnly bool) (removed bool, spaceFreed int64) {
item.mu.Lock()
defer item.mu.Unlock()
spaceFreed = 0
removed = false
if item.opens != 0 || item.metaDirty || item.info.Dirty {
return
}
removeIt := false
if maxAge == 0 {
removeIt = true // quota-driven removal
}
if maxAge != 0 {
cutoff := time.Now().Add(-maxAge)
// If not locked and access time too long ago - delete the file
accessTime := item.info.ATime
if accessTime.Sub(cutoff) <= 0 {
removeIt = true
}
}
if removeIt {
spaceUsed := item.info.Rs.Size()
if !emptyOnly || spaceUsed == 0 {
spaceFreed = spaceUsed
removed = true
if item._remove("Removing old cache file not in use") {
fs.Errorf(item.name, "item removed when it was writing/uploaded")
}
}
}
return
}
// Reset is called by the cache purge functions only to reset (empty the contents) cache files that
// are not dirty. It is used when cache space runs out and we see some ENOSPC error.
func (item *Item) Reset() (rr ResetResult, spaceFreed int64, err error) {
item.mu.Lock()
defer item.mu.Unlock()
// The item is not being used now. Just remove it instead of resetting it.
if item.opens == 0 && !item.metaDirty && !item.info.Dirty {
spaceFreed = item.info.Rs.Size()
if item._remove("Removing old cache file not in use") {
fs.Errorf(item.name, "item removed when it was writing/uploaded")
}
return RemovedNotInUse, spaceFreed, nil
}
// do not reset dirty file
if item.info.Dirty {
return SkippedDirty, 0, nil
}
/* A wait on pendingAccessCnt to become 0 can lead to deadlock when an item.Open bumps
up the pendingAccesses count, calls item.open, which calls cache.put. The cache.put
operation needs the cache mutex, which is held here. We skip this file now. The
caller (the cache cleaner thread) may retry resetting this item if the cache size does
not reduce below quota. */
if item.pendingAccesses > 0 {
return SkippedPendingAccess, 0, nil
}
/* Do not need to reset an empty cache file unless it was being reset and the reset failed.
Some thread(s) may be waiting on the reset's succesful completion in that case. */
if item.info.Rs.Size() == 0 && item.beingReset == false {
return SkippedEmpty, 0, nil
}
item.beingReset = true
/* Error handling from this point on (setting item.fd and item.beingReset):
Since Reset is called by the cache cleaner thread, there is no direct way to return
the error to the io threads. Set item.fd to nil upon internal errors, so that the
io threads will return internal errors seeing a nil fd. In the case when the error
is ENOSPC, keep the item in isBeingReset state and that will keep the item.ReadAt
waiting at its beginning. The cache purge loop will try to redo the reset after cache
space is made available again. This recovery design should allow most io threads to
eventually go through, unless large files are written/overwritten concurrently and
the total size of these files exceed the cache storage limit. */
// Close the downloaders
// Accumulate and log errors
checkErr := func(e error) {
if e != nil {
fs.Errorf(item.o, "vfs cache: item reset failed: %v", e)
if err == nil {
err = e
}
}
}
if downloaders := item.downloaders; downloaders != nil {
item.downloaders = nil
// FIXME need to unlock to kill downloader - should we
// re-arrange locking so this isn't necessary? maybe
// downloader should use the item mutex for locking? or put a
// finer lock on Rs?
//
// downloader.Write calls ensure which needs the lock
// close downloader with mutex unlocked
item.mu.Unlock()
checkErr(downloaders.Close(nil))
item.mu.Lock()
}
// close the file handle
// fd can be nil if we tried Reset and failed before because of ENOSPC during reset
if item.fd != nil {
checkErr(item.fd.Close())
if err != nil {
// Could not close the cache file
item.beingReset = false
item.cond.Broadcast()
return ResetFailed, 0, err
}
item.fd = nil
}
spaceFreed = item.info.Rs.Size()
// This should not be possible. We get here only if cache data is not dirty.
if item._remove("cache out of space, item is clean") {
fs.Errorf(item.o, "vfs cache item removed when it was writing/uploaded")
}
// can we have an item with no dirty data (so that we can get here) and nil item.o at the same time?
fso := item.o
checkErr(item._checkObject(fso))
if err != nil {
item.beingReset = false
item.cond.Broadcast()
return ResetFailed, spaceFreed, err
}
osPath := item.c.toOSPath(item.name)
checkErr(item._createFile(osPath))
if err != nil {
item._remove("cache reset failed on _createFile, removed cache data file")
item.fd = nil // This allows a new Reset redo to have a clean state to deal with
if !fserrors.IsErrNoSpace(err) {
item.beingReset = false
item.cond.Broadcast()
}
return ResetFailed, spaceFreed, err
}
// Create the downloaders
if item.o != nil {
item.downloaders = downloaders.New(item, item.c.opt, item.name, item.o)
}
/* The item will stay in the beingReset state if we get an error that prevents us from
reaching this point. The cache purge loop will redo the failed Reset. */
item.beingReset = false
item.cond.Broadcast()
return ResetComplete, spaceFreed, err
}
// ProtectCache either waits for an ongoing cache reset to finish or increases pendingReads
// to protect against cache reset on this item while the thread potentially uses the cache file
// Cache cleaner waits until pendingReads is zero before resetting cache.
func (item *Item) preAccess() {
item.mu.Lock()
defer item.mu.Unlock()
if item.beingReset {
for {
item.cond.Wait()
if !item.beingReset {
break
}
}
}
item.pendingAccesses++
}
// postAccess reduces the pendingReads count enabling cache reset upon ENOSPC
func (item *Item) postAccess() {
item.mu.Lock()
defer item.mu.Unlock()
item.pendingAccesses--
item.cond.Broadcast()
}
// _present returns true if the whole file has been downloaded
//
// call with the lock held
@ -811,6 +1073,10 @@ func (item *Item) _ensure(offset, size int64) (err error) {
}
r := ranges.Range{Pos: offset, Size: size}
present := item.info.Rs.Present(r)
/* This statement simulates a cache space error for test purpose */
/* if present != true && item.info.Rs.Size() > 32*1024*1024 {
return errors.New("no space left on device")
} */
fs.Debugf(nil, "vfs cache: looking for range=%+v in %+v - present %v", r, item.info.Rs, present)
item.mu.Unlock()
defer item.mu.Lock()
@ -887,6 +1153,27 @@ func (item *Item) setModTime(modTime time.Time) {
// ReadAt bytes from the file at off
func (item *Item) ReadAt(b []byte, off int64) (n int, err error) {
n = 0
for retries := 0; retries < fs.Config.LowLevelRetries; retries++ {
item.preAccess()
n, err = item.readAt(b, off)
item.postAccess()
if err == nil {
break
}
fs.Errorf(item.name, "vfs cache: failed to _ensure cache %v", err)
if !fserrors.IsErrNoSpace(err) && err.Error() != "no space left on device" {
fs.Debugf(item.name, "vfs cache: failed to _ensure cache %v is not out of space", err)
break
}
item.c.KickCleaner()
}
return n, err
}
// ReadAt bytes from the file at off
func (item *Item) readAt(b []byte, off int64) (n int, err error) {
item.mu.Lock()
if item.fd == nil {
item.mu.Unlock()
@ -896,15 +1183,17 @@ func (item *Item) ReadAt(b []byte, off int64) (n int, err error) {
item.mu.Unlock()
return 0, io.EOF
}
defer item.mu.Unlock()
err = item._ensure(off, int64(len(b)))
if err != nil {
item.mu.Unlock()
return n, err
return 0, err
}
item.info.ATime = time.Now()
item.mu.Unlock()
// Do the reading with Item.mu unlocked
return item.fd.ReadAt(b, off)
// Do the reading with Item.mu unlocked and cache protected by preAccess
n, err = item.fd.ReadAt(b, off)
return n, err
}
// WriteAt bytes to the file at off