azureblob: fix "fatal error: concurrent map writes"

Before this change, the metadata map could be accessed from multiple
goroutines at once, sometimes causing this error.

This fix adds a global mutex for adjusting the metadata map to make
all accesses safe.

See: https://forum.rclone.org/t/azure-blob-storage-with-vfs-cache-concurrent-map-writes-exception/41686
This commit is contained in:
Nick Craig-Wood 2023-09-13 15:36:44 +01:00
parent 29dd29b9f3
commit 04bba67cd5
1 changed files with 19 additions and 7 deletions

View File

@ -71,6 +71,12 @@ const (
var (
errCantUpdateArchiveTierBlobs = fserrors.NoRetryError(errors.New("can't update archive tier blob without --azureblob-archive-tier-delete"))
// Take this when changing or reading metadata.
//
// It acts as global metadata lock so we don't bloat Object
// with an extra lock that will only very rarely be contended.
metadataMu sync.Mutex
)
// Register with Fs
@ -461,7 +467,7 @@ type Object struct {
size int64 // Size of the object
mimeType string // Content-Type of the object
accessTier blob.AccessTier // Blob Access Tier
meta map[string]string // blob metadata
meta map[string]string // blob metadata - take metadataMu when accessing
}
// ------------------------------------------------------------
@ -955,6 +961,9 @@ func (f *Fs) getBlockBlobSVC(container, containerPath string) *blockblob.Client
// updateMetadataWithModTime adds the modTime passed in to o.meta.
func (o *Object) updateMetadataWithModTime(modTime time.Time) {
metadataMu.Lock()
defer metadataMu.Unlock()
// Make sure o.meta is not nil
if o.meta == nil {
o.meta = make(map[string]string, 1)
@ -1623,6 +1632,9 @@ func (o *Object) Size() int64 {
// Set o.metadata from metadata
func (o *Object) setMetadata(metadata map[string]*string) {
metadataMu.Lock()
defer metadataMu.Unlock()
if len(metadata) > 0 {
// Lower case the metadata
o.meta = make(map[string]string, len(metadata))
@ -1647,6 +1659,9 @@ func (o *Object) setMetadata(metadata map[string]*string) {
// Get metadata from o.meta
func (o *Object) getMetadata() (metadata map[string]*string) {
metadataMu.Lock()
defer metadataMu.Unlock()
if len(o.meta) == 0 {
return nil
}
@ -1858,12 +1873,7 @@ func (o *Object) ModTime(ctx context.Context) (result time.Time) {
// SetModTime sets the modification time of the local fs object
func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
// Make sure o.meta is not nil
if o.meta == nil {
o.meta = make(map[string]string, 1)
}
// Set modTimeKey in it
o.meta[modTimeKey] = modTime.Format(timeFormatOut)
o.updateMetadataWithModTime(modTime)
blb := o.getBlobSVC()
opt := blob.SetMetadataOptions{}
@ -2233,7 +2243,9 @@ func (o *Object) prepareUpload(ctx context.Context, src fs.ObjectInfo, options [
return ui, fmt.Errorf("can't upload to root - need a container")
}
// Create parent dir/bucket if not saving directory marker
metadataMu.Lock()
_, ui.isDirMarker = o.meta[dirMetaKey]
metadataMu.Unlock()
if !ui.isDirMarker {
err = o.fs.mkdirParent(ctx, o.remote)
if err != nil {