diff --git a/b2/b2.go b/b2/b2.go index 18510e1ee..553de9e15 100644 --- a/b2/b2.go +++ b/b2/b2.go @@ -86,6 +86,8 @@ type Fs struct { endpoint string // name of the starting api endpoint srv *rest.Client // the connection to the b2 server bucket string // the bucket we are working on + bucketOKMu sync.Mutex // mutex to protect bucket OK + bucketOK bool // true if we have created the bucket bucketIDMutex sync.Mutex // mutex to protect _bucketID _bucketID string // the ID of the bucket we are working on info api.AuthorizeAccountResponse // result of authorize call @@ -671,8 +673,9 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs. // Mkdir creates the bucket if it doesn't exist func (f *Fs) Mkdir(dir string) error { - // Can't create subdirs - if dir != "" { + f.bucketOKMu.Lock() + defer f.bucketOKMu.Unlock() + if f.bucketOK { return nil } opts := rest.Opts{ @@ -697,6 +700,7 @@ func (f *Fs) Mkdir(dir string) error { _, getBucketErr := f.getBucketID() if getBucketErr == nil { // found so it is our bucket + f.bucketOK = true return nil } if getBucketErr != fs.ErrorDirNotFound { @@ -707,6 +711,7 @@ func (f *Fs) Mkdir(dir string) error { return errors.Wrap(err, "failed to create bucket") } f.setBucketID(response.ID) + f.bucketOK = true return nil } @@ -714,6 +719,8 @@ func (f *Fs) Mkdir(dir string) error { // // Returns an error if it isn't empty func (f *Fs) Rmdir(dir string) error { + f.bucketOKMu.Lock() + defer f.bucketOKMu.Unlock() if f.root != "" || dir != "" { return nil } @@ -737,6 +744,7 @@ func (f *Fs) Rmdir(dir string) error { if err != nil { return errors.Wrap(err, "failed to delete bucket") } + f.bucketOK = false f.clearBucketID() f.clearUploadURL() return nil @@ -1165,6 +1173,10 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio if *b2Versions { return errNotWithVersions } + err = o.fs.Mkdir("") + if err != nil { + return err + } size := src.Size() // If a large file upload in chunks - see upload.go diff --git a/googlecloudstorage/googlecloudstorage.go b/googlecloudstorage/googlecloudstorage.go index 128b708c7..c72f5dcd8 100644 --- a/googlecloudstorage/googlecloudstorage.go +++ b/googlecloudstorage/googlecloudstorage.go @@ -24,6 +24,7 @@ import ( "path" "regexp" "strings" + "sync" "time" "github.com/ncw/rclone/fs" @@ -135,6 +136,8 @@ type Fs struct { svc *storage.Service // the connection to the storage server client *http.Client // authorized client bucket string // the bucket we are working on + bucketOKMu sync.Mutex // mutex to protect bucket OK + bucketOK bool // true if we have created the bucket projectNumber string // used for finding buckets objectACL string // used when creating new objects bucketACL string // used when creating new buckets @@ -456,13 +459,15 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs. // Mkdir creates the bucket if it doesn't exist func (f *Fs) Mkdir(dir string) error { - // Can't create subdirs - if dir != "" { + f.bucketOKMu.Lock() + defer f.bucketOKMu.Unlock() + if f.bucketOK { return nil } _, err := f.svc.Buckets.Get(f.bucket).Do() if err == nil { // Bucket already exists + f.bucketOK = true return nil } @@ -474,6 +479,9 @@ func (f *Fs) Mkdir(dir string) error { Name: f.bucket, } _, err = f.svc.Buckets.Insert(f.projectNumber, &bucket).PredefinedAcl(f.bucketACL).Do() + if err == nil { + f.bucketOK = true + } return err } @@ -482,10 +490,16 @@ func (f *Fs) Mkdir(dir string) error { // Returns an error if it isn't empty: Error 409: The bucket you tried // to delete was not empty. func (f *Fs) Rmdir(dir string) error { + f.bucketOKMu.Lock() + defer f.bucketOKMu.Unlock() if f.root != "" || dir != "" { return nil } - return f.svc.Buckets.Delete(f.bucket).Do() + err := f.svc.Buckets.Delete(f.bucket).Do() + if err == nil { + f.bucketOK = false + } + return err } // Precision returns the precision @@ -684,6 +698,10 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) { // // The new object may have been created if an error is returned func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { + err := o.fs.Mkdir("") + if err != nil { + return err + } size := src.Size() modTime := src.ModTime() diff --git a/s3/s3.go b/s3/s3.go index 173a95361..44729038f 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -21,6 +21,7 @@ import ( "path" "regexp" "strings" + "sync" "time" "github.com/aws/aws-sdk-go/aws" @@ -239,6 +240,8 @@ type Fs struct { c *s3.S3 // the connection to the s3 server ses *session.Session // the s3 session bucket string // the bucket we are working on + bucketOKMu sync.Mutex // mutex to protect bucket OK + bucketOK bool // true if we have created the bucket acl string // ACL for new buckets / objects locationConstraint string // location constraint of new buckets sse string // the type of server-side encryption @@ -652,11 +655,15 @@ func (f *Fs) dirExists() (bool, error) { // Mkdir creates the bucket if it doesn't exist func (f *Fs) Mkdir(dir string) error { - // Can't create subdirs - if dir != "" { + f.bucketOKMu.Lock() + defer f.bucketOKMu.Unlock() + if f.bucketOK { return nil } exists, err := f.dirExists() + if err == nil { + f.bucketOK = exists + } if err != nil || exists { return err } @@ -672,9 +679,12 @@ func (f *Fs) Mkdir(dir string) error { _, err = f.c.CreateBucket(&req) if err, ok := err.(awserr.Error); ok { if err.Code() == "BucketAlreadyOwnedByYou" { - return nil + err = nil } } + if err == nil { + f.bucketOK = true + } return err } @@ -682,6 +692,8 @@ func (f *Fs) Mkdir(dir string) error { // // Returns an error if it isn't empty func (f *Fs) Rmdir(dir string) error { + f.bucketOKMu.Lock() + defer f.bucketOKMu.Unlock() if f.root != "" || dir != "" { return nil } @@ -689,6 +701,9 @@ func (f *Fs) Rmdir(dir string) error { Bucket: &f.bucket, } _, err := f.c.DeleteBucket(&req) + if err == nil { + f.bucketOK = false + } return err } @@ -903,6 +918,10 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) { // Update the Object from in with modTime and size func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { + err := o.fs.Mkdir("") + if err != nil { + return err + } modTime := src.ModTime() uploader := s3manager.NewUploader(o.fs.ses, func(u *s3manager.Uploader) { @@ -943,7 +962,7 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio if o.fs.storageClass != "" { req.StorageClass = &o.fs.storageClass } - _, err := uploader.Upload(&req) + _, err = uploader.Upload(&req) if err != nil { return err } diff --git a/swift/swift.go b/swift/swift.go index c1b6af14e..63933d2b2 100644 --- a/swift/swift.go +++ b/swift/swift.go @@ -9,6 +9,7 @@ import ( "regexp" "strconv" "strings" + "sync" "time" "github.com/ncw/rclone/fs" @@ -92,6 +93,8 @@ type Fs struct { features *fs.Features // optional features c *swift.Connection // the connection to the swift server container string // the container we are working on + containerOKMu sync.Mutex // mutex to protect container OK + containerOK bool // true if we have created the container segmentsContainer string // container to store the segments (if any) in } @@ -410,30 +413,36 @@ func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs. // Mkdir creates the container if it doesn't exist func (f *Fs) Mkdir(dir string) error { - // Can't create subdirs - if dir != "" { + f.containerOKMu.Lock() + defer f.containerOKMu.Unlock() + if f.containerOK { return nil } // Check to see if container exists first _, _, err := f.c.Container(f.container) - if err == nil { - return nil - } if err == swift.ContainerNotFound { - return f.c.ContainerCreate(f.container, nil) + err = f.c.ContainerCreate(f.container, nil) + } + if err == nil { + f.containerOK = true } return err - } // Rmdir deletes the container if the fs is at the root // // Returns an error if it isn't empty func (f *Fs) Rmdir(dir string) error { + f.containerOKMu.Lock() + defer f.containerOKMu.Unlock() if f.root != "" || dir != "" { return nil } - return f.c.ContainerDelete(f.container) + err := f.c.ContainerDelete(f.container) + if err == nil { + f.containerOK = false + } + return err } // Precision of the remote @@ -738,6 +747,10 @@ func (o *Object) updateChunks(in io.Reader, headers swift.Headers, size int64, c // // The new object may have been created if an error is returned func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { + err := o.fs.Mkdir("") + if err != nil { + return err + } size := src.Size() modTime := src.ModTime()