mirror of
https://github.com/rclone/rclone
synced 2025-01-25 07:47:29 +01:00
vfs: use atomic types
This commit is contained in:
parent
552b6c47ff
commit
91b8152321
16
vfs/file.go
16
vfs/file.go
@ -37,7 +37,7 @@ import (
|
|||||||
// File represents a file
|
// File represents a file
|
||||||
type File struct {
|
type File struct {
|
||||||
inode uint64 // inode number - read only
|
inode uint64 // inode number - read only
|
||||||
size int64 // size of file - read and written with atomic int64 - must be 64 bit aligned
|
size atomic.Int64 // size of file
|
||||||
|
|
||||||
muRW sync.Mutex // synchronize RWFileHandle.openPending(), RWFileHandle.close() and File.Remove
|
muRW sync.Mutex // synchronize RWFileHandle.openPending(), RWFileHandle.close() and File.Remove
|
||||||
|
|
||||||
@ -51,7 +51,7 @@ type File struct {
|
|||||||
pendingModTime time.Time // will be applied once o becomes available, i.e. after file was written
|
pendingModTime time.Time // will be applied once o becomes available, i.e. after file was written
|
||||||
pendingRenameFun func(ctx context.Context) error // will be run/renamed after all writers close
|
pendingRenameFun func(ctx context.Context) error // will be run/renamed after all writers close
|
||||||
sys atomic.Value // user defined info to be attached here
|
sys atomic.Value // user defined info to be attached here
|
||||||
nwriters int32 // len(writers) which is read/updated with atomic
|
nwriters atomic.Int32 // len(writers)
|
||||||
appendMode bool // file was opened with O_APPEND
|
appendMode bool // file was opened with O_APPEND
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -67,7 +67,7 @@ func newFile(d *Dir, dPath string, o fs.Object, leaf string) *File {
|
|||||||
inode: newInode(),
|
inode: newInode(),
|
||||||
}
|
}
|
||||||
if o != nil {
|
if o != nil {
|
||||||
f.size = o.Size()
|
f.size.Store(o.Size())
|
||||||
}
|
}
|
||||||
return f
|
return f
|
||||||
}
|
}
|
||||||
@ -266,7 +266,7 @@ func (f *File) rename(ctx context.Context, destDir *Dir, newName string) error {
|
|||||||
func (f *File) addWriter(h Handle) {
|
func (f *File) addWriter(h Handle) {
|
||||||
f.mu.Lock()
|
f.mu.Lock()
|
||||||
f.writers = append(f.writers, h)
|
f.writers = append(f.writers, h)
|
||||||
atomic.AddInt32(&f.nwriters, 1)
|
f.nwriters.Add(1)
|
||||||
f.mu.Unlock()
|
f.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -284,7 +284,7 @@ func (f *File) delWriter(h Handle) {
|
|||||||
}
|
}
|
||||||
if found >= 0 {
|
if found >= 0 {
|
||||||
f.writers = append(f.writers[:found], f.writers[found+1:]...)
|
f.writers = append(f.writers[:found], f.writers[found+1:]...)
|
||||||
atomic.AddInt32(&f.nwriters, -1)
|
f.nwriters.Add(-1)
|
||||||
} else {
|
} else {
|
||||||
fs.Debugf(f._path(), "File.delWriter couldn't find handle")
|
fs.Debugf(f._path(), "File.delWriter couldn't find handle")
|
||||||
}
|
}
|
||||||
@ -295,7 +295,7 @@ func (f *File) delWriter(h Handle) {
|
|||||||
// Note that we don't take the mutex here. If we do then we can get a
|
// Note that we don't take the mutex here. If we do then we can get a
|
||||||
// deadlock.
|
// deadlock.
|
||||||
func (f *File) activeWriters() int {
|
func (f *File) activeWriters() int {
|
||||||
return int(atomic.LoadInt32(&f.nwriters))
|
return int(f.nwriters.Load())
|
||||||
}
|
}
|
||||||
|
|
||||||
// _roundModTime rounds the time passed in to the Precision of the
|
// _roundModTime rounds the time passed in to the Precision of the
|
||||||
@ -383,7 +383,7 @@ func (f *File) Size() int64 {
|
|||||||
|
|
||||||
// if o is nil it isn't valid yet or there are writers, so return the size so far
|
// if o is nil it isn't valid yet or there are writers, so return the size so far
|
||||||
if f._writingInProgress() {
|
if f._writingInProgress() {
|
||||||
return atomic.LoadInt64(&f.size)
|
return f.size.Load()
|
||||||
}
|
}
|
||||||
return nonNegative(f.o.Size())
|
return nonNegative(f.o.Size())
|
||||||
}
|
}
|
||||||
@ -473,7 +473,7 @@ func (f *File) writingInProgress() bool {
|
|||||||
|
|
||||||
// Update the size while writing
|
// Update the size while writing
|
||||||
func (f *File) setSize(n int64) {
|
func (f *File) setSize(n int64) {
|
||||||
atomic.StoreInt64(&f.size, n)
|
f.size.Store(n)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the object when written and add it to the directory
|
// Update the object when written and add it to the directory
|
||||||
|
@ -223,7 +223,7 @@ func waitSequential(what string, remote string, cond *sync.Cond, maxWait time.Du
|
|||||||
var (
|
var (
|
||||||
timeout = time.NewTimer(maxWait)
|
timeout = time.NewTimer(maxWait)
|
||||||
done = make(chan struct{})
|
done = make(chan struct{})
|
||||||
abort = int32(0)
|
abort atomic.Int32
|
||||||
)
|
)
|
||||||
go func() {
|
go func() {
|
||||||
select {
|
select {
|
||||||
@ -232,14 +232,14 @@ func waitSequential(what string, remote string, cond *sync.Cond, maxWait time.Du
|
|||||||
// cond.Broadcast. NB cond.L == mu
|
// cond.Broadcast. NB cond.L == mu
|
||||||
cond.L.Lock()
|
cond.L.Lock()
|
||||||
// set abort flag and give all the waiting goroutines a kick on timeout
|
// set abort flag and give all the waiting goroutines a kick on timeout
|
||||||
atomic.StoreInt32(&abort, 1)
|
abort.Store(1)
|
||||||
fs.Debugf(remote, "aborting in-sequence %s wait, off=%d", what, off)
|
fs.Debugf(remote, "aborting in-sequence %s wait, off=%d", what, off)
|
||||||
cond.Broadcast()
|
cond.Broadcast()
|
||||||
cond.L.Unlock()
|
cond.L.Unlock()
|
||||||
case <-done:
|
case <-done:
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
for *poff != off && atomic.LoadInt32(&abort) == 0 {
|
for *poff != off && abort.Load() == 0 {
|
||||||
fs.Debugf(remote, "waiting for in-sequence %s to %d for %v", what, off, maxWait)
|
fs.Debugf(remote, "waiting for in-sequence %s to %d for %v", what, off, maxWait)
|
||||||
cond.Wait()
|
cond.Wait()
|
||||||
}
|
}
|
||||||
|
@ -26,7 +26,7 @@ var (
|
|||||||
number = flag.Int("n", 4, "Number of tests to run simultaneously")
|
number = flag.Int("n", 4, "Number of tests to run simultaneously")
|
||||||
iterations = flag.Int("i", 100, "Iterations of the test")
|
iterations = flag.Int("i", 100, "Iterations of the test")
|
||||||
timeout = flag.Duration("timeout", 10*time.Second, "Inactivity time to detect a deadlock")
|
timeout = flag.Duration("timeout", 10*time.Second, "Inactivity time to detect a deadlock")
|
||||||
testNumber int32
|
testNumber atomic.Int32
|
||||||
)
|
)
|
||||||
|
|
||||||
// Seed the random number generator
|
// Seed the random number generator
|
||||||
@ -55,7 +55,7 @@ func NewTest(Dir string) *Test {
|
|||||||
dir: Dir,
|
dir: Dir,
|
||||||
name: random.String(*nameLength),
|
name: random.String(*nameLength),
|
||||||
isDir: rand.Intn(2) == 0,
|
isDir: rand.Intn(2) == 0,
|
||||||
number: atomic.AddInt32(&testNumber, 1),
|
number: testNumber.Add(1),
|
||||||
timer: time.NewTimer(*timeout),
|
timer: time.NewTimer(*timeout),
|
||||||
}
|
}
|
||||||
width := int(math.Floor(math.Log10(float64(*number)))) + 1
|
width := int(math.Floor(math.Log10(float64(*number)))) + 1
|
||||||
|
14
vfs/vfs.go
14
vfs/vfs.go
@ -167,7 +167,7 @@ type VFS struct {
|
|||||||
usageTime time.Time
|
usageTime time.Time
|
||||||
usage *fs.Usage
|
usage *fs.Usage
|
||||||
pollChan chan time.Duration
|
pollChan chan time.Duration
|
||||||
inUse int32 // count of number of opens accessed with atomic
|
inUse atomic.Int32 // count of number of opens
|
||||||
}
|
}
|
||||||
|
|
||||||
// Keep track of active VFS keyed on fs.ConfigString(f)
|
// Keep track of active VFS keyed on fs.ConfigString(f)
|
||||||
@ -182,8 +182,8 @@ func New(f fs.Fs, opt *vfscommon.Options) *VFS {
|
|||||||
fsDir := fs.NewDir("", time.Now())
|
fsDir := fs.NewDir("", time.Now())
|
||||||
vfs := &VFS{
|
vfs := &VFS{
|
||||||
f: f,
|
f: f,
|
||||||
inUse: int32(1),
|
|
||||||
}
|
}
|
||||||
|
vfs.inUse.Store(1)
|
||||||
|
|
||||||
// Make a copy of the options
|
// Make a copy of the options
|
||||||
if opt != nil {
|
if opt != nil {
|
||||||
@ -202,7 +202,7 @@ func New(f fs.Fs, opt *vfscommon.Options) *VFS {
|
|||||||
for _, activeVFS := range active[configName] {
|
for _, activeVFS := range active[configName] {
|
||||||
if vfs.Opt == activeVFS.Opt {
|
if vfs.Opt == activeVFS.Opt {
|
||||||
fs.Debugf(f, "Re-using VFS from active cache")
|
fs.Debugf(f, "Re-using VFS from active cache")
|
||||||
atomic.AddInt32(&activeVFS.inUse, 1)
|
activeVFS.inUse.Add(1)
|
||||||
return activeVFS
|
return activeVFS
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -243,7 +243,7 @@ func (vfs *VFS) Stats() (out rc.Params) {
|
|||||||
out = make(rc.Params)
|
out = make(rc.Params)
|
||||||
out["fs"] = fs.ConfigString(vfs.f)
|
out["fs"] = fs.ConfigString(vfs.f)
|
||||||
out["opt"] = vfs.Opt
|
out["opt"] = vfs.Opt
|
||||||
out["inUse"] = atomic.LoadInt32(&vfs.inUse)
|
out["inUse"] = vfs.inUse.Load()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
dirs int
|
dirs int
|
||||||
@ -313,7 +313,7 @@ func (vfs *VFS) shutdownCache() {
|
|||||||
// Shutdown stops any background go-routines and removes the VFS from
|
// Shutdown stops any background go-routines and removes the VFS from
|
||||||
// the active ache.
|
// the active ache.
|
||||||
func (vfs *VFS) Shutdown() {
|
func (vfs *VFS) Shutdown() {
|
||||||
if atomic.AddInt32(&vfs.inUse, -1) > 0 {
|
if vfs.inUse.Add(-1) > 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -386,11 +386,11 @@ func (vfs *VFS) Root() (*Dir, error) {
|
|||||||
return vfs.root, nil
|
return vfs.root, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var inodeCount uint64
|
var inodeCount atomic.Uint64
|
||||||
|
|
||||||
// newInode creates a new unique inode number
|
// newInode creates a new unique inode number
|
||||||
func newInode() (inode uint64) {
|
func newInode() (inode uint64) {
|
||||||
return atomic.AddUint64(&inodeCount, 1)
|
return inodeCount.Add(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stat finds the Node by path starting from the root
|
// Stat finds the Node by path starting from the root
|
||||||
|
Loading…
Reference in New Issue
Block a user