From e0e0e0c7bddc6a3fba4a49b1479def4017d3fa14 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Fri, 9 Aug 2019 15:19:02 +0100 Subject: [PATCH] b2: make all operations work from the root #3421 --- backend/b2/b2.go | 546 +++++++++++++++++++++++-------------------- backend/b2/upload.go | 5 +- 2 files changed, 299 insertions(+), 252 deletions(-) diff --git a/backend/b2/b2.go b/backend/b2/b2.go index a973c5e03..4db5dc692 100644 --- a/backend/b2/b2.go +++ b/backend/b2/b2.go @@ -14,7 +14,6 @@ import ( "io" "net/http" "path" - "regexp" "strconv" "strings" "sync" @@ -30,6 +29,7 @@ import ( "github.com/rclone/rclone/fs/fshttp" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/walk" + "github.com/rclone/rclone/lib/bucket" "github.com/rclone/rclone/lib/pacer" "github.com/rclone/rclone/lib/rest" ) @@ -164,24 +164,24 @@ type Options struct { // Fs represents a remote b2 server type Fs struct { - name string // name of this remote - root string // the path we are working on if any - opt Options // parsed config options - features *fs.Features // optional features - srv *rest.Client // the connection to the b2 server - bucket string // the bucket we are working on - bucketOKMu sync.Mutex // mutex to protect bucket OK - bucketOK bool // true if we have created the bucket - bucketIDMutex sync.Mutex // mutex to protect _bucketID - _bucketID string // the ID of the bucket we are working on - bucketTypeMutex sync.Mutex // mutex to protect _bucketType - _bucketType string // the Type of the bucket we are working on - info api.AuthorizeAccountResponse // result of authorize call - uploadMu sync.Mutex // lock for upload variable - uploads []*api.GetUploadURLResponse // result of get upload URL calls - authMu sync.Mutex // lock for authorizing the account - pacer *fs.Pacer // To pace and retry the API calls - bufferTokens chan []byte // control concurrency of multipart uploads + name string // name of this remote + root string // the path we are working on if any + opt Options // parsed config options + features *fs.Features // optional features + srv *rest.Client // the connection to the b2 server + rootBucket string // bucket part of root (if any) + rootDirectory string // directory part of root (if any) + cache *bucket.Cache // cache for bucket creation status + bucketIDMutex sync.Mutex // mutex to protect _bucketID + _bucketID map[string]string // the ID of the bucket we are working on + bucketTypeMutex sync.Mutex // mutex to protect _bucketType + _bucketType map[string]string // the Type of the bucket we are working on + info api.AuthorizeAccountResponse // result of authorize call + uploadMu sync.Mutex // lock for upload variable + uploads map[string][]*api.GetUploadURLResponse // Upload URLs by buckedID + authMu sync.Mutex // lock for authorizing the account + pacer *fs.Pacer // To pace and retry the API calls + bufferTokens chan []byte // control concurrency of multipart uploads } // Object describes a b2 object @@ -204,18 +204,18 @@ func (f *Fs) Name() string { // Root of the remote (as passed into NewFs) func (f *Fs) Root() string { - if f.root == "" { - return f.bucket - } - return f.bucket + "/" + f.root + return f.root } // String converts this Fs to a string func (f *Fs) String() string { - if f.root == "" { - return fmt.Sprintf("B2 bucket %s", f.bucket) + if f.rootBucket == "" { + return fmt.Sprintf("B2 root") } - return fmt.Sprintf("B2 bucket %s path %s", f.bucket, f.root) + if f.rootDirectory == "" { + return fmt.Sprintf("B2 bucket %s", f.rootBucket) + } + return fmt.Sprintf("B2 bucket %s path %s", f.rootBucket, f.rootDirectory) } // Features returns the optional features of this Fs @@ -223,21 +223,23 @@ func (f *Fs) Features() *fs.Features { return f.features } -// Pattern to match a b2 path -var matcher = regexp.MustCompile(`^/*([^/]*)(.*)$`) - -// parseParse parses a b2 'url' -func parsePath(path string) (bucket, directory string, err error) { - parts := matcher.FindStringSubmatch(path) - if parts == nil { - err = errors.Errorf("couldn't find bucket in b2 path %q", path) - } else { - bucket, directory = parts[1], parts[2] - directory = strings.Trim(directory, "/") - } +// parsePath parses a remote 'url' +func parsePath(path string) (root string) { + root = strings.Trim(path, "/") return } +// split returns bucket and bucketPath from the rootRelativePath +// relative to f.root +func (f *Fs) split(rootRelativePath string) (bucketName, bucketPath string) { + return bucket.Split(path.Join(f.root, rootRelativePath)) +} + +// split returns bucket and bucketPath from the object +func (o *Object) split() (bucket, bucketPath string) { + return o.fs.split(o.remote) +} + // retryErrorCodes is a slice of error codes that we will retry var retryErrorCodes = []int{ 401, // Unauthorized (eg "Token has expired") @@ -335,6 +337,12 @@ func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) { return } +// setRoot changes the root of the Fs +func (f *Fs) setRoot(root string) { + f.root = parsePath(root) + f.rootBucket, f.rootDirectory = bucket.Split(f.root) +} + // NewFs constructs an Fs from the path, bucket:path func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { ctx := context.Background() @@ -352,10 +360,6 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { if err != nil { return nil, errors.Wrap(err, "b2: chunk size") } - bucket, directory, err := parsePath(root) - if err != nil { - return nil, err - } if opt.Account == "" { return nil, errors.New("account not found") } @@ -366,17 +370,21 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { opt.Endpoint = defaultEndpoint } f := &Fs{ - name: name, - opt: *opt, - bucket: bucket, - root: directory, - srv: rest.NewClient(fshttp.NewClient(fs.Config)).SetErrorHandler(errorHandler), - pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), + name: name, + opt: *opt, + srv: rest.NewClient(fshttp.NewClient(fs.Config)).SetErrorHandler(errorHandler), + cache: bucket.NewCache(), + _bucketID: make(map[string]string, 1), + _bucketType: make(map[string]string, 1), + uploads: make(map[string][]*api.GetUploadURLResponse), + pacer: fs.NewPacer(pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), } + f.setRoot(root) f.features = (&fs.Features{ - ReadMimeType: true, - WriteMimeType: true, - BucketBased: true, + ReadMimeType: true, + WriteMimeType: true, + BucketBased: true, + BucketBasedRootOK: true, }).Fill(f) // Set the test flag if required if opt.TestMode != "" { @@ -390,33 +398,27 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { return nil, errors.Wrap(err, "failed to authorize account") } // If this is a key limited to a single bucket, it must exist already - if f.bucket != "" && f.info.Allowed.BucketID != "" { + if f.rootBucket != "" && f.info.Allowed.BucketID != "" { allowedBucket := f.info.Allowed.BucketName if allowedBucket == "" { return nil, errors.New("bucket that application key is restricted to no longer exists") } - if allowedBucket != f.bucket { + if allowedBucket != f.rootBucket { return nil, errors.Errorf("you must use bucket %q with this application key", allowedBucket) } - f.markBucketOK() - f.setBucketID(f.info.Allowed.BucketID) + f.cache.MarkOK(f.rootBucket) + f.setBucketID(f.rootBucket, f.info.Allowed.BucketID) } - if f.root != "" { - f.root += "/" + if f.rootBucket != "" && f.rootDirectory != "" { // Check to see if the (bucket,directory) is actually an existing file oldRoot := f.root - remote := path.Base(directory) - f.root = path.Dir(directory) - if f.root == "." { - f.root = "" - } else { - f.root += "/" - } - _, err := f.NewObject(ctx, remote) + newRoot, leaf := path.Split(oldRoot) + f.setRoot(newRoot) + _, err := f.NewObject(ctx, leaf) if err != nil { if err == fs.ErrorObjectNotFound { // File doesn't exist so return old f - f.root = oldRoot + f.setRoot(oldRoot) return f, nil } return nil, err @@ -464,30 +466,34 @@ func (f *Fs) hasPermission(permission string) bool { // getUploadURL returns the upload info with the UploadURL and the AuthorizationToken // // This should be returned with returnUploadURL when finished -func (f *Fs) getUploadURL() (upload *api.GetUploadURLResponse, err error) { +func (f *Fs) getUploadURL(bucket string) (upload *api.GetUploadURLResponse, err error) { f.uploadMu.Lock() defer f.uploadMu.Unlock() - bucketID, err := f.getBucketID() + bucketID, err := f.getBucketID(bucket) if err != nil { return nil, err } - if len(f.uploads) == 0 { - opts := rest.Opts{ - Method: "POST", - Path: "/b2_get_upload_url", - } - var request = api.GetUploadURLRequest{ - BucketID: bucketID, - } - err := f.pacer.Call(func() (bool, error) { - resp, err := f.srv.CallJSON(&opts, &request, &upload) - return f.shouldRetry(resp, err) - }) - if err != nil { - return nil, errors.Wrap(err, "failed to get upload URL") - } - } else { - upload, f.uploads = f.uploads[0], f.uploads[1:] + // look for a stored upload URL for the correct bucketID + uploads := f.uploads[bucketID] + if len(uploads) > 0 { + upload, uploads = uploads[0], uploads[1:] + f.uploads[bucketID] = uploads + return upload, nil + } + // get a new upload URL since not found + opts := rest.Opts{ + Method: "POST", + Path: "/b2_get_upload_url", + } + var request = api.GetUploadURLRequest{ + BucketID: bucketID, + } + err = f.pacer.Call(func() (bool, error) { + resp, err := f.srv.CallJSON(&opts, &request, &upload) + return f.shouldRetry(resp, err) + }) + if err != nil { + return nil, errors.Wrap(err, "failed to get upload URL") } return upload, nil } @@ -498,14 +504,14 @@ func (f *Fs) returnUploadURL(upload *api.GetUploadURLResponse) { return } f.uploadMu.Lock() - f.uploads = append(f.uploads, upload) + f.uploads[upload.BucketID] = append(f.uploads[upload.BucketID], upload) f.uploadMu.Unlock() } // clearUploadURL clears the current UploadURL and the AuthorizationToken -func (f *Fs) clearUploadURL() { +func (f *Fs) clearUploadURL(bucketID string) { f.uploadMu.Lock() - f.uploads = nil + delete(f.uploads, bucketID) f.uploadMu.Unlock() } @@ -575,27 +581,35 @@ var errEndList = errors.New("end list") // list lists the objects into the function supplied from // the bucket and root supplied // -// dir is the starting directory, "" for root +// (bucket, directory) is the starting directory // -// level is the depth to search to +// If prefix is set then it is removed from all file names // -// If prefix is set then startFileName is used as a prefix which all -// files must have +// If addBucket is set then it adds the bucket to the start of the +// remotes generated +// +// If recurse is set the function will recursively list // // If limit is > 0 then it limits to that many files (must be less // than 1000) // // If hidden is set then it will list the hidden (deleted) files too. -func (f *Fs) list(ctx context.Context, dir string, recurse bool, prefix string, limit int, hidden bool, fn listFn) error { - root := f.root - if dir != "" { - root += dir + "/" +// +// if findFile is set it will look for files called (bucket, directory) +func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBucket bool, recurse bool, limit int, hidden bool, findFile bool, fn listFn) error { + if !findFile { + if prefix != "" { + prefix += "/" + } + if directory != "" { + directory += "/" + } } delimiter := "" if !recurse { delimiter = "/" } - bucketID, err := f.getBucketID() + bucketID, err := f.getBucketID(bucket) if err != nil { return err } @@ -606,12 +620,11 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, prefix string, var request = api.ListFileNamesRequest{ BucketID: bucketID, MaxFileCount: chunkSize, - Prefix: root, + Prefix: directory, Delimiter: delimiter, } - prefix = root + prefix - if prefix != "" { - request.StartFileName = prefix + if directory != "" { + request.StartFileName = directory } opts := rest.Opts{ Method: "POST", @@ -635,16 +648,19 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, prefix string, if prefix != "" && !strings.HasPrefix(file.Name, prefix) { return nil } - if !strings.HasPrefix(file.Name, f.root) { + if !strings.HasPrefix(file.Name, prefix) { fs.Debugf(f, "Odd name received %q", file.Name) continue } - remote := file.Name[len(f.root):] + remote := file.Name[len(prefix):] // Check for directory isDirectory := strings.HasSuffix(remote, "/") if isDirectory { remote = remote[:len(remote)-1] } + if addBucket { + remote = path.Join(bucket, remote) + } // Send object err = fn(remote, file, isDirectory) if err != nil { @@ -688,19 +704,10 @@ func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *api.File return o, nil } -// mark the bucket as being OK -func (f *Fs) markBucketOK() { - if f.bucket != "" { - f.bucketOKMu.Lock() - f.bucketOK = true - f.bucketOKMu.Unlock() - } -} - // listDir lists a single directory -func (f *Fs) listDir(ctx context.Context, dir string) (entries fs.DirEntries, err error) { +func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addBucket bool) (entries fs.DirEntries, err error) { last := "" - err = f.list(ctx, dir, false, "", 0, f.opt.Versions, func(remote string, object *api.File, isDirectory bool) error { + err = f.list(ctx, bucket, directory, prefix, f.rootBucket == "", false, 0, f.opt.Versions, false, func(remote string, object *api.File, isDirectory bool) error { entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory, &last) if err != nil { return err @@ -714,7 +721,7 @@ func (f *Fs) listDir(ctx context.Context, dir string) (entries fs.DirEntries, er return nil, err } // bucket must be present if listing succeeded - f.markBucketOK() + f.cache.MarkOK(bucket) return entries, nil } @@ -744,10 +751,11 @@ func (f *Fs) listBuckets(dir string) (entries fs.DirEntries, err error) { // This should return ErrDirNotFound if the directory isn't // found. func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { - if f.bucket == "" { - return f.listBuckets(dir) + bucket, directory := f.split(dir) + if bucket == "" { + return f.listBuckets(directory) } - return f.listDir(ctx, dir) + return f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "") } // ListR lists the objects and directories of the Fs starting @@ -767,23 +775,42 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e // Don't implement this unless you have a more efficient way // of listing recursively that doing a directory traversal. func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) { - if f.bucket == "" { - return fs.ErrorListBucketRequired - } + bucket, directory := f.split(dir) list := walk.NewListRHelper(callback) - last := "" - err = f.list(ctx, dir, true, "", 0, f.opt.Versions, func(remote string, object *api.File, isDirectory bool) error { - entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory, &last) + listR := func(bucket, directory, prefix string, addBucket bool) error { + last := "" + return f.list(ctx, bucket, directory, prefix, addBucket, true, 0, f.opt.Versions, false, func(remote string, object *api.File, isDirectory bool) error { + entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory, &last) + if err != nil { + return err + } + return list.Add(entry) + }) + } + if bucket == "" { + entries, err := f.listBuckets("") + if err != nil { + return err + } + for _, entry := range entries { + err = list.Add(entry) + if err != nil { + return err + } + bucket := entry.Remote() + err = listR(bucket, "", f.rootDirectory, true) + if err != nil { + return err + } + } + } else { + err = listR(bucket, directory, f.rootDirectory, f.rootBucket == "") if err != nil { return err } - return list.Add(entry) - }) - if err != nil { - return err } // bucket must be present if listing succeeded - f.markBucketOK() + f.cache.MarkOK(bucket) return list.Flush() } @@ -809,8 +836,21 @@ func (f *Fs) listBucketsToFn(fn listBucketFn) error { if err != nil { return err } + f.bucketIDMutex.Lock() + f.bucketTypeMutex.Lock() + f._bucketID = make(map[string]string, 1) + f._bucketType = make(map[string]string, 1) for i := range response.Buckets { - err = fn(&response.Buckets[i]) + bucket := &response.Buckets[i] + f.cache.MarkOK(bucket.Name) + f._bucketID[bucket.Name] = bucket.ID + f._bucketType[bucket.Name] = bucket.Type + } + f.bucketTypeMutex.Unlock() + f.bucketIDMutex.Unlock() + for i := range response.Buckets { + bucket := &response.Buckets[i] + err = fn(bucket) if err != nil { return err } @@ -820,72 +860,72 @@ func (f *Fs) listBucketsToFn(fn listBucketFn) error { // getbucketType finds the bucketType for the current bucket name // can be one of allPublic. allPrivate, or snapshot -func (f *Fs) getbucketType() (bucketType string, err error) { +func (f *Fs) getbucketType(bucket string) (bucketType string, err error) { f.bucketTypeMutex.Lock() - defer f.bucketTypeMutex.Unlock() - if f._bucketType != "" { - return f._bucketType, nil + bucketType = f._bucketType[bucket] + f.bucketTypeMutex.Unlock() + if bucketType != "" { + return bucketType, nil } err = f.listBucketsToFn(func(bucket *api.Bucket) error { - if bucket.Name == f.bucket { - bucketType = bucket.Type - } + // listBucketsToFn reads bucket Types return nil - }) + f.bucketTypeMutex.Lock() + bucketType = f._bucketType[bucket] + f.bucketTypeMutex.Unlock() if bucketType == "" { err = fs.ErrorDirNotFound } - f._bucketType = bucketType return bucketType, err } // setBucketType sets the Type for the current bucket name -func (f *Fs) setBucketType(Type string) { +func (f *Fs) setBucketType(bucket string, Type string) { f.bucketTypeMutex.Lock() - f._bucketType = Type + f._bucketType[bucket] = Type f.bucketTypeMutex.Unlock() } // clearBucketType clears the Type for the current bucket name -func (f *Fs) clearBucketType() { +func (f *Fs) clearBucketType(bucket string) { f.bucketTypeMutex.Lock() - f._bucketType = "" + delete(f._bucketType, bucket) f.bucketTypeMutex.Unlock() } // getBucketID finds the ID for the current bucket name -func (f *Fs) getBucketID() (bucketID string, err error) { +func (f *Fs) getBucketID(bucket string) (bucketID string, err error) { f.bucketIDMutex.Lock() - defer f.bucketIDMutex.Unlock() - if f._bucketID != "" { - return f._bucketID, nil + bucketID = f._bucketID[bucket] + f.bucketIDMutex.Unlock() + if bucketID != "" { + return bucketID, nil } err = f.listBucketsToFn(func(bucket *api.Bucket) error { - if bucket.Name == f.bucket { - bucketID = bucket.ID - } + // listBucketsToFn sets IDs return nil - }) + f.bucketIDMutex.Lock() + bucketID = f._bucketID[bucket] + f.bucketIDMutex.Unlock() if bucketID == "" { err = fs.ErrorDirNotFound } - f._bucketID = bucketID return bucketID, err } // setBucketID sets the ID for the current bucket name -func (f *Fs) setBucketID(ID string) { +func (f *Fs) setBucketID(bucket, ID string) { f.bucketIDMutex.Lock() - f._bucketID = ID + f._bucketID[bucket] = ID f.bucketIDMutex.Unlock() } // clearBucketID clears the ID for the current bucket name -func (f *Fs) clearBucketID() { +func (f *Fs) clearBucketID(bucket string) { f.bucketIDMutex.Lock() - f._bucketID = "" + delete(f._bucketID, bucket) f.bucketIDMutex.Unlock() } @@ -910,83 +950,79 @@ func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, opt // Mkdir creates the bucket if it doesn't exist func (f *Fs) Mkdir(ctx context.Context, dir string) error { - f.bucketOKMu.Lock() - defer f.bucketOKMu.Unlock() - if f.bucketOK { - return nil - } - opts := rest.Opts{ - Method: "POST", - Path: "/b2_create_bucket", - } - var request = api.CreateBucketRequest{ - AccountID: f.info.AccountID, - Name: f.bucket, - Type: "allPrivate", - } - var response api.Bucket - err := f.pacer.Call(func() (bool, error) { - resp, err := f.srv.CallJSON(&opts, &request, &response) - return f.shouldRetry(resp, err) - }) - if err != nil { - if apiErr, ok := err.(*api.Error); ok { - if apiErr.Code == "duplicate_bucket_name" { - // Check this is our bucket - buckets are globally unique and this - // might be someone elses. - _, getBucketErr := f.getBucketID() - if getBucketErr == nil { - // found so it is our bucket - f.bucketOK = true - return nil - } - if getBucketErr != fs.ErrorDirNotFound { - fs.Debugf(f, "Error checking bucket exists: %v", getBucketErr) + bucket, _ := f.split(dir) + return f.cache.Create(bucket, func() error { + opts := rest.Opts{ + Method: "POST", + Path: "/b2_create_bucket", + } + var request = api.CreateBucketRequest{ + AccountID: f.info.AccountID, + Name: bucket, + Type: "allPrivate", + } + var response api.Bucket + err := f.pacer.Call(func() (bool, error) { + resp, err := f.srv.CallJSON(&opts, &request, &response) + return f.shouldRetry(resp, err) + }) + if err != nil { + if apiErr, ok := err.(*api.Error); ok { + if apiErr.Code == "duplicate_bucket_name" { + // Check this is our bucket - buckets are globally unique and this + // might be someone elses. + _, getBucketErr := f.getBucketID(bucket) + if getBucketErr == nil { + // found so it is our bucket + return nil + } + if getBucketErr != fs.ErrorDirNotFound { + fs.Debugf(f, "Error checking bucket exists: %v", getBucketErr) + } } } + return errors.Wrap(err, "failed to create bucket") } - return errors.Wrap(err, "failed to create bucket") - } - f.setBucketID(response.ID) - f.setBucketType(response.Type) - f.bucketOK = true - return nil + f.setBucketID(bucket, response.ID) + f.setBucketType(bucket, response.Type) + return nil + }, nil) } // Rmdir deletes the bucket if the fs is at the root // // Returns an error if it isn't empty func (f *Fs) Rmdir(ctx context.Context, dir string) error { - f.bucketOKMu.Lock() - defer f.bucketOKMu.Unlock() - if f.root != "" || dir != "" { + bucket, directory := f.split(dir) + if bucket == "" || directory != "" { return nil } - opts := rest.Opts{ - Method: "POST", - Path: "/b2_delete_bucket", - } - bucketID, err := f.getBucketID() - if err != nil { - return err - } - var request = api.DeleteBucketRequest{ - ID: bucketID, - AccountID: f.info.AccountID, - } - var response api.Bucket - err = f.pacer.Call(func() (bool, error) { - resp, err := f.srv.CallJSON(&opts, &request, &response) - return f.shouldRetry(resp, err) + return f.cache.Remove(bucket, func() error { + opts := rest.Opts{ + Method: "POST", + Path: "/b2_delete_bucket", + } + bucketID, err := f.getBucketID(bucket) + if err != nil { + return err + } + var request = api.DeleteBucketRequest{ + ID: bucketID, + AccountID: f.info.AccountID, + } + var response api.Bucket + err = f.pacer.Call(func() (bool, error) { + resp, err := f.srv.CallJSON(&opts, &request, &response) + return f.shouldRetry(resp, err) + }) + if err != nil { + return errors.Wrap(err, "failed to delete bucket") + } + f.clearBucketID(bucket) + f.clearBucketType(bucket) + f.clearUploadURL(bucketID) + return nil }) - if err != nil { - return errors.Wrap(err, "failed to delete bucket") - } - f.bucketOK = false - f.clearBucketID() - f.clearBucketType() - f.clearUploadURL() - return nil } // Precision of the remote @@ -995,8 +1031,8 @@ func (f *Fs) Precision() time.Duration { } // hide hides a file on the remote -func (f *Fs) hide(Name string) error { - bucketID, err := f.getBucketID() +func (f *Fs) hide(bucket, bucketPath string) error { + bucketID, err := f.getBucketID(bucket) if err != nil { return err } @@ -1006,7 +1042,7 @@ func (f *Fs) hide(Name string) error { } var request = api.HideFileRequest{ BucketID: bucketID, - Name: Name, + Name: bucketPath, } var response api.File err = f.pacer.Call(func() (bool, error) { @@ -1021,7 +1057,7 @@ func (f *Fs) hide(Name string) error { return nil } } - return errors.Wrapf(err, "failed to hide %q", Name) + return errors.Wrapf(err, "failed to hide %q", bucketPath) } return nil } @@ -1052,7 +1088,10 @@ func (f *Fs) deleteByID(ID, Name string) error { // if oldOnly is true then it deletes only non current files. // // Implemented here so we can make sure we delete old versions. -func (f *Fs) purge(ctx context.Context, oldOnly bool) error { +func (f *Fs) purge(ctx context.Context, bucket, directory string, oldOnly bool) error { + if bucket == "" { + return errors.New("can't purge from root") + } var errReturn error var checkErrMutex sync.Mutex var checkErr = func(err error) { @@ -1093,7 +1132,7 @@ func (f *Fs) purge(ctx context.Context, oldOnly bool) error { }() } last := "" - checkErr(f.list(ctx, "", true, "", 0, true, func(remote string, object *api.File, isDirectory bool) error { + checkErr(f.list(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "", true, 0, true, false, func(remote string, object *api.File, isDirectory bool) error { if !isDirectory { oi, err := f.newObjectWithInfo(ctx, object.Name, object) if err != nil { @@ -1101,6 +1140,7 @@ func (f *Fs) purge(ctx context.Context, oldOnly bool) error { } tr := accounting.Stats(ctx).NewCheckingTransfer(oi) if oldOnly && last != remote { + // Check current version of the file if object.Action == "hide" { fs.Debugf(remote, "Deleting current version (id %q) as it is a hide marker", object.ID) toBeDeleted <- object @@ -1130,12 +1170,12 @@ func (f *Fs) purge(ctx context.Context, oldOnly bool) error { // Purge deletes all the files and directories including the old versions. func (f *Fs) Purge(ctx context.Context) error { - return f.purge(ctx, false) + return f.purge(ctx, f.rootBucket, f.rootDirectory, false) } // CleanUp deletes all the hidden files. func (f *Fs) CleanUp(ctx context.Context) error { - return f.purge(ctx, true) + return f.purge(ctx, f.rootBucket, f.rootDirectory, true) } // Copy src to this remote using server side copy operations. @@ -1148,6 +1188,7 @@ func (f *Fs) CleanUp(ctx context.Context) error { // // If it isn't possible then return fs.ErrorCantCopy func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { + dstBucket, dstPath := f.split(remote) err := f.Mkdir(ctx, "") if err != nil { return nil, err @@ -1157,7 +1198,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, fs.Debugf(src, "Can't copy - not same remote type") return nil, fs.ErrorCantCopy } - destBucketID, err := f.getBucketID() + destBucketID, err := f.getBucketID(dstBucket) if err != nil { return nil, err } @@ -1167,7 +1208,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, } var request = api.CopyFileRequest{ SourceID: srcObj.id, - Name: f.root + remote, + Name: dstPath, MetadataDirective: "COPY", DestBucketID: destBucketID, } @@ -1196,8 +1237,8 @@ func (f *Fs) Hashes() hash.Set { } // getDownloadAuthorization returns authorization token for downloading -// without accout. -func (f *Fs) getDownloadAuthorization(remote string) (authorization string, err error) { +// without account. +func (f *Fs) getDownloadAuthorization(bucket, remote string) (authorization string, err error) { validDurationInSeconds := time.Duration(f.opt.DownloadAuthorizationDuration).Nanoseconds() / 1e9 if validDurationInSeconds <= 0 || validDurationInSeconds > 604800 { return "", errors.New("--b2-download-auth-duration must be between 1 sec and 1 week") @@ -1205,7 +1246,7 @@ func (f *Fs) getDownloadAuthorization(remote string) (authorization string, err if !f.hasPermission("shareFiles") { return "", errors.New("sharing a file link requires the shareFiles permission") } - bucketID, err := f.getBucketID() + bucketID, err := f.getBucketID(bucket) if err != nil { return "", err } @@ -1229,8 +1270,9 @@ func (f *Fs) getDownloadAuthorization(remote string) (authorization string, err return response.AuthorizationToken, nil } -// PublicLink returns a link for downloading without accout. +// PublicLink returns a link for downloading without account func (f *Fs) PublicLink(ctx context.Context, remote string) (link string, err error) { + bucket, bucketPath := f.split(remote) var RootURL string if f.opt.DownloadURL == "" { RootURL = f.info.DownloadURL @@ -1239,7 +1281,7 @@ func (f *Fs) PublicLink(ctx context.Context, remote string) (link string, err er } _, err = f.NewObject(ctx, remote) if err == fs.ErrorObjectNotFound || err == fs.ErrorNotAFile { - err2 := f.list(ctx, remote, false, "", 1, f.opt.Versions, func(remote string, object *api.File, isDirectory bool) error { + err2 := f.list(ctx, bucket, bucketPath, f.rootDirectory, f.rootBucket == "", false, 1, f.opt.Versions, false, func(remote string, object *api.File, isDirectory bool) error { err = nil return nil }) @@ -1250,14 +1292,14 @@ func (f *Fs) PublicLink(ctx context.Context, remote string) (link string, err er if err != nil { return "", err } - absPath := "/" + path.Join(f.root, remote) - link = RootURL + "/file/" + urlEncode(f.bucket) + absPath - bucketType, err := f.getbucketType() + absPath := "/" + bucketPath + link = RootURL + "/file/" + urlEncode(bucket) + absPath + bucketType, err := f.getbucketType(bucket) if err != nil { return "", err } if bucketType == "allPrivate" || bucketType == "snapshot" { - AuthorizationToken, err := f.getDownloadAuthorization(remote) + AuthorizationToken, err := f.getDownloadAuthorization(bucket, remote) if err != nil { return "", err } @@ -1351,19 +1393,19 @@ func (o *Object) decodeMetaDataFileInfo(info *api.FileInfo) (err error) { // getMetaData gets the metadata from the object unconditionally func (o *Object) getMetaData(ctx context.Context) (info *api.File, err error) { + bucket, bucketPath := o.split() maxSearched := 1 var timestamp api.Timestamp - baseRemote := o.remote if o.fs.opt.Versions { - timestamp, baseRemote = api.RemoveVersion(baseRemote) + timestamp, bucketPath = api.RemoveVersion(bucketPath) maxSearched = maxVersions } - err = o.fs.list(ctx, "", true, baseRemote, maxSearched, o.fs.opt.Versions, func(remote string, object *api.File, isDirectory bool) error { + err = o.fs.list(ctx, bucket, bucketPath, "", false, true, maxSearched, o.fs.opt.Versions, true, func(remote string, object *api.File, isDirectory bool) error { if isDirectory { return nil } - if remote == baseRemote { + if remote == bucketPath { if !timestamp.IsZero() && !timestamp.Equal(object.UploadTimestamp) { return nil } @@ -1441,6 +1483,7 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error { if err != nil { return err } + _, bucketPath := o.split() info.Info[timeKey] = timeString(modTime) opts := rest.Opts{ Method: "POST", @@ -1448,7 +1491,7 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error { } var request = api.CopyFileRequest{ SourceID: o.id, - Name: o.fs.root + o.remote, // copy to same name + Name: bucketPath, // copy to same name MetadataDirective: "REPLACE", ContentType: info.ContentType, Info: info.Info, @@ -1549,7 +1592,8 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read if o.id != "" { opts.Path += "/b2api/v1/b2_download_file_by_id?fileId=" + urlEncode(o.id) } else { - opts.Path += "/file/" + urlEncode(o.fs.bucket) + "/" + urlEncode(o.fs.root+o.remote) + bucket, bucketPath := o.split() + opts.Path += "/file/" + urlEncode(bucket) + "/" + urlEncode(bucketPath) } var resp *http.Response err = o.fs.pacer.Call(func() (bool, error) { @@ -1632,6 +1676,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op } size := src.Size() + bucket, bucketPath := o.split() if size == -1 { // Check if the file is large enough for a chunked upload (needs to be at least two chunks) buf := o.fs.getUploadBlock() @@ -1677,7 +1722,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op } // Get upload URL - upload, err := o.fs.getUploadURL() + upload, err := o.fs.getUploadURL(bucket) if err != nil { return err } @@ -1745,7 +1790,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op Body: in, ExtraHeaders: map[string]string{ "Authorization": upload.AuthorizationToken, - "X-Bz-File-Name": urlEncode(o.fs.root + o.remote), + "X-Bz-File-Name": urlEncode(bucketPath), "Content-Type": fs.MimeType(ctx, src), sha1Header: calculatedSha1, timeHeader: timeString(modTime), @@ -1772,13 +1817,14 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op // Remove an object func (o *Object) Remove(ctx context.Context) error { + bucket, bucketPath := o.split() if o.fs.opt.Versions { return errNotWithVersions } if o.fs.opt.HardDelete { - return o.fs.deleteByID(o.id, o.fs.root+o.remote) + return o.fs.deleteByID(o.id, bucketPath) } - return o.fs.hide(o.fs.root + o.remote) + return o.fs.hide(bucket, bucketPath) } // MimeType of an Object if known, "" otherwise diff --git a/backend/b2/upload.go b/backend/b2/upload.go index af01f971a..f891a5561 100644 --- a/backend/b2/upload.go +++ b/backend/b2/upload.go @@ -104,13 +104,14 @@ func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs Method: "POST", Path: "/b2_start_large_file", } - bucketID, err := f.getBucketID() + bucket, bucketPath := o.split() + bucketID, err := f.getBucketID(bucket) if err != nil { return nil, err } var request = api.StartLargeFileRequest{ BucketID: bucketID, - Name: o.fs.root + remote, + Name: bucketPath, ContentType: fs.MimeType(ctx, src), Info: map[string]string{ timeKey: timeString(modTime),