From 9698a2babb1a3fdb69be60af81b3fe48cdcc0f73 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Wed, 9 May 2018 14:27:21 +0100 Subject: [PATCH] gcs: low level retry all operations if necessary Google cloud storage doesn't normally need retries, however certain things (eg bucket creation and removal) are rate limited and do generate 429 errors. Before this change the integration tests would regularly blow up with errors from GCS rate limiting bucket creation and removal. After this change we low level retry all operations using the same exponential backoff strategy as used in the google drive backend. --- .../googlecloudstorage/googlecloudstorage.go | 114 +++++++++++++++--- fstest/test_all/test_all.go | 1 - 2 files changed, 97 insertions(+), 18 deletions(-) diff --git a/backend/googlecloudstorage/googlecloudstorage.go b/backend/googlecloudstorage/googlecloudstorage.go index c18cab6cb..62ee84e41 100644 --- a/backend/googlecloudstorage/googlecloudstorage.go +++ b/backend/googlecloudstorage/googlecloudstorage.go @@ -31,10 +31,12 @@ import ( "github.com/ncw/rclone/fs/config" "github.com/ncw/rclone/fs/config/flags" "github.com/ncw/rclone/fs/config/obscure" + "github.com/ncw/rclone/fs/fserrors" "github.com/ncw/rclone/fs/fshttp" "github.com/ncw/rclone/fs/hash" "github.com/ncw/rclone/fs/walk" "github.com/ncw/rclone/lib/oauthutil" + "github.com/ncw/rclone/lib/pacer" "github.com/pkg/errors" "golang.org/x/oauth2" "golang.org/x/oauth2/google" @@ -49,6 +51,7 @@ const ( timeFormatOut = "2006-01-02T15:04:05.000000000Z07:00" metaMtime = "mtime" // key to store mtime under in metadata listChunks = 1000 // chunk size to read directory listings + minSleep = 10 * time.Millisecond ) var ( @@ -219,6 +222,7 @@ type Fs struct { bucketACL string // used when creating new buckets location string // location of new buckets storageClass string // storage class of new buckets + pacer *pacer.Pacer // To pace the API calls } // Object describes a storage object @@ -262,6 +266,30 @@ func (f *Fs) Features() *fs.Features { return f.features } +// shouldRetry determines whehter a given err rates being retried +func shouldRetry(err error) (again bool, errOut error) { + again = false + if err != nil { + if fserrors.ShouldRetry(err) { + again = true + } else { + switch gerr := err.(type) { + case *googleapi.Error: + if gerr.Code >= 500 && gerr.Code < 600 { + // All 5xx errors should be retried + again = true + } else if len(gerr.Errors) > 0 { + reason := gerr.Errors[0].Reason + if reason == "rateLimitExceeded" || reason == "userRateLimitExceeded" { + again = true + } + } + } + } + } + return again, err +} + // Pattern to match a storage path var matcher = regexp.MustCompile(`^([^/]*)(.*)$`) @@ -327,6 +355,7 @@ func NewFs(name, root string) (fs.Fs, error) { bucketACL: config.FileGet(name, "bucket_acl"), location: config.FileGet(name, "location"), storageClass: config.FileGet(name, "storage_class"), + pacer: pacer.New().SetMinSleep(minSleep).SetPacer(pacer.GoogleDrivePacer), } f.features = (&fs.Features{ ReadMimeType: true, @@ -356,7 +385,10 @@ func NewFs(name, root string) (fs.Fs, error) { if f.root != "" { f.root += "/" // Check to see if the object exists - _, err = f.svc.Objects.Get(bucket, directory).Do() + err = f.pacer.Call(func() (bool, error) { + _, err = f.svc.Objects.Get(bucket, directory).Do() + return shouldRetry(err) + }) if err == nil { f.root = path.Dir(directory) if f.root == "." { @@ -404,7 +436,7 @@ type listFn func(remote string, object *storage.Object, isDirectory bool) error // dir is the starting directory, "" for root // // Set recurse to read sub directories -func (f *Fs) list(dir string, recurse bool, fn listFn) error { +func (f *Fs) list(dir string, recurse bool, fn listFn) (err error) { root := f.root rootLength := len(root) if dir != "" { @@ -415,7 +447,11 @@ func (f *Fs) list(dir string, recurse bool, fn listFn) error { list = list.Delimiter("/") } for { - objects, err := list.Do() + var objects *storage.Objects + err = f.pacer.Call(func() (bool, error) { + objects, err = list.Do() + return shouldRetry(err) + }) if err != nil { if gErr, ok := err.(*googleapi.Error); ok { if gErr.Code == http.StatusNotFound { @@ -519,7 +555,11 @@ func (f *Fs) listBuckets(dir string) (entries fs.DirEntries, err error) { } listBuckets := f.svc.Buckets.List(f.projectNumber).MaxResults(listChunks) for { - buckets, err := listBuckets.Do() + var buckets *storage.Buckets + err = f.pacer.Call(func() (bool, error) { + buckets, err = listBuckets.Do() + return shouldRetry(err) + }) if err != nil { return nil, err } @@ -607,7 +647,7 @@ func (f *Fs) PutStream(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption } // Mkdir creates the bucket if it doesn't exist -func (f *Fs) Mkdir(dir string) error { +func (f *Fs) Mkdir(dir string) (err error) { f.bucketOKMu.Lock() defer f.bucketOKMu.Unlock() if f.bucketOK { @@ -615,7 +655,11 @@ func (f *Fs) Mkdir(dir string) error { } // List something from the bucket to see if it exists. Doing it like this enables the use of a // service account that only has the "Storage Object Admin" role. See #2193 for details. - _, err := f.svc.Objects.List(f.bucket).MaxResults(1).Do() + + err = f.pacer.Call(func() (bool, error) { + _, err = f.svc.Objects.List(f.bucket).MaxResults(1).Do() + return shouldRetry(err) + }) if err == nil { // Bucket already exists f.bucketOK = true @@ -637,7 +681,10 @@ func (f *Fs) Mkdir(dir string) error { Location: f.location, StorageClass: f.storageClass, } - _, err = f.svc.Buckets.Insert(f.projectNumber, &bucket).PredefinedAcl(f.bucketACL).Do() + err = f.pacer.Call(func() (bool, error) { + _, err = f.svc.Buckets.Insert(f.projectNumber, &bucket).PredefinedAcl(f.bucketACL).Do() + return shouldRetry(err) + }) if err == nil { f.bucketOK = true } @@ -648,13 +695,16 @@ func (f *Fs) Mkdir(dir string) error { // // Returns an error if it isn't empty: Error 409: The bucket you tried // to delete was not empty. -func (f *Fs) Rmdir(dir string) error { +func (f *Fs) Rmdir(dir string) (err error) { f.bucketOKMu.Lock() defer f.bucketOKMu.Unlock() if f.root != "" || dir != "" { return nil } - err := f.svc.Buckets.Delete(f.bucket).Do() + err = f.pacer.Call(func() (bool, error) { + err = f.svc.Buckets.Delete(f.bucket).Do() + return shouldRetry(err) + }) if err == nil { f.bucketOK = false } @@ -696,7 +746,11 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) { srcObject := srcObj.fs.root + srcObj.remote dstBucket := f.bucket dstObject := f.root + remote - newObject, err := f.svc.Objects.Copy(srcBucket, srcObject, dstBucket, dstObject, nil).Do() + var newObject *storage.Object + err = f.pacer.Call(func() (bool, error) { + newObject, err = f.svc.Objects.Copy(srcBucket, srcObject, dstBucket, dstObject, nil).Do() + return shouldRetry(err) + }) if err != nil { return nil, err } @@ -784,7 +838,11 @@ func (o *Object) readMetaData() (err error) { if !o.modTime.IsZero() { return nil } - object, err := o.fs.svc.Objects.Get(o.fs.bucket, o.fs.root+o.remote).Do() + var object *storage.Object + err = o.fs.pacer.Call(func() (bool, error) { + object, err = o.fs.svc.Objects.Get(o.fs.bucket, o.fs.root+o.remote).Do() + return shouldRetry(err) + }) if err != nil { if gErr, ok := err.(*googleapi.Error); ok { if gErr.Code == http.StatusNotFound { @@ -818,14 +876,18 @@ func metadataFromModTime(modTime time.Time) map[string]string { } // SetModTime sets the modification time of the local fs object -func (o *Object) SetModTime(modTime time.Time) error { +func (o *Object) SetModTime(modTime time.Time) (err error) { // This only adds metadata so will perserve other metadata object := storage.Object{ Bucket: o.fs.bucket, Name: o.fs.root + o.remote, Metadata: metadataFromModTime(modTime), } - newObject, err := o.fs.svc.Objects.Patch(o.fs.bucket, o.fs.root+o.remote, &object).Do() + var newObject *storage.Object + err = o.fs.pacer.Call(func() (bool, error) { + newObject, err = o.fs.svc.Objects.Patch(o.fs.bucket, o.fs.root+o.remote, &object).Do() + return shouldRetry(err) + }) if err != nil { return err } @@ -845,7 +907,17 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) { return nil, err } fs.OpenOptionAddHTTPHeaders(req.Header, options) - res, err := o.fs.client.Do(req) + var res *http.Response + err = o.fs.pacer.Call(func() (bool, error) { + res, err = o.fs.client.Do(req) + if err == nil { + err = googleapi.CheckResponse(res) + if err != nil { + _ = res.Body.Close() // ignore error + } + } + return shouldRetry(err) + }) if err != nil { return nil, err } @@ -874,7 +946,11 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio Updated: modTime.Format(timeFormatOut), // Doesn't get set Metadata: metadataFromModTime(modTime), } - newObject, err := o.fs.svc.Objects.Insert(o.fs.bucket, &object).Media(in, googleapi.ContentType("")).Name(object.Name).PredefinedAcl(o.fs.objectACL).Do() + var newObject *storage.Object + err = o.fs.pacer.CallNoRetry(func() (bool, error) { + newObject, err = o.fs.svc.Objects.Insert(o.fs.bucket, &object).Media(in, googleapi.ContentType("")).Name(object.Name).PredefinedAcl(o.fs.objectACL).Do() + return shouldRetry(err) + }) if err != nil { return err } @@ -884,8 +960,12 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio } // Remove an object -func (o *Object) Remove() error { - return o.fs.svc.Objects.Delete(o.fs.bucket, o.fs.root+o.remote).Do() +func (o *Object) Remove() (err error) { + err = o.fs.pacer.Call(func() (bool, error) { + err = o.fs.svc.Objects.Delete(o.fs.bucket, o.fs.root+o.remote).Do() + return shouldRetry(err) + }) + return err } // MimeType of an Object if known, "" otherwise diff --git a/fstest/test_all/test_all.go b/fstest/test_all/test_all.go index 161b40ee6..50d841915 100644 --- a/fstest/test_all/test_all.go +++ b/fstest/test_all/test_all.go @@ -277,7 +277,6 @@ func (t *test) cleanFs() error { remote := dir.Remote() if fstest.MatchTestRemote.MatchString(remote) { log.Printf("Purging %s%s", t.remote, remote) - time.Sleep(2500 * time.Millisecond) // sleep to rate limit bucket deletes for gcs dir, err := fs.NewFs(t.remote + remote) if err != nil { return err