mirror of https://github.com/rclone/rclone synced 2024-10-17 03:01:13 +02:00
Nick Craig-Wood d178233e74 sync,march: check the cancel context on every channel send and receive
This fixes a deadlock on sync when all the copying channels receive a
Fatal Error.
2018-05-05 12:58:28 +01:00

419 lines
11 KiB

// Package march traverses two directories in lock step
package march
import (
// March holds the data used to traverse two Fs simultaneously,
// calling callback for each match
type March struct {
// parameters
ctx context.Context
fdst fs.Fs
fsrc fs.Fs
dir string
callback Marcher
// internal state
srcListDir listDirFn // function to call to list a directory in the src
dstListDir listDirFn // function to call to list a directory in the dst
transforms []matchTransformFn
// Marcher is called on each match
type Marcher interface {
// SrcOnly is called for a DirEntry found only in the source
SrcOnly(src fs.DirEntry) (recurse bool)
// DstOnly is called for a DirEntry found only in the destination
DstOnly(dst fs.DirEntry) (recurse bool)
// Match is called for a DirEntry found both in the source and destination
Match(dst, src fs.DirEntry) (recurse bool)
// New sets up a march over fsrc, and fdst calling back callback for each match
func New(ctx context.Context, fdst, fsrc fs.Fs, dir string, callback Marcher) *March {
m := &March{
ctx: ctx,
fdst: fdst,
fsrc: fsrc,
dir: dir,
callback: callback,
m.srcListDir = m.makeListDir(fsrc, false)
m.dstListDir = m.makeListDir(fdst, filter.Active.Opt.DeleteExcluded)
// Now create the matching transform
// ..normalise the UTF8 first
m.transforms = append(m.transforms, norm.NFC.String)
// ..if destination is caseInsensitive then make it lower case
// case Insensitive | src | dst | lower case compare |
// | No | No | No |
// | Yes | No | No |
// | No | Yes | Yes |
// | Yes | Yes | Yes |
if fdst.Features().CaseInsensitive {
m.transforms = append(m.transforms, strings.ToLower)
return m
// list a directory into entries, err
type listDirFn func(dir string) (entries fs.DirEntries, err error)
// makeListDir makes a listing function for the given fs and includeAll flags
func (m *March) makeListDir(f fs.Fs, includeAll bool) listDirFn {
if !fs.Config.UseListR || f.Features().ListR == nil {
return func(dir string) (entries fs.DirEntries, err error) {
return list.DirSorted(f, includeAll, dir)
var (
mu sync.Mutex
started bool
dirs walk.DirTree
dirsErr error
return func(dir string) (entries fs.DirEntries, err error) {
defer mu.Unlock()
if !started {
dirs, dirsErr = walk.NewDirTree(f, m.dir, includeAll, fs.Config.MaxDepth)
started = true
if dirsErr != nil {
return nil, dirsErr
entries, ok := dirs[dir]
if !ok {
err = fs.ErrorDirNotFound
} else {
delete(dirs, dir)
return entries, err
// listDirJob describe a directory listing that needs to be done
type listDirJob struct {
srcRemote string
dstRemote string
srcDepth int
dstDepth int
noSrc bool
noDst bool
// Run starts the matching process off
func (m *March) Run() {
srcDepth := fs.Config.MaxDepth
if srcDepth < 0 {
srcDepth = fs.MaxLevel
dstDepth := srcDepth
if filter.Active.Opt.DeleteExcluded {
dstDepth = fs.MaxLevel
// Start some directory listing go routines
var wg sync.WaitGroup // sync closing of go routines
var traversing sync.WaitGroup // running directory traversals
in := make(chan listDirJob, fs.Config.Checkers)
for i := 0; i < fs.Config.Checkers; i++ {
go func() {
defer wg.Done()
for {
select {
case <-m.ctx.Done():
case job, ok := <-in:
if !ok {
jobs := m.processJob(job)
if len(jobs) > 0 {
go func() {
// Now we have traversed this directory, send these
// jobs off for traversal in the background
for _, newJob := range jobs {
select {
case <-m.ctx.Done():
// discard job if finishing
case in <- newJob:
// Start the process
in <- listDirJob{
srcRemote: m.dir,
srcDepth: srcDepth - 1,
dstRemote: m.dir,
dstDepth: dstDepth - 1,
go func() {
// when the context is cancelled discard the remaining jobs
for range in {
// Check to see if the context has been cancelled
func (m *March) aborting() bool {
select {
case <-m.ctx.Done():
return true
return false
// matchEntry is an entry plus transformed name
type matchEntry struct {
entry fs.DirEntry
leaf string
name string
// matchEntries contains many matchEntry~s
type matchEntries []matchEntry
// Len is part of sort.Interface.
func (es matchEntries) Len() int { return len(es) }
// Swap is part of sort.Interface.
func (es matchEntries) Swap(i, j int) { es[i], es[j] = es[j], es[i] }
// Less is part of sort.Interface.
// Compare in order (name, leaf, remote)
func (es matchEntries) Less(i, j int) bool {
ei, ej := &es[i], &es[j]
if ei.name == ej.name {
if ei.leaf == ej.leaf {
return ei.entry.Remote() < ej.entry.Remote()
return ei.leaf < ej.leaf
return ei.name < ej.name
// Sort the directory entries by (name, leaf, remote)
// We use a stable sort here just in case there are
// duplicates. Assuming the remote delivers the entries in a
// consistent order, this will give the best user experience
// in syncing as it will use the first entry for the sync
// comparison.
func (es matchEntries) sort() {
// make a matchEntries from a newMatch entries
func newMatchEntries(entries fs.DirEntries, transforms []matchTransformFn) matchEntries {
es := make(matchEntries, len(entries))
for i := range es {
es[i].entry = entries[i]
name := path.Base(entries[i].Remote())
es[i].leaf = name
for _, transform := range transforms {
name = transform(name)
es[i].name = name
return es
// matchPair is a matched pair of direntries returned by matchListings
type matchPair struct {
src, dst fs.DirEntry
// matchTransformFn converts a name into a form which is used for
// comparison in matchListings.
type matchTransformFn func(name string) string
// Process the two listings, matching up the items in the two slices
// using the transform function on each name first.
// Into srcOnly go Entries which only exist in the srcList
// Into dstOnly go Entries which only exist in the dstList
// Into matches go matchPair's of src and dst which have the same name
// This checks for duplicates and checks the list is sorted.
func matchListings(srcListEntries, dstListEntries fs.DirEntries, transforms []matchTransformFn) (srcOnly fs.DirEntries, dstOnly fs.DirEntries, matches []matchPair) {
srcList := newMatchEntries(srcListEntries, transforms)
dstList := newMatchEntries(dstListEntries, transforms)
for iSrc, iDst := 0, 0; ; iSrc, iDst = iSrc+1, iDst+1 {
var src, dst fs.DirEntry
var srcName, dstName string
if iSrc < len(srcList) {
src = srcList[iSrc].entry
srcName = srcList[iSrc].name
if iDst < len(dstList) {
dst = dstList[iDst].entry
dstName = dstList[iDst].name
if src == nil && dst == nil {
if src != nil && iSrc > 0 {
prev := srcList[iSrc-1].name
if srcName == prev {
fs.Logf(src, "Duplicate %s found in source - ignoring", fs.DirEntryType(src))
iDst-- // ignore the src and retry the dst
} else if srcName < prev {
// this should never happen since we sort the listings
panic("Out of order listing in source")
if dst != nil && iDst > 0 {
prev := dstList[iDst-1].name
if dstName == prev {
fs.Logf(dst, "Duplicate %s found in destination - ignoring", fs.DirEntryType(dst))
iSrc-- // ignore the dst and retry the src
} else if dstName < prev {
// this should never happen since we sort the listings
panic("Out of order listing in destination")
if src != nil && dst != nil {
if srcName < dstName {
dst = nil
iDst-- // retry the dst
} else if srcName > dstName {
src = nil
iSrc-- // retry the src
// Debugf(nil, "src = %v, dst = %v", src, dst)
switch {
case src == nil && dst == nil:
// do nothing
case src == nil:
dstOnly = append(dstOnly, dst)
case dst == nil:
srcOnly = append(srcOnly, src)
matches = append(matches, matchPair{src: src, dst: dst})
// processJob processes a listDirJob listing the source and
// destination directories, comparing them and returning a slice of
// more jobs
// returns errors using processError
func (m *March) processJob(job listDirJob) (jobs []listDirJob) {
var (
srcList, dstList fs.DirEntries
srcListErr, dstListErr error
wg sync.WaitGroup
// List the src and dst directories
if !job.noSrc {
go func() {
defer wg.Done()
srcList, srcListErr = m.srcListDir(job.srcRemote)
if !job.noDst {
go func() {
defer wg.Done()
dstList, dstListErr = m.dstListDir(job.dstRemote)
// Wait for listings to complete and report errors
if srcListErr != nil {
fs.Errorf(job.srcRemote, "error reading source directory: %v", srcListErr)
return nil
if dstListErr == fs.ErrorDirNotFound {
// Copy the stuff anyway
} else if dstListErr != nil {
fs.Errorf(job.dstRemote, "error reading destination directory: %v", dstListErr)
return nil
// Work out what to do and do it
srcOnly, dstOnly, matches := matchListings(srcList, dstList, m.transforms)
for _, src := range srcOnly {
if m.aborting() {
return nil
recurse := m.callback.SrcOnly(src)
if recurse && job.srcDepth > 0 {
jobs = append(jobs, listDirJob{
srcRemote: src.Remote(),
srcDepth: job.srcDepth - 1,
noDst: true,
for _, dst := range dstOnly {
if m.aborting() {
return nil
recurse := m.callback.DstOnly(dst)
if recurse && job.dstDepth > 0 {
jobs = append(jobs, listDirJob{
dstRemote: dst.Remote(),
dstDepth: job.dstDepth - 1,
noSrc: true,
for _, match := range matches {
if m.aborting() {
return nil
recurse := m.callback.Match(match.dst, match.src)
if recurse && job.srcDepth > 0 && job.dstDepth > 0 {
jobs = append(jobs, listDirJob{
srcRemote: match.src.Remote(),
dstRemote: match.dst.Remote(),
srcDepth: job.srcDepth - 1,
dstDepth: job.dstDepth - 1,
return jobs