1
mirror of https://github.com/rclone/rclone synced 2024-11-01 21:49:35 +01:00

amazonclouddrive: implement DirChangeNotify

Use the Changes API to invalidate cache entries.
The latest retrieved checkpoint is stored in the config file to allow
fast resumption after restart.
This commit is contained in:
Fabian Möller 2017-10-16 21:54:53 +02:00 committed by Nick Craig-Wood
parent 62b74d06ff
commit 115d24e1f7

View File

@ -19,6 +19,7 @@ import (
"net/http"
"path"
"regexp"
"sort"
"strings"
"time"
@ -1206,14 +1207,110 @@ func (o *Object) MimeType() string {
return ""
}
// DirChangeNotify polls for changes from the remote and hands the path to the
// given function. Only changes that can be resolved to a path through the
// DirCache will handled.
//
// Automatically restarts itself in case of unexpected behaviour of the remote.
//
// Close the returned channel to stop being notified.
func (f *Fs) DirChangeNotify(notifyFunc func(string), pollInterval time.Duration) chan bool {
checkpoint := config.FileGet(f.name, "checkpoint")
quit := make(chan bool)
go func() {
for {
checkpoint = f.dirchangeNotifyRunner(notifyFunc, checkpoint)
if err := config.SetValueAndSave(f.name, "checkpoint", checkpoint); err != nil {
fs.Debugf(f, "Unable to save checkpoint: %v", err)
}
select {
case <-quit:
return
case <-time.After(pollInterval):
}
}
}()
return quit
}
func (f *Fs) dirchangeNotifyRunner(notifyFunc func(string), checkpoint string) string {
var err error
var resp *http.Response
var reachedEnd bool
var csCount int
var nodeCount int
fs.Debugf(f, "Checking for changes on remote (Checkpoint %q)", checkpoint)
err = f.pacer.CallNoRetry(func() (bool, error) {
resp, err = f.c.Changes.GetChangesFunc(&acd.ChangesOptions{
Checkpoint: checkpoint,
IncludePurged: true,
}, func(changeSet *acd.ChangeSet, err error) error {
if err != nil {
return err
}
pathsToClear := make([]string, 0)
csCount++
nodeCount += len(changeSet.Nodes)
if changeSet.End {
reachedEnd = true
}
if changeSet.Checkpoint != "" {
checkpoint = changeSet.Checkpoint
}
for _, node := range changeSet.Nodes {
if path, ok := f.dirCache.GetInv(*node.Id); ok {
pathsToClear = append(pathsToClear, path)
}
for _, parent := range node.Parents {
if path, ok := f.dirCache.GetInv(parent); ok {
pathsToClear = append(pathsToClear, path)
}
}
}
lastNotifiedPath := ""
sort.Strings(pathsToClear)
for _, path := range pathsToClear {
if lastNotifiedPath != "" && (path == lastNotifiedPath || strings.HasPrefix(path+"/", lastNotifiedPath)) {
continue
}
lastNotifiedPath = path
notifyFunc(path)
}
return nil
})
return false, err
})
fs.Debugf(f, "Got %d ChangeSets with %d Nodes", csCount, nodeCount)
if err != nil && err != io.ErrUnexpectedEOF {
fs.Debugf(f, "Failed to get Changes: %v", err)
return checkpoint
}
if reachedEnd {
reachedEnd = false
fs.Debugf(f, "All changes were processed. Waiting for more.")
} else if checkpoint == "" {
fs.Debugf(f, "Did not get any checkpoint, something went wrong! %+v", resp)
}
return checkpoint
}
// Check the interfaces are satisfied
var (
_ fs.Fs = (*Fs)(nil)
_ fs.Purger = (*Fs)(nil)
// _ fs.Copier = (*Fs)(nil)
_ fs.Mover = (*Fs)(nil)
_ fs.DirMover = (*Fs)(nil)
_ fs.DirCacheFlusher = (*Fs)(nil)
_ fs.Object = (*Object)(nil)
_ fs.MimeTyper = &Object{}
_ fs.Mover = (*Fs)(nil)
_ fs.DirMover = (*Fs)(nil)
_ fs.DirCacheFlusher = (*Fs)(nil)
_ fs.DirChangeNotifier = (*Fs)(nil)
_ fs.Object = (*Object)(nil)
_ fs.MimeTyper = &Object{}
)