diff --git a/docs/content/docs.md b/docs/content/docs.md index 5432af724..5a253aa01 100644 --- a/docs/content/docs.md +++ b/docs/content/docs.md @@ -671,8 +671,8 @@ queue is in use. Note that it will use in the order of N kB of memory when the backlog is in use. Setting this large allows rclone to calculate how many files are -pending more accurately and give a more accurate estimated finish -time. +pending more accurately, give a more accurate estimated finish +time and make `--order-by` work more accurately. Setting this small will make rclone more synchronous to the listings of the remote which may be desirable. @@ -823,6 +823,48 @@ files if they are incorrect as it would normally. This can be used if the remote is being synced with another tool also (eg the Google Drive client). +### --order-by string ### + +The `--order-by` flag controls the order in which files in the backlog +are processed in `rclone sync`, `rclone copy` and `rclone move`. + +The order by string is constructed like this. The first part +describes what aspect is being measured: + +- `size` - order by the size of the files +- `name` - order by the full path of the files +- `modtime` - order by the modification date of the files + +This can have a modifier appended with a comma: + +- `ascending` or `asc` - order so that the smallest (or oldest) is processed first +- `descending` or `desc` - order so that the largest (or newest) is processed first + +If no modifier is supplied then the order is `ascending`. + +For example + +- `--order-by size,desc` - send the largest files first +- `--order-by modtime,ascending` - send the oldest files first +- `--order-by name` - send the files with alphabetically by path first + +If the `--order-by` flag is not supplied or it is supplied with an +empty string then the default ordering will be used which is as +scanned. With `--checkers 1` this is mostly alphabetical, however +with the default `--checkers 8` it is somewhat random. + +#### Limitations + +The `--order-by` flag does not do a separate pass over the data. This +means that is may transfer some files out of the order specified if + +- there are no files in the backlog or the source has not been fully scanned yet +- there are more than [--max-backlog](#max-backlog-n) files in the backlog + +Rclone will do its best to transfer the best file it has so in +practice this should not cause a problem. Think of `--order-by` as +being more of a best efforts flag rather than a perfect ordering. + ### -P, --progress ### This flag makes rclone update the stats in a static block in the diff --git a/fs/config.go b/fs/config.go index 52df901dd..b497ac7dd 100644 --- a/fs/config.go +++ b/fs/config.go @@ -103,7 +103,8 @@ type ConfigInfo struct { ClientKey string // Client Side Key MultiThreadCutoff SizeSuffix MultiThreadStreams int - MultiThreadSet bool // whether MultiThreadStreams was set (set in fs/config/configflags) + MultiThreadSet bool // whether MultiThreadStreams was set (set in fs/config/configflags) + OrderBy string // instructions on how to order the transfer } // NewConfig creates a new config with everything set to the default diff --git a/fs/config/configflags/configflags.go b/fs/config/configflags/configflags.go index f515e0e3e..3ae0cbf64 100644 --- a/fs/config/configflags/configflags.go +++ b/fs/config/configflags/configflags.go @@ -105,6 +105,7 @@ func AddFlags(flagSet *pflag.FlagSet) { flags.FVarP(flagSet, &fs.Config.MultiThreadCutoff, "multi-thread-cutoff", "", "Use multi-thread downloads for files above this size.") flags.IntVarP(flagSet, &fs.Config.MultiThreadStreams, "multi-thread-streams", "", fs.Config.MultiThreadStreams, "Max number of streams to use for multi-thread downloads.") flags.BoolVarP(flagSet, &fs.Config.UseJSONLog, "use-json-log", "", fs.Config.UseJSONLog, "Use json log format.") + flags.StringVarP(flagSet, &fs.Config.OrderBy, "order-by", "", fs.Config.OrderBy, "Instructions on how to order the transfers, eg 'size,descending'") } // SetFlags converts any flags into config which weren't straight forward diff --git a/fs/sync/pipe.go b/fs/sync/pipe.go index 1af0e684a..60da19841 100644 --- a/fs/sync/pipe.go +++ b/fs/sync/pipe.go @@ -1,12 +1,19 @@ package sync import ( + "container/heap" "context" + "strings" "sync" + "github.com/pkg/errors" "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/fserrors" ) +// compare two items for order by +type lessFn func(a, b fs.ObjectPair) bool + // pipe provides an unbounded channel like experience // // Note unlike channels these aren't strictly ordered. @@ -17,15 +24,58 @@ type pipe struct { closed bool totalSize int64 stats func(items int, totalSize int64) + less lessFn } -func newPipe(stats func(items int, totalSize int64), maxBacklog int) *pipe { - return &pipe{ +func newPipe(orderBy string, stats func(items int, totalSize int64), maxBacklog int) (*pipe, error) { + less, err := newLess(orderBy) + if err != nil { + return nil, fserrors.FatalError(err) + } + p := &pipe{ c: make(chan struct{}, maxBacklog), stats: stats, + less: less, } + if p.less != nil { + heap.Init(p) + } + return p, nil } +// Len satisfy heap.Interface - must be called with lock held +func (p *pipe) Len() int { + return len(p.queue) +} + +// Len satisfy heap.Interface - must be called with lock held +func (p *pipe) Less(i, j int) bool { + return p.less(p.queue[i], p.queue[j]) +} + +// Swap satisfy heap.Interface - must be called with lock held +func (p *pipe) Swap(i, j int) { + p.queue[i], p.queue[j] = p.queue[j], p.queue[i] +} + +// Push satisfy heap.Interface - must be called with lock held +func (p *pipe) Push(item interface{}) { + p.queue = append(p.queue, item.(fs.ObjectPair)) +} + +// Pop satisfy heap.Interface - must be called with lock held +func (p *pipe) Pop() interface{} { + old := p.queue + n := len(old) + item := old[n-1] + old[n-1] = fs.ObjectPair{} // avoid memory leak + p.queue = old[0 : n-1] + return item +} + +// Check interface satisfied +var _ heap.Interface = (*pipe)(nil) + // Put an pair into the pipe // // It returns ok = false if the context was cancelled @@ -36,7 +86,12 @@ func (p *pipe) Put(ctx context.Context, pair fs.ObjectPair) (ok bool) { return false } p.mu.Lock() - p.queue = append(p.queue, pair) + if p.less == nil { + // no order-by + p.queue = append(p.queue, pair) + } else { + heap.Push(p, pair) + } size := pair.Src.Size() if size > 0 { p.totalSize += size @@ -68,10 +123,14 @@ func (p *pipe) Get(ctx context.Context) (pair fs.ObjectPair, ok bool) { } } p.mu.Lock() - pair = p.queue[0] - p.queue[0].Src = nil - p.queue[0].Dst = nil - p.queue = p.queue[1:] + if p.less == nil { + // no order-by + pair = p.queue[0] + p.queue[0] = fs.ObjectPair{} // avoid memory leak + p.queue = p.queue[1:] + } else { + pair = heap.Pop(p).(fs.ObjectPair) + } size := pair.Src.Size() if size > 0 { p.totalSize -= size @@ -101,3 +160,49 @@ func (p *pipe) Close() { p.closed = true p.mu.Unlock() } + +// newLess returns a less function for the heap comparison or nil if +// one is not required +func newLess(orderBy string) (less lessFn, err error) { + if orderBy == "" { + return nil, nil + } + parts := strings.Split(strings.ToLower(orderBy), ",") + if len(parts) > 2 { + return nil, errors.Errorf("bad --order-by string %q", orderBy) + } + switch parts[0] { + case "name": + less = func(a, b fs.ObjectPair) bool { + return a.Src.Remote() < b.Src.Remote() + } + case "size": + less = func(a, b fs.ObjectPair) bool { + return a.Src.Size() < b.Src.Size() + } + case "modtime": + less = func(a, b fs.ObjectPair) bool { + ctx := context.Background() + return a.Src.ModTime(ctx).Before(b.Src.ModTime(ctx)) + } + default: + return nil, errors.Errorf("unknown --order-by comparison %q", parts[0]) + } + descending := false + if len(parts) > 1 { + switch parts[1] { + case "ascending", "asc": + case "descending", "desc": + descending = true + default: + return nil, errors.Errorf("unknown --order-by sort direction %q", parts[1]) + } + } + if descending { + oldLess := less + less = func(a, b fs.ObjectPair) bool { + return !oldLess(a, b) + } + } + return less, nil +} diff --git a/fs/sync/pipe_test.go b/fs/sync/pipe_test.go index 28ee2ca67..1ef34f27d 100644 --- a/fs/sync/pipe_test.go +++ b/fs/sync/pipe_test.go @@ -9,6 +9,7 @@ import ( "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fstest/mockobject" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestPipe(t *testing.T) { @@ -19,7 +20,8 @@ func TestPipe(t *testing.T) { } // Make a new pipe - p := newPipe(stats, 10) + p, err := newPipe("", stats, 10) + require.NoError(t, err) checkStats := func(expectedN int, expectedSize int64) { n, size := p.Stats() @@ -60,7 +62,8 @@ func TestPipe(t *testing.T) { assert.Panics(t, func() { p.Put(ctx, pair1) }) // Make a new pipe - p = newPipe(stats, 10) + p, err = newPipe("", stats, 10) + require.NoError(t, err) ctx2, cancel := context.WithCancel(ctx) // cancel it in the background - check read ceases @@ -86,7 +89,8 @@ func TestPipeConcurrent(t *testing.T) { stats := func(n int, size int64) {} // Make a new pipe - p := newPipe(stats, 10) + p, err := newPipe("", stats, 10) + require.NoError(t, err) var wg sync.WaitGroup obj1 := mockobject.New("potato").WithContent([]byte("hello"), mockobject.SeekModeNone) @@ -120,3 +124,125 @@ func TestPipeConcurrent(t *testing.T) { assert.Equal(t, int64(0), count) } + +func TestPipeOrderBy(t *testing.T) { + var ( + stats = func(n int, size int64) {} + ctx = context.Background() + obj1 = mockobject.New("b").WithContent([]byte("1"), mockobject.SeekModeNone) + obj2 = mockobject.New("a").WithContent([]byte("22"), mockobject.SeekModeNone) + pair1 = fs.ObjectPair{Src: obj1} + pair2 = fs.ObjectPair{Src: obj2} + ) + + for _, test := range []struct { + orderBy string + swapped1 bool + swapped2 bool + }{ + {"", false, true}, + {"size", false, false}, + {"name", true, true}, + {"modtime", false, true}, + {"size,ascending", false, false}, + {"name,asc", true, true}, + {"modtime,ascending", false, true}, + {"size,descending", true, true}, + {"name,desc", false, false}, + {"modtime,descending", true, false}, + } { + t.Run(test.orderBy, func(t *testing.T) { + p, err := newPipe(test.orderBy, stats, 10) + require.NoError(t, err) + + ok := p.Put(ctx, pair1) + assert.True(t, ok) + ok = p.Put(ctx, pair2) + assert.True(t, ok) + + readAndCheck := func(swapped bool) { + readFirst, ok := p.Get(ctx) + assert.True(t, ok) + readSecond, ok := p.Get(ctx) + assert.True(t, ok) + + if swapped { + assert.True(t, readFirst == pair2 && readSecond == pair1) + } else { + assert.True(t, readFirst == pair1 && readSecond == pair2) + } + } + + readAndCheck(test.swapped1) + + // insert other way round + + ok = p.Put(ctx, pair2) + assert.True(t, ok) + ok = p.Put(ctx, pair1) + assert.True(t, ok) + + readAndCheck(test.swapped2) + }) + } +} + +func TestNewLess(t *testing.T) { + t.Run("blankOK", func(t *testing.T) { + less, err := newLess("") + require.NoError(t, err) + assert.Nil(t, less) + }) + + t.Run("tooManyParts", func(t *testing.T) { + _, err := newLess("too,many,parts") + require.Error(t, err) + assert.Contains(t, err.Error(), "bad --order-by string") + }) + + t.Run("unknownComparison", func(t *testing.T) { + _, err := newLess("potato") + require.Error(t, err) + assert.Contains(t, err.Error(), "unknown --order-by comparison") + }) + + t.Run("unknownSortDirection", func(t *testing.T) { + _, err := newLess("name,sideways") + require.Error(t, err) + assert.Contains(t, err.Error(), "unknown --order-by sort direction") + }) + + var ( + obj1 = mockobject.New("b").WithContent([]byte("1"), mockobject.SeekModeNone) + obj2 = mockobject.New("a").WithContent([]byte("22"), mockobject.SeekModeNone) + pair1 = fs.ObjectPair{Src: obj1} + pair2 = fs.ObjectPair{Src: obj2} + ) + + for _, test := range []struct { + orderBy string + pair1LessPair2 bool + pair2LessPair1 bool + }{ + {"size", true, false}, + {"name", false, true}, + {"modtime", false, false}, + {"size,ascending", true, false}, + {"name,asc", false, true}, + {"modtime,ascending", false, false}, + {"size,descending", false, true}, + {"name,desc", true, false}, + {"modtime,descending", true, true}, + } { + t.Run(test.orderBy, func(t *testing.T) { + less, err := newLess(test.orderBy) + require.NoError(t, err) + require.NotNil(t, less) + pair1LessPair2 := less(pair1, pair2) + assert.Equal(t, test.pair1LessPair2, pair1LessPair2) + pair2LessPair1 := less(pair2, pair1) + assert.Equal(t, test.pair2LessPair1, pair2LessPair1) + }) + } + +} diff --git a/fs/sync/sync.go b/fs/sync/sync.go index ca89533c1..7c3fc7230 100644 --- a/fs/sync/sync.go +++ b/fs/sync/sync.go @@ -84,14 +84,24 @@ func newSyncCopyMove(ctx context.Context, fdst, fsrc fs.Fs, deleteMode fs.Delete srcEmptyDirs: make(map[string]fs.DirEntry), noTraverse: fs.Config.NoTraverse, noCheckDest: fs.Config.NoCheckDest, - toBeChecked: newPipe(accounting.Stats(ctx).SetCheckQueue, fs.Config.MaxBacklog), - toBeUploaded: newPipe(accounting.Stats(ctx).SetTransferQueue, fs.Config.MaxBacklog), deleteFilesCh: make(chan fs.Object, fs.Config.Checkers), trackRenames: fs.Config.TrackRenames, commonHash: fsrc.Hashes().Overlap(fdst.Hashes()).GetOne(), - toBeRenamed: newPipe(accounting.Stats(ctx).SetRenameQueue, fs.Config.MaxBacklog), trackRenamesCh: make(chan fs.Object, fs.Config.Checkers), } + var err error + s.toBeChecked, err = newPipe(fs.Config.OrderBy, accounting.Stats(ctx).SetCheckQueue, fs.Config.MaxBacklog) + if err != nil { + return nil, err + } + s.toBeUploaded, err = newPipe(fs.Config.OrderBy, accounting.Stats(ctx).SetTransferQueue, fs.Config.MaxBacklog) + if err != nil { + return nil, err + } + s.toBeRenamed, err = newPipe(fs.Config.OrderBy, accounting.Stats(ctx).SetRenameQueue, fs.Config.MaxBacklog) + if err != nil { + return nil, err + } s.ctx, s.cancel = context.WithCancel(ctx) if s.noTraverse && s.deleteMode != fs.DeleteModeOff { fs.Errorf(nil, "Ignoring --no-traverse with sync")