mirror of
https://github.com/rclone/rclone
synced 2025-01-25 07:47:29 +01:00
209 lines
4.4 KiB
Go
209 lines
4.4 KiB
Go
package sync
|
|
|
|
import (
|
|
"container/heap"
|
|
"context"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/pkg/errors"
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/fserrors"
|
|
)
|
|
|
|
// compare two items for order by
|
|
type lessFn func(a, b fs.ObjectPair) bool
|
|
|
|
// pipe provides an unbounded channel like experience
|
|
//
|
|
// Note unlike channels these aren't strictly ordered.
|
|
type pipe struct {
|
|
mu sync.Mutex
|
|
c chan struct{}
|
|
queue []fs.ObjectPair
|
|
closed bool
|
|
totalSize int64
|
|
stats func(items int, totalSize int64)
|
|
less lessFn
|
|
}
|
|
|
|
func newPipe(orderBy string, stats func(items int, totalSize int64), maxBacklog int) (*pipe, error) {
|
|
less, err := newLess(orderBy)
|
|
if err != nil {
|
|
return nil, fserrors.FatalError(err)
|
|
}
|
|
p := &pipe{
|
|
c: make(chan struct{}, maxBacklog),
|
|
stats: stats,
|
|
less: less,
|
|
}
|
|
if p.less != nil {
|
|
heap.Init(p)
|
|
}
|
|
return p, nil
|
|
}
|
|
|
|
// Len satisfy heap.Interface - must be called with lock held
|
|
func (p *pipe) Len() int {
|
|
return len(p.queue)
|
|
}
|
|
|
|
// Len satisfy heap.Interface - must be called with lock held
|
|
func (p *pipe) Less(i, j int) bool {
|
|
return p.less(p.queue[i], p.queue[j])
|
|
}
|
|
|
|
// Swap satisfy heap.Interface - must be called with lock held
|
|
func (p *pipe) Swap(i, j int) {
|
|
p.queue[i], p.queue[j] = p.queue[j], p.queue[i]
|
|
}
|
|
|
|
// Push satisfy heap.Interface - must be called with lock held
|
|
func (p *pipe) Push(item interface{}) {
|
|
p.queue = append(p.queue, item.(fs.ObjectPair))
|
|
}
|
|
|
|
// Pop satisfy heap.Interface - must be called with lock held
|
|
func (p *pipe) Pop() interface{} {
|
|
old := p.queue
|
|
n := len(old)
|
|
item := old[n-1]
|
|
old[n-1] = fs.ObjectPair{} // avoid memory leak
|
|
p.queue = old[0 : n-1]
|
|
return item
|
|
}
|
|
|
|
// Check interface satisfied
|
|
var _ heap.Interface = (*pipe)(nil)
|
|
|
|
// Put an pair into the pipe
|
|
//
|
|
// It returns ok = false if the context was cancelled
|
|
//
|
|
// It will panic if you call it after Close()
|
|
func (p *pipe) Put(ctx context.Context, pair fs.ObjectPair) (ok bool) {
|
|
if ctx.Err() != nil {
|
|
return false
|
|
}
|
|
p.mu.Lock()
|
|
if p.less == nil {
|
|
// no order-by
|
|
p.queue = append(p.queue, pair)
|
|
} else {
|
|
heap.Push(p, pair)
|
|
}
|
|
size := pair.Src.Size()
|
|
if size > 0 {
|
|
p.totalSize += size
|
|
}
|
|
p.stats(len(p.queue), p.totalSize)
|
|
p.mu.Unlock()
|
|
select {
|
|
case <-ctx.Done():
|
|
return false
|
|
case p.c <- struct{}{}:
|
|
}
|
|
return true
|
|
}
|
|
|
|
// Get a pair from the pipe
|
|
//
|
|
// It returns ok = false if the context was cancelled or Close() has
|
|
// been called.
|
|
func (p *pipe) Get(ctx context.Context) (pair fs.ObjectPair, ok bool) {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case _, ok = <-p.c:
|
|
if !ok {
|
|
return
|
|
}
|
|
}
|
|
p.mu.Lock()
|
|
if p.less == nil {
|
|
// no order-by
|
|
pair = p.queue[0]
|
|
p.queue[0] = fs.ObjectPair{} // avoid memory leak
|
|
p.queue = p.queue[1:]
|
|
} else {
|
|
pair = heap.Pop(p).(fs.ObjectPair)
|
|
}
|
|
size := pair.Src.Size()
|
|
if size > 0 {
|
|
p.totalSize -= size
|
|
}
|
|
if p.totalSize < 0 {
|
|
p.totalSize = 0
|
|
}
|
|
p.stats(len(p.queue), p.totalSize)
|
|
p.mu.Unlock()
|
|
return pair, true
|
|
}
|
|
|
|
// Stats reads the number of items in the queue and the totalSize
|
|
func (p *pipe) Stats() (items int, totalSize int64) {
|
|
p.mu.Lock()
|
|
items, totalSize = len(p.queue), p.totalSize
|
|
p.mu.Unlock()
|
|
return items, totalSize
|
|
}
|
|
|
|
// Close the pipe
|
|
//
|
|
// Writes to a closed pipe will panic as will double closing a pipe
|
|
func (p *pipe) Close() {
|
|
p.mu.Lock()
|
|
close(p.c)
|
|
p.closed = true
|
|
p.mu.Unlock()
|
|
}
|
|
|
|
// newLess returns a less function for the heap comparison or nil if
|
|
// one is not required
|
|
func newLess(orderBy string) (less lessFn, err error) {
|
|
if orderBy == "" {
|
|
return nil, nil
|
|
}
|
|
parts := strings.Split(strings.ToLower(orderBy), ",")
|
|
if len(parts) > 2 {
|
|
return nil, errors.Errorf("bad --order-by string %q", orderBy)
|
|
}
|
|
switch parts[0] {
|
|
case "name":
|
|
less = func(a, b fs.ObjectPair) bool {
|
|
return a.Src.Remote() < b.Src.Remote()
|
|
}
|
|
case "size":
|
|
less = func(a, b fs.ObjectPair) bool {
|
|
return a.Src.Size() < b.Src.Size()
|
|
}
|
|
case "modtime":
|
|
less = func(a, b fs.ObjectPair) bool {
|
|
ctx := context.Background()
|
|
return a.Src.ModTime(ctx).Before(b.Src.ModTime(ctx))
|
|
}
|
|
default:
|
|
return nil, errors.Errorf("unknown --order-by comparison %q", parts[0])
|
|
}
|
|
descending := false
|
|
if len(parts) > 1 {
|
|
switch parts[1] {
|
|
case "ascending", "asc":
|
|
case "descending", "desc":
|
|
descending = true
|
|
default:
|
|
return nil, errors.Errorf("unknown --order-by sort direction %q", parts[1])
|
|
}
|
|
}
|
|
if descending {
|
|
oldLess := less
|
|
less = func(a, b fs.ObjectPair) bool {
|
|
return !oldLess(a, b)
|
|
}
|
|
}
|
|
return less, nil
|
|
}
|