1
mirror of https://github.com/rclone/rclone synced 2025-01-03 03:46:24 +01:00

rc: expire remote cache and fix tests under race detector

This commit is contained in:
Nick Craig-Wood 2018-11-01 21:14:00 +00:00
parent b961e07c57
commit 8c8b58a7de
3 changed files with 101 additions and 10 deletions

View File

@ -4,29 +4,66 @@ package rc
import ( import (
"sync" "sync"
"time"
"github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs"
) )
var ( var (
fsCacheMu sync.Mutex fsCacheMu sync.Mutex
fsCache = map[string]fs.Fs{} fsCache = map[string]*cacheEntry{}
fsNewFs = fs.NewFs // for tests fsNewFs = fs.NewFs // for tests
expireRunning = false
cacheExpireDuration = 300 * time.Second // expire the cache entry when it is older than this
cacheExpireInterval = 60 * time.Second // interval to run the cache expire
) )
type cacheEntry struct {
f fs.Fs
fsString string
lastUsed time.Time
}
// GetCachedFs gets a fs.Fs named fsString either from the cache or creates it afresh // GetCachedFs gets a fs.Fs named fsString either from the cache or creates it afresh
func GetCachedFs(fsString string) (f fs.Fs, err error) { func GetCachedFs(fsString string) (f fs.Fs, err error) {
fsCacheMu.Lock() fsCacheMu.Lock()
defer fsCacheMu.Unlock() defer fsCacheMu.Unlock()
entry, ok := fsCache[fsString]
f = fsCache[fsString] if !ok {
if f == nil {
f, err = fsNewFs(fsString) f, err = fsNewFs(fsString)
if err == nil { if err != nil {
fsCache[fsString] = f return nil, err
}
entry = &cacheEntry{
f: f,
fsString: fsString,
}
fsCache[fsString] = entry
}
entry.lastUsed = time.Now()
if !expireRunning {
time.AfterFunc(cacheExpireInterval, cacheExpire)
expireRunning = true
}
return entry.f, err
}
// cacheExpire expires any entries that haven't been used recently
func cacheExpire() {
fsCacheMu.Lock()
defer fsCacheMu.Unlock()
now := time.Now()
for fsString, entry := range fsCache {
if now.Sub(entry.lastUsed) > cacheExpireDuration {
delete(fsCache, fsString)
} }
} }
return f, err if len(fsCache) != 0 {
time.AfterFunc(cacheExpireInterval, cacheExpire)
expireRunning = true
} else {
expireRunning = false
}
} }
// GetFsNamed gets a fs.Fs named fsName either from the cache or creates it afresh // GetFsNamed gets a fs.Fs named fsName either from the cache or creates it afresh

View File

@ -2,6 +2,7 @@ package rc
import ( import (
"testing" "testing"
"time"
"github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fstest/mockfs" "github.com/ncw/rclone/fstest/mockfs"
@ -22,7 +23,10 @@ func mockNewFs(t *testing.T) func() {
} }
return func() { return func() {
fsNewFs = oldFsNewFs fsNewFs = oldFsNewFs
fsCache = map[string]fs.Fs{} fsCacheMu.Lock()
fsCache = map[string]*cacheEntry{}
expireRunning = false
fsCacheMu.Unlock()
} }
} }
@ -42,6 +46,33 @@ func TestGetCachedFs(t *testing.T) {
assert.Equal(t, f, f2) assert.Equal(t, f, f2)
} }
func TestCacheExpire(t *testing.T) {
defer mockNewFs(t)()
cacheExpireInterval = time.Millisecond
assert.Equal(t, false, expireRunning)
_, err := GetCachedFs("/")
require.NoError(t, err)
fsCacheMu.Lock()
entry := fsCache["/"]
assert.Equal(t, 1, len(fsCache))
fsCacheMu.Unlock()
cacheExpire()
fsCacheMu.Lock()
assert.Equal(t, 1, len(fsCache))
entry.lastUsed = time.Now().Add(-cacheExpireDuration - 60*time.Second)
assert.Equal(t, true, expireRunning)
fsCacheMu.Unlock()
time.Sleep(10 * time.Millisecond)
fsCacheMu.Lock()
assert.Equal(t, false, expireRunning)
assert.Equal(t, 0, len(fsCache))
fsCacheMu.Unlock()
}
func TestGetFsNamed(t *testing.T) { func TestGetFsNamed(t *testing.T) {
defer mockNewFs(t)() defer mockNewFs(t)()

View File

@ -1,6 +1,7 @@
package rc package rc
import ( import (
"runtime"
"testing" "testing"
"time" "time"
@ -19,9 +20,13 @@ func TestJobsKickExpire(t *testing.T) {
jobs.expireInterval = time.Millisecond jobs.expireInterval = time.Millisecond
assert.Equal(t, false, jobs.expireRunning) assert.Equal(t, false, jobs.expireRunning)
jobs.kickExpire() jobs.kickExpire()
jobs.mu.Lock()
assert.Equal(t, true, jobs.expireRunning) assert.Equal(t, true, jobs.expireRunning)
jobs.mu.Unlock()
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
jobs.mu.Lock()
assert.Equal(t, false, jobs.expireRunning) assert.Equal(t, false, jobs.expireRunning)
jobs.mu.Unlock()
} }
func TestJobsExpire(t *testing.T) { func TestJobsExpire(t *testing.T) {
@ -37,11 +42,15 @@ func TestJobsExpire(t *testing.T) {
assert.Equal(t, 1, len(jobs.jobs)) assert.Equal(t, 1, len(jobs.jobs))
jobs.Expire() jobs.Expire()
assert.Equal(t, 1, len(jobs.jobs)) assert.Equal(t, 1, len(jobs.jobs))
jobs.mu.Lock()
job.EndTime = time.Now().Add(-expireDuration - 60*time.Second) job.EndTime = time.Now().Add(-expireDuration - 60*time.Second)
assert.Equal(t, true, jobs.expireRunning) assert.Equal(t, true, jobs.expireRunning)
jobs.mu.Unlock()
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
jobs.mu.Lock()
assert.Equal(t, false, jobs.expireRunning) assert.Equal(t, false, jobs.expireRunning)
assert.Equal(t, 0, len(jobs.jobs)) assert.Equal(t, 0, len(jobs.jobs))
jobs.mu.Unlock()
} }
var noopFn = func(in Params) (Params, error) { var noopFn = func(in Params) (Params, error) {
@ -127,13 +136,27 @@ func TestJobRunPanic(t *testing.T) {
jobs := newJobs() jobs := newJobs()
job := jobs.NewJob(boom, Params{}) job := jobs.NewJob(boom, Params{})
<-wait <-wait
runtime.Gosched() // yield to make sure job is updated
// Wait a short time for the panic to propagate
for i := uint(0); i < 10; i++ {
job.mu.Lock()
e := job.Error
job.mu.Unlock()
if e != "" {
break
}
time.Sleep(time.Millisecond << i)
}
job.mu.Lock()
assert.Equal(t, false, job.EndTime.IsZero()) assert.Equal(t, false, job.EndTime.IsZero())
assert.Equal(t, Params{}, job.Output) assert.Equal(t, Params{}, job.Output)
assert.NotEqual(t, 0.0, job.Duration) assert.NotEqual(t, 0.0, job.Duration)
assert.Equal(t, "panic received: boom", job.Error) assert.Equal(t, "panic received: boom", job.Error)
assert.Equal(t, false, job.Success) assert.Equal(t, false, job.Success)
assert.Equal(t, true, job.Finished) assert.Equal(t, true, job.Finished)
job.mu.Unlock()
} }
func TestJobsNewJob(t *testing.T) { func TestJobsNewJob(t *testing.T) {