1
mirror of https://github.com/rclone/rclone synced 2024-11-29 07:55:12 +01:00
rclone/backend/oracleobjectstorage/object.go

627 lines
18 KiB
Go

//go:build !plan9 && !solaris && !js
// +build !plan9,!solaris,!js
package oracleobjectstorage
import (
"context"
"encoding/base64"
"encoding/hex"
"fmt"
"io"
"net/http"
"regexp"
"strconv"
"strings"
"time"
"github.com/ncw/swift/v2"
"github.com/oracle/oci-go-sdk/v65/common"
"github.com/oracle/oci-go-sdk/v65/objectstorage"
"github.com/oracle/oci-go-sdk/v65/objectstorage/transfer"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/lib/atexit"
)
// ------------------------------------------------------------
// Object Interface Implementation
// ------------------------------------------------------------
const (
metaMtime = "mtime" // the meta key to store mtime in - e.g. X-Amz-Meta-Mtime
metaMD5Hash = "md5chksum" // the meta key to store md5hash in
// StandardTier object storage tier
ociMetaPrefix = "opc-meta-"
)
var archive = "archive"
var infrequentAccess = "infrequentaccess"
var standard = "standard"
var storageTierMap = map[string]*string{
archive: &archive,
infrequentAccess: &infrequentAccess,
standard: &standard,
}
var matchMd5 = regexp.MustCompile(`^[0-9a-f]{32}$`)
// Object describes a oci bucket object
type Object struct {
fs *Fs // what this object is part of
remote string // The remote path
md5 string // MD5 hash if known
bytes int64 // Size of the object
lastModified time.Time // The modified time of the object if known
meta map[string]string // The object metadata if known - may be nil
mimeType string // Content-Type of the object
// Metadata as pointers to strings as they often won't be present
storageTier *string // e.g. Standard
}
// split returns bucket and bucketPath from the object
func (o *Object) split() (bucket, bucketPath string) {
return o.fs.split(o.remote)
}
// readMetaData gets the metadata if it hasn't already been fetched
func (o *Object) readMetaData(ctx context.Context) (err error) {
fs.Debugf(o, "trying to read metadata %v", o.remote)
if o.meta != nil {
return nil
}
info, err := o.headObject(ctx)
if err != nil {
return err
}
return o.decodeMetaDataHead(info)
}
// headObject gets the metadata from the object unconditionally
func (o *Object) headObject(ctx context.Context) (info *objectstorage.HeadObjectResponse, err error) {
bucketName, objectPath := o.split()
req := objectstorage.HeadObjectRequest{
NamespaceName: common.String(o.fs.opt.Namespace),
BucketName: common.String(bucketName),
ObjectName: common.String(objectPath),
}
useBYOKHeadObject(o.fs, &req)
var response objectstorage.HeadObjectResponse
err = o.fs.pacer.Call(func() (bool, error) {
var err error
response, err = o.fs.srv.HeadObject(ctx, req)
return shouldRetry(ctx, response.HTTPResponse(), err)
})
if err != nil {
if svcErr, ok := err.(common.ServiceError); ok {
if svcErr.GetHTTPStatusCode() == http.StatusNotFound {
return nil, fs.ErrorObjectNotFound
}
}
fs.Errorf(o, "Failed to head object: %v", err)
return nil, err
}
o.fs.cache.MarkOK(bucketName)
return &response, err
}
func (o *Object) decodeMetaDataHead(info *objectstorage.HeadObjectResponse) (err error) {
return o.setMetaData(
info.ContentLength,
info.ContentMd5,
info.ContentType,
info.LastModified,
info.StorageTier,
info.OpcMeta)
}
func (o *Object) decodeMetaDataObject(info *objectstorage.GetObjectResponse) (err error) {
return o.setMetaData(
info.ContentLength,
info.ContentMd5,
info.ContentType,
info.LastModified,
info.StorageTier,
info.OpcMeta)
}
func (o *Object) setMetaData(
contentLength *int64,
contentMd5 *string,
contentType *string,
lastModified *common.SDKTime,
storageTier interface{},
meta map[string]string) error {
if contentLength != nil {
o.bytes = *contentLength
}
if contentMd5 != nil {
md5, err := o.base64ToMd5(*contentMd5)
if err == nil {
o.md5 = md5
}
}
o.meta = meta
if o.meta == nil {
o.meta = map[string]string{}
}
// Read MD5 from metadata if present
if md5sumBase64, ok := o.meta[metaMD5Hash]; ok {
md5, err := o.base64ToMd5(md5sumBase64)
if err != nil {
o.md5 = md5
}
}
if lastModified == nil {
o.lastModified = time.Now()
fs.Logf(o, "Failed to read last modified")
} else {
o.lastModified = lastModified.Time
}
if contentType != nil {
o.mimeType = *contentType
}
if storageTier == nil || storageTier == "" {
o.storageTier = storageTierMap[standard]
} else {
tier := strings.ToLower(fmt.Sprintf("%v", storageTier))
o.storageTier = storageTierMap[tier]
}
return nil
}
func (o *Object) base64ToMd5(md5sumBase64 string) (md5 string, err error) {
md5sumBytes, err := base64.StdEncoding.DecodeString(md5sumBase64)
if err != nil {
fs.Debugf(o, "Failed to read md5sum from metadata %q: %v", md5sumBase64, err)
return "", err
} else if len(md5sumBytes) != 16 {
fs.Debugf(o, "failed to read md5sum from metadata %q: wrong length", md5sumBase64)
return "", fmt.Errorf("failed to read md5sum from metadata %q: wrong length", md5sumBase64)
}
return hex.EncodeToString(md5sumBytes), nil
}
// Fs returns the parent Fs
func (o *Object) Fs() fs.Info {
return o.fs
}
// Remote returns the remote path
func (o *Object) Remote() string {
return o.remote
}
// Return a string version
func (o *Object) String() string {
if o == nil {
return "<nil>"
}
return o.remote
}
// Size returns the size of an object in bytes
func (o *Object) Size() int64 {
return o.bytes
}
// GetTier returns storage class as string
func (o *Object) GetTier() string {
if o.storageTier == nil || *o.storageTier == "" {
return standard
}
return *o.storageTier
}
// SetTier performs changing storage class
func (o *Object) SetTier(tier string) (err error) {
ctx := context.TODO()
tier = strings.ToLower(tier)
bucketName, bucketPath := o.split()
tierEnum, ok := objectstorage.GetMappingStorageTierEnum(tier)
if !ok {
return fmt.Errorf("not a valid storage tier %v ", tier)
}
req := objectstorage.UpdateObjectStorageTierRequest{
NamespaceName: common.String(o.fs.opt.Namespace),
BucketName: common.String(bucketName),
UpdateObjectStorageTierDetails: objectstorage.UpdateObjectStorageTierDetails{
ObjectName: common.String(bucketPath),
StorageTier: tierEnum,
},
}
_, err = o.fs.srv.UpdateObjectStorageTier(ctx, req)
if err != nil {
return err
}
o.storageTier = storageTierMap[tier]
return err
}
// MimeType of an Object if known, "" otherwise
func (o *Object) MimeType(ctx context.Context) string {
err := o.readMetaData(ctx)
if err != nil {
fs.Logf(o, "Failed to read metadata: %v", err)
return ""
}
return o.mimeType
}
// Hash returns the MD5 of an object returning a lowercase hex string
func (o *Object) Hash(ctx context.Context, t hash.Type) (string, error) {
if t != hash.MD5 {
return "", hash.ErrUnsupported
}
// Convert base64 encoded md5 into lower case hex
if o.md5 == "" {
err := o.readMetaData(ctx)
if err != nil {
return "", err
}
}
return o.md5, nil
}
// ModTime returns the modification time of the object
//
// It attempts to read the objects mtime and if that isn't present the
// LastModified returned to the http headers
func (o *Object) ModTime(ctx context.Context) (result time.Time) {
if o.fs.ci.UseServerModTime {
return o.lastModified
}
err := o.readMetaData(ctx)
if err != nil {
fs.Logf(o, "Failed to read metadata: %v", err)
return time.Now()
}
// read mtime out of metadata if available
d, ok := o.meta[metaMtime]
if !ok || d == "" {
return o.lastModified
}
modTime, err := swift.FloatStringToTime(d)
if err != nil {
fs.Logf(o, "Failed to read mtime from object: %v", err)
return o.lastModified
}
return modTime
}
// SetModTime sets the modification time of the local fs object
func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
err := o.readMetaData(ctx)
if err != nil {
return err
}
o.meta[metaMtime] = swift.TimeToFloatString(modTime)
_, err = o.fs.Copy(ctx, o, o.remote)
return err
}
// Storable returns if this object is storable
func (o *Object) Storable() bool {
return true
}
// Remove an object
func (o *Object) Remove(ctx context.Context) error {
bucketName, bucketPath := o.split()
req := objectstorage.DeleteObjectRequest{
NamespaceName: common.String(o.fs.opt.Namespace),
BucketName: common.String(bucketName),
ObjectName: common.String(bucketPath),
}
err := o.fs.pacer.Call(func() (bool, error) {
resp, err := o.fs.srv.DeleteObject(ctx, req)
return shouldRetry(ctx, resp.HTTPResponse(), err)
})
return err
}
// Open object file
func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) {
bucketName, bucketPath := o.split()
req := objectstorage.GetObjectRequest{
NamespaceName: common.String(o.fs.opt.Namespace),
BucketName: common.String(bucketName),
ObjectName: common.String(bucketPath),
}
o.applyGetObjectOptions(&req, options...)
useBYOKGetObject(o.fs, &req)
var resp objectstorage.GetObjectResponse
err := o.fs.pacer.Call(func() (bool, error) {
var err error
resp, err = o.fs.srv.GetObject(ctx, req)
return shouldRetry(ctx, resp.HTTPResponse(), err)
})
if err != nil {
return nil, err
}
// read size from ContentLength or ContentRange
bytes := resp.ContentLength
if resp.ContentRange != nil {
var contentRange = *resp.ContentRange
slash := strings.IndexRune(contentRange, '/')
if slash >= 0 {
i, err := strconv.ParseInt(contentRange[slash+1:], 10, 64)
if err == nil {
bytes = &i
} else {
fs.Debugf(o, "Failed to find parse integer from in %q: %v", contentRange, err)
}
} else {
fs.Debugf(o, "Failed to find length in %q", contentRange)
}
}
err = o.decodeMetaDataObject(&resp)
if err != nil {
return nil, err
}
o.bytes = *bytes
return resp.HTTPResponse().Body, nil
}
// Update an object if it has changed
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
bucketName, bucketPath := o.split()
err = o.fs.makeBucket(ctx, bucketName)
if err != nil {
return err
}
// determine if we like upload single or multipart.
size := src.Size()
multipart := size >= int64(o.fs.opt.UploadCutoff)
// Set the mtime in the metadata
modTime := src.ModTime(ctx)
metadata := map[string]string{
metaMtime: swift.TimeToFloatString(modTime),
}
// read the md5sum if available
// - for non-multipart
// - so we can add a ContentMD5
// - so we can add the md5sum in the metadata as metaMD5Hash if using SSE/SSE-C
// - for multipart provided checksums aren't disabled
// - so we can add the md5sum in the metadata as metaMD5Hash
var md5sumBase64 string
var md5sumHex string
if !multipart || !o.fs.opt.DisableChecksum {
md5sumHex, err = src.Hash(ctx, hash.MD5)
if err == nil && matchMd5.MatchString(md5sumHex) {
hashBytes, err := hex.DecodeString(md5sumHex)
if err == nil {
md5sumBase64 = base64.StdEncoding.EncodeToString(hashBytes)
if multipart && !o.fs.opt.DisableChecksum {
// Set the md5sum as metadata on the object if
// - a multipart upload
// - the ETag is not an MD5, e.g. when using SSE/SSE-C
// provided checksums aren't disabled
metadata[metaMD5Hash] = md5sumBase64
}
}
}
}
// Guess the content type
mimeType := fs.MimeType(ctx, src)
if multipart {
chunkSize := int64(o.fs.opt.ChunkSize)
uploadRequest := transfer.UploadRequest{
NamespaceName: common.String(o.fs.opt.Namespace),
BucketName: common.String(bucketName),
ObjectName: common.String(bucketPath),
ContentType: common.String(mimeType),
PartSize: common.Int64(chunkSize),
AllowMultipartUploads: common.Bool(true),
AllowParrallelUploads: common.Bool(true),
ObjectStorageClient: o.fs.srv,
EnableMultipartChecksumVerification: common.Bool(!o.fs.opt.DisableChecksum),
NumberOfGoroutines: common.Int(o.fs.opt.UploadConcurrency),
Metadata: metadataWithOpcPrefix(metadata),
}
if o.fs.opt.StorageTier != "" {
storageTier, ok := objectstorage.GetMappingPutObjectStorageTierEnum(o.fs.opt.StorageTier)
if !ok {
return fmt.Errorf("not a valid storage tier: %v", o.fs.opt.StorageTier)
}
uploadRequest.StorageTier = storageTier
}
o.applyMultiPutOptions(&uploadRequest, options...)
useBYOKUpload(o.fs, &uploadRequest)
uploadStreamRequest := transfer.UploadStreamRequest{
UploadRequest: uploadRequest,
StreamReader: in,
}
uploadMgr := transfer.NewUploadManager()
var uploadID = ""
defer atexit.OnError(&err, func() {
if uploadID == "" {
return
}
if o.fs.opt.LeavePartsOnError {
return
}
fs.Debugf(o, "Cancelling multipart upload")
errCancel := o.fs.abortMultiPartUpload(
context.Background(),
bucketName,
bucketPath,
uploadID)
if errCancel != nil {
fs.Debugf(o, "Failed to cancel multipart upload: %v", errCancel)
}
})()
err = o.fs.pacer.Call(func() (bool, error) {
uploadResponse, err := uploadMgr.UploadStream(ctx, uploadStreamRequest)
var httpResponse *http.Response
if err == nil {
if uploadResponse.Type == transfer.MultipartUpload {
if uploadResponse.MultipartUploadResponse != nil {
httpResponse = uploadResponse.MultipartUploadResponse.HTTPResponse()
}
} else {
if uploadResponse.SinglepartUploadResponse != nil {
httpResponse = uploadResponse.SinglepartUploadResponse.HTTPResponse()
}
}
}
if err != nil {
uploadID := ""
if uploadResponse.MultipartUploadResponse != nil && uploadResponse.MultipartUploadResponse.UploadID != nil {
uploadID = *uploadResponse.MultipartUploadResponse.UploadID
fs.Debugf(o, "multipart streaming upload failed, aborting uploadID: %v, may retry", uploadID)
_ = o.fs.abortMultiPartUpload(ctx, bucketName, bucketPath, uploadID)
}
}
return shouldRetry(ctx, httpResponse, err)
})
if err != nil {
fs.Errorf(o, "multipart streaming upload failed %v", err)
return err
}
} else {
req := objectstorage.PutObjectRequest{
NamespaceName: common.String(o.fs.opt.Namespace),
BucketName: common.String(bucketName),
ObjectName: common.String(bucketPath),
ContentType: common.String(mimeType),
PutObjectBody: io.NopCloser(in),
OpcMeta: metadata,
}
if size >= 0 {
req.ContentLength = common.Int64(size)
}
if o.fs.opt.StorageTier != "" {
storageTier, ok := objectstorage.GetMappingPutObjectStorageTierEnum(o.fs.opt.StorageTier)
if !ok {
return fmt.Errorf("not a valid storage tier: %v", o.fs.opt.StorageTier)
}
req.StorageTier = storageTier
}
o.applyPutOptions(&req, options...)
useBYOKPutObject(o.fs, &req)
var resp objectstorage.PutObjectResponse
err = o.fs.pacer.Call(func() (bool, error) {
resp, err = o.fs.srv.PutObject(ctx, req)
return shouldRetry(ctx, resp.HTTPResponse(), err)
})
if err != nil {
fs.Errorf(o, "put object failed %v", err)
return err
}
}
// Read the metadata from the newly created object
o.meta = nil // wipe old metadata
return o.readMetaData(ctx)
}
func (o *Object) applyPutOptions(req *objectstorage.PutObjectRequest, options ...fs.OpenOption) {
// Apply upload options
for _, option := range options {
key, value := option.Header()
lowerKey := strings.ToLower(key)
switch lowerKey {
case "":
// ignore
case "cache-control":
req.CacheControl = common.String(value)
case "content-disposition":
req.ContentDisposition = common.String(value)
case "content-encoding":
req.ContentEncoding = common.String(value)
case "content-language":
req.ContentLanguage = common.String(value)
case "content-type":
req.ContentType = common.String(value)
default:
if strings.HasPrefix(lowerKey, ociMetaPrefix) {
req.OpcMeta[lowerKey] = value
} else {
fs.Errorf(o, "Don't know how to set key %q on upload", key)
}
}
}
}
func (o *Object) applyGetObjectOptions(req *objectstorage.GetObjectRequest, options ...fs.OpenOption) {
fs.FixRangeOption(options, o.bytes)
for _, option := range options {
switch option.(type) {
case *fs.RangeOption, *fs.SeekOption:
_, value := option.Header()
req.Range = &value
default:
if option.Mandatory() {
fs.Logf(o, "Unsupported mandatory option: %v", option)
}
}
}
// Apply upload options
for _, option := range options {
key, value := option.Header()
lowerKey := strings.ToLower(key)
switch lowerKey {
case "":
// ignore
case "cache-control":
req.HttpResponseCacheControl = common.String(value)
case "content-disposition":
req.HttpResponseContentDisposition = common.String(value)
case "content-encoding":
req.HttpResponseContentEncoding = common.String(value)
case "content-language":
req.HttpResponseContentLanguage = common.String(value)
case "content-type":
req.HttpResponseContentType = common.String(value)
case "range":
// do nothing
default:
fs.Errorf(o, "Don't know how to set key %q on upload", key)
}
}
}
func (o *Object) applyMultiPutOptions(req *transfer.UploadRequest, options ...fs.OpenOption) {
// Apply upload options
for _, option := range options {
key, value := option.Header()
lowerKey := strings.ToLower(key)
switch lowerKey {
case "":
// ignore
case "content-encoding":
req.ContentEncoding = common.String(value)
case "content-language":
req.ContentLanguage = common.String(value)
case "content-type":
req.ContentType = common.String(value)
default:
if strings.HasPrefix(lowerKey, ociMetaPrefix) {
req.Metadata[lowerKey] = value
} else {
fs.Errorf(o, "Don't know how to set key %q on upload", key)
}
}
}
}
func metadataWithOpcPrefix(src map[string]string) map[string]string {
dst := make(map[string]string)
for lowerKey, value := range src {
if !strings.HasPrefix(lowerKey, ociMetaPrefix) {
dst[ociMetaPrefix+lowerKey] = value
}
}
return dst
}