1
mirror of https://github.com/rclone/rclone synced 2024-12-21 11:45:56 +01:00

azureblob: implement multipart server side copy

This implements multipart server side copy to improve copying from one
azure region to another by orders of magnitude (from 30s for a 100M
file to 10s for a 10G file with --azureblob-upload-concurrency 500).

- Add `--azureblob-copy-cutoff` to control the cutoff from single to multipart copy
- Add ServerSideAcrossConfigs flag as this now works properly
- Implement multipart copy using put block list API

Fixes #8249
This commit is contained in:
Nick Craig-Wood 2024-12-18 15:29:52 +00:00
parent bf5fdf9491
commit afbb80a8fb
2 changed files with 306 additions and 66 deletions

View File

@ -42,11 +42,13 @@ import (
"github.com/rclone/rclone/fs/fshttp"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/walk"
"github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/bucket"
"github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/env"
"github.com/rclone/rclone/lib/multipart"
"github.com/rclone/rclone/lib/pacer"
"golang.org/x/sync/errgroup"
)
const (
@ -310,6 +312,16 @@ Note that chunks are stored in memory and there may be up to
in memory.`,
Default: 16,
Advanced: true,
}, {
Name: "copy_cutoff",
Help: `Cutoff for switching to multipart copy.
Any files larger than this that need to be server-side copied will be
copied in chunks of chunk_size using the put block list API.
Files smaller than this limit will be copied with the Copy Blob API.`,
Default: 8 * fs.Mebi,
Advanced: true,
}, {
Name: "list_chunk",
Help: `Size of blob list.
@ -476,6 +488,7 @@ type Options struct {
UseAZ bool `config:"use_az"`
Endpoint string `config:"endpoint"`
ChunkSize fs.SizeSuffix `config:"chunk_size"`
CopyCutoff fs.SizeSuffix `config:"copy_cutoff"`
UploadConcurrency int `config:"upload_concurrency"`
ListChunkSize uint `config:"list_chunk"`
AccessTier string `config:"access_tier"`
@ -500,6 +513,9 @@ type Fs struct {
cntSVCcacheMu sync.Mutex // mutex to protect cntSVCcache
cntSVCcache map[string]*container.Client // reference to containerClient per container
svc *service.Client // client to access azblob
cred azcore.TokenCredential // how to generate tokens (may be nil)
sharedKeyCred *service.SharedKeyCredential // shared key credentials (may be nil)
anonymous bool // if this is anonymous access
rootContainer string // container part of root (if any)
rootDirectory string // directory part of root (if any)
isLimited bool // if limited to one container
@ -638,6 +654,14 @@ func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error)
return
}
func (f *Fs) setCopyCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
err = checkUploadChunkSize(cs)
if err == nil {
old, f.opt.CopyCutoff = f.opt.CopyCutoff, cs
}
return
}
type servicePrincipalCredentials struct {
AppID string `json:"appId"`
Password string `json:"password"`
@ -723,12 +747,13 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
f.publicAccess = container.PublicAccessType(opt.PublicAccess)
f.setRoot(root)
f.features = (&fs.Features{
ReadMimeType: true,
WriteMimeType: true,
BucketBased: true,
BucketBasedRootOK: true,
SetTier: true,
GetTier: true,
ReadMimeType: true,
WriteMimeType: true,
BucketBased: true,
BucketBasedRootOK: true,
SetTier: true,
GetTier: true,
ServerSideAcrossConfigs: true,
}).Fill(ctx, f)
if opt.DirectoryMarkers {
f.features.CanHaveEmptyDirectories = true
@ -743,12 +768,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
ClientOptions: policyClientOptions,
}
// Here we auth by setting one of cred, sharedKeyCred, f.svc or anonymous
var (
cred azcore.TokenCredential
sharedKeyCred *service.SharedKeyCredential
anonymous = false
)
// Here we auth by setting one of f.cred, f.sharedKeyCred, f.svc or f.anonymous
switch {
case opt.EnvAuth:
// Read account from environment if needed
@ -760,7 +780,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
ClientOptions: policyClientOptions,
DisableInstanceDiscovery: opt.DisableInstanceDiscovery,
}
cred, err = azidentity.NewDefaultAzureCredential(&options)
f.cred, err = azidentity.NewDefaultAzureCredential(&options)
if err != nil {
return nil, fmt.Errorf("create azure environment credential failed: %w", err)
}
@ -774,12 +794,12 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
if opt.Endpoint == "" {
opt.Endpoint = emulatorBlobEndpoint
}
sharedKeyCred, err = service.NewSharedKeyCredential(opt.Account, opt.Key)
f.sharedKeyCred, err = service.NewSharedKeyCredential(opt.Account, opt.Key)
if err != nil {
return nil, fmt.Errorf("create new shared key credential for emulator failed: %w", err)
}
case opt.Account != "" && opt.Key != "":
sharedKeyCred, err = service.NewSharedKeyCredential(opt.Account, opt.Key)
f.sharedKeyCred, err = service.NewSharedKeyCredential(opt.Account, opt.Key)
if err != nil {
return nil, fmt.Errorf("create new shared key credential failed: %w", err)
}
@ -814,7 +834,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
options := azidentity.ClientSecretCredentialOptions{
ClientOptions: policyClientOptions,
}
cred, err = azidentity.NewClientSecretCredential(opt.Tenant, opt.ClientID, opt.ClientSecret, &options)
f.cred, err = azidentity.NewClientSecretCredential(opt.Tenant, opt.ClientID, opt.ClientSecret, &options)
if err != nil {
return nil, fmt.Errorf("error creating a client secret credential: %w", err)
}
@ -848,7 +868,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
ClientOptions: policyClientOptions,
SendCertificateChain: opt.ClientSendCertificateChain,
}
cred, err = azidentity.NewClientCertificateCredential(
f.cred, err = azidentity.NewClientCertificateCredential(
opt.Tenant, opt.ClientID, certs, key, &options,
)
if err != nil {
@ -863,7 +883,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
if err != nil {
return nil, fmt.Errorf("user password decode failed - did you obscure it?: %w", err)
}
cred, err = azidentity.NewUsernamePasswordCredential(
f.cred, err = azidentity.NewUsernamePasswordCredential(
opt.Tenant, opt.ClientID, opt.Username, password, &options,
)
if err != nil {
@ -882,7 +902,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
options := azidentity.ClientSecretCredentialOptions{
ClientOptions: policyClientOptions,
}
cred, err = azidentity.NewClientSecretCredential(parsedCreds.Tenant, parsedCreds.AppID, parsedCreds.Password, &options)
f.cred, err = azidentity.NewClientSecretCredential(parsedCreds.Tenant, parsedCreds.AppID, parsedCreds.Password, &options)
if err != nil {
return nil, fmt.Errorf("error creating a client secret credential: %w", err)
}
@ -904,19 +924,19 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
case opt.MSIResourceID != "":
options.ID = azidentity.ResourceID(opt.MSIResourceID)
}
cred, err = azidentity.NewManagedIdentityCredential(&options)
f.cred, err = azidentity.NewManagedIdentityCredential(&options)
if err != nil {
return nil, fmt.Errorf("failed to acquire MSI token: %w", err)
}
case opt.UseAZ:
var options = azidentity.AzureCLICredentialOptions{}
cred, err = azidentity.NewAzureCLICredential(&options)
f.cred, err = azidentity.NewAzureCLICredential(&options)
if err != nil {
return nil, fmt.Errorf("failed to create Azure CLI credentials: %w", err)
}
case opt.Account != "":
// Anonymous access
anonymous = true
f.anonymous = true
default:
return nil, errors.New("no authentication method configured")
}
@ -934,19 +954,19 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
}
opt.Endpoint = u.String()
}
if sharedKeyCred != nil {
if f.sharedKeyCred != nil {
// Shared key cred
f.svc, err = service.NewClientWithSharedKeyCredential(opt.Endpoint, sharedKeyCred, &clientOpt)
f.svc, err = service.NewClientWithSharedKeyCredential(opt.Endpoint, f.sharedKeyCred, &clientOpt)
if err != nil {
return nil, fmt.Errorf("create client with shared key failed: %w", err)
}
} else if cred != nil {
} else if f.cred != nil {
// Azidentity cred
f.svc, err = service.NewClient(opt.Endpoint, cred, &clientOpt)
f.svc, err = service.NewClient(opt.Endpoint, f.cred, &clientOpt)
if err != nil {
return nil, fmt.Errorf("create client failed: %w", err)
}
} else if anonymous {
} else if f.anonymous {
// Anonymous public access
f.svc, err = service.NewClientWithNoCredential(opt.Endpoint, &clientOpt)
if err != nil {
@ -1500,7 +1520,7 @@ func (f *Fs) makeContainer(ctx context.Context, container string) error {
// When a container is deleted, a container with the same name cannot be created
// for at least 30 seconds; the container may not be available for more than 30
// seconds if the service is still processing the request.
time.Sleep(6 * time.Second) // default 10 retries will be 60 seconds
time.Sleep(12 * time.Second) // default 10 retries will be 120 seconds
f.cache.MarkDeleted(container)
return true, err
case bloberror.AuthorizationFailure:
@ -1608,6 +1628,220 @@ func (f *Fs) Purge(ctx context.Context, dir string) error {
return f.deleteContainer(ctx, container)
}
// getAuth gets auth to copy o.
//
// tokenOK is used to signal that token based auth (Microsoft Entra
// ID) is acceptable.
//
// This will return srcURL to read the object, which may be a SAS URL.
//
// If noAuth is set then the srcURL returned will be a plain object
// URL (not a SAS) and token will be empty.
//
// If tokenOK is true it may also return a token for the auth.
func (o *Object) getAuth(ctx context.Context, tokenOK bool, noAuth bool) (srcURL string, token *string, err error) {
f := o.fs
srcBlobSVC := o.getBlobSVC()
srcURL = srcBlobSVC.URL()
switch {
case noAuth:
// If same storage account then no auth needed
case f.cred != nil:
if !tokenOK {
return srcURL, token, errors.New("not supported: Microsoft Entra ID")
}
options := policy.TokenRequestOptions{}
accessToken, err := f.cred.GetToken(ctx, options)
if err != nil {
return srcURL, token, fmt.Errorf("failed to create access token: %w", err)
}
token = &accessToken.Token
case f.sharedKeyCred != nil:
// Generate a short lived SAS URL if using shared key credentials
expiry := time.Now().Add(time.Hour)
sasOptions := blob.GetSASURLOptions{}
srcURL, err = srcBlobSVC.GetSASURL(sas.BlobPermissions{Read: true}, expiry, &sasOptions)
if err != nil {
return srcURL, token, fmt.Errorf("failed to create SAS URL: %w", err)
}
case f.anonymous || f.opt.SASURL != "":
// If using a SASURL or anonymous, no need for any extra auth
default:
return srcURL, token, errors.New("unknown authentication type")
}
return srcURL, token, nil
}
// Do multipart parallel copy.
//
// This uses these APIs:
//
// - PutBlockFromURL - https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-from-url
// - PutBlockList - https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list
func (f *Fs) copyMultipart(ctx context.Context, remote, dstContainer, dstPath string, src *Object) (dst fs.Object, err error) {
srcProperties, err := src.readMetaDataAlways(ctx)
if err != nil {
return nil, fmt.Errorf("multipart copy: failed to read source object: %w", err)
}
// Create the dst object by altering a copy of the src object
obj := *src
o := &obj
o.fs = f
o.remote = remote
srcURL, token, err := src.getAuth(ctx, true, false)
if err != nil {
return nil, fmt.Errorf("multipart copy: %w", err)
}
dstBlockBlobSVC := f.getBlockBlobSVC(dstContainer, dstPath)
defer atexit.OnError(&err, func() {
// Try to abort the upload, but ignore the error.
fs.Debugf(o, "Cancelling multipart copy")
// FIXME abort by deleting uncommitted blocks?
// _ = f.pacer.Call(func() (bool, error) {
// _, err := f.c.AbortMultipartUpload(context.Background(), &s3.AbortMultipartUploadInput{
// Bucket: &dstContainer,
// Key: &dstPath,
// UploadId: uid,
// RequestPayer: req.RequestPayer,
// })
// return f.shouldRetry(ctx, err)
// })
})()
var (
srcSize = src.size
partSize = int64(chunksize.Calculator(o, src.size, blockblob.MaxBlocks, f.opt.ChunkSize))
numParts = (srcSize-1)/partSize + 1
blockIDs = make([]string, numParts) // list of blocks for finalize
g, gCtx = errgroup.WithContext(ctx)
)
g.SetLimit(f.opt.UploadConcurrency)
fs.Debugf(o, "Starting multipart copy with %d parts of size %v", numParts, fs.SizeSuffix(partSize))
for partNum := uint64(0); partNum < uint64(numParts); partNum++ {
// Fail fast, in case an errgroup managed function returns an error
// gCtx is cancelled. There is no point in uploading all the other parts.
if gCtx.Err() != nil {
break
}
partNum := partNum // for closure
g.Go(func() error {
var binaryBlockID [8]byte // block counter as LSB first 8 bytes
binary.LittleEndian.PutUint64(binaryBlockID[:], partNum)
blockID := base64.StdEncoding.EncodeToString(binaryBlockID[:])
options := blockblob.StageBlockFromURLOptions{
Range: blob.HTTPRange{
Offset: int64(partNum) * partSize,
Count: partSize,
},
// Specifies the authorization scheme and signature for the copy source.
CopySourceAuthorization: token,
// CPKInfo *blob.CPKInfo
// CPKScopeInfo *blob.CPKScopeInfo
}
// Partial last block
if remaining := srcSize - options.Range.Offset; remaining < options.Range.Count {
options.Range.Count = remaining
}
fs.Debugf(o, "multipart copy: starting chunk %d size %v offset %v/%v", partNum, fs.SizeSuffix(options.Range.Count), fs.SizeSuffix(options.Range.Offset), fs.SizeSuffix(srcSize))
err := f.pacer.Call(func() (bool, error) {
_, err = dstBlockBlobSVC.StageBlockFromURL(ctx, blockID, srcURL, &options)
if err != nil {
return f.shouldRetry(ctx, err)
}
return false, nil
})
if err != nil {
return fmt.Errorf("multipart copy: failed to copy chunk %d with %v bytes: %w", partNum+1, -1, err)
}
blockIDs[partNum] = blockID
return nil
})
}
err = g.Wait()
if err != nil {
return nil, err
}
// Convert metadata from source object
options := blockblob.CommitBlockListOptions{
Metadata: srcProperties.Metadata,
Tier: parseTier(f.opt.AccessTier),
HTTPHeaders: &blob.HTTPHeaders{
BlobCacheControl: srcProperties.CacheControl,
BlobContentDisposition: srcProperties.ContentDisposition,
BlobContentEncoding: srcProperties.ContentEncoding,
BlobContentLanguage: srcProperties.ContentLanguage,
BlobContentMD5: srcProperties.ContentMD5,
BlobContentType: srcProperties.ContentType,
},
}
// Finalise the upload session
err = f.pacer.Call(func() (bool, error) {
_, err := dstBlockBlobSVC.CommitBlockList(ctx, blockIDs, &options)
return f.shouldRetry(ctx, err)
})
if err != nil {
return nil, fmt.Errorf("failed to complete multipart copy: %w", err)
}
fs.Debugf(o, "multipart copy finished")
return f.NewObject(ctx, remote)
}
// Do single part copy.
//
// This uses these APIs:
//
// - Copy Blob - https://docs.microsoft.com/rest/api/storageservices/copy-blob
// - Get Blob Properties - https://docs.microsoft.com/rest/api/storageservices/get-blob-properties
func (f *Fs) copySinglepart(ctx context.Context, remote, dstContainer, dstPath string, src *Object) (dst fs.Object, err error) {
dstBlobSVC := f.getBlobSVC(dstContainer, dstPath)
// Get the source auth - none needed for same storage account
srcURL, _, err := src.getAuth(ctx, false, f == src.fs)
if err != nil {
return nil, fmt.Errorf("single part copy: source auth: %w", err)
}
// Start the copy
options := blob.StartCopyFromURLOptions{
Tier: parseTier(f.opt.AccessTier),
}
var startCopy blob.StartCopyFromURLResponse
err = f.pacer.Call(func() (bool, error) {
startCopy, err = dstBlobSVC.StartCopyFromURL(ctx, srcURL, &options)
return f.shouldRetry(ctx, err)
})
if err != nil {
return nil, fmt.Errorf("single part copy: copy blob: %w", err)
}
// Poll for completion if necessary
//
// The for loop is never executed for same storage account copies.
copyStatus := startCopy.CopyStatus
getOptions := blob.GetPropertiesOptions{}
pollTime := 100 * time.Millisecond
for copyStatus != nil && string(*copyStatus) == string(container.CopyStatusTypePending) {
time.Sleep(pollTime)
getMetadata, err := dstBlobSVC.GetProperties(ctx, &getOptions)
if err != nil {
return nil, err
}
copyStatus = getMetadata.CopyStatus
pollTime = min(2*pollTime, time.Second)
}
return f.NewObject(ctx, remote)
}
// Copy src to this remote using server-side copy operations.
//
// This is stored with the remote path given.
@ -1628,36 +1862,19 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
fs.Debugf(src, "Can't copy - not same remote type")
return nil, fs.ErrorCantCopy
}
dstBlobSVC := f.getBlobSVC(dstContainer, dstPath)
srcBlobSVC := srcObj.getBlobSVC()
srcURL := srcBlobSVC.URL()
options := blob.StartCopyFromURLOptions{
Tier: parseTier(f.opt.AccessTier),
}
var startCopy blob.StartCopyFromURLResponse
err = f.pacer.Call(func() (bool, error) {
startCopy, err = dstBlobSVC.StartCopyFromURL(ctx, srcURL, &options)
return f.shouldRetry(ctx, err)
})
if err != nil {
return nil, err
}
// Assume we are copying to a different storage account if we
// are copying across configs.
sameStorageAccount := f == srcObj.fs
copyStatus := startCopy.CopyStatus
getOptions := blob.GetPropertiesOptions{}
pollTime := 100 * time.Millisecond
for copyStatus != nil && string(*copyStatus) == string(container.CopyStatusTypePending) {
time.Sleep(pollTime)
getMetadata, err := dstBlobSVC.GetProperties(ctx, &getOptions)
if err != nil {
return nil, err
}
copyStatus = getMetadata.CopyStatus
pollTime = min(2*pollTime, time.Second)
}
// If we are using Microsoft Entra ID token based auth then
// copySinglepart does not work
usingEntraID := f.cred != nil
return f.NewObject(ctx, remote)
if srcObj.size >= int64(f.opt.CopyCutoff) || (usingEntraID && !sameStorageAccount) {
return f.copyMultipart(ctx, remote, dstContainer, dstPath, srcObj)
}
return f.copySinglepart(ctx, remote, dstContainer, dstPath, srcObj)
}
// ------------------------------------------------------------
@ -1890,24 +2107,46 @@ func (o *Object) clearMetaData() {
}
// readMetaData gets the metadata if it hasn't already been fetched
func (f *Fs) readMetaData(ctx context.Context, container, containerPath string) (blobProperties blob.GetPropertiesResponse, err error) {
func (f *Fs) readMetaData(ctx context.Context, container, containerPath string) (blobProperties *blob.GetPropertiesResponse, err error) {
if !f.containerOK(container) {
return blobProperties, fs.ErrorObjectNotFound
return nil, fs.ErrorObjectNotFound
}
blb := f.getBlobSVC(container, containerPath)
// Read metadata (this includes metadata)
options := blob.GetPropertiesOptions{}
var resp blob.GetPropertiesResponse
err = f.pacer.Call(func() (bool, error) {
blobProperties, err = blb.GetProperties(ctx, &options)
resp, err = blb.GetProperties(ctx, &options)
return f.shouldRetry(ctx, err)
})
if err != nil {
// On directories - GetProperties does not work and current SDK does not populate service code correctly hence check regular http response as well
if storageErr, ok := err.(*azcore.ResponseError); ok && (storageErr.ErrorCode == string(bloberror.BlobNotFound) || storageErr.StatusCode == http.StatusNotFound) {
return blobProperties, fs.ErrorObjectNotFound
return nil, fs.ErrorObjectNotFound
}
return blobProperties, err
return nil, err
}
return &resp, nil
}
// readMetaDataAlways gets the metadata unconditionally and also the blob properties.
//
// Sets
//
// o.id
// o.modTime
// o.size
// o.md5
func (o *Object) readMetaDataAlways(ctx context.Context) (blobProperties *blob.GetPropertiesResponse, err error) {
container, containerPath := o.split()
blobProperties, err = o.fs.readMetaData(ctx, container, containerPath)
if err != nil {
return nil, err
}
err = o.decodeMetaDataFromPropertiesResponse(blobProperties)
if err != nil {
return nil, err
}
return blobProperties, nil
}
@ -1924,12 +2163,8 @@ func (o *Object) readMetaData(ctx context.Context) (err error) {
if !o.modTime.IsZero() {
return nil
}
container, containerPath := o.split()
blobProperties, err := o.fs.readMetaData(ctx, container, containerPath)
if err != nil {
return err
}
return o.decodeMetaDataFromPropertiesResponse(&blobProperties)
_, err = o.readMetaDataAlways(ctx)
return err
}
// ModTime returns the modification time of the object

View File

@ -48,8 +48,13 @@ func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
return f.setUploadChunkSize(cs)
}
func (f *Fs) SetCopyCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
return f.setCopyCutoff(cs)
}
var (
_ fstests.SetUploadChunkSizer = (*Fs)(nil)
_ fstests.SetCopyCutoffer = (*Fs)(nil)
)
func TestValidateAccessTier(t *testing.T) {