From 05050d53adc506ee06f5f384fc8f1cbe33ea48af Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Thu, 17 Sep 2015 18:12:37 +0100 Subject: [PATCH] Implement compliant pacing scheme for Amazon Cloud Drive * Implement switchable pacing algorithm * Add tests for pacer --- amazonclouddrive/amazonclouddrive.go | 5 +- pacer/pacer.go | 151 ++++++++++--- pacer/pacer_test.go | 307 +++++++++++++++++++++++++++ 3 files changed, 433 insertions(+), 30 deletions(-) create mode 100644 pacer/pacer_test.go diff --git a/amazonclouddrive/amazonclouddrive.go b/amazonclouddrive/amazonclouddrive.go index f44cb0716..43a9eb2ed 100644 --- a/amazonclouddrive/amazonclouddrive.go +++ b/amazonclouddrive/amazonclouddrive.go @@ -40,8 +40,6 @@ const ( statusAvailable = "AVAILABLE" timeFormat = time.RFC3339 // 2014-03-07T22:31:12.173Z minSleep = 20 * time.Millisecond - maxSleep = 15 * time.Second - decayConstant = 2 // bigger for slower decay, exponential ) // Globals @@ -129,6 +127,7 @@ var retryErrorCodes = []int{ 429, // Rate exceeded. 500, // Get occasional 500 Internal Server Error 409, // Conflict - happens in the unit tests a lot + 503, // Service Unavailable } // shouldRetry returns a boolean as to whether this resp and err @@ -172,7 +171,7 @@ func NewFs(name, root string) (fs.Fs, error) { name: name, root: root, c: c, - pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant), + pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.AmazonCloudDrivePacer), } // Update endpoints diff --git a/pacer/pacer.go b/pacer/pacer.go index d51da5dbb..e133786fa 100644 --- a/pacer/pacer.go +++ b/pacer/pacer.go @@ -1,25 +1,54 @@ -// pacer is a utility package to make pacing and retrying API calls easy +// Package pacer makes pacing and retrying API calls easy package pacer import ( + "math/rand" "sync" "time" "github.com/ncw/rclone/fs" ) +// Pacer state type Pacer struct { - minSleep time.Duration // minimum sleep time - maxSleep time.Duration // maximum sleep time - decayConstant uint // decay constant - pacer chan struct{} // To pace the operations - sleepTime time.Duration // Time to sleep for each transaction - retries int // Max number of retries - mu sync.Mutex // Protecting read/writes - maxConnections int // Maximum number of concurrent connections - connTokens chan struct{} // Connection tokens + mu sync.Mutex // Protecting read/writes + minSleep time.Duration // minimum sleep time + maxSleep time.Duration // maximum sleep time + decayConstant uint // decay constant + pacer chan struct{} // To pace the operations + sleepTime time.Duration // Time to sleep for each transaction + retries int // Max number of retries + maxConnections int // Maximum number of concurrent connections + connTokens chan struct{} // Connection tokens + calculatePace func(bool) // switchable pacing algorithm - call with mu held + consecutiveRetries int // number of consecutive retries } +// Type is for selecting different pacing algorithms +type Type int + +const ( + // DefaultPacer is a truncated exponential attack and decay. + // + // On retries the sleep time is doubled, on non errors then + // sleeptime decays according to the decay constant as set + // with SetDecayConstant. + // + // The sleep never goes below that set with SetMinSleep or + // above that set with SetMaxSleep. + DefaultPacer = Type(iota) + + // AmazonCloudDrivePacer is a specialised pacer for Amazon Cloud Drive + // + // It implements a truncated exponential backoff strategy with + // randomization. Normally operations are paced at the + // interval set with SetMinSleep. On errors the sleep timer + // is set to 0..2**retries seconds. + // + // See https://developer.amazon.com/public/apis/experience/cloud-drive/content/restful-api-best-practices + AmazonCloudDrivePacer +) + // Paced is a function which is called by the Call and CallNoRetry // methods. It should return a boolean, true if it would like to be // retried, and an error. This error may be returned or returned @@ -36,7 +65,9 @@ func New() *Pacer { pacer: make(chan struct{}, 1), } p.sleepTime = p.minSleep - p.SetMaxConnections(fs.Config.Checkers) + p.SetPacer(DefaultPacer) + p.SetMaxConnections(fs.Config.Checkers + fs.Config.Transfers) + // Put the first pacing token in p.pacer <- struct{}{} @@ -101,6 +132,22 @@ func (p *Pacer) SetRetries(retries int) *Pacer { return p } +// SetPacer sets the pacing algorithm +// +// It will choose the default algorithm if an incorrect value is +// passed in. +func (p *Pacer) SetPacer(t Type) *Pacer { + p.mu.Lock() + defer p.mu.Unlock() + switch t { + case AmazonCloudDrivePacer: + p.calculatePace = p.acdPacer + default: + p.calculatePace = p.defaultPacer + } + return p +} + // Start a call to the API // // This must be called as a pair with endCall @@ -126,17 +173,18 @@ func (p *Pacer) beginCall() { p.mu.Unlock() } -// End a call to the API +// exponentialImplementation implements a exponentialImplementation up +// and down pacing algorithm // -// Refresh the pace given an error that was returned. It returns a -// boolean as to whether the operation should be retried. -func (p *Pacer) endCall(again bool) { - if p.maxConnections > 0 { - p.connTokens <- struct{}{} - } - p.mu.Lock() +// See the description for DefaultPacer +// +// This should calculate a new sleepTime. It takes a boolean as to +// whether the operation should be retried or not. +// +// Call with p.mu held +func (p *Pacer) defaultPacer(retry bool) { oldSleepTime := p.sleepTime - if again { + if retry { p.sleepTime *= 2 if p.sleepTime > p.maxSleep { p.sleepTime = p.maxSleep @@ -153,21 +201,70 @@ func (p *Pacer) endCall(again bool) { fs.Debug("pacer", "Reducing sleep to %v", p.sleepTime) } } +} + +// acdPacer implements a truncated exponential backoff +// strategy with randomization for Amazon Cloud Drive +// +// See the description for AmazonCloudDrivePacer +// +// This should calculate a new sleepTime. It takes a boolean as to +// whether the operation should be retried or not. +// +// Call with p.mu held +func (p *Pacer) acdPacer(retry bool) { + consecutiveRetries := p.consecutiveRetries + if consecutiveRetries == 0 { + if p.sleepTime != p.minSleep { + p.sleepTime = p.minSleep + fs.Debug("pacer", "Resetting sleep to minimum %v on success", p.sleepTime) + } + } else { + if consecutiveRetries > 9 { + consecutiveRetries = 9 + } + // consecutiveRetries starts at 1 so + // maxSleep is 2**(consecutiveRetries-1) seconds + maxSleep := time.Second << uint(consecutiveRetries-1) + // actual sleep is random from 0..maxSleep + p.sleepTime = time.Duration(rand.Int63n(int64(maxSleep))) + if p.sleepTime < p.minSleep { + p.sleepTime = p.minSleep + } + fs.Debug("pacer", "Rate limited, sleeping for %v (%d retries)", p.sleepTime, consecutiveRetries) + } +} + +// endCall implements the pacing algorithm +// +// This should calculate a new sleepTime. It takes a boolean as to +// whether the operation should be retried or not. +func (p *Pacer) endCall(retry bool) { + if p.maxConnections > 0 { + p.connTokens <- struct{}{} + } + p.mu.Lock() + if retry { + p.consecutiveRetries++ + } else { + p.consecutiveRetries = 0 + } + p.calculatePace(retry) p.mu.Unlock() } // call implements Call but with settable retries func (p *Pacer) call(fn Paced, retries int) (err error) { - var again bool + var retry bool for i := 0; i < retries; i++ { p.beginCall() - again, err = fn() - p.endCall(again) - if !again { + retry, err = fn() + p.endCall(retry) + if !retry { break } } - if again { + if retry { err = fs.RetryError(err) } return err @@ -186,8 +283,8 @@ func (p *Pacer) Call(fn Paced) (err error) { return p.call(fn, retries) } -// Pace the remote operations to not exceed Amazon's limits and return -// a retry error on rate limit exceeded +// CallNoRetry paces the remote operations to not exceed the limits +// and return a retry error on rate limit exceeded // // This calls fn and wraps the output in a RetryError if it would like // it to be retried diff --git a/pacer/pacer_test.go b/pacer/pacer_test.go new file mode 100644 index 000000000..e0c1c5728 --- /dev/null +++ b/pacer/pacer_test.go @@ -0,0 +1,307 @@ +package pacer + +import ( + "fmt" + "testing" + "time" + + "github.com/ncw/rclone/fs" +) + +func TestNew(t *testing.T) { + p := New() + if p.minSleep != 10*time.Millisecond { + t.Errorf("minSleep") + } + if p.maxSleep != 2*time.Second { + t.Errorf("maxSleep") + } + if p.sleepTime != p.minSleep { + t.Errorf("sleepTime") + } + if p.retries != 10 { + t.Errorf("retries") + } + if p.decayConstant != 2 { + t.Errorf("decayConstant") + } + if cap(p.pacer) != 1 { + t.Errorf("pacer 1") + } + if len(p.pacer) != 1 { + t.Errorf("pacer 2") + } + if fmt.Sprintf("%p", p.calculatePace) != fmt.Sprintf("%p", p.defaultPacer) { + t.Errorf("calculatePace") + } + if p.maxConnections != fs.Config.Checkers+fs.Config.Transfers { + t.Errorf("maxConnections") + } + if cap(p.connTokens) != fs.Config.Checkers+fs.Config.Transfers { + t.Errorf("connTokens") + } + if p.consecutiveRetries != 0 { + t.Errorf("consecutiveRetries") + } +} + +func TestSetMinSleep(t *testing.T) { + p := New().SetMinSleep(1 * time.Millisecond) + if p.minSleep != 1*time.Millisecond { + t.Errorf("didn't set") + } +} + +func TestSetMaxSleep(t *testing.T) { + p := New().SetMaxSleep(100 * time.Second) + if p.maxSleep != 100*time.Second { + t.Errorf("didn't set") + } +} + +func TestMaxConnections(t *testing.T) { + p := New().SetMaxConnections(20) + if p.maxConnections != 20 { + t.Errorf("maxConnections") + } + if cap(p.connTokens) != 20 { + t.Errorf("connTokens") + } + p.SetMaxConnections(0) + if p.maxConnections != 0 { + t.Errorf("maxConnections is not 0") + } + if p.connTokens != nil { + t.Errorf("connTokens is not nil") + } +} + +func TestSetDecayConstant(t *testing.T) { + p := New().SetDecayConstant(17) + if p.decayConstant != 17 { + t.Errorf("didn't set") + } +} + +func TestSetRetries(t *testing.T) { + p := New().SetRetries(18) + if p.retries != 18 { + t.Errorf("didn't set") + } +} + +func TestSetPacer(t *testing.T) { + p := New().SetPacer(AmazonCloudDrivePacer) + if fmt.Sprintf("%p", p.calculatePace) != fmt.Sprintf("%p", p.acdPacer) { + t.Errorf("calculatePace is not acdPacer") + } + p.SetPacer(DefaultPacer) + if fmt.Sprintf("%p", p.calculatePace) != fmt.Sprintf("%p", p.defaultPacer) { + t.Errorf("calculatePace is not defaultPacer") + } +} + +// emptyTokens empties the pacer of all its tokens +func emptyTokens(p *Pacer) { + for len(p.pacer) != 0 { + <-p.pacer + } + for len(p.connTokens) != 0 { + <-p.connTokens + } +} + +func TestBeginCall(t *testing.T) { + p := New().SetMaxConnections(10).SetMinSleep(1 * time.Millisecond) + emptyTokens(p) + go p.beginCall() + time.Sleep(2 * p.minSleep) + if len(p.pacer) != 0 { + t.Errorf("beginSleep fired too early #1") + } + p.pacer <- struct{}{} + time.Sleep(2 * p.minSleep) + if len(p.pacer) != 0 { + t.Errorf("beginSleep fired too early #2") + } + p.connTokens <- struct{}{} + time.Sleep(2 * p.minSleep) + if len(p.pacer) == 0 { + t.Errorf("beginSleep didn't fire") + } +} + +func TestBeginCallZeroConnections(t *testing.T) { + p := New().SetMaxConnections(0).SetMinSleep(1 * time.Millisecond) + emptyTokens(p) + go p.beginCall() + time.Sleep(2 * p.minSleep) + if len(p.pacer) != 0 { + t.Errorf("beginSleep fired too early #1") + } + p.pacer <- struct{}{} + time.Sleep(2 * p.minSleep) + if len(p.pacer) == 0 { + t.Errorf("beginSleep didn't fire") + } +} + +func TestDefaultPacer(t *testing.T) { + p := New().SetMinSleep(time.Millisecond).SetPacer(DefaultPacer).SetMaxSleep(time.Second).SetDecayConstant(2) + for _, test := range []struct { + in time.Duration + retry bool + want time.Duration + }{ + {time.Millisecond, true, 2 * time.Millisecond}, + {time.Second, true, time.Second}, + {(3 * time.Second) / 4, true, time.Second}, + {time.Second, false, 750 * time.Millisecond}, + {1000 * time.Microsecond, false, time.Millisecond}, + {1200 * time.Microsecond, false, time.Millisecond}, + } { + p.sleepTime = test.in + p.defaultPacer(test.retry) + got := p.sleepTime + if got != test.want { + t.Errorf("bad sleep want %v got %v", test.want, got) + } + } + +} + +func TestAmazonCloudDrivePacer(t *testing.T) { + p := New().SetMinSleep(time.Millisecond).SetPacer(AmazonCloudDrivePacer).SetMaxSleep(time.Second).SetDecayConstant(2) + // Do lots of times because of the random number! + for _, test := range []struct { + in time.Duration + consecutiveRetries int + retry bool + want time.Duration + }{ + {time.Millisecond, 0, true, time.Millisecond}, + {10 * time.Millisecond, 0, true, time.Millisecond}, + {1 * time.Second, 1, true, 500 * time.Millisecond}, + {1 * time.Second, 2, true, 1 * time.Second}, + {1 * time.Second, 3, true, 2 * time.Second}, + {1 * time.Second, 4, true, 4 * time.Second}, + {1 * time.Second, 5, true, 8 * time.Second}, + {1 * time.Second, 6, true, 16 * time.Second}, + {1 * time.Second, 7, true, 32 * time.Second}, + {1 * time.Second, 8, true, 64 * time.Second}, + {1 * time.Second, 9, true, 128 * time.Second}, + {1 * time.Second, 10, true, 128 * time.Second}, + {1 * time.Second, 11, true, 128 * time.Second}, + } { + const n = 1000 + var sum time.Duration + // measure average time over n cycles + for i := 0; i < n; i++ { + p.sleepTime = test.in + p.consecutiveRetries = test.consecutiveRetries + p.acdPacer(test.retry) + sum += p.sleepTime + } + got := sum / n + //t.Logf("%+v: got = %v", test, got) + if got < (test.want*9)/10 || got > (test.want*11)/10 { + t.Fatalf("%+v: bad sleep want %v+/-10% got %v", test, test.want, got) + } + } +} + +func TestEndCall(t *testing.T) { + p := New().SetMaxConnections(5) + emptyTokens(p) + p.consecutiveRetries = 1 + p.endCall(true) + if len(p.connTokens) != 1 { + t.Errorf("Expecting 1 token") + } + if p.consecutiveRetries != 2 { + t.Errorf("Bad consecutive retries") + } +} + +func TestEndCallZeroConnections(t *testing.T) { + p := New().SetMaxConnections(0) + emptyTokens(p) + p.consecutiveRetries = 1 + p.endCall(false) + if len(p.connTokens) != 0 { + t.Errorf("Expecting 0 token") + } + if p.consecutiveRetries != 0 { + t.Errorf("Bad consecutive retries") + } +} + +var errFoo = fmt.Errorf("Foo") + +type dummyPaced struct { + retry bool + called int +} + +func (dp *dummyPaced) fn() (bool, error) { + dp.called++ + return dp.retry, errFoo +} + +func Test_callNoRetry(t *testing.T) { + p := New().SetMinSleep(time.Millisecond).SetMaxSleep(2 * time.Millisecond) + + dp := &dummyPaced{retry: false} + err := p.call(dp.fn, 10) + if dp.called != 1 { + t.Errorf("called want %d got %d", 1, dp.called) + } + if err != errFoo { + t.Errorf("err want %v got %v", errFoo, err) + } +} + +func Test_callRetry(t *testing.T) { + p := New().SetMinSleep(time.Millisecond).SetMaxSleep(2 * time.Millisecond) + + dp := &dummyPaced{retry: true} + err := p.call(dp.fn, 10) + if dp.called != 10 { + t.Errorf("called want %d got %d", 10, dp.called) + } + if err == errFoo { + t.Errorf("err didn't want %v got %v", errFoo, err) + } + _, ok := err.(fs.Retry) + if !ok { + t.Errorf("didn't return a retry error") + } +} + +func TestCall(t *testing.T) { + p := New().SetMinSleep(time.Millisecond).SetMaxSleep(2 * time.Millisecond).SetRetries(20) + + dp := &dummyPaced{retry: true} + err := p.Call(dp.fn) + if dp.called != 20 { + t.Errorf("called want %d got %d", 20, dp.called) + } + _, ok := err.(fs.Retry) + if !ok { + t.Errorf("didn't return a retry error") + } +} + +func TestCallNoRetry(t *testing.T) { + p := New().SetMinSleep(time.Millisecond).SetMaxSleep(2 * time.Millisecond).SetRetries(20) + + dp := &dummyPaced{retry: true} + err := p.CallNoRetry(dp.fn) + if dp.called != 1 { + t.Errorf("called want %d got %d", 1, dp.called) + } + _, ok := err.(fs.Retry) + if !ok { + t.Errorf("didn't return a retry error") + } +}