mirror of
https://github.com/rclone/rclone
synced 2024-11-24 01:26:25 +01:00
serve restic: implement object cache
This caches all the objects returned from the List call. This makes opening them much quicker so speeds up prune and restores. It also uses fewer transactions. It can be disabled with `--cache-objects=false`. This was discovered when using the B2 backend when the budget was being blown on list object calls which can avoided with a bit of caching. For typical 1 million file backup for a latop or server this will only use a small amount more memory.
This commit is contained in:
parent
83d48f65b6
commit
ceeac84cfe
75
cmd/serve/restic/cache.go
Normal file
75
cmd/serve/restic/cache.go
Normal file
@ -0,0 +1,75 @@
|
||||
package restic
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
)
|
||||
|
||||
// cache implements a simple object cache
|
||||
type cache struct {
|
||||
mu sync.RWMutex // protects the cache
|
||||
items map[string]fs.Object // cache of objects
|
||||
}
|
||||
|
||||
// create a new cache
|
||||
func newCache() *cache {
|
||||
return &cache{
|
||||
items: map[string]fs.Object{},
|
||||
}
|
||||
}
|
||||
|
||||
// find the object at remote or return nil
|
||||
func (c *cache) find(remote string) fs.Object {
|
||||
if !cacheObjects {
|
||||
return nil
|
||||
}
|
||||
c.mu.RLock()
|
||||
o := c.items[remote]
|
||||
c.mu.RUnlock()
|
||||
return o
|
||||
}
|
||||
|
||||
// add the object to the cache
|
||||
func (c *cache) add(remote string, o fs.Object) {
|
||||
if !cacheObjects {
|
||||
return
|
||||
}
|
||||
c.mu.Lock()
|
||||
c.items[remote] = o
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// remove the object from the cache
|
||||
func (c *cache) remove(remote string) {
|
||||
if !cacheObjects {
|
||||
return
|
||||
}
|
||||
c.mu.Lock()
|
||||
delete(c.items, remote)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// remove all the items with prefix from the cache
|
||||
func (c *cache) removePrefix(prefix string) {
|
||||
if !cacheObjects {
|
||||
return
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if !strings.HasSuffix(prefix, "/") {
|
||||
prefix += "/"
|
||||
}
|
||||
if prefix == "/" {
|
||||
c.items = map[string]fs.Object{}
|
||||
return
|
||||
}
|
||||
for key := range c.items {
|
||||
if strings.HasPrefix(key, prefix) {
|
||||
delete(c.items, key)
|
||||
}
|
||||
}
|
||||
}
|
55
cmd/serve/restic/cache_test.go
Normal file
55
cmd/serve/restic/cache_test.go
Normal file
@ -0,0 +1,55 @@
|
||||
package restic
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/rclone/rclone/fstest/mockobject"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func (c *cache) String() string {
|
||||
keys := []string{}
|
||||
c.mu.Lock()
|
||||
for k := range c.items {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
sort.Strings(keys)
|
||||
return strings.Join(keys, ",")
|
||||
}
|
||||
|
||||
func TestCacheCRUD(t *testing.T) {
|
||||
c := newCache()
|
||||
assert.Equal(t, "", c.String())
|
||||
assert.Nil(t, c.find("potato"))
|
||||
o := mockobject.New("potato")
|
||||
c.add(o.Remote(), o)
|
||||
assert.Equal(t, "potato", c.String())
|
||||
assert.Equal(t, o, c.find("potato"))
|
||||
c.remove("potato")
|
||||
assert.Equal(t, "", c.String())
|
||||
assert.Nil(t, c.find("potato"))
|
||||
c.remove("notfound")
|
||||
}
|
||||
|
||||
func TestCacheRemovePrefix(t *testing.T) {
|
||||
c := newCache()
|
||||
for _, remote := range []string{
|
||||
"a",
|
||||
"b",
|
||||
"b/1",
|
||||
"b/2/3",
|
||||
"b/2/4",
|
||||
"b/2",
|
||||
"c",
|
||||
} {
|
||||
c.add(remote, mockobject.New(remote))
|
||||
}
|
||||
assert.Equal(t, "a,b,b/1,b/2,b/2/3,b/2/4,c", c.String())
|
||||
c.removePrefix("b")
|
||||
assert.Equal(t, "a,b,c", c.String())
|
||||
c.removePrefix("/")
|
||||
assert.Equal(t, "", c.String())
|
||||
}
|
@ -2,6 +2,7 @@
|
||||
package restic
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net/http"
|
||||
@ -30,6 +31,7 @@ var (
|
||||
stdio bool
|
||||
appendOnly bool
|
||||
privateRepos bool
|
||||
cacheObjects bool
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -38,6 +40,7 @@ func init() {
|
||||
flags.BoolVarP(flagSet, &stdio, "stdio", "", false, "run an HTTP2 server on stdin/stdout")
|
||||
flags.BoolVarP(flagSet, &appendOnly, "append-only", "", false, "disallow deletion of repository data")
|
||||
flags.BoolVarP(flagSet, &privateRepos, "private-repos", "", false, "users can only access their private repo")
|
||||
flags.BoolVarP(flagSet, &cacheObjects, "cache-objects", "", true, "cache listed objects")
|
||||
}
|
||||
|
||||
// Command definition for cobra
|
||||
@ -77,6 +80,10 @@ with use of the "--addr" flag.
|
||||
|
||||
You might wish to start this server on boot.
|
||||
|
||||
Adding --cache-objects=false will cause rclone to stop caching objects
|
||||
returned from the List call. Caching is normally desirable as it speeds
|
||||
up downloading objects, saves transactions and uses very little memory.
|
||||
|
||||
### Setting up restic to use rclone ###
|
||||
|
||||
Now you can [follow the restic
|
||||
@ -161,7 +168,8 @@ const (
|
||||
// Server contains everything to run the Server
|
||||
type Server struct {
|
||||
*httplib.Server
|
||||
f fs.Fs
|
||||
f fs.Fs
|
||||
cache *cache
|
||||
}
|
||||
|
||||
// NewServer returns an HTTP server that speaks the rest protocol
|
||||
@ -170,6 +178,7 @@ func NewServer(f fs.Fs, opt *httplib.Options) *Server {
|
||||
s := &Server{
|
||||
Server: httplib.NewServer(mux, opt),
|
||||
f: f,
|
||||
cache: newCache(),
|
||||
}
|
||||
mux.HandleFunc(s.Opt.BaseURL+"/", s.ServeHTTP)
|
||||
return s
|
||||
@ -248,9 +257,24 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
// newObject returns an object with the remote given either from the
|
||||
// cache or directly
|
||||
func (s *Server) newObject(ctx context.Context, remote string) (fs.Object, error) {
|
||||
o := s.cache.find(remote)
|
||||
if o != nil {
|
||||
return o, nil
|
||||
}
|
||||
o, err := s.f.NewObject(ctx, remote)
|
||||
if err != nil {
|
||||
return o, err
|
||||
}
|
||||
s.cache.add(remote, o)
|
||||
return o, nil
|
||||
}
|
||||
|
||||
// get the remote
|
||||
func (s *Server) serveObject(w http.ResponseWriter, r *http.Request, remote string) {
|
||||
o, err := s.f.NewObject(r.Context(), remote)
|
||||
o, err := s.newObject(r.Context(), remote)
|
||||
if err != nil {
|
||||
fs.Debugf(remote, "%s request error: %v", r.Method, err)
|
||||
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
|
||||
@ -263,7 +287,7 @@ func (s *Server) serveObject(w http.ResponseWriter, r *http.Request, remote stri
|
||||
func (s *Server) postObject(w http.ResponseWriter, r *http.Request, remote string) {
|
||||
if appendOnly {
|
||||
// make sure the file does not exist yet
|
||||
_, err := s.f.NewObject(r.Context(), remote)
|
||||
_, err := s.newObject(r.Context(), remote)
|
||||
if err == nil {
|
||||
fs.Errorf(remote, "Post request: file already exists, refusing to overwrite in append-only mode")
|
||||
http.Error(w, http.StatusText(http.StatusForbidden), http.StatusForbidden)
|
||||
@ -272,7 +296,7 @@ func (s *Server) postObject(w http.ResponseWriter, r *http.Request, remote strin
|
||||
}
|
||||
}
|
||||
|
||||
_, err := operations.RcatSize(r.Context(), s.f, remote, r.Body, r.ContentLength, time.Now())
|
||||
o, err := operations.RcatSize(r.Context(), s.f, remote, r.Body, r.ContentLength, time.Now())
|
||||
if err != nil {
|
||||
err = accounting.Stats(r.Context()).Error(err)
|
||||
fs.Errorf(remote, "Post request rcat error: %v", err)
|
||||
@ -280,6 +304,9 @@ func (s *Server) postObject(w http.ResponseWriter, r *http.Request, remote strin
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// if successfully uploaded add to cache
|
||||
s.cache.add(remote, o)
|
||||
}
|
||||
|
||||
// delete the remote
|
||||
@ -294,7 +321,7 @@ func (s *Server) deleteObject(w http.ResponseWriter, r *http.Request, remote str
|
||||
}
|
||||
}
|
||||
|
||||
o, err := s.f.NewObject(r.Context(), remote)
|
||||
o, err := s.newObject(r.Context(), remote)
|
||||
if err != nil {
|
||||
fs.Debugf(remote, "Delete request error: %v", err)
|
||||
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
|
||||
@ -310,6 +337,9 @@ func (s *Server) deleteObject(w http.ResponseWriter, r *http.Request, remote str
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// remove object from cache
|
||||
s.cache.remove(remote)
|
||||
}
|
||||
|
||||
// listItem is an element returned for the restic v2 list response
|
||||
@ -321,14 +351,12 @@ type listItem struct {
|
||||
// return type for list
|
||||
type listItems []listItem
|
||||
|
||||
// add a DirEntry to the listItems
|
||||
func (ls *listItems) add(entry fs.DirEntry) {
|
||||
if o, ok := entry.(fs.Object); ok {
|
||||
*ls = append(*ls, listItem{
|
||||
Name: path.Base(o.Remote()),
|
||||
Size: o.Size(),
|
||||
})
|
||||
}
|
||||
// add an fs.Object to the listItems
|
||||
func (ls *listItems) add(o fs.Object) {
|
||||
*ls = append(*ls, listItem{
|
||||
Name: path.Base(o.Remote()),
|
||||
Size: o.Size(),
|
||||
})
|
||||
}
|
||||
|
||||
// listObjects lists all Objects of a given type in an arbitrary order.
|
||||
@ -344,10 +372,16 @@ func (s *Server) listObjects(w http.ResponseWriter, r *http.Request, remote stri
|
||||
// make sure an empty list is returned, and not a 'nil' value
|
||||
ls := listItems{}
|
||||
|
||||
// Remove all existing values from the cache
|
||||
s.cache.removePrefix(remote)
|
||||
|
||||
// if remote supports ListR use that directly, otherwise use recursive Walk
|
||||
err := walk.ListR(r.Context(), s.f, remote, true, -1, walk.ListObjects, func(entries fs.DirEntries) error {
|
||||
for _, entry := range entries {
|
||||
ls.add(entry)
|
||||
if o, ok := entry.(fs.Object); ok {
|
||||
ls.add(o)
|
||||
s.cache.add(o.Remote(), o)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user