mirror of
https://github.com/rclone/rclone
synced 2025-02-18 14:11:27 +01:00
azureblob: remove uncommitted blocks on InvalidBlobOrBlock error
When doing a multipart upload or copy, if a InvalidBlobOrBlock error is received, it can mean that there are uncomitted blocks from a previous failed attempt with a different length of ID. This patch makes rclone attempt to clear the uncomitted blocks and retry if it receives this error.
This commit is contained in:
parent
3a5ddfcd3c
commit
8e955c6b13
backend/azureblob
@ -1751,6 +1751,7 @@ func (f *Fs) copyMultipart(ctx context.Context, remote, dstContainer, dstPath st
|
||||
numParts = (srcSize-1)/partSize + 1
|
||||
blockIDs = make([]string, numParts) // list of blocks for finalize
|
||||
g, gCtx = errgroup.WithContext(ctx)
|
||||
checker = newCheckForInvalidBlockOrBlob("copy", o)
|
||||
)
|
||||
g.SetLimit(f.opt.CopyConcurrency)
|
||||
|
||||
@ -1780,8 +1781,13 @@ func (f *Fs) copyMultipart(ctx context.Context, remote, dstContainer, dstPath st
|
||||
}
|
||||
fs.Debugf(o, "multipart copy: starting chunk %d size %v offset %v/%v", partNum, fs.SizeSuffix(options.Range.Count), fs.SizeSuffix(options.Range.Offset), fs.SizeSuffix(srcSize))
|
||||
err := f.pacer.Call(func() (bool, error) {
|
||||
checker.start()
|
||||
_, err := dstBlockBlobSVC.StageBlockFromURL(ctx, blockID, srcURL, &options)
|
||||
checker.stop()
|
||||
if err != nil {
|
||||
if checker.checkErr(ctx, err) {
|
||||
return true, err
|
||||
}
|
||||
return f.shouldRetry(ctx, err)
|
||||
}
|
||||
return false, nil
|
||||
@ -2393,6 +2399,7 @@ type azChunkWriter struct {
|
||||
blocks []azBlock // list of blocks for finalize
|
||||
o *Object
|
||||
bic *blockIDCreator
|
||||
checker *checkForInvalidBlockOrBlob
|
||||
}
|
||||
|
||||
// OpenChunkWriter returns the chunk size and a ChunkWriter
|
||||
@ -2449,6 +2456,7 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn
|
||||
f: f,
|
||||
ui: ui,
|
||||
o: o,
|
||||
checker: newCheckForInvalidBlockOrBlob("upload", o),
|
||||
}
|
||||
info = fs.ChunkWriterInfo{
|
||||
ChunkSize: int64(partSize),
|
||||
@ -2463,6 +2471,81 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn
|
||||
return info, chunkWriter, nil
|
||||
}
|
||||
|
||||
// isInvalidBlockOrBlob looks for the InvalidBlockOrBlob error in err
|
||||
// returning true if it is found
|
||||
func isInvalidBlockOrBlob(err error) bool {
|
||||
var storageErr *azcore.ResponseError
|
||||
if errors.As(err, &storageErr) {
|
||||
return storageErr.ErrorCode == string(bloberror.InvalidBlobOrBlock)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Struct to hold state for checking for InvalidBlockOrBlob
|
||||
type checkForInvalidBlockOrBlob struct {
|
||||
startMu sync.Mutex // hold when starting transactions
|
||||
inFlight sync.WaitGroup // transactions in flight
|
||||
what string // "copy" or "upload"
|
||||
o *Object // object we are working on
|
||||
cleared bool // set if we have cleared the uncommitted blocks - we only do this once
|
||||
}
|
||||
|
||||
// Make InvalidBlockOrBlob checker
|
||||
func newCheckForInvalidBlockOrBlob(what string, o *Object) *checkForInvalidBlockOrBlob {
|
||||
return &checkForInvalidBlockOrBlob{
|
||||
what: what,
|
||||
o: o,
|
||||
}
|
||||
}
|
||||
|
||||
// start marks that there is a transaction in progress
|
||||
func (c *checkForInvalidBlockOrBlob) start() {
|
||||
c.startMu.Lock()
|
||||
defer c.startMu.Unlock()
|
||||
c.inFlight.Add(1)
|
||||
}
|
||||
|
||||
// stop marks that this transaction has finished
|
||||
func (c *checkForInvalidBlockOrBlob) stop() {
|
||||
c.inFlight.Done()
|
||||
}
|
||||
|
||||
// checkErr looks for the InvalidBlockOrBlob error in err, and if it
|
||||
// is found, it clears uncommitted blocks in o to clear the error.
|
||||
//
|
||||
// It returns a bool indicating whether the error was found or not.
|
||||
//
|
||||
// See https://gauravmantri.com/2013/05/18/windows-azure-blob-storage-dealing-with-the-specified-blob-or-block-content-is-invalid-error/
|
||||
func (c *checkForInvalidBlockOrBlob) checkErr(ctx context.Context, err error) (result bool) {
|
||||
// defer log.Trace(c.o, "err=%#v, what=%q", err, c.what)("result=%v", &result)
|
||||
if !isInvalidBlockOrBlob(err) {
|
||||
return false
|
||||
}
|
||||
|
||||
// Prevent more transactions starting
|
||||
c.startMu.Lock()
|
||||
defer c.startMu.Unlock()
|
||||
|
||||
if c.cleared {
|
||||
fs.Debugf(c.o, "multipart %s: received %s error: already cleared", c.what, bloberror.InvalidBlobOrBlock)
|
||||
return true
|
||||
}
|
||||
|
||||
// Wait for any other outstanding transactions to finish
|
||||
c.inFlight.Wait()
|
||||
|
||||
// Clear uncommitted blocks
|
||||
fs.Debugf(c.o, "multipart %s: received %s error: clearing uncommitted blocks and retrying", c.what, bloberror.InvalidBlobOrBlock)
|
||||
clearErr := c.o.clearUncommittedBlocks(ctx)
|
||||
if clearErr != nil {
|
||||
fs.Debugf(c.o, "multipart %s: error fixing %s: %v", c.what, bloberror.InvalidBlobOrBlock, clearErr)
|
||||
}
|
||||
fs.Debugf(c.o, "multipart %s: fixed %s", c.what, bloberror.InvalidBlobOrBlock)
|
||||
c.cleared = true
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// WriteChunk will write chunk number with reader bytes, where chunk number >= 0
|
||||
func (w *azChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (int64, error) {
|
||||
if chunkNumber < 0 {
|
||||
@ -2503,8 +2586,13 @@ func (w *azChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader
|
||||
// Specify the transactional md5 for the body, to be validated by the service.
|
||||
TransactionalValidation: blob.TransferValidationTypeMD5(md5sum),
|
||||
}
|
||||
w.checker.start()
|
||||
_, err = w.ui.blb.StageBlock(ctx, blockID, &readSeekCloser{Reader: reader, Seeker: reader}, &options)
|
||||
w.checker.stop()
|
||||
if err != nil {
|
||||
if w.checker.checkErr(ctx, err) {
|
||||
return true, err
|
||||
}
|
||||
if chunkNumber <= 8 {
|
||||
return w.f.shouldRetry(ctx, err)
|
||||
}
|
||||
|
@ -3,22 +3,20 @@
|
||||
package azureblob
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fstest"
|
||||
"github.com/rclone/rclone/fstest/fstests"
|
||||
"github.com/rclone/rclone/lib/random"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func (f *Fs) InternalTest(t *testing.T) {
|
||||
// Check first feature flags are set on this
|
||||
// remote
|
||||
enabled := f.Features().SetTier
|
||||
assert.True(t, enabled)
|
||||
enabled = f.Features().GetTier
|
||||
assert.True(t, enabled)
|
||||
}
|
||||
|
||||
func TestBlockIDCreator(t *testing.T) {
|
||||
// Check creation and random number
|
||||
bic, err := newBlockIDCreator()
|
||||
@ -46,3 +44,108 @@ func TestBlockIDCreator(t *testing.T) {
|
||||
assert.ErrorContains(t, bic.checkID(chunkNumber+1, got), "expecting decoded")
|
||||
assert.ErrorContains(t, bic2.checkID(chunkNumber, got), "random bytes")
|
||||
}
|
||||
|
||||
func (f *Fs) testFeatures(t *testing.T) {
|
||||
// Check first feature flags are set on this remote
|
||||
enabled := f.Features().SetTier
|
||||
assert.True(t, enabled)
|
||||
enabled = f.Features().GetTier
|
||||
assert.True(t, enabled)
|
||||
}
|
||||
|
||||
type ReadSeekCloser struct {
|
||||
*strings.Reader
|
||||
}
|
||||
|
||||
func (r *ReadSeekCloser) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stage a block at remote but don't commit it
|
||||
func (f *Fs) stageBlockWithoutCommit(ctx context.Context, t *testing.T, remote string) {
|
||||
var (
|
||||
containerName, blobPath = f.split(remote)
|
||||
containerClient = f.cntSVC(containerName)
|
||||
blobClient = containerClient.NewBlockBlobClient(blobPath)
|
||||
data = "uncommitted data"
|
||||
blockID = "1"
|
||||
blockIDBase64 = base64.StdEncoding.EncodeToString([]byte(blockID))
|
||||
)
|
||||
r := &ReadSeekCloser{strings.NewReader(data)}
|
||||
_, err := blobClient.StageBlock(ctx, blockIDBase64, r, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify the block is staged but not committed
|
||||
blockList, err := blobClient.GetBlockList(ctx, blockblob.BlockListTypeAll, nil)
|
||||
require.NoError(t, err)
|
||||
found := false
|
||||
for _, block := range blockList.UncommittedBlocks {
|
||||
if *block.Name == blockIDBase64 {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
require.True(t, found, "Block ID not found in uncommitted blocks")
|
||||
}
|
||||
|
||||
// This tests uploading a blob where it has uncommitted blocks with a different ID size.
|
||||
//
|
||||
// https://gauravmantri.com/2013/05/18/windows-azure-blob-storage-dealing-with-the-specified-blob-or-block-content-is-invalid-error/
|
||||
//
|
||||
// TestIntegration/FsMkdir/FsPutFiles/Internal/WriteUncommittedBlocks
|
||||
func (f *Fs) testWriteUncommittedBlocks(t *testing.T) {
|
||||
var (
|
||||
ctx = context.Background()
|
||||
remote = "testBlob"
|
||||
)
|
||||
|
||||
// Multipart copy the blob please
|
||||
oldUseCopyBlob, oldCopyCutoff := f.opt.UseCopyBlob, f.opt.CopyCutoff
|
||||
f.opt.UseCopyBlob = false
|
||||
f.opt.CopyCutoff = f.opt.ChunkSize
|
||||
defer func() {
|
||||
f.opt.UseCopyBlob, f.opt.CopyCutoff = oldUseCopyBlob, oldCopyCutoff
|
||||
}()
|
||||
|
||||
// Create a blob with uncommitted blocks
|
||||
f.stageBlockWithoutCommit(ctx, t, remote)
|
||||
|
||||
// Now attempt to overwrite the block with a different sized block ID to provoke this error
|
||||
|
||||
// Check the object does not exist
|
||||
_, err := f.NewObject(ctx, remote)
|
||||
require.Equal(t, fs.ErrorObjectNotFound, err)
|
||||
|
||||
// Upload a multipart file over the block with uncommitted chunks of a different ID size
|
||||
size := 4*int(f.opt.ChunkSize) - 1
|
||||
contents := random.String(size)
|
||||
item := fstest.NewItem(remote, contents, fstest.Time("2001-05-06T04:05:06.499Z"))
|
||||
o := fstests.PutTestContents(ctx, t, f, &item, contents, true)
|
||||
|
||||
// Check size
|
||||
assert.Equal(t, int64(size), o.Size())
|
||||
|
||||
// Create a new blob with uncommitted blocks
|
||||
newRemote := "testBlob2"
|
||||
f.stageBlockWithoutCommit(ctx, t, newRemote)
|
||||
|
||||
// Copy over that block
|
||||
dst, err := f.Copy(ctx, o, newRemote)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check basics
|
||||
assert.Equal(t, int64(size), dst.Size())
|
||||
assert.Equal(t, newRemote, dst.Remote())
|
||||
|
||||
// Check contents
|
||||
gotContents := fstests.ReadObject(ctx, t, dst, -1)
|
||||
assert.Equal(t, contents, gotContents)
|
||||
|
||||
// Remove the object
|
||||
require.NoError(t, dst.Remove(ctx))
|
||||
}
|
||||
|
||||
func (f *Fs) InternalTest(t *testing.T) {
|
||||
t.Run("Features", f.testFeatures)
|
||||
t.Run("WriteUncommittedBlocks", f.testWriteUncommittedBlocks)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user