From 78a9e7440aca190c609f8d8127cc9be1f9fe6202 Mon Sep 17 00:00:00 2001 From: Max Sum Date: Sat, 30 Nov 2019 22:41:39 +0800 Subject: [PATCH] union: Implement multiple writable remotes --- backend/union/entry.go | 167 +++++++ backend/union/errors.go | 68 +++ backend/union/policy/all.go | 44 ++ backend/union/policy/epall.go | 99 +++++ backend/union/policy/epff.go | 115 +++++ backend/union/policy/eplfs.go | 116 +++++ backend/union/policy/eplno.go | 116 +++++ backend/union/policy/eplus.go | 116 +++++ backend/union/policy/epmfs.go | 115 +++++ backend/union/policy/eprand.go | 86 ++++ backend/union/policy/ff.go | 32 ++ backend/union/policy/lfs.go | 33 ++ backend/union/policy/lno.go | 33 ++ backend/union/policy/lus.go | 33 ++ backend/union/policy/mfs.go | 33 ++ backend/union/policy/newest.go | 149 +++++++ backend/union/policy/policy.go | 129 ++++++ backend/union/policy/rand.go | 83 ++++ backend/union/union.go | 673 ++++++++++++++++++++++------- backend/union/union_test.go | 143 +++++- backend/union/upstream/upstream.go | 348 +++++++++++++++ docs/content/union.md | 157 ++++++- 22 files changed, 2717 insertions(+), 171 deletions(-) create mode 100644 backend/union/entry.go create mode 100644 backend/union/errors.go create mode 100644 backend/union/policy/all.go create mode 100644 backend/union/policy/epall.go create mode 100644 backend/union/policy/epff.go create mode 100644 backend/union/policy/eplfs.go create mode 100644 backend/union/policy/eplno.go create mode 100644 backend/union/policy/eplus.go create mode 100644 backend/union/policy/epmfs.go create mode 100644 backend/union/policy/eprand.go create mode 100644 backend/union/policy/ff.go create mode 100644 backend/union/policy/lfs.go create mode 100644 backend/union/policy/lno.go create mode 100644 backend/union/policy/lus.go create mode 100644 backend/union/policy/mfs.go create mode 100644 backend/union/policy/newest.go create mode 100644 backend/union/policy/policy.go create mode 100644 backend/union/policy/rand.go create mode 100644 backend/union/upstream/upstream.go diff --git a/backend/union/entry.go b/backend/union/entry.go new file mode 100644 index 000000000..232241f28 --- /dev/null +++ b/backend/union/entry.go @@ -0,0 +1,167 @@ +package union + +import ( + "bufio" + "context" + "io" + "sync" + "time" + + "github.com/pkg/errors" + "github.com/rclone/rclone/backend/union/upstream" + "github.com/rclone/rclone/fs" +) + +// Object describes a union Object +// +// This is a wrapped object which returns the Union Fs as its parent +type Object struct { + *upstream.Object + fs *Fs // what this object is part of + co []upstream.Entry +} + +// Directory describes a union Directory +// +// This is a wrapped object contains all candidates +type Directory struct { + *upstream.Directory + cd []upstream.Entry +} + +type entry interface { + upstream.Entry + candidates() []upstream.Entry +} + +// UnWrap returns the Object that this Object is wrapping or +// nil if it isn't wrapping anything +func (o *Object) UnWrap() *upstream.Object { + return o.Object +} + +// Fs returns the union Fs as the parent +func (o *Object) Fs() fs.Info { + return o.fs +} + +func (o *Object) candidates() []upstream.Entry { + return o.co +} + +func (d *Directory) candidates() []upstream.Entry { + return d.cd +} + +// Update in to the object with the modTime given of the given size +// +// When called from outside a Fs by rclone, src.Size() will always be >= 0. +// But for unknown-sized objects (indicated by src.Size() == -1), Upload should either +// return an error or update the object properly (rather than e.g. calling panic). +func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { + entries, err := o.fs.actionEntries(o.candidates()...) + if err != nil { + return err + } + if len(entries) == 1 { + obj := entries[0].(*upstream.Object) + return obj.Update(ctx, in, src, options...) + } + // Get multiple reader + readers := make([]io.Reader, len(entries)) + writers := make([]io.Writer, len(entries)) + errs := Errors(make([]error, len(entries)+1)) + for i := range entries { + r, w := io.Pipe() + bw := bufio.NewWriter(w) + readers[i], writers[i] = r, bw + defer func() { + err := w.Close() + if err != nil { + panic(err) + } + }() + } + go func() { + mw := io.MultiWriter(writers...) + es := make([]error, len(writers)+1) + _, es[len(es)-1] = io.Copy(mw, in) + for i, bw := range writers { + es[i] = bw.(*bufio.Writer).Flush() + } + errs[len(entries)] = Errors(es).Err() + }() + // Multi-threading + multithread(len(entries), func(i int) { + if o, ok := entries[i].(*upstream.Object); ok { + err := o.Update(ctx, readers[i], src, options...) + errs[i] = errors.Wrap(err, o.UpstreamFs().Name()) + } else { + errs[i] = fs.ErrorNotAFile + } + }) + return errs.Err() +} + +// Remove candidate objects selected by ACTION policy +func (o *Object) Remove(ctx context.Context) error { + entries, err := o.fs.actionEntries(o.candidates()...) + if err != nil { + return err + } + errs := Errors(make([]error, len(entries))) + multithread(len(entries), func(i int) { + if o, ok := entries[i].(*upstream.Object); ok { + err := o.Remove(ctx) + errs[i] = errors.Wrap(err, o.UpstreamFs().Name()) + } else { + errs[i] = fs.ErrorNotAFile + } + }) + return errs.Err() +} + +// SetModTime sets the metadata on the object to set the modification date +func (o *Object) SetModTime(ctx context.Context, t time.Time) error { + entries, err := o.fs.actionEntries(o.candidates()...) + if err != nil { + return err + } + var wg sync.WaitGroup + errs := Errors(make([]error, len(entries))) + multithread(len(entries), func(i int) { + if o, ok := entries[i].(*upstream.Object); ok { + err := o.SetModTime(ctx, t) + errs[i] = errors.Wrap(err, o.UpstreamFs().Name()) + } else { + errs[i] = fs.ErrorNotAFile + } + }) + wg.Wait() + return errs.Err() +} + +// ModTime returns the modification date of the directory +// It returns the latest ModTime of all candidates +func (d *Directory) ModTime(ctx context.Context) (t time.Time) { + entries := d.candidates() + times := make([]time.Time, len(entries)) + multithread(len(entries), func(i int) { + times[i] = entries[i].ModTime(ctx) + }) + for _, ti := range times { + if t.Before(ti) { + t = ti + } + } + return t +} + +// Size returns the size of the directory +// It returns the sum of all candidates +func (d *Directory) Size() (s int64) { + for _, e := range d.candidates() { + s += e.Size() + } + return s +} diff --git a/backend/union/errors.go b/backend/union/errors.go new file mode 100644 index 000000000..1249dd540 --- /dev/null +++ b/backend/union/errors.go @@ -0,0 +1,68 @@ +package union + +import ( + "bytes" + "fmt" +) + +// The Errors type wraps a slice of errors +type Errors []error + +// Map returns a copy of the error slice with all its errors modified +// according to the mapping function. If mapping returns nil, +// the error is dropped from the error slice with no replacement. +func (e Errors) Map(mapping func(error) error) Errors { + s := make([]error, len(e)) + i := 0 + for _, err := range e { + nerr := mapping(err) + if nerr == nil { + continue + } + s[i] = nerr + i++ + } + return Errors(s[:i]) +} + +// FilterNil returns the Errors without nil +func (e Errors) FilterNil() Errors { + ne := e.Map(func(err error) error { + return err + }) + return ne +} + +// Err returns a error interface that filtered nil, +// or nil if no non-nil Error is presented. +func (e Errors) Err() error { + ne := e.FilterNil() + if len(ne) == 0 { + return nil + } + return ne +} + +// Error returns a concatenated string of the contained errors +func (e Errors) Error() string { + var buf bytes.Buffer + + if len(e) == 0 { + buf.WriteString("no error") + } + if len(e) == 1 { + buf.WriteString("1 error: ") + } else { + fmt.Fprintf(&buf, "%d errors: ", len(e)) + } + + for i, err := range e { + if i != 0 { + buf.WriteString("; ") + } + + buf.WriteString(err.Error()) + } + + return buf.String() +} diff --git a/backend/union/policy/all.go b/backend/union/policy/all.go new file mode 100644 index 000000000..06589220d --- /dev/null +++ b/backend/union/policy/all.go @@ -0,0 +1,44 @@ +package policy + +import ( + "context" + + "github.com/rclone/rclone/backend/union/upstream" + "github.com/rclone/rclone/fs" +) + +func init() { + registerPolicy("all", &All{}) +} + +// All policy behaves the same as EpAll except for the CREATE category +// Action category: same as epall. +// Create category: apply to all branches. +// Search category: same as epall. +type All struct { + EpAll +} + +// Create category policy, governing the creation of files and directories +func (p *All) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams = filterNC(upstreams) + if len(upstreams) == 0 { + return nil, fs.ErrorPermissionDenied + } + return upstreams, nil +} + +// CreateEntries is CREATE category policy but receving a set of candidate entries +func (p *All) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + entries = filterNCEntries(entries) + if len(entries) == 0 { + return nil, fs.ErrorPermissionDenied + } + return entries, nil +} diff --git a/backend/union/policy/epall.go b/backend/union/policy/epall.go new file mode 100644 index 000000000..666820a29 --- /dev/null +++ b/backend/union/policy/epall.go @@ -0,0 +1,99 @@ +package policy + +import ( + "context" + "path" + "sync" + + "github.com/rclone/rclone/backend/union/upstream" + "github.com/rclone/rclone/fs" +) + +func init() { + registerPolicy("epall", &EpAll{}) +} + +// EpAll stands for existing path, all +// Action category: apply to all found. +// Create category: apply to all found. +// Search category: same as epff. +type EpAll struct { + EpFF +} + +func (p *EpAll) epall(ctx context.Context, upstreams []*upstream.Fs, filePath string) ([]*upstream.Fs, error) { + var wg sync.WaitGroup + ufs := make([]*upstream.Fs, len(upstreams)) + for i, u := range upstreams { + wg.Add(1) + i, u := i, u // Closure + go func() { + rfs := u.RootFs + remote := path.Join(u.RootPath, filePath) + if findEntry(ctx, rfs, remote) != nil { + ufs[i] = u + } + wg.Done() + }() + } + wg.Wait() + var results []*upstream.Fs + for _, f := range ufs { + if f != nil { + results = append(results, f) + } + } + if len(results) == 0 { + return nil, fs.ErrorObjectNotFound + } + return results, nil +} + +// Action category policy, governing the modification of files and directories +func (p *EpAll) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams = filterRO(upstreams) + if len(upstreams) == 0 { + return nil, fs.ErrorPermissionDenied + } + return p.epall(ctx, upstreams, path) +} + +// ActionEntries is ACTION category policy but receving a set of candidate entries +func (p *EpAll) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + entries = filterROEntries(entries) + if len(entries) == 0 { + return nil, fs.ErrorPermissionDenied + } + return entries, nil +} + +// Create category policy, governing the creation of files and directories +func (p *EpAll) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams = filterNC(upstreams) + if len(upstreams) == 0 { + return nil, fs.ErrorPermissionDenied + } + upstreams, err := p.epall(ctx, upstreams, path+"/..") + return upstreams, err +} + +// CreateEntries is CREATE category policy but receving a set of candidate entries +func (p *EpAll) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + entries = filterNCEntries(entries) + if len(entries) == 0 { + return nil, fs.ErrorPermissionDenied + } + return entries, nil +} diff --git a/backend/union/policy/epff.go b/backend/union/policy/epff.go new file mode 100644 index 000000000..e294cdadb --- /dev/null +++ b/backend/union/policy/epff.go @@ -0,0 +1,115 @@ +package policy + +import ( + "context" + "path" + + "github.com/rclone/rclone/backend/union/upstream" + "github.com/rclone/rclone/fs" +) + +func init() { + registerPolicy("epff", &EpFF{}) +} + +// EpFF stands for existing path, first found +// Given the order of the candidates, act on the first one found where the relative path exists. +type EpFF struct{} + +func (p *EpFF) epff(ctx context.Context, upstreams []*upstream.Fs, filePath string) (*upstream.Fs, error) { + ch := make(chan *upstream.Fs) + for _, u := range upstreams { + u := u // Closure + go func() { + rfs := u.RootFs + remote := path.Join(u.RootPath, filePath) + if findEntry(ctx, rfs, remote) == nil { + u = nil + } + ch <- u + }() + } + var u *upstream.Fs + for i := 0; i < len(upstreams); i++ { + u = <-ch + if u != nil { + // close remaining goroutines + go func(num int) { + defer close(ch) + for i := 0; i < num; i++ { + <-ch + } + }(len(upstreams) - 1 - i) + } + } + if u == nil { + return nil, fs.ErrorObjectNotFound + } + return u, nil +} + +// Action category policy, governing the modification of files and directories +func (p *EpFF) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams = filterRO(upstreams) + if len(upstreams) == 0 { + return nil, fs.ErrorPermissionDenied + } + u, err := p.epff(ctx, upstreams, path) + return []*upstream.Fs{u}, err +} + +// ActionEntries is ACTION category policy but receving a set of candidate entries +func (p *EpFF) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + entries = filterROEntries(entries) + if len(entries) == 0 { + return nil, fs.ErrorPermissionDenied + } + return entries[:1], nil +} + +// Create category policy, governing the creation of files and directories +func (p *EpFF) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams = filterNC(upstreams) + if len(upstreams) == 0 { + return nil, fs.ErrorPermissionDenied + } + u, err := p.epff(ctx, upstreams, path+"/..") + return []*upstream.Fs{u}, err +} + +// CreateEntries is CREATE category policy but receving a set of candidate entries +func (p *EpFF) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + entries = filterNCEntries(entries) + if len(entries) == 0 { + return nil, fs.ErrorPermissionDenied + } + return entries[:1], nil +} + +// Search category policy, governing the access to files and directories +func (p *EpFF) Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + return p.epff(ctx, upstreams, path) +} + +// SearchEntries is SEARCH category policy but receving a set of candidate entries +func (p *EpFF) SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + return entries[0], nil +} diff --git a/backend/union/policy/eplfs.go b/backend/union/policy/eplfs.go new file mode 100644 index 000000000..9b6107eb2 --- /dev/null +++ b/backend/union/policy/eplfs.go @@ -0,0 +1,116 @@ +package policy + +import ( + "context" + "math" + + "github.com/rclone/rclone/backend/union/upstream" + "github.com/rclone/rclone/fs" +) + +func init() { + registerPolicy("eplfs", &EpLfs{}) +} + +// EpLfs stands for existing path, least free space +// Of all the candidates on which the path exists choose the one with the least free space. +type EpLfs struct { + EpAll +} + +func (p *EpLfs) lfs(upstreams []*upstream.Fs) (*upstream.Fs, error) { + var minFreeSpace int64 = math.MaxInt64 + var lfsupstream *upstream.Fs + for _, u := range upstreams { + space, err := u.GetFreeSpace() + if err != nil { + fs.LogPrintf(fs.LogLevelNotice, nil, + "Free Space is not supported for upstream %s, treating as infinite", u.Name()) + } + if space < minFreeSpace { + minFreeSpace = space + lfsupstream = u + } + } + if lfsupstream == nil { + return nil, fs.ErrorObjectNotFound + } + return lfsupstream, nil +} + +func (p *EpLfs) lfsEntries(entries []upstream.Entry) (upstream.Entry, error) { + var minFreeSpace int64 + var lfsEntry upstream.Entry + for _, e := range entries { + space, err := e.UpstreamFs().GetFreeSpace() + if err != nil { + fs.LogPrintf(fs.LogLevelNotice, nil, + "Free Space is not supported for upstream %s, treating as infinite", e.UpstreamFs().Name()) + } + if space < minFreeSpace { + minFreeSpace = space + lfsEntry = e + } + } + return lfsEntry, nil +} + +// Action category policy, governing the modification of files and directories +func (p *EpLfs) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + upstreams, err := p.EpAll.Action(ctx, upstreams, path) + if err != nil { + return nil, err + } + u, err := p.lfs(upstreams) + return []*upstream.Fs{u}, err +} + +// ActionEntries is ACTION category policy but receving a set of candidate entries +func (p *EpLfs) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + entries, err := p.EpAll.ActionEntries(entries...) + if err != nil { + return nil, err + } + e, err := p.lfsEntries(entries) + return []upstream.Entry{e}, err +} + +// Create category policy, governing the creation of files and directories +func (p *EpLfs) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + upstreams, err := p.EpAll.Create(ctx, upstreams, path) + if err != nil { + return nil, err + } + u, err := p.lfs(upstreams) + return []*upstream.Fs{u}, err +} + +// CreateEntries is CREATE category policy but receving a set of candidate entries +func (p *EpLfs) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + entries, err := p.EpAll.CreateEntries(entries...) + if err != nil { + return nil, err + } + e, err := p.lfsEntries(entries) + return []upstream.Entry{e}, err +} + +// Search category policy, governing the access to files and directories +func (p *EpLfs) Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams, err := p.epall(ctx, upstreams, path) + if err != nil { + return nil, err + } + return p.lfs(upstreams) +} + +// SearchEntries is SEARCH category policy but receving a set of candidate entries +func (p *EpLfs) SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + return p.lfsEntries(entries) +} diff --git a/backend/union/policy/eplno.go b/backend/union/policy/eplno.go new file mode 100644 index 000000000..b81f85c31 --- /dev/null +++ b/backend/union/policy/eplno.go @@ -0,0 +1,116 @@ +package policy + +import ( + "context" + "math" + + "github.com/rclone/rclone/backend/union/upstream" + "github.com/rclone/rclone/fs" +) + +func init() { + registerPolicy("eplno", &EpLno{}) +} + +// EpLno stands for existing path, least number of objects +// Of all the candidates on which the path exists choose the one with the least number of objects +type EpLno struct { + EpAll +} + +func (p *EpLno) lno(upstreams []*upstream.Fs) (*upstream.Fs, error) { + var minNumObj int64 = math.MaxInt64 + var lnoUpstream *upstream.Fs + for _, u := range upstreams { + numObj, err := u.GetNumObjects() + if err != nil { + fs.LogPrintf(fs.LogLevelNotice, nil, + "Number of Objects is not supported for upstream %s, treating as 0", u.Name()) + } + if minNumObj > numObj { + minNumObj = numObj + lnoUpstream = u + } + } + if lnoUpstream == nil { + return nil, fs.ErrorObjectNotFound + } + return lnoUpstream, nil +} + +func (p *EpLno) lnoEntries(entries []upstream.Entry) (upstream.Entry, error) { + var minNumObj int64 = math.MaxInt64 + var lnoEntry upstream.Entry + for _, e := range entries { + numObj, err := e.UpstreamFs().GetNumObjects() + if err != nil { + fs.LogPrintf(fs.LogLevelNotice, nil, + "Number of Objects is not supported for upstream %s, treating as 0", e.UpstreamFs().Name()) + } + if minNumObj > numObj { + minNumObj = numObj + lnoEntry = e + } + } + return lnoEntry, nil +} + +// Action category policy, governing the modification of files and directories +func (p *EpLno) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + upstreams, err := p.EpAll.Action(ctx, upstreams, path) + if err != nil { + return nil, err + } + u, err := p.lno(upstreams) + return []*upstream.Fs{u}, err +} + +// ActionEntries is ACTION category policy but receving a set of candidate entries +func (p *EpLno) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + entries, err := p.EpAll.ActionEntries(entries...) + if err != nil { + return nil, err + } + e, err := p.lnoEntries(entries) + return []upstream.Entry{e}, err +} + +// Create category policy, governing the creation of files and directories +func (p *EpLno) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + upstreams, err := p.EpAll.Create(ctx, upstreams, path) + if err != nil { + return nil, err + } + u, err := p.lno(upstreams) + return []*upstream.Fs{u}, err +} + +// CreateEntries is CREATE category policy but receving a set of candidate entries +func (p *EpLno) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + entries, err := p.EpAll.CreateEntries(entries...) + if err != nil { + return nil, err + } + e, err := p.lnoEntries(entries) + return []upstream.Entry{e}, err +} + +// Search category policy, governing the access to files and directories +func (p *EpLno) Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams, err := p.epall(ctx, upstreams, path) + if err != nil { + return nil, err + } + return p.lno(upstreams) +} + +// SearchEntries is SEARCH category policy but receving a set of candidate entries +func (p *EpLno) SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + return p.lnoEntries(entries) +} diff --git a/backend/union/policy/eplus.go b/backend/union/policy/eplus.go new file mode 100644 index 000000000..ef8a963b0 --- /dev/null +++ b/backend/union/policy/eplus.go @@ -0,0 +1,116 @@ +package policy + +import ( + "context" + "math" + + "github.com/rclone/rclone/backend/union/upstream" + "github.com/rclone/rclone/fs" +) + +func init() { + registerPolicy("eplus", &EpLus{}) +} + +// EpLus stands for existing path, least used space +// Of all the candidates on which the path exists choose the one with the least used space. +type EpLus struct { + EpAll +} + +func (p *EpLus) lus(upstreams []*upstream.Fs) (*upstream.Fs, error) { + var minUsedSpace int64 = math.MaxInt64 + var lusupstream *upstream.Fs + for _, u := range upstreams { + space, err := u.GetUsedSpace() + if err != nil { + fs.LogPrintf(fs.LogLevelNotice, nil, + "Used Space is not supported for upstream %s, treating as 0", u.Name()) + } + if space < minUsedSpace { + minUsedSpace = space + lusupstream = u + } + } + if lusupstream == nil { + return nil, fs.ErrorObjectNotFound + } + return lusupstream, nil +} + +func (p *EpLus) lusEntries(entries []upstream.Entry) (upstream.Entry, error) { + var minUsedSpace int64 + var lusEntry upstream.Entry + for _, e := range entries { + space, err := e.UpstreamFs().GetFreeSpace() + if err != nil { + fs.LogPrintf(fs.LogLevelNotice, nil, + "Used Space is not supported for upstream %s, treating as 0", e.UpstreamFs().Name()) + } + if space < minUsedSpace { + minUsedSpace = space + lusEntry = e + } + } + return lusEntry, nil +} + +// Action category policy, governing the modification of files and directories +func (p *EpLus) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + upstreams, err := p.EpAll.Action(ctx, upstreams, path) + if err != nil { + return nil, err + } + u, err := p.lus(upstreams) + return []*upstream.Fs{u}, err +} + +// ActionEntries is ACTION category policy but receving a set of candidate entries +func (p *EpLus) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + entries, err := p.EpAll.ActionEntries(entries...) + if err != nil { + return nil, err + } + e, err := p.lusEntries(entries) + return []upstream.Entry{e}, err +} + +// Create category policy, governing the creation of files and directories +func (p *EpLus) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + upstreams, err := p.EpAll.Create(ctx, upstreams, path) + if err != nil { + return nil, err + } + u, err := p.lus(upstreams) + return []*upstream.Fs{u}, err +} + +// CreateEntries is CREATE category policy but receving a set of candidate entries +func (p *EpLus) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + entries, err := p.EpAll.CreateEntries(entries...) + if err != nil { + return nil, err + } + e, err := p.lusEntries(entries) + return []upstream.Entry{e}, err +} + +// Search category policy, governing the access to files and directories +func (p *EpLus) Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams, err := p.epall(ctx, upstreams, path) + if err != nil { + return nil, err + } + return p.lus(upstreams) +} + +// SearchEntries is SEARCH category policy but receving a set of candidate entries +func (p *EpLus) SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + return p.lusEntries(entries) +} diff --git a/backend/union/policy/epmfs.go b/backend/union/policy/epmfs.go new file mode 100644 index 000000000..1370411c0 --- /dev/null +++ b/backend/union/policy/epmfs.go @@ -0,0 +1,115 @@ +package policy + +import ( + "context" + + "github.com/rclone/rclone/backend/union/upstream" + "github.com/rclone/rclone/fs" +) + +func init() { + registerPolicy("epmfs", &EpMfs{}) +} + +// EpMfs stands for existing path, most free space +// Of all the candidates on which the path exists choose the one with the most free space. +type EpMfs struct { + EpAll +} + +func (p *EpMfs) mfs(upstreams []*upstream.Fs) (*upstream.Fs, error) { + var maxFreeSpace int64 + var mfsupstream *upstream.Fs + for _, u := range upstreams { + space, err := u.GetFreeSpace() + if err != nil { + fs.LogPrintf(fs.LogLevelNotice, nil, + "Free Space is not supported for upstream %s, treating as infinite", u.Name()) + } + if maxFreeSpace < space { + maxFreeSpace = space + mfsupstream = u + } + } + if mfsupstream == nil { + return nil, fs.ErrorObjectNotFound + } + return mfsupstream, nil +} + +func (p *EpMfs) mfsEntries(entries []upstream.Entry) (upstream.Entry, error) { + var maxFreeSpace int64 + var mfsEntry upstream.Entry + for _, e := range entries { + space, err := e.UpstreamFs().GetFreeSpace() + if err != nil { + fs.LogPrintf(fs.LogLevelNotice, nil, + "Free Space is not supported for upstream %s, treating as infinite", e.UpstreamFs().Name()) + } + if maxFreeSpace < space { + maxFreeSpace = space + mfsEntry = e + } + } + return mfsEntry, nil +} + +// Action category policy, governing the modification of files and directories +func (p *EpMfs) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + upstreams, err := p.EpAll.Action(ctx, upstreams, path) + if err != nil { + return nil, err + } + u, err := p.mfs(upstreams) + return []*upstream.Fs{u}, err +} + +// ActionEntries is ACTION category policy but receving a set of candidate entries +func (p *EpMfs) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + entries, err := p.EpAll.ActionEntries(entries...) + if err != nil { + return nil, err + } + e, err := p.mfsEntries(entries) + return []upstream.Entry{e}, err +} + +// Create category policy, governing the creation of files and directories +func (p *EpMfs) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + upstreams, err := p.EpAll.Create(ctx, upstreams, path) + if err != nil { + return nil, err + } + u, err := p.mfs(upstreams) + return []*upstream.Fs{u}, err +} + +// CreateEntries is CREATE category policy but receving a set of candidate entries +func (p *EpMfs) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + entries, err := p.EpAll.CreateEntries(entries...) + if err != nil { + return nil, err + } + e, err := p.mfsEntries(entries) + return []upstream.Entry{e}, err +} + +// Search category policy, governing the access to files and directories +func (p *EpMfs) Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams, err := p.epall(ctx, upstreams, path) + if err != nil { + return nil, err + } + return p.mfs(upstreams) +} + +// SearchEntries is SEARCH category policy but receving a set of candidate entries +func (p *EpMfs) SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + return p.mfsEntries(entries) +} diff --git a/backend/union/policy/eprand.go b/backend/union/policy/eprand.go new file mode 100644 index 000000000..bc137ac78 --- /dev/null +++ b/backend/union/policy/eprand.go @@ -0,0 +1,86 @@ +package policy + +import ( + "context" + "math/rand" + "time" + + "github.com/rclone/rclone/backend/union/upstream" + "github.com/rclone/rclone/fs" +) + +func init() { + registerPolicy("eprand", &EpRand{}) +} + +// EpRand stands for existing path, random +// Calls epall and then randomizes. Returns one candidate. +type EpRand struct { + EpAll +} + +func (p *EpRand) rand(upstreams []*upstream.Fs) *upstream.Fs { + rand.Seed(time.Now().Unix()) + return upstreams[rand.Intn(len(upstreams))] +} + +func (p *EpRand) randEntries(entries []upstream.Entry) upstream.Entry { + rand.Seed(time.Now().Unix()) + return entries[rand.Intn(len(entries))] +} + +// Action category policy, governing the modification of files and directories +func (p *EpRand) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + upstreams, err := p.EpAll.Action(ctx, upstreams, path) + if err != nil { + return nil, err + } + return []*upstream.Fs{p.rand(upstreams)}, nil +} + +// ActionEntries is ACTION category policy but receving a set of candidate entries +func (p *EpRand) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + entries, err := p.EpAll.ActionEntries(entries...) + if err != nil { + return nil, err + } + return []upstream.Entry{p.randEntries(entries)}, nil +} + +// Create category policy, governing the creation of files and directories +func (p *EpRand) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + upstreams, err := p.EpAll.Create(ctx, upstreams, path) + if err != nil { + return nil, err + } + return []*upstream.Fs{p.rand(upstreams)}, nil +} + +// CreateEntries is CREATE category policy but receving a set of candidate entries +func (p *EpRand) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + entries, err := p.EpAll.CreateEntries(entries...) + if err != nil { + return nil, err + } + return []upstream.Entry{p.randEntries(entries)}, nil +} + +// Search category policy, governing the access to files and directories +func (p *EpRand) Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams, err := p.epall(ctx, upstreams, path) + if err != nil { + return nil, err + } + return p.rand(upstreams), nil +} + +// SearchEntries is SEARCH category policy but receving a set of candidate entries +func (p *EpRand) SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + return p.randEntries(entries), nil +} diff --git a/backend/union/policy/ff.go b/backend/union/policy/ff.go new file mode 100644 index 000000000..280c81cc2 --- /dev/null +++ b/backend/union/policy/ff.go @@ -0,0 +1,32 @@ +package policy + +import ( + "context" + + "github.com/rclone/rclone/backend/union/upstream" + "github.com/rclone/rclone/fs" +) + +func init() { + registerPolicy("ff", &FF{}) +} + +// FF stands for first found +// Search category: same as epff. +// Action category: same as epff. +// Create category: Given the order of the candiates, act on the first one found. +type FF struct { + EpFF +} + +// Create category policy, governing the creation of files and directories +func (p *FF) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams = filterNC(upstreams) + if len(upstreams) == 0 { + return upstreams, fs.ErrorPermissionDenied + } + return upstreams[:1], nil +} diff --git a/backend/union/policy/lfs.go b/backend/union/policy/lfs.go new file mode 100644 index 000000000..972906ee1 --- /dev/null +++ b/backend/union/policy/lfs.go @@ -0,0 +1,33 @@ +package policy + +import ( + "context" + + "github.com/rclone/rclone/backend/union/upstream" + "github.com/rclone/rclone/fs" +) + +func init() { + registerPolicy("lfs", &Lfs{}) +} + +// Lfs stands for least free space +// Search category: same as eplfs. +// Action category: same as eplfs. +// Create category: Pick the drive with the least free space. +type Lfs struct { + EpLfs +} + +// Create category policy, governing the creation of files and directories +func (p *Lfs) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams = filterNC(upstreams) + if len(upstreams) == 0 { + return nil, fs.ErrorPermissionDenied + } + u, err := p.lfs(upstreams) + return []*upstream.Fs{u}, err +} diff --git a/backend/union/policy/lno.go b/backend/union/policy/lno.go new file mode 100644 index 000000000..7d9a99d8a --- /dev/null +++ b/backend/union/policy/lno.go @@ -0,0 +1,33 @@ +package policy + +import ( + "context" + + "github.com/rclone/rclone/backend/union/upstream" + "github.com/rclone/rclone/fs" +) + +func init() { + registerPolicy("lno", &Lno{}) +} + +// Lno stands for least number of objects +// Search category: same as eplno. +// Action category: same as eplno. +// Create category: Pick the drive with the least number of objects. +type Lno struct { + EpLno +} + +// Create category policy, governing the creation of files and directories +func (p *Lno) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams = filterNC(upstreams) + if len(upstreams) == 0 { + return nil, fs.ErrorPermissionDenied + } + u, err := p.lno(upstreams) + return []*upstream.Fs{u}, err +} diff --git a/backend/union/policy/lus.go b/backend/union/policy/lus.go new file mode 100644 index 000000000..763df8edf --- /dev/null +++ b/backend/union/policy/lus.go @@ -0,0 +1,33 @@ +package policy + +import ( + "context" + + "github.com/rclone/rclone/backend/union/upstream" + "github.com/rclone/rclone/fs" +) + +func init() { + registerPolicy("lus", &Lus{}) +} + +// Lus stands for least used space +// Search category: same as eplus. +// Action category: same as eplus. +// Create category: Pick the drive with the least used space. +type Lus struct { + EpLus +} + +// Create category policy, governing the creation of files and directories +func (p *Lus) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams = filterNC(upstreams) + if len(upstreams) == 0 { + return nil, fs.ErrorPermissionDenied + } + u, err := p.lus(upstreams) + return []*upstream.Fs{u}, err +} diff --git a/backend/union/policy/mfs.go b/backend/union/policy/mfs.go new file mode 100644 index 000000000..162545a97 --- /dev/null +++ b/backend/union/policy/mfs.go @@ -0,0 +1,33 @@ +package policy + +import ( + "context" + + "github.com/rclone/rclone/backend/union/upstream" + "github.com/rclone/rclone/fs" +) + +func init() { + registerPolicy("mfs", &Mfs{}) +} + +// Mfs stands for most free space +// Search category: same as epmfs. +// Action category: same as epmfs. +// Create category: Pick the drive with the most free space. +type Mfs struct { + EpMfs +} + +// Create category policy, governing the creation of files and directories +func (p *Mfs) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams = filterNC(upstreams) + if len(upstreams) == 0 { + return nil, fs.ErrorPermissionDenied + } + u, err := p.mfs(upstreams) + return []*upstream.Fs{u}, err +} diff --git a/backend/union/policy/newest.go b/backend/union/policy/newest.go new file mode 100644 index 000000000..8d4fa8049 --- /dev/null +++ b/backend/union/policy/newest.go @@ -0,0 +1,149 @@ +package policy + +import ( + "context" + "path" + "sync" + "time" + + "github.com/rclone/rclone/backend/union/upstream" + "github.com/rclone/rclone/fs" +) + +func init() { + registerPolicy("newest", &Newest{}) +} + +// Newest policy picks the file / directory with the largest mtime +// It implies the existance of a path +type Newest struct { + EpAll +} + +func (p *Newest) newest(ctx context.Context, upstreams []*upstream.Fs, filePath string) (*upstream.Fs, error) { + var wg sync.WaitGroup + ufs := make([]*upstream.Fs, len(upstreams)) + mtimes := make([]time.Time, len(upstreams)) + for i, u := range upstreams { + wg.Add(1) + i, u := i, u // Closure + go func() { + defer wg.Done() + rfs := u.RootFs + remote := path.Join(u.RootPath, filePath) + if e := findEntry(ctx, rfs, remote); e != nil { + ufs[i] = u + mtimes[i] = e.ModTime(ctx) + } + }() + } + wg.Wait() + maxMtime := time.Time{} + var newestFs *upstream.Fs + for i, u := range ufs { + if u != nil && mtimes[i].After(maxMtime) { + maxMtime = mtimes[i] + newestFs = u + } + } + if newestFs == nil { + return nil, fs.ErrorObjectNotFound + } + return newestFs, nil +} + +func (p *Newest) newestEntries(entries []upstream.Entry) (upstream.Entry, error) { + var wg sync.WaitGroup + mtimes := make([]time.Time, len(entries)) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + for i, e := range entries { + wg.Add(1) + i, e := i, e // Closure + go func() { + defer wg.Done() + mtimes[i] = e.ModTime(ctx) + }() + } + wg.Wait() + maxMtime := time.Time{} + var newestEntry upstream.Entry + for i, t := range mtimes { + if t.After(maxMtime) { + maxMtime = t + newestEntry = entries[i] + } + } + if newestEntry == nil { + return nil, fs.ErrorObjectNotFound + } + return newestEntry, nil +} + +// Action category policy, governing the modification of files and directories +func (p *Newest) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams = filterRO(upstreams) + if len(upstreams) == 0 { + return nil, fs.ErrorPermissionDenied + } + u, err := p.newest(ctx, upstreams, path) + return []*upstream.Fs{u}, err +} + +// ActionEntries is ACTION category policy but receving a set of candidate entries +func (p *Newest) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + entries = filterROEntries(entries) + if len(entries) == 0 { + return nil, fs.ErrorPermissionDenied + } + e, err := p.newestEntries(entries) + return []upstream.Entry{e}, err +} + +// Create category policy, governing the creation of files and directories +func (p *Newest) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams = filterNC(upstreams) + if len(upstreams) == 0 { + return nil, fs.ErrorPermissionDenied + } + u, err := p.newest(ctx, upstreams, path+"/..") + return []*upstream.Fs{u}, err +} + +// CreateEntries is CREATE category policy but receving a set of candidate entries +func (p *Newest) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + entries = filterNCEntries(entries) + if len(entries) == 0 { + return nil, fs.ErrorPermissionDenied + } + e, err := p.newestEntries(entries) + return []upstream.Entry{e}, err +} + +// Search category policy, governing the access to files and directories +func (p *Newest) Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + return p.newest(ctx, upstreams, path) +} + +// SearchEntries is SEARCH category policy but receving a set of candidate entries +func (p *Newest) SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + return p.newestEntries(entries) +} diff --git a/backend/union/policy/policy.go b/backend/union/policy/policy.go new file mode 100644 index 000000000..0cb2d0007 --- /dev/null +++ b/backend/union/policy/policy.go @@ -0,0 +1,129 @@ +package policy + +import ( + "context" + "math/rand" + "path" + "strings" + "time" + + "github.com/pkg/errors" + "github.com/rclone/rclone/backend/union/upstream" + "github.com/rclone/rclone/fs" +) + +var policies = make(map[string]Policy) + +// Policy is the interface of a set of defined behavior choosing +// the upstream Fs to operate on +type Policy interface { + // Action category policy, governing the modification of files and directories + Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) + + // Create category policy, governing the creation of files and directories + Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) + + // Search category policy, governing the access to files and directories + Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) + + // ActionEntries is ACTION category policy but receving a set of candidate entries + ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) + + // CreateEntries is CREATE category policy but receving a set of candidate entries + CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) + + // SearchEntries is SEARCH category policy but receving a set of candidate entries + SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) +} + +func registerPolicy(name string, p Policy) { + policies[strings.ToLower(name)] = p +} + +// Get a Policy from the list +func Get(name string) (Policy, error) { + p, ok := policies[strings.ToLower(name)] + if !ok { + return nil, errors.Errorf("didn't find policy called %q", name) + } + return p, nil +} + +func filterRO(ufs []*upstream.Fs) (wufs []*upstream.Fs) { + for _, u := range ufs { + if u.IsWritable() { + wufs = append(wufs, u) + } + } + return wufs +} + +func filterROEntries(ue []upstream.Entry) (wue []upstream.Entry) { + for _, e := range ue { + if e.UpstreamFs().IsWritable() { + wue = append(wue, e) + } + } + return wue +} + +func filterNC(ufs []*upstream.Fs) (wufs []*upstream.Fs) { + for _, u := range ufs { + if u.IsCreatable() { + wufs = append(wufs, u) + } + } + return wufs +} + +func filterNCEntries(ue []upstream.Entry) (wue []upstream.Entry) { + for _, e := range ue { + if e.UpstreamFs().IsCreatable() { + wue = append(wue, e) + } + } + return wue +} + +func parentDir(absPath string) string { + parent := path.Dir(strings.TrimRight(absPath, "/")) + if parent == "." { + parent = "" + } + return parent +} + +func clean(absPath string) string { + cleanPath := path.Clean(absPath) + if cleanPath == "." { + cleanPath = "" + } + return cleanPath +} + +func findEntry(ctx context.Context, f fs.Fs, remote string) fs.DirEntry { + remote = clean(remote) + dir := parentDir(remote) + entries, err := f.List(ctx, dir) + if remote == dir { + if err != nil { + return nil + } + // random modtime for root + randomNow := time.Unix(time.Now().Unix()-rand.Int63n(10000), 0) + return fs.NewDir("", randomNow) + } + found := false + for _, e := range entries { + eRemote := e.Remote() + if f.Features().CaseInsensitive { + found = strings.EqualFold(remote, eRemote) + } else { + found = (remote == eRemote) + } + if found { + return e + } + } + return nil +} diff --git a/backend/union/policy/rand.go b/backend/union/policy/rand.go new file mode 100644 index 000000000..6e3128ed7 --- /dev/null +++ b/backend/union/policy/rand.go @@ -0,0 +1,83 @@ +package policy + +import ( + "context" + "math/rand" + + "github.com/rclone/rclone/backend/union/upstream" + "github.com/rclone/rclone/fs" +) + +func init() { + registerPolicy("rand", &Rand{}) +} + +// Rand stands for random +// Calls all and then randomizes. Returns one candidate. +type Rand struct { + All +} + +func (p *Rand) rand(upstreams []*upstream.Fs) *upstream.Fs { + return upstreams[rand.Intn(len(upstreams))] +} + +func (p *Rand) randEntries(entries []upstream.Entry) upstream.Entry { + return entries[rand.Intn(len(entries))] +} + +// Action category policy, governing the modification of files and directories +func (p *Rand) Action(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + upstreams, err := p.All.Action(ctx, upstreams, path) + if err != nil { + return nil, err + } + return []*upstream.Fs{p.rand(upstreams)}, nil +} + +// ActionEntries is ACTION category policy but receving a set of candidate entries +func (p *Rand) ActionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + entries, err := p.All.ActionEntries(entries...) + if err != nil { + return nil, err + } + return []upstream.Entry{p.randEntries(entries)}, nil +} + +// Create category policy, governing the creation of files and directories +func (p *Rand) Create(ctx context.Context, upstreams []*upstream.Fs, path string) ([]*upstream.Fs, error) { + upstreams, err := p.All.Create(ctx, upstreams, path) + if err != nil { + return nil, err + } + return []*upstream.Fs{p.rand(upstreams)}, nil +} + +// CreateEntries is CREATE category policy but receving a set of candidate entries +func (p *Rand) CreateEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + entries, err := p.All.CreateEntries(entries...) + if err != nil { + return nil, err + } + return []upstream.Entry{p.randEntries(entries)}, nil +} + +// Search category policy, governing the access to files and directories +func (p *Rand) Search(ctx context.Context, upstreams []*upstream.Fs, path string) (*upstream.Fs, error) { + if len(upstreams) == 0 { + return nil, fs.ErrorObjectNotFound + } + upstreams, err := p.epall(ctx, upstreams, path) + if err != nil { + return nil, err + } + return p.rand(upstreams), nil +} + +// SearchEntries is SEARCH category policy but receving a set of candidate entries +func (p *Rand) SearchEntries(entries ...upstream.Entry) (upstream.Entry, error) { + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + return p.randEntries(entries), nil +} diff --git a/backend/union/union.go b/backend/union/union.go index 8934a2526..12ea869ff 100644 --- a/backend/union/union.go +++ b/backend/union/union.go @@ -1,17 +1,20 @@ package union import ( + "bufio" "context" "fmt" "io" "path" "path/filepath" "strings" + "sync" "time" "github.com/pkg/errors" + "github.com/rclone/rclone/backend/union/policy" + "github.com/rclone/rclone/backend/union/upstream" "github.com/rclone/rclone/fs" - "github.com/rclone/rclone/fs/cache" "github.com/rclone/rclone/fs/config/configmap" "github.com/rclone/rclone/fs/config/configstruct" "github.com/rclone/rclone/fs/hash" @@ -21,12 +24,32 @@ import ( func init() { fsi := &fs.RegInfo{ Name: "union", - Description: "Union merges the contents of several remotes", + Description: "Union merges the contents of several upstream fs", NewFs: NewFs, Options: []fs.Option{{ - Name: "remotes", - Help: "List of space separated remotes.\nCan be 'remotea:test/dir remoteb:', '\"remotea:test/space dir\" remoteb:', etc.\nThe last remote is used to write to.", + Name: "upstreams", + Help: "List of space separated upstreams.\nCan be 'upstreama:test/dir upstreamb:', '\"upstreama:test/space:ro dir\" upstreamb:', etc.\n", Required: true, + }, { + Name: "action_policy", + Help: "Policy to choose upstream on ACTION category.", + Required: true, + Default: "epall", + }, { + Name: "create_policy", + Help: "Policy to choose upstream on CREATE category.", + Required: true, + Default: "epmfs", + }, { + Name: "search_policy", + Help: "Policy to choose upstream on SEARCH category.", + Required: true, + Default: "ff", + }, { + Name: "cache_time", + Help: "Cache time of usage and free space (in seconds). This option is only useful when a path preserving policy is used.", + Required: true, + Default: 120, }}, } fs.Register(fsi) @@ -34,39 +57,48 @@ func init() { // Options defines the configuration for this backend type Options struct { - Remotes fs.SpaceSepList `config:"remotes"` + Upstreams fs.SpaceSepList `config:"upstreams"` + Remotes fs.SpaceSepList `config:"remotes"` // Depreated + ActionPolicy string `config:"action_policy"` + CreatePolicy string `config:"create_policy"` + SearchPolicy string `config:"search_policy"` + CacheTime int `config:"cache_time"` } -// Fs represents a union of remotes +// Fs represents a union of upstreams type Fs struct { - name string // name of this remote - features *fs.Features // optional features - opt Options // options for this Fs - root string // the path we are working on - remotes []fs.Fs // slice of remotes - wr fs.Fs // writable remote - hashSet hash.Set // intersection of hash types + name string // name of this remote + features *fs.Features // optional features + opt Options // options for this Fs + root string // the path we are working on + upstreams []*upstream.Fs // slice of upstreams + hashSet hash.Set // intersection of hash types + actionPolicy policy.Policy // policy for ACTION + createPolicy policy.Policy // policy for CREATE + searchPolicy policy.Policy // policy for SEARCH } -// Object describes a union Object -// -// This is a wrapped object which returns the Union Fs as its parent -type Object struct { - fs.Object - fs *Fs // what this object is part of -} - -// Wrap an existing object in the union Object -func (f *Fs) wrapObject(o fs.Object) *Object { - return &Object{ - Object: o, - fs: f, +// Wrap candidate objects in to an union Object +func (f *Fs) wrapEntries(entries ...upstream.Entry) (entry, error) { + e, err := f.searchEntries(entries...) + if err != nil { + return nil, err + } + switch e.(type) { + case *upstream.Object: + return &Object{ + Object: e.(*upstream.Object), + fs: f, + co: entries, + }, nil + case *upstream.Directory: + return &Directory{ + Directory: e.(*upstream.Directory), + cd: entries, + }, nil + default: + return nil, errors.Errorf("unknown object type %T", e) } -} - -// Fs returns the union Fs as the parent -func (o *Object) Fs() fs.Info { - return o.fs } // Name of the remote (as passed into NewFs) @@ -91,7 +123,16 @@ func (f *Fs) Features() *fs.Features { // Rmdir removes the root directory of the Fs object func (f *Fs) Rmdir(ctx context.Context, dir string) error { - return f.wr.Rmdir(ctx, dir) + upstreams, err := f.action(ctx, dir) + if err != nil { + return err + } + errs := Errors(make([]error, len(upstreams))) + multithread(len(upstreams), func(i int) { + err := upstreams[i].Rmdir(ctx, dir) + errs[i] = errors.Wrap(err, upstreams[i].Name()) + }) + return errs.Err() } // Hashes returns hash.HashNone to indicate remote hashing is unavailable @@ -101,7 +142,22 @@ func (f *Fs) Hashes() hash.Set { // Mkdir makes the root directory of the Fs object func (f *Fs) Mkdir(ctx context.Context, dir string) error { - return f.wr.Mkdir(ctx, dir) + upstreams, err := f.create(ctx, dir) + if err == fs.ErrorObjectNotFound && dir != parentDir(dir) { + if err := f.Mkdir(ctx, parentDir(dir)); err != nil { + return err + } + upstreams, err = f.create(ctx, dir) + } + if err != nil { + return err + } + errs := Errors(make([]error, len(upstreams))) + multithread(len(upstreams), func(i int) { + err := upstreams[i].Mkdir(ctx, dir) + errs[i] = errors.Wrap(err, upstreams[i].Name()) + }) + return errs.Err() } // Purge all files in the root and the root directory @@ -111,7 +167,21 @@ func (f *Fs) Mkdir(ctx context.Context, dir string) error { // // Return an error if it doesn't exist func (f *Fs) Purge(ctx context.Context) error { - return f.wr.Features().Purge(ctx) + for _, r := range f.upstreams { + if r.Features().Purge == nil { + return fs.ErrorCantPurge + } + } + upstreams, err := f.action(ctx, "") + if err != nil { + return err + } + errs := Errors(make([]error, len(upstreams))) + multithread(len(upstreams), func(i int) { + err := upstreams[i].Features().Purge(ctx) + errs[i] = errors.Wrap(err, upstreams[i].Name()) + }) + return errs.Err() } // Copy src to this remote using server side copy operations. @@ -124,15 +194,26 @@ func (f *Fs) Purge(ctx context.Context) error { // // If it isn't possible then return fs.ErrorCantCopy func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { - if src.Fs() != f.wr { + srcObj, ok := src.(*Object) + if !ok { fs.Debugf(src, "Can't copy - not same remote type") return nil, fs.ErrorCantCopy } - o, err := f.wr.Features().Copy(ctx, src, remote) - if err != nil { + o := srcObj.UnWrap() + u := o.UpstreamFs() + do := u.Features().Copy + if do == nil { + return nil, fs.ErrorCantCopy + } + if !u.IsCreatable() { + return nil, fs.ErrorPermissionDenied + } + co, err := do(ctx, o, remote) + if err != nil || co == nil { return nil, err } - return f.wrapObject(o), nil + wo, err := f.wrapEntries(u.WrapObject(co)) + return wo.(*Object), err } // Move src to this remote using server side move operations. @@ -145,15 +226,47 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, // // If it isn't possible then return fs.ErrorCantMove func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { - if src.Fs() != f.wr { + o, ok := src.(*Object) + if !ok { fs.Debugf(src, "Can't move - not same remote type") return nil, fs.ErrorCantMove } - o, err := f.wr.Features().Move(ctx, src, remote) + entries, err := f.actionEntries(o.candidates()...) if err != nil { return nil, err } - return f.wrapObject(o), err + for _, e := range entries { + if e.UpstreamFs().Features().Move == nil { + return nil, fs.ErrorCantMove + } + } + objs := make([]*upstream.Object, len(entries)) + errs := Errors(make([]error, len(entries))) + multithread(len(entries), func(i int) { + u := entries[i].UpstreamFs() + o, ok := entries[i].(*upstream.Object) + if !ok { + errs[i] = errors.Wrap(fs.ErrorNotAFile, u.Name()) + return + } + mo, err := u.Features().Move(ctx, o.UnWrap(), remote) + if err != nil || mo == nil { + errs[i] = errors.Wrap(err, u.Name()) + return + } + objs[i] = u.WrapObject(mo) + }) + var en []upstream.Entry + for _, o := range objs { + if o != nil { + en = append(en, o) + } + } + e, err := f.wrapEntries(en...) + if err != nil { + return nil, err + } + return e.(*Object), errs.Err() } // DirMove moves src, srcRemote to this remote at dstRemote @@ -165,12 +278,46 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, // // If destination exists then return fs.ErrorDirExists func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) error { - srcFs, ok := src.(*Fs) + sfs, ok := src.(*Fs) if !ok { - fs.Debugf(srcFs, "Can't move directory - not same remote type") + fs.Debugf(src, "Can't move directory - not same remote type") return fs.ErrorCantDirMove } - return f.wr.Features().DirMove(ctx, srcFs.wr, srcRemote, dstRemote) + upstreams, err := sfs.action(ctx, srcRemote) + if err != nil { + return err + } + for _, u := range upstreams { + if u.Features().DirMove == nil { + return fs.ErrorCantDirMove + } + } + errs := Errors(make([]error, len(upstreams))) + multithread(len(upstreams), func(i int) { + su := upstreams[i] + var du *upstream.Fs + for _, u := range f.upstreams { + if u.RootFs.Root() == su.RootFs.Root() { + du = u + } + } + if du == nil { + errs[i] = errors.Wrap(fs.ErrorCantDirMove, su.Name()+":"+su.Root()) + return + } + err := du.Features().DirMove(ctx, su.Fs, srcRemote, dstRemote) + errs[i] = errors.Wrap(err, du.Name()+":"+du.Root()) + }) + errs = errs.FilterNil() + if len(errs) == 0 { + return nil + } + for _, e := range errs { + if errors.Cause(e) != fs.ErrorDirExists { + return errs + } + } + return fs.ErrorDirExists } // ChangeNotify calls the passed function with a path @@ -183,23 +330,23 @@ func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string // regularly. When the channel gets closed, the implementation // should stop polling and release resources. func (f *Fs) ChangeNotify(ctx context.Context, fn func(string, fs.EntryType), ch <-chan time.Duration) { - var remoteChans []chan time.Duration + var uChans []chan time.Duration - for _, remote := range f.remotes { - if ChangeNotify := remote.Features().ChangeNotify; ChangeNotify != nil { + for _, u := range f.upstreams { + if ChangeNotify := u.Features().ChangeNotify; ChangeNotify != nil { ch := make(chan time.Duration) - remoteChans = append(remoteChans, ch) + uChans = append(uChans, ch) ChangeNotify(ctx, fn, ch) } } go func() { for i := range ch { - for _, c := range remoteChans { + for _, c := range uChans { c <- i } } - for _, c := range remoteChans { + for _, c := range uChans { close(c) } }() @@ -208,10 +355,103 @@ func (f *Fs) ChangeNotify(ctx context.Context, fn func(string, fs.EntryType), ch // DirCacheFlush resets the directory cache - used in testing // as an optional interface func (f *Fs) DirCacheFlush() { - for _, remote := range f.remotes { - if DirCacheFlush := remote.Features().DirCacheFlush; DirCacheFlush != nil { - DirCacheFlush() + multithread(len(f.upstreams), func(i int) { + if do := f.upstreams[i].Features().DirCacheFlush; do != nil { + do() } + }) +} + +func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, stream bool, options ...fs.OpenOption) (fs.Object, error) { + srcPath := src.Remote() + upstreams, err := f.create(ctx, srcPath) + if err == fs.ErrorObjectNotFound { + if err := f.Mkdir(ctx, parentDir(srcPath)); err != nil { + return nil, err + } + upstreams, err = f.create(ctx, srcPath) + } + if err != nil { + return nil, err + } + if len(upstreams) == 1 { + u := upstreams[0] + var o fs.Object + var err error + if stream { + o, err = u.Features().PutStream(ctx, in, src, options...) + } else { + o, err = u.Put(ctx, in, src, options...) + } + if err != nil { + return nil, err + } + e, err := f.wrapEntries(u.WrapObject(o)) + return e.(*Object), err + } + errs := Errors(make([]error, len(upstreams)+1)) + // Get multiple reader + readers := make([]io.Reader, len(upstreams)) + writers := make([]io.Writer, len(upstreams)) + for i := range writers { + r, w := io.Pipe() + bw := bufio.NewWriter(w) + readers[i], writers[i] = r, bw + defer func() { + err := w.Close() + if err != nil { + panic(err) + } + }() + } + go func() { + mw := io.MultiWriter(writers...) + es := make([]error, len(writers)+1) + _, es[len(es)-1] = io.Copy(mw, in) + for i, bw := range writers { + es[i] = bw.(*bufio.Writer).Flush() + } + errs[len(upstreams)] = Errors(es).Err() + }() + // Multi-threading + objs := make([]upstream.Entry, len(upstreams)) + multithread(len(upstreams), func(i int) { + u := upstreams[i] + var o fs.Object + var err error + if stream { + o, err = u.Features().PutStream(ctx, readers[i], src, options...) + } else { + o, err = u.Put(ctx, readers[i], src, options...) + } + if err != nil { + errs[i] = errors.Wrap(err, u.Name()) + return + } + objs[i] = u.WrapObject(o) + }) + err = errs.Err() + if err != nil { + return nil, err + } + e, err := f.wrapEntries(objs...) + return e.(*Object), err +} + +// Put in to the remote path with the modTime given of the given size +// +// May create the object even if it returns an error - if so +// will return the object and the error, otherwise will return +// nil and the error +func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + o, err := f.NewObject(ctx, src.Remote()) + switch err { + case nil: + return o, o.Update(ctx, in, src, options...) + case fs.ErrorObjectNotFound: + return f.put(ctx, in, src, false, options...) + default: + return nil, err } } @@ -221,29 +461,64 @@ func (f *Fs) DirCacheFlush() { // will return the object and the error, otherwise will return // nil and the error func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { - o, err := f.wr.Features().PutStream(ctx, in, src, options...) - if err != nil { + o, err := f.NewObject(ctx, src.Remote()) + switch err { + case nil: + return o, o.Update(ctx, in, src, options...) + case fs.ErrorObjectNotFound: + return f.put(ctx, in, src, true, options...) + default: return nil, err } - return f.wrapObject(o), err } // About gets quota information from the Fs func (f *Fs) About(ctx context.Context) (*fs.Usage, error) { - return f.wr.Features().About(ctx) -} - -// Put in to the remote path with the modTime given of the given size -// -// May create the object even if it returns an error - if so -// will return the object and the error, otherwise will return -// nil and the error -func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { - o, err := f.wr.Put(ctx, in, src, options...) - if err != nil { - return nil, err + usage := &fs.Usage{ + Total: new(int64), + Used: new(int64), + Trashed: new(int64), + Other: new(int64), + Free: new(int64), + Objects: new(int64), } - return f.wrapObject(o), err + for _, u := range f.upstreams { + usg, err := u.About(ctx) + if err != nil { + return nil, err + } + if usg.Total != nil && usage.Total != nil { + *usage.Total += *usg.Total + } else { + usage.Total = nil + } + if usg.Used != nil && usage.Used != nil { + *usage.Used += *usg.Used + } else { + usage.Used = nil + } + if usg.Trashed != nil && usage.Trashed != nil { + *usage.Trashed += *usg.Trashed + } else { + usage.Trashed = nil + } + if usg.Other != nil && usage.Other != nil { + *usage.Other += *usg.Other + } else { + usage.Other = nil + } + if usg.Free != nil && usage.Free != nil { + *usage.Free += *usg.Free + } else { + usage.Free = nil + } + if usg.Objects != nil && usage.Objects != nil { + *usage.Objects += *usg.Objects + } else { + usage.Objects = nil + } + } + return usage, nil } // List the objects and directories in dir into entries. The @@ -256,60 +531,125 @@ func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options . // This should return ErrDirNotFound if the directory isn't // found. func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { - set := make(map[string]fs.DirEntry) - found := false - for _, remote := range f.remotes { - var remoteEntries, err = remote.List(ctx, dir) - if err == fs.ErrorDirNotFound { - continue - } + entriess := make([][]upstream.Entry, len(f.upstreams)) + errs := Errors(make([]error, len(f.upstreams))) + multithread(len(f.upstreams), func(i int) { + u := f.upstreams[i] + entries, err := u.List(ctx, dir) if err != nil { - return nil, errors.Wrapf(err, "List failed on %v", remote) + errs[i] = errors.Wrap(err, u.Name()) + return } - found = true - for _, remoteEntry := range remoteEntries { - set[remoteEntry.Remote()] = remoteEntry + uEntries := make([]upstream.Entry, len(entries)) + for j, e := range entries { + uEntries[j], _ = u.WrapEntry(e) } - } - if !found { - return nil, fs.ErrorDirNotFound - } - for _, entry := range set { - if o, ok := entry.(fs.Object); ok { - entry = f.wrapObject(o) + entriess[i] = uEntries + }) + if len(errs) == len(errs.FilterNil()) { + errs = errs.Map(func(e error) error { + if errors.Cause(e) == fs.ErrorDirNotFound { + return nil + } + return e + }) + if len(errs) == 0 { + return nil, fs.ErrorDirNotFound } - entries = append(entries, entry) + return nil, errs.Err() } - return entries, nil + return f.mergeDirEntries(entriess) } -// NewObject creates a new remote union file object based on the first Object it finds (reverse remote order) -func (f *Fs) NewObject(ctx context.Context, path string) (fs.Object, error) { - for i := range f.remotes { - var remote = f.remotes[len(f.remotes)-i-1] - var obj, err = remote.NewObject(ctx, path) - if err == fs.ErrorObjectNotFound { - continue +// NewObject creates a new remote union file object +func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { + objs := make([]*upstream.Object, len(f.upstreams)) + errs := Errors(make([]error, len(f.upstreams))) + multithread(len(f.upstreams), func(i int) { + u := f.upstreams[i] + o, err := u.NewObject(ctx, remote) + if err != nil && err != fs.ErrorObjectNotFound { + errs[i] = errors.Wrap(err, u.Name()) + return } - if err != nil { - return nil, errors.Wrapf(err, "NewObject failed on %v", remote) + objs[i] = u.WrapObject(o) + }) + var entries []upstream.Entry + for _, o := range objs { + if o != nil { + entries = append(entries, o) } - return f.wrapObject(obj), nil } - return nil, fs.ErrorObjectNotFound + if len(entries) == 0 { + return nil, fs.ErrorObjectNotFound + } + e, err := f.wrapEntries(entries...) + if err != nil { + return nil, err + } + return e.(*Object), errs.Err() } -// Precision is the greatest Precision of all remotes +// Precision is the greatest Precision of all upstreams func (f *Fs) Precision() time.Duration { var greatestPrecision time.Duration - for _, remote := range f.remotes { - if remote.Precision() > greatestPrecision { - greatestPrecision = remote.Precision() + for _, u := range f.upstreams { + if u.Precision() > greatestPrecision { + greatestPrecision = u.Precision() } } return greatestPrecision } +func (f *Fs) action(ctx context.Context, path string) ([]*upstream.Fs, error) { + return f.actionPolicy.Action(ctx, f.upstreams, path) +} + +func (f *Fs) actionEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + return f.actionPolicy.ActionEntries(entries...) +} + +func (f *Fs) create(ctx context.Context, path string) ([]*upstream.Fs, error) { + return f.createPolicy.Create(ctx, f.upstreams, path) +} + +func (f *Fs) createEntries(entries ...upstream.Entry) ([]upstream.Entry, error) { + return f.createPolicy.CreateEntries(entries...) +} + +func (f *Fs) search(ctx context.Context, path string) (*upstream.Fs, error) { + return f.searchPolicy.Search(ctx, f.upstreams, path) +} + +func (f *Fs) searchEntries(entries ...upstream.Entry) (upstream.Entry, error) { + return f.searchPolicy.SearchEntries(entries...) +} + +func (f *Fs) mergeDirEntries(entriess [][]upstream.Entry) (fs.DirEntries, error) { + entryMap := make(map[string]([]upstream.Entry)) + for _, en := range entriess { + if en == nil { + continue + } + for _, entry := range en { + remote := entry.Remote() + if f.Features().CaseInsensitive { + remote = strings.ToLower(remote) + } + entryMap[remote] = append(entryMap[remote], entry) + } + } + var entries fs.DirEntries + for path := range entryMap { + e, err := f.wrapEntries(entryMap[path]...) + if err != nil { + return nil, err + } + entries = append(entries, e) + } + return entries, nil +} + // NewFs constructs an Fs from the path. // // The returned Fs is the actual Fs, referenced by remote in the config @@ -320,51 +660,64 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { if err != nil { return nil, err } - if len(opt.Remotes) == 0 { - return nil, errors.New("union can't point to an empty remote - check the value of the remotes setting") + // Backward compatible to old config + if len(opt.Upstreams) == 0 && len(opt.Remotes) > 0 { + for i := 0; i < len(opt.Remotes)-1; i++ { + opt.Remotes[i] = opt.Remotes[i] + ":ro" + } + opt.Upstreams = opt.Remotes } - if len(opt.Remotes) == 1 { - return nil, errors.New("union can't point to a single remote - check the value of the remotes setting") + if len(opt.Upstreams) == 0 { + return nil, errors.New("union can't point to an empty upstream - check the value of the upstreams setting") } - for _, remote := range opt.Remotes { - if strings.HasPrefix(remote, name+":") { - return nil, errors.New("can't point union remote at itself - check the value of the remote setting") + if len(opt.Upstreams) == 1 { + return nil, errors.New("union can't point to a single upstream - check the value of the upstreams setting") + } + for _, u := range opt.Upstreams { + if strings.HasPrefix(u, name+":") { + return nil, errors.New("can't point union remote at itself - check the value of the upstreams setting") } } - var remotes []fs.Fs - for i := range opt.Remotes { - // Last remote first so we return the correct (last) matching fs in case of fs.ErrorIsFile - var remote = opt.Remotes[len(opt.Remotes)-i-1] - _, configName, fsPath, err := fs.ParseRemote(remote) - if err != nil { + upstreams := make([]*upstream.Fs, len(opt.Upstreams)) + errs := Errors(make([]error, len(opt.Upstreams))) + multithread(len(opt.Upstreams), func(i int) { + u := opt.Upstreams[i] + upstreams[i], errs[i] = upstream.New(u, root, time.Duration(opt.CacheTime)*time.Second) + }) + var usedUpstreams []*upstream.Fs + var fserr error + for i, err := range errs { + if err != nil && err != fs.ErrorIsFile { return nil, err } - var rootString = path.Join(fsPath, filepath.ToSlash(root)) - if configName != "local" { - rootString = configName + ":" + rootString + // Only the upstreams returns ErrorIsFile would be used if any + if err == fs.ErrorIsFile { + usedUpstreams = append(usedUpstreams, upstreams[i]) + fserr = fs.ErrorIsFile } - myFs, err := cache.Get(rootString) - if err != nil { - if err == fs.ErrorIsFile { - return myFs, err - } - return nil, err - } - remotes = append(remotes, myFs) } - - // Reverse the remotes again so they are in the order as before - for i, j := 0, len(remotes)-1; i < j; i, j = i+1, j-1 { - remotes[i], remotes[j] = remotes[j], remotes[i] + if fserr == nil { + usedUpstreams = upstreams } f := &Fs{ - name: name, - root: root, - opt: *opt, - remotes: remotes, - wr: remotes[len(remotes)-1], + name: name, + root: root, + opt: *opt, + upstreams: usedUpstreams, + } + f.actionPolicy, err = policy.Get(opt.ActionPolicy) + if err != nil { + return nil, err + } + f.createPolicy, err = policy.Get(opt.CreatePolicy) + if err != nil { + return nil, err + } + f.searchPolicy, err = policy.Get(opt.SearchPolicy) + if err != nil { + return nil, err } var features = (&fs.Features{ CaseInsensitive: true, @@ -376,9 +729,14 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { SetTier: true, GetTier: true, }).Fill(f) - features = features.Mask(f.wr) // mask the features just on the writable fs + for _, f := range upstreams { + if !f.IsWritable() { + continue + } + features = features.Mask(f) // Mask all writable upstream fs + } - // Really need the union of all remotes for these, so + // Really need the union of all upstreams for these, so // re-instate and calculate separately. features.ChangeNotify = f.ChangeNotify features.DirCacheFlush = f.DirCacheFlush @@ -388,12 +746,12 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { // Clear ChangeNotify and DirCacheFlush if all are nil clearChangeNotify := true clearDirCacheFlush := true - for _, remote := range f.remotes { - remoteFeatures := remote.Features() - if remoteFeatures.ChangeNotify != nil { + for _, u := range f.upstreams { + uFeatures := u.Features() + if uFeatures.ChangeNotify != nil { clearChangeNotify = false } - if remoteFeatures.DirCacheFlush != nil { + if uFeatures.DirCacheFlush != nil { clearDirCacheFlush = false } } @@ -407,13 +765,34 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { f.features = features // Get common intersection of hashes - hashSet := f.remotes[0].Hashes() - for _, remote := range f.remotes[1:] { - hashSet = hashSet.Overlap(remote.Hashes()) + hashSet := f.upstreams[0].Hashes() + for _, u := range f.upstreams[1:] { + hashSet = hashSet.Overlap(u.Hashes()) } f.hashSet = hashSet - return f, nil + return f, fserr +} + +func parentDir(absPath string) string { + parent := path.Dir(strings.TrimRight(filepath.ToSlash(absPath), "/")) + if parent == "." { + parent = "" + } + return parent +} + +func multithread(num int, fn func(int)) { + var wg sync.WaitGroup + for i := 0; i < num; i++ { + wg.Add(1) + i := i + go func() { + defer wg.Done() + fn(i) + }() + } + wg.Wait() } // Check the interfaces are satisfied diff --git a/backend/union/union_test.go b/backend/union/union_test.go index b3c59573f..98d34a68b 100644 --- a/backend/union/union_test.go +++ b/backend/union/union_test.go @@ -2,17 +2,154 @@ package union_test import ( + "os" + "path/filepath" "testing" _ "github.com/rclone/rclone/backend/local" + "github.com/rclone/rclone/fstest" "github.com/rclone/rclone/fstest/fstests" + "github.com/stretchr/testify/require" ) // TestIntegration runs integration tests against the remote func TestIntegration(t *testing.T) { + if *fstest.RemoteName == "" { + t.Skip("Skipping as -remote not set") + } fstests.Run(t, &fstests.Opt{ - RemoteName: "TestUnion:", - NilObject: nil, - SkipFsMatch: true, + RemoteName: *fstest.RemoteName, + UnimplementableFsMethods: []string{"OpenWriterAt", "DuplicateFiles"}, + UnimplementableObjectMethods: []string{"MimeType"}, + }) +} + +func TestStandard(t *testing.T) { + if *fstest.RemoteName != "" { + t.Skip("Skipping as -remote set") + } + tempdir1 := filepath.Join(os.TempDir(), "rclone-union-test-standard1") + tempdir2 := filepath.Join(os.TempDir(), "rclone-union-test-standard2") + tempdir3 := filepath.Join(os.TempDir(), "rclone-union-test-standard3") + require.NoError(t, os.MkdirAll(tempdir1, 0744)) + require.NoError(t, os.MkdirAll(tempdir2, 0744)) + require.NoError(t, os.MkdirAll(tempdir3, 0744)) + upstreams := tempdir1 + " " + tempdir2 + " " + tempdir3 + name := "TestUnion" + fstests.Run(t, &fstests.Opt{ + RemoteName: name + ":", + ExtraConfig: []fstests.ExtraConfigItem{ + {Name: name, Key: "type", Value: "union"}, + {Name: name, Key: "upstreams", Value: upstreams}, + {Name: name, Key: "action_policy", Value: "epall"}, + {Name: name, Key: "create_policy", Value: "epmfs"}, + {Name: name, Key: "search_policy", Value: "ff"}, + }, + UnimplementableFsMethods: []string{"OpenWriterAt", "DuplicateFiles"}, + UnimplementableObjectMethods: []string{"MimeType"}, + }) +} + +func TestRO(t *testing.T) { + if *fstest.RemoteName != "" { + t.Skip("Skipping as -remote set") + } + tempdir1 := filepath.Join(os.TempDir(), "rclone-union-test-ro1") + tempdir2 := filepath.Join(os.TempDir(), "rclone-union-test-ro2") + tempdir3 := filepath.Join(os.TempDir(), "rclone-union-test-ro3") + require.NoError(t, os.MkdirAll(tempdir1, 0744)) + require.NoError(t, os.MkdirAll(tempdir2, 0744)) + require.NoError(t, os.MkdirAll(tempdir3, 0744)) + upstreams := tempdir1 + " " + tempdir2 + ":ro " + tempdir3 + ":ro" + name := "TestUnionRO" + fstests.Run(t, &fstests.Opt{ + RemoteName: name + ":", + ExtraConfig: []fstests.ExtraConfigItem{ + {Name: name, Key: "type", Value: "union"}, + {Name: name, Key: "upstreams", Value: upstreams}, + {Name: name, Key: "action_policy", Value: "epall"}, + {Name: name, Key: "create_policy", Value: "epmfs"}, + {Name: name, Key: "search_policy", Value: "ff"}, + }, + UnimplementableFsMethods: []string{"OpenWriterAt", "DuplicateFiles"}, + UnimplementableObjectMethods: []string{"MimeType"}, + }) +} + +func TestNC(t *testing.T) { + if *fstest.RemoteName != "" { + t.Skip("Skipping as -remote set") + } + tempdir1 := filepath.Join(os.TempDir(), "rclone-union-test-nc1") + tempdir2 := filepath.Join(os.TempDir(), "rclone-union-test-nc2") + tempdir3 := filepath.Join(os.TempDir(), "rclone-union-test-nc3") + require.NoError(t, os.MkdirAll(tempdir1, 0744)) + require.NoError(t, os.MkdirAll(tempdir2, 0744)) + require.NoError(t, os.MkdirAll(tempdir3, 0744)) + upstreams := tempdir1 + " " + tempdir2 + ":nc " + tempdir3 + ":nc" + name := "TestUnionNC" + fstests.Run(t, &fstests.Opt{ + RemoteName: name + ":", + ExtraConfig: []fstests.ExtraConfigItem{ + {Name: name, Key: "type", Value: "union"}, + {Name: name, Key: "upstreams", Value: upstreams}, + {Name: name, Key: "action_policy", Value: "epall"}, + {Name: name, Key: "create_policy", Value: "epmfs"}, + {Name: name, Key: "search_policy", Value: "ff"}, + }, + UnimplementableFsMethods: []string{"OpenWriterAt", "DuplicateFiles"}, + UnimplementableObjectMethods: []string{"MimeType"}, + }) +} + +func TestPolicy1(t *testing.T) { + if *fstest.RemoteName != "" { + t.Skip("Skipping as -remote set") + } + tempdir1 := filepath.Join(os.TempDir(), "rclone-union-test-policy11") + tempdir2 := filepath.Join(os.TempDir(), "rclone-union-test-policy12") + tempdir3 := filepath.Join(os.TempDir(), "rclone-union-test-policy13") + require.NoError(t, os.MkdirAll(tempdir1, 0744)) + require.NoError(t, os.MkdirAll(tempdir2, 0744)) + require.NoError(t, os.MkdirAll(tempdir3, 0744)) + upstreams := tempdir1 + " " + tempdir2 + " " + tempdir3 + name := "TestUnionPolicy1" + fstests.Run(t, &fstests.Opt{ + RemoteName: name + ":", + ExtraConfig: []fstests.ExtraConfigItem{ + {Name: name, Key: "type", Value: "union"}, + {Name: name, Key: "upstreams", Value: upstreams}, + {Name: name, Key: "action_policy", Value: "all"}, + {Name: name, Key: "create_policy", Value: "lus"}, + {Name: name, Key: "search_policy", Value: "all"}, + }, + UnimplementableFsMethods: []string{"OpenWriterAt", "DuplicateFiles"}, + UnimplementableObjectMethods: []string{"MimeType"}, + }) +} + +func TestPolicy2(t *testing.T) { + if *fstest.RemoteName != "" { + t.Skip("Skipping as -remote set") + } + tempdir1 := filepath.Join(os.TempDir(), "rclone-union-test-policy21") + tempdir2 := filepath.Join(os.TempDir(), "rclone-union-test-policy22") + tempdir3 := filepath.Join(os.TempDir(), "rclone-union-test-policy23") + require.NoError(t, os.MkdirAll(tempdir1, 0744)) + require.NoError(t, os.MkdirAll(tempdir2, 0744)) + require.NoError(t, os.MkdirAll(tempdir3, 0744)) + upstreams := tempdir1 + " " + tempdir2 + " " + tempdir3 + name := "TestUnionPolicy2" + fstests.Run(t, &fstests.Opt{ + RemoteName: name + ":", + ExtraConfig: []fstests.ExtraConfigItem{ + {Name: name, Key: "type", Value: "union"}, + {Name: name, Key: "upstreams", Value: upstreams}, + {Name: name, Key: "action_policy", Value: "all"}, + {Name: name, Key: "create_policy", Value: "rand"}, + {Name: name, Key: "search_policy", Value: "ff"}, + }, + UnimplementableFsMethods: []string{"OpenWriterAt", "DuplicateFiles"}, + UnimplementableObjectMethods: []string{"MimeType"}, }) } diff --git a/backend/union/upstream/upstream.go b/backend/union/upstream/upstream.go new file mode 100644 index 000000000..4538f6685 --- /dev/null +++ b/backend/union/upstream/upstream.go @@ -0,0 +1,348 @@ +package upstream + +import ( + "context" + "io" + "math" + "path" + "path/filepath" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/pkg/errors" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/cache" +) + +var ( + // ErrUsageFieldNotSupported stats the usage field is not supported by the backend + ErrUsageFieldNotSupported = errors.New("this usage field is not supported") +) + +// Fs is a wrap of any fs and its configs +type Fs struct { + fs.Fs + RootFs fs.Fs + RootPath string + writable bool + creatable bool + usage *fs.Usage // Cache the usage + cacheTime time.Duration // cache duration + cacheExpiry int64 // usage cache expiry time + cacheMutex sync.RWMutex + cacheOnce sync.Once + cacheUpdate bool // if the cache is updating +} + +// Directory describes a wrapped Directory +// +// This is a wrapped Directory which contains the upstream Fs +type Directory struct { + fs.Directory + f *Fs +} + +// Object describes a wrapped Object +// +// This is a wrapped Object which contains the upstream Fs +type Object struct { + fs.Object + f *Fs +} + +// Entry describe a warpped fs.DirEntry interface with the +// information of upstream Fs +type Entry interface { + fs.DirEntry + UpstreamFs() *Fs +} + +// New creates a new Fs based on the +// string formatted `type:root_path(:ro/:nc)` +func New(remote, root string, cacheTime time.Duration) (*Fs, error) { + _, configName, fsPath, err := fs.ParseRemote(remote) + if err != nil { + return nil, err + } + f := &Fs{ + RootPath: root, + writable: true, + creatable: true, + cacheExpiry: time.Now().Unix(), + cacheTime: cacheTime, + usage: &fs.Usage{}, + } + if strings.HasSuffix(fsPath, ":ro") { + f.writable = false + f.creatable = false + fsPath = fsPath[0 : len(fsPath)-3] + } else if strings.HasSuffix(fsPath, ":nc") { + f.writable = true + f.creatable = false + fsPath = fsPath[0 : len(fsPath)-3] + } + if configName != "local" { + fsPath = configName + ":" + fsPath + } + rFs, err := cache.Get(fsPath) + if err != nil && err != fs.ErrorIsFile { + return nil, err + } + f.RootFs = rFs + rootString := path.Join(fsPath, filepath.ToSlash(root)) + myFs, err := cache.Get(rootString) + if err != nil && err != fs.ErrorIsFile { + return nil, err + } + f.Fs = myFs + return f, err +} + +// WrapDirectory wraps a fs.Directory to include the info +// of the upstream Fs +func (f *Fs) WrapDirectory(e fs.Directory) *Directory { + if e == nil { + return nil + } + return &Directory{ + Directory: e, + f: f, + } +} + +// WrapObject wraps a fs.Object to include the info +// of the upstream Fs +func (f *Fs) WrapObject(o fs.Object) *Object { + if o == nil { + return nil + } + return &Object{ + Object: o, + f: f, + } +} + +// WrapEntry wraps a fs.DirEntry to include the info +// of the upstream Fs +func (f *Fs) WrapEntry(e fs.DirEntry) (Entry, error) { + switch e.(type) { + case fs.Object: + return f.WrapObject(e.(fs.Object)), nil + case fs.Directory: + return f.WrapDirectory(e.(fs.Directory)), nil + default: + return nil, errors.Errorf("unknown object type %T", e) + } +} + +// UpstreamFs get the upstream Fs the entry is stored in +func (e *Directory) UpstreamFs() *Fs { + return e.f +} + +// UpstreamFs get the upstream Fs the entry is stored in +func (o *Object) UpstreamFs() *Fs { + return o.f +} + +// UnWrap returns the Object that this Object is wrapping or +// nil if it isn't wrapping anything +func (o *Object) UnWrap() fs.Object { + return o.Object +} + +// IsCreatable return if the fs is allowed to create new objects +func (f *Fs) IsCreatable() bool { + return f.creatable +} + +// IsWritable return if the fs is allowed to write +func (f *Fs) IsWritable() bool { + return f.writable +} + +// Put in to the remote path with the modTime given of the given size +// +// May create the object even if it returns an error - if so +// will return the object and the error, otherwise will return +// nil and the error +func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + o, err := f.Fs.Put(ctx, in, src, options...) + if err != nil { + return o, err + } + f.cacheMutex.Lock() + defer f.cacheMutex.Unlock() + size := src.Size() + if f.usage.Used != nil { + *f.usage.Used += size + } + if f.usage.Free != nil { + *f.usage.Free -= size + } + if f.usage.Objects != nil { + *f.usage.Objects++ + } + return o, nil +} + +// PutStream uploads to the remote path with the modTime given of indeterminate size +// +// May create the object even if it returns an error - if so +// will return the object and the error, otherwise will return +// nil and the error +func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + do := f.Features().PutStream + if do == nil { + return nil, fs.ErrorNotImplemented + } + o, err := do(ctx, in, src, options...) + if err != nil { + return o, err + } + f.cacheMutex.Lock() + defer f.cacheMutex.Unlock() + size := o.Size() + if f.usage.Used != nil { + *f.usage.Used += size + } + if f.usage.Free != nil { + *f.usage.Free -= size + } + if f.usage.Objects != nil { + *f.usage.Objects++ + } + return o, nil +} + +// Update in to the object with the modTime given of the given size +// +// When called from outside a Fs by rclone, src.Size() will always be >= 0. +// But for unknown-sized objects (indicated by src.Size() == -1), Upload should either +// return an error or update the object properly (rather than e.g. calling panic). +func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { + size := o.Size() + err := o.Object.Update(ctx, in, src, options...) + if err != nil { + return err + } + o.f.cacheMutex.Lock() + defer o.f.cacheMutex.Unlock() + delta := o.Size() - size + if delta <= 0 { + return nil + } + if o.f.usage.Used != nil { + *o.f.usage.Used += size + } + if o.f.usage.Free != nil { + *o.f.usage.Free -= size + } + return nil +} + +// About gets quota information from the Fs +func (f *Fs) About(ctx context.Context) (*fs.Usage, error) { + if atomic.LoadInt64(&f.cacheExpiry) <= time.Now().Unix() { + err := f.updateUsage() + if err != nil { + return nil, ErrUsageFieldNotSupported + } + } + f.cacheMutex.RLock() + defer f.cacheMutex.RUnlock() + return f.usage, nil +} + +// GetFreeSpace get the free space of the fs +func (f *Fs) GetFreeSpace() (int64, error) { + if atomic.LoadInt64(&f.cacheExpiry) <= time.Now().Unix() { + err := f.updateUsage() + if err != nil { + return math.MaxInt64, ErrUsageFieldNotSupported + } + } + f.cacheMutex.RLock() + defer f.cacheMutex.RUnlock() + if f.usage.Free == nil { + return math.MaxInt64, ErrUsageFieldNotSupported + } + return *f.usage.Free, nil +} + +// GetUsedSpace get the used space of the fs +func (f *Fs) GetUsedSpace() (int64, error) { + if atomic.LoadInt64(&f.cacheExpiry) <= time.Now().Unix() { + err := f.updateUsage() + if err != nil { + return 0, ErrUsageFieldNotSupported + } + } + f.cacheMutex.RLock() + defer f.cacheMutex.RUnlock() + if f.usage.Used == nil { + return 0, ErrUsageFieldNotSupported + } + return *f.usage.Used, nil +} + +// GetNumObjects get the number of objects of the fs +func (f *Fs) GetNumObjects() (int64, error) { + if atomic.LoadInt64(&f.cacheExpiry) <= time.Now().Unix() { + err := f.updateUsage() + if err != nil { + return 0, ErrUsageFieldNotSupported + } + } + f.cacheMutex.RLock() + defer f.cacheMutex.RUnlock() + if f.usage.Objects == nil { + return 0, ErrUsageFieldNotSupported + } + return *f.usage.Objects, nil +} + +func (f *Fs) updateUsage() (err error) { + if do := f.RootFs.Features().About; do == nil { + return ErrUsageFieldNotSupported + } + done := false + f.cacheOnce.Do(func() { + f.cacheMutex.Lock() + err = f.updateUsageCore(false) + f.cacheMutex.Unlock() + done = true + }) + if done { + return err + } + if !f.cacheUpdate { + f.cacheUpdate = true + go func() { + _ = f.updateUsageCore(true) + f.cacheUpdate = false + }() + } + return nil +} + +func (f *Fs) updateUsageCore(lock bool) error { + // Run in background, should not be cancelled by user + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + usage, err := f.RootFs.Features().About(ctx) + if err != nil { + f.cacheUpdate = false + return err + } + if lock { + f.cacheMutex.Lock() + defer f.cacheMutex.Unlock() + } + // Store usage + atomic.StoreInt64(&f.cacheExpiry, time.Now().Add(f.cacheTime).Unix()) + f.usage = usage + return nil +} diff --git a/docs/content/union.md b/docs/content/union.md index 49db11f9c..47138c1ac 100644 --- a/docs/content/union.md +++ b/docs/content/union.md @@ -1,7 +1,7 @@ --- title: "Union" description: "Remote Unification" -date: "2018-08-29" +date: "2020-01-25" --- Union @@ -12,22 +12,90 @@ The `union` remote provides a unification similar to UnionFS using other remotes Paths may be as deep as required or a local path, eg `remote:directory/subdirectory` or `/directory/subdirectory`. -During the initial setup with `rclone config` you will specify the target -remotes as a space separated list. The target remotes can either be a local paths or other remotes. +During the initial setup with `rclone config` you will specify the upstream +remotes as a space separated list. The upstream remotes can either be a local paths or other remotes. -The order of the remotes is important as it defines which remotes take precedence over others if there are files with the same name in the same logical path. -The last remote is the topmost remote and replaces files with the same name from previous remotes. +Attribute `:ro` and `:nc` can be attach to the end of path to tag the remote as **read only** or **no create**, +eg `remote:directory/subdirectory:ro` or `remote:directory/subdirectory:nc`. -Only the last remote is used to write to and delete from, all other remotes are read-only. - -Subfolders can be used in target remote. Assume a union remote named `backup` -with the remotes `mydrive:private/backup mydrive2:/backup`. Invoking `rclone mkdir backup:desktop` +Subfolders can be used in upstream remotes. Assume a union remote named `backup` +with the remotes `mydrive:private/backup`. Invoking `rclone mkdir backup:desktop` is exactly the same as invoking `rclone mkdir mydrive2:/backup/desktop`. There will be no special handling of paths containing `..` segments. Invoking `rclone mkdir backup:../desktop` is exactly the same as invoking `rclone mkdir mydrive2:/backup/../desktop`. +### Behavior / Policies + +The behavior of union backend is inspired by [trapexit/mergerfs](https://github.com/trapexit/mergerfs). All functions are grouped into 3 categories: **action**, **create** and **search**. These functions and categories can be assigned a policy which dictates what file or directory is chosen when performing that behavior. Any policy can be assigned to a function or category though some may not be very useful in practice. For instance: **rand** (random) may be useful for file creation (create) but could lead to very odd behavior if used for `delete` if there were more than one copy of the file. + +#### Function / Category classifications + +| Category | Description | Functions | +|----------|--------------------------|-------------------------------------------------------------------------------------| +| action | Writing Existing file | move, rmdir, rmdirs, delete, purge and copy, sync (as destination when file exist) | +| create | Create non-existing file | copy, sync (as destination when file not exist) | +| search | Reading and listing file | ls, lsd, lsl, cat, md5sum, sha1sum and copy, sync (as source) | +| N/A | | size, about | + +#### Path Preservation + +Policies, as described below, are of two basic types. `path preserving` and `non-path preserving`. + +All policies which start with `ep` (**epff**, **eplfs**, **eplus**, **epmfs**, **eprand**) are `path preserving`. `ep` stands for `existing path`. + +A path preserving policy will only consider upstreams where the relative path being accessed already exists. + +When using non-path preserving policies paths will be created in target upstreams as necessary. + +#### Quota Relevant Policies + +Some policies rely on quota information. These policies should be used only if your upstreams support the respective quota fields. + +| Policy | Required Field | +|------------|----------------| +| lfs, eplfs | Free | +| mfs, epmfs | Free | +| lus, eplus | Used | +| lno, eplno | Objects | + +To check if your upstream support the field, run `rclone about remote: [flags]` and see if the reuqired field exists. + +#### Filters + +Policies basically search upstream remotes and create a list of files / paths for functions to work on. The policy is responsible for filtering and sorting. The policy type defines the sorting but filtering is mostly uniform as described below. + +* No **search** policies filter. +* All **action** policies will filter out remotes which are tagged as **read-only**. +* All **create** policies will filter out remotes which are tagged **read-only** or **no-create**. + +If all remotes are filtered an error will be returned. + +#### Policy descriptions + +THe policies definition are inspired by [trapexit/mergerfs](https://github.com/trapexit/mergerfs) but not exactly the same. Some policy definition could be different due to the much larger latency of remote file systems. + +| Policy | Description | +|------------------|------------------------------------------------------------| +| all | Search category: same as **epall**. Action category: same as **epall**. Create category: act on all upstreams. | +| epall (existing path, all) | Search category: Given this order configured, act on the first one found where the relative path exists. Action category: apply to all found. Create category: act on all upstreams where the relative path exists. | +| epff (existing path, first found) | Act on the first one found, by the time upstreams reply, where the relative path exists. | +| eplfs (existing path, least free space) | Of all the upstreams on which the relative path exists choose the one with the least free space. | +| eplus (existing path, least used space) | Of all the upstreams on which the relative path exists choose the one with the least used space. | +| eplno (existing path, least number of objects) | Of all the upstreams on which the relative path exists choose the one with the least number of objects. | +| epmfs (existing path, most free space) | Of all the upstreams on which the relative path exists choose the one with the most free space. | +| eprand (existing path, random) | Calls **epall** and then randomizes. Returns only one upstream. | +| ff (first found) | Search category: same as **epff**. Action category: same as **epff**. Create category: Act on the first one found by the time upstreams reply. | +| lfs (least free space) | Search category: same as **eplfs**. Action category: same as **eplfs**. Create category: Pick the upstream with the least available free space. | +| lus (least used space) | Search category: same as **eplus**. Action category: same as **eplus**. Create category: Pick the upstream with the least used space. | +| lno (least number of objects) | Search category: same as **eplno**. Action category: same as **eplno**. Create category: Pick the upstream with the least number of objects. | +| mfs (most free space) | Search category: same as **epmfs**. Action category: same as **epmfs**. Create category: Pick the upstream with the most available free space. | +| newest | Pick the file / directory with the largest mtime. | +| rand (random) | Calls **all** and then randomizes. Returns only one upstream. | + +### Setup + Here is an example of how to make a union called `remote` for local folders. First run: @@ -49,16 +117,27 @@ XX / Union merges the contents of several remotes \ "union" [snip] Storage> union -List of space separated remotes. -Can be 'remotea:test/dir remoteb:', '"remotea:test/space dir" remoteb:', etc. -The last remote is used to write to. +List of space separated upstreams. +Can be 'upstreama:test/dir upstreamb:', '\"upstreama:test/space:ro dir\" upstreamb:', etc. Enter a string value. Press Enter for the default (""). -remotes> +upstreams> +Policy to choose upstream on ACTION class. +Enter a string value. Press Enter for the default ("epall"). +action_policy> +Policy to choose upstream on CREATE class. +Enter a string value. Press Enter for the default ("epmfs"). +create_policy> +Policy to choose upstream on SEARCH class. +Enter a string value. Press Enter for the default ("ff"). +search_policy> +Cache time of usage and free space (in seconds). This option is only useful when a path preserving policy is used. +Enter a signed integer. Press Enter for the default ("120"). +cache_time> Remote config -------------------- [remote] type = union -remotes = C:\dir1 C:\dir2 C:\dir3 +upstreams = C:\dir1 C:\dir2 C:\dir3 -------------------- y) Yes this is OK e) Edit this remote @@ -97,17 +176,53 @@ Copy another local directory to the union directory called source, which will be ### Standard Options -Here are the standard options specific to union (Union merges the contents of several remotes). +Here are the standard options specific to union (Union merges the contents of several upstream fs). -#### --union-remotes +#### --union-upstreams -List of space separated remotes. -Can be 'remotea:test/dir remoteb:', '"remotea:test/space dir" remoteb:', etc. -The last remote is used to write to. +List of space separated upstreams. +Can be 'upstreama:test/dir upstreamb:', '"upstreama:test/space:ro dir" upstreamb:', etc. -- Config: remotes -- Env Var: RCLONE_UNION_REMOTES + +- Config: upstreams +- Env Var: RCLONE_UNION_UPSTREAMS - Type: string - Default: "" +#### --union-action-policy + +Policy to choose upstream on ACTION class. + +- Config: action_policy +- Env Var: RCLONE_UNION_ACTION_POLICY +- Type: string +- Default: "epall" + +#### --union-create-policy + +Policy to choose upstream on CREATE class. + +- Config: create_policy +- Env Var: RCLONE_UNION_CREATE_POLICY +- Type: string +- Default: "epmfs" + +#### --union-search-policy + +Policy to choose upstream on SEARCH class. + +- Config: search_policy +- Env Var: RCLONE_UNION_SEARCH_POLICY +- Type: string +- Default: "ff" + +#### --union-cache-time + +Cache time of usage and free space (in seconds). This option is only useful when a path preserving policy is used. + +- Config: cache_time +- Env Var: RCLONE_UNION_CACHE_TIME +- Type: int +- Default: 120 +