diff --git a/docs/content/docs.md b/docs/content/docs.md index 27c96a700..2489c9cda 100644 --- a/docs/content/docs.md +++ b/docs/content/docs.md @@ -536,6 +536,13 @@ files not recursed through are considered excluded and will be deleted on the destination. Test first with `--dry-run` if you are not sure what will happen. +### --max-transfer=SIZE ### + +Rclone will stop transferring when it has reached the size specified. +Defaults to off. + +When the limit is reached all transfers will stop immediately. + ### --modify-window=TIME ### When checking whether a file has been modified, this is the maximum diff --git a/fs/accounting/accounting.go b/fs/accounting/accounting.go index cf0340f93..5d51d85aa 100644 --- a/fs/accounting/accounting.go +++ b/fs/accounting/accounting.go @@ -10,8 +10,14 @@ import ( "github.com/VividCortex/ewma" "github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs/asyncreader" + "github.com/ncw/rclone/fs/fserrors" + "github.com/pkg/errors" ) +// ErrorMaxTransferLimitReached is returned from Read when the max +// transfer limit is reached. +var ErrorMaxTransferLimitReached = fserrors.FatalError(errors.New("Max transfer limit reached as set by --max-transfer")) + // Account limits and accounts for one transfer type Account struct { // The mutex is to make sure Read() and Close() aren't called @@ -27,6 +33,7 @@ type Account struct { name string statmu sync.Mutex // Separate mutex for stat values. bytes int64 // Total number of bytes read + max int64 // if >=0 the max number of bytes to transfer start time.Time // Start time of first read lpTime time.Time // Time of last average measurement lpBytes int // Number of bytes read since last measurement @@ -48,6 +55,7 @@ func NewAccountSizeName(in io.ReadCloser, size int64, name string) *Account { exit: make(chan struct{}), avg: ewma.NewMovingAverage(), lpTime: time.Now(), + max: int64(fs.Config.MaxTransfer), } go acc.averageLoop() Stats.inProgress.set(acc.name, acc) @@ -131,8 +139,12 @@ func (acc *Account) averageLoop() { // read bytes from the io.Reader passed in and account them func (acc *Account) read(in io.Reader, p []byte) (n int, err error) { - // Set start time. acc.statmu.Lock() + if acc.max >= 0 && Stats.GetBytes() >= acc.max { + acc.statmu.Unlock() + return 0, ErrorMaxTransferLimitReached + } + // Set start time. if acc.start.IsZero() { acc.start = time.Now() } diff --git a/fs/accounting/accounting_test.go b/fs/accounting/accounting_test.go index 554a49623..b197a4a7d 100644 --- a/fs/accounting/accounting_test.go +++ b/fs/accounting/accounting_test.go @@ -7,7 +7,9 @@ import ( "strings" "testing" + "github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs/asyncreader" + "github.com/ncw/rclone/fs/fserrors" "github.com/ncw/rclone/fstest/mockobject" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -181,3 +183,28 @@ func TestAccountAccounter(t *testing.T) { assert.True(t, wrap(in3) == in3) } + +func TestAccountMaxTransfer(t *testing.T) { + old := fs.Config.MaxTransfer + fs.Config.MaxTransfer = 15 + defer func() { + fs.Config.MaxTransfer = old + }() + Stats.ResetCounters() + + in := ioutil.NopCloser(bytes.NewBuffer(make([]byte, 100))) + acc := NewAccountSizeName(in, 1, "test") + + var b = make([]byte, 10) + + n, err := acc.Read(b) + assert.Equal(t, 10, n) + assert.NoError(t, err) + n, err = acc.Read(b) + assert.Equal(t, 10, n) + assert.NoError(t, err) + n, err = acc.Read(b) + assert.Equal(t, 0, n) + assert.Equal(t, ErrorMaxTransferLimitReached, err) + assert.True(t, fserrors.IsFatalError(err)) +} diff --git a/fs/accounting/stats.go b/fs/accounting/stats.go index cd2557247..c666e39ff 100644 --- a/fs/accounting/stats.go +++ b/fs/accounting/stats.go @@ -95,6 +95,13 @@ func (s *StatsInfo) Bytes(bytes int64) { s.bytes += bytes } +// GetBytes returns the number of bytes transferred so far +func (s *StatsInfo) GetBytes() int64 { + s.lock.Lock() + defer s.lock.Unlock() + return s.bytes +} + // Errors updates the stats for errors func (s *StatsInfo) Errors(errors int64) { s.lock.Lock() diff --git a/fs/config.go b/fs/config.go index fb948cdf2..543923981 100644 --- a/fs/config.go +++ b/fs/config.go @@ -71,6 +71,7 @@ type ConfigInfo struct { StatsFileNameLength int AskPassword bool UseServerModTime bool + MaxTransfer SizeSuffix } // NewConfig creates a new config with everything set to the default @@ -98,6 +99,7 @@ func NewConfig() *ConfigInfo { c.StatsFileNameLength = 40 c.AskPassword = true c.TPSLimitBurst = 1 + c.MaxTransfer = -1 return c } diff --git a/fs/config/configflags/configflags.go b/fs/config/configflags/configflags.go index 9d22dea85..c3a1531a7 100644 --- a/fs/config/configflags/configflags.go +++ b/fs/config/configflags/configflags.go @@ -82,7 +82,7 @@ func AddFlags(flagSet *pflag.FlagSet) { flags.FVarP(flagSet, &fs.Config.BufferSize, "buffer-size", "", "Buffer size when copying files.") flags.FVarP(flagSet, &fs.Config.StreamingUploadCutoff, "streaming-upload-cutoff", "", "Cutoff for switching to chunked upload if file size is unknown. Upload starts after reaching cutoff or when file ends.") flags.FVarP(flagSet, &fs.Config.Dump, "dump", "", "List of items to dump from: "+fs.DumpFlagsList) - + flags.FVarP(flagSet, &fs.Config.MaxTransfer, "max-transfer", "", "Maximum size of data to transfer.") } // SetFlags converts any flags into config which weren't straight foward