mirror of
https://github.com/rclone/rclone
synced 2025-01-01 01:06:24 +01:00
1135 lines
31 KiB
Go
1135 lines
31 KiB
Go
// Package s3 provides an interface to Amazon S3 oject storage
|
|
package s3
|
|
|
|
// FIXME need to prevent anything but ListDir working for s3://
|
|
|
|
/*
|
|
Progress of port to aws-sdk
|
|
|
|
* Don't really need o.meta at all?
|
|
|
|
What happens if you CTRL-C a multipart upload
|
|
* get an incomplete upload
|
|
* disappears when you delete the bucket
|
|
*/
|
|
|
|
import (
|
|
"encoding/base64"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"path"
|
|
"regexp"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go/aws"
|
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
|
"github.com/aws/aws-sdk-go/aws/corehandlers"
|
|
"github.com/aws/aws-sdk-go/aws/credentials"
|
|
"github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds"
|
|
"github.com/aws/aws-sdk-go/aws/defaults"
|
|
"github.com/aws/aws-sdk-go/aws/ec2metadata"
|
|
"github.com/aws/aws-sdk-go/aws/request"
|
|
"github.com/aws/aws-sdk-go/aws/session"
|
|
"github.com/aws/aws-sdk-go/service/s3"
|
|
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
|
"github.com/ncw/rclone/fs"
|
|
"github.com/ncw/rclone/fs/config"
|
|
"github.com/ncw/rclone/fs/config/flags"
|
|
"github.com/ncw/rclone/fs/fshttp"
|
|
"github.com/ncw/rclone/fs/hash"
|
|
"github.com/ncw/rclone/fs/walk"
|
|
"github.com/ncw/rclone/lib/rest"
|
|
"github.com/ncw/swift"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
// Register with Fs
|
|
func init() {
|
|
fs.Register(&fs.RegInfo{
|
|
Name: "s3",
|
|
Description: "Amazon S3 (also Dreamhost, Ceph, Minio, IBM COS)",
|
|
NewFs: NewFs,
|
|
// AWS endpoints: http://docs.amazonwebservices.com/general/latest/gr/rande.html#s3_region
|
|
Options: []fs.Option{{
|
|
Name: "env_auth",
|
|
Help: "Get AWS credentials from runtime (environment variables or EC2/ECS meta data if no env vars). Only applies if access_key_id and secret_access_key is blank.",
|
|
Examples: []fs.OptionExample{
|
|
{
|
|
Value: "false",
|
|
Help: "Enter AWS credentials in the next step",
|
|
}, {
|
|
Value: "true",
|
|
Help: "Get AWS credentials from the environment (env vars or IAM)",
|
|
},
|
|
},
|
|
}, {
|
|
Name: "access_key_id",
|
|
Help: "AWS Access Key ID - leave blank for anonymous access or runtime credentials.",
|
|
}, {
|
|
Name: "secret_access_key",
|
|
Help: "AWS Secret Access Key (password) - leave blank for anonymous access or runtime credentials.",
|
|
}, {
|
|
Name: "region",
|
|
Help: "Region to connect to. Leave blank if you are using an S3 clone and you don't have a region.",
|
|
Examples: []fs.OptionExample{{
|
|
Value: "us-east-1",
|
|
Help: "The default endpoint - a good choice if you are unsure.\nUS Region, Northern Virginia or Pacific Northwest.\nLeave location constraint empty.",
|
|
}, {
|
|
Value: "us-east-2",
|
|
Help: "US East (Ohio) Region\nNeeds location constraint us-east-2.",
|
|
}, {
|
|
Value: "us-west-2",
|
|
Help: "US West (Oregon) Region\nNeeds location constraint us-west-2.",
|
|
}, {
|
|
Value: "us-west-1",
|
|
Help: "US West (Northern California) Region\nNeeds location constraint us-west-1.",
|
|
}, {
|
|
Value: "ca-central-1",
|
|
Help: "Canada (Central) Region\nNeeds location constraint ca-central-1.",
|
|
}, {
|
|
Value: "eu-west-1",
|
|
Help: "EU (Ireland) Region\nNeeds location constraint EU or eu-west-1.",
|
|
}, {
|
|
Value: "eu-west-2",
|
|
Help: "EU (London) Region\nNeeds location constraint eu-west-2.",
|
|
}, {
|
|
Value: "eu-central-1",
|
|
Help: "EU (Frankfurt) Region\nNeeds location constraint eu-central-1.",
|
|
}, {
|
|
Value: "ap-southeast-1",
|
|
Help: "Asia Pacific (Singapore) Region\nNeeds location constraint ap-southeast-1.",
|
|
}, {
|
|
Value: "ap-southeast-2",
|
|
Help: "Asia Pacific (Sydney) Region\nNeeds location constraint ap-southeast-2.",
|
|
}, {
|
|
Value: "ap-northeast-1",
|
|
Help: "Asia Pacific (Tokyo) Region\nNeeds location constraint ap-northeast-1.",
|
|
}, {
|
|
Value: "ap-northeast-2",
|
|
Help: "Asia Pacific (Seoul)\nNeeds location constraint ap-northeast-2.",
|
|
}, {
|
|
Value: "ap-south-1",
|
|
Help: "Asia Pacific (Mumbai)\nNeeds location constraint ap-south-1.",
|
|
}, {
|
|
Value: "sa-east-1",
|
|
Help: "South America (Sao Paulo) Region\nNeeds location constraint sa-east-1.",
|
|
}, {
|
|
Value: "other-v2-signature",
|
|
Help: "Use this only if v4 signatures don't work, eg pre Jewel/v10 CEPH.\nSet this and make sure you set the endpoint.",
|
|
}},
|
|
}, {
|
|
Name: "endpoint",
|
|
Help: "Endpoint for S3 API.\nLeave blank if using AWS to use the default endpoint for the region.\nSpecify if using an S3 clone such as Ceph.",
|
|
}, {
|
|
Name: "location_constraint",
|
|
Help: "Location constraint - must be set to match the Region. Used when creating buckets only.",
|
|
Examples: []fs.OptionExample{{
|
|
Value: "",
|
|
Help: "Empty for US Region, Northern Virginia or Pacific Northwest.",
|
|
}, {
|
|
Value: "us-east-2",
|
|
Help: "US East (Ohio) Region.",
|
|
}, {
|
|
Value: "us-west-2",
|
|
Help: "US West (Oregon) Region.",
|
|
}, {
|
|
Value: "us-west-1",
|
|
Help: "US West (Northern California) Region.",
|
|
}, {
|
|
Value: "ca-central-1",
|
|
Help: "Canada (Central) Region.",
|
|
}, {
|
|
Value: "eu-west-1",
|
|
Help: "EU (Ireland) Region.",
|
|
}, {
|
|
Value: "eu-west-2",
|
|
Help: "EU (London) Region.",
|
|
}, {
|
|
Value: "EU",
|
|
Help: "EU Region.",
|
|
}, {
|
|
Value: "ap-southeast-1",
|
|
Help: "Asia Pacific (Singapore) Region.",
|
|
}, {
|
|
Value: "ap-southeast-2",
|
|
Help: "Asia Pacific (Sydney) Region.",
|
|
}, {
|
|
Value: "ap-northeast-1",
|
|
Help: "Asia Pacific (Tokyo) Region.",
|
|
}, {
|
|
Value: "ap-northeast-2",
|
|
Help: "Asia Pacific (Seoul)",
|
|
}, {
|
|
Value: "ap-south-1",
|
|
Help: "Asia Pacific (Mumbai)",
|
|
}, {
|
|
Value: "sa-east-1",
|
|
Help: "South America (Sao Paulo) Region.",
|
|
}},
|
|
}, {
|
|
Name: "acl",
|
|
Help: "Canned ACL used when creating buckets and/or storing objects in S3.\nFor more info visit https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#canned-acl",
|
|
Examples: []fs.OptionExample{{
|
|
Value: "private",
|
|
Help: "Owner gets FULL_CONTROL. No one else has access rights (default).",
|
|
}, {
|
|
Value: "public-read",
|
|
Help: "Owner gets FULL_CONTROL. The AllUsers group gets READ access.",
|
|
}, {
|
|
Value: "public-read-write",
|
|
Help: "Owner gets FULL_CONTROL. The AllUsers group gets READ and WRITE access.\nGranting this on a bucket is generally not recommended.",
|
|
}, {
|
|
Value: "authenticated-read",
|
|
Help: "Owner gets FULL_CONTROL. The AuthenticatedUsers group gets READ access.",
|
|
}, {
|
|
Value: "bucket-owner-read",
|
|
Help: "Object owner gets FULL_CONTROL. Bucket owner gets READ access.\nIf you specify this canned ACL when creating a bucket, Amazon S3 ignores it.",
|
|
}, {
|
|
Value: "bucket-owner-full-control",
|
|
Help: "Both the object owner and the bucket owner get FULL_CONTROL over the object.\nIf you specify this canned ACL when creating a bucket, Amazon S3 ignores it.",
|
|
}},
|
|
}, {
|
|
Name: "server_side_encryption",
|
|
Help: "The server-side encryption algorithm used when storing this object in S3.",
|
|
Examples: []fs.OptionExample{{
|
|
Value: "",
|
|
Help: "None",
|
|
}, {
|
|
Value: "AES256",
|
|
Help: "AES256",
|
|
}},
|
|
}, {
|
|
Name: "storage_class",
|
|
Help: "The storage class to use when storing objects in S3.",
|
|
Examples: []fs.OptionExample{{
|
|
Value: "",
|
|
Help: "Default",
|
|
}, {
|
|
Value: "STANDARD",
|
|
Help: "Standard storage class",
|
|
}, {
|
|
Value: "REDUCED_REDUNDANCY",
|
|
Help: "Reduced redundancy storage class",
|
|
}, {
|
|
Value: "STANDARD_IA",
|
|
Help: "Standard Infrequent Access storage class",
|
|
}},
|
|
}},
|
|
})
|
|
}
|
|
|
|
// Constants
|
|
const (
|
|
metaMtime = "Mtime" // the meta key to store mtime in - eg X-Amz-Meta-Mtime
|
|
metaMD5Hash = "Md5chksum" // the meta key to store md5hash in
|
|
listChunkSize = 1000 // number of items to read at once
|
|
maxRetries = 10 // number of retries to make of operations
|
|
maxSizeForCopy = 5 * 1024 * 1024 * 1024 // The maximum size of object we can COPY
|
|
maxFileSize = 5 * 1024 * 1024 * 1024 * 1024 // largest possible upload file size
|
|
)
|
|
|
|
// Globals
|
|
var (
|
|
// Flags
|
|
s3ACL = flags.StringP("s3-acl", "", "", "Canned ACL used when creating buckets and/or storing objects in S3")
|
|
s3StorageClass = flags.StringP("s3-storage-class", "", "", "Storage class to use when uploading S3 objects (STANDARD|REDUCED_REDUNDANCY|STANDARD_IA)")
|
|
)
|
|
|
|
// Fs represents a remote s3 server
|
|
type Fs struct {
|
|
name string // the name of the remote
|
|
root string // root of the bucket - ignore all objects above this
|
|
features *fs.Features // optional features
|
|
c *s3.S3 // the connection to the s3 server
|
|
ses *session.Session // the s3 session
|
|
bucket string // the bucket we are working on
|
|
bucketOKMu sync.Mutex // mutex to protect bucket OK
|
|
bucketOK bool // true if we have created the bucket
|
|
bucketDeleted bool // true if we have deleted the bucket
|
|
acl string // ACL for new buckets / objects
|
|
locationConstraint string // location constraint of new buckets
|
|
sse string // the type of server-side encryption
|
|
storageClass string // storage class
|
|
}
|
|
|
|
// Object describes a s3 object
|
|
type Object struct {
|
|
// Will definitely have everything but meta which may be nil
|
|
//
|
|
// List will read everything but meta & mimeType - to fill
|
|
// that in you need to call readMetaData
|
|
fs *Fs // what this object is part of
|
|
remote string // The remote path
|
|
etag string // md5sum of the object
|
|
bytes int64 // size of the object
|
|
lastModified time.Time // Last modified
|
|
meta map[string]*string // The object metadata if known - may be nil
|
|
mimeType string // MimeType of object - may be ""
|
|
}
|
|
|
|
// ------------------------------------------------------------
|
|
|
|
// Name of the remote (as passed into NewFs)
|
|
func (f *Fs) Name() string {
|
|
return f.name
|
|
}
|
|
|
|
// Root of the remote (as passed into NewFs)
|
|
func (f *Fs) Root() string {
|
|
if f.root == "" {
|
|
return f.bucket
|
|
}
|
|
return f.bucket + "/" + f.root
|
|
}
|
|
|
|
// String converts this Fs to a string
|
|
func (f *Fs) String() string {
|
|
if f.root == "" {
|
|
return fmt.Sprintf("S3 bucket %s", f.bucket)
|
|
}
|
|
return fmt.Sprintf("S3 bucket %s path %s", f.bucket, f.root)
|
|
}
|
|
|
|
// Features returns the optional features of this Fs
|
|
func (f *Fs) Features() *fs.Features {
|
|
return f.features
|
|
}
|
|
|
|
// Pattern to match a s3 path
|
|
var matcher = regexp.MustCompile(`^([^/]*)(.*)$`)
|
|
|
|
// parseParse parses a s3 'url'
|
|
func s3ParsePath(path string) (bucket, directory string, err error) {
|
|
parts := matcher.FindStringSubmatch(path)
|
|
if parts == nil {
|
|
err = errors.Errorf("couldn't parse bucket out of s3 path %q", path)
|
|
} else {
|
|
bucket, directory = parts[1], parts[2]
|
|
directory = strings.Trim(directory, "/")
|
|
}
|
|
return
|
|
}
|
|
|
|
// s3Connection makes a connection to s3
|
|
func s3Connection(name string) (*s3.S3, *session.Session, error) {
|
|
// Make the auth
|
|
v := credentials.Value{
|
|
AccessKeyID: config.FileGet(name, "access_key_id"),
|
|
SecretAccessKey: config.FileGet(name, "secret_access_key"),
|
|
SessionToken: config.FileGet(name, "session_token"),
|
|
}
|
|
|
|
lowTimeoutClient := &http.Client{Timeout: 1 * time.Second} // low timeout to ec2 metadata service
|
|
def := defaults.Get()
|
|
def.Config.HTTPClient = lowTimeoutClient
|
|
|
|
// first provider to supply a credential set "wins"
|
|
providers := []credentials.Provider{
|
|
// use static credentials if they're present (checked by provider)
|
|
&credentials.StaticProvider{Value: v},
|
|
|
|
// * Access Key ID: AWS_ACCESS_KEY_ID or AWS_ACCESS_KEY
|
|
// * Secret Access Key: AWS_SECRET_ACCESS_KEY or AWS_SECRET_KEY
|
|
&credentials.EnvProvider{},
|
|
|
|
// Pick up IAM role if we're in an ECS task
|
|
defaults.RemoteCredProvider(*def.Config, def.Handlers),
|
|
|
|
// Pick up IAM role in case we're on EC2
|
|
&ec2rolecreds.EC2RoleProvider{
|
|
Client: ec2metadata.New(session.New(), &aws.Config{
|
|
HTTPClient: lowTimeoutClient,
|
|
}),
|
|
ExpiryWindow: 3,
|
|
},
|
|
}
|
|
cred := credentials.NewChainCredentials(providers)
|
|
|
|
switch {
|
|
case config.FileGetBool(name, "env_auth", false):
|
|
// No need for empty checks if "env_auth" is true
|
|
case v.AccessKeyID == "" && v.SecretAccessKey == "":
|
|
// if no access key/secret and iam is explicitly disabled then fall back to anon interaction
|
|
cred = credentials.AnonymousCredentials
|
|
case v.AccessKeyID == "":
|
|
return nil, nil, errors.New("access_key_id not found")
|
|
case v.SecretAccessKey == "":
|
|
return nil, nil, errors.New("secret_access_key not found")
|
|
}
|
|
|
|
endpoint := config.FileGet(name, "endpoint")
|
|
region := config.FileGet(name, "region")
|
|
if region == "" && endpoint == "" {
|
|
endpoint = "https://s3.amazonaws.com/"
|
|
}
|
|
if region == "" {
|
|
region = "us-east-1"
|
|
}
|
|
awsConfig := aws.NewConfig().
|
|
WithRegion(region).
|
|
WithMaxRetries(maxRetries).
|
|
WithCredentials(cred).
|
|
WithEndpoint(endpoint).
|
|
WithHTTPClient(fshttp.NewClient(fs.Config)).
|
|
WithS3ForcePathStyle(true)
|
|
// awsConfig.WithLogLevel(aws.LogDebugWithSigning)
|
|
ses := session.New()
|
|
c := s3.New(ses, awsConfig)
|
|
if region == "other-v2-signature" {
|
|
fs.Debugf(name, "Using v2 auth")
|
|
signer := func(req *request.Request) {
|
|
// Ignore AnonymousCredentials object
|
|
if req.Config.Credentials == credentials.AnonymousCredentials {
|
|
return
|
|
}
|
|
sign(v.AccessKeyID, v.SecretAccessKey, req.HTTPRequest)
|
|
}
|
|
c.Handlers.Sign.Clear()
|
|
c.Handlers.Sign.PushBackNamed(corehandlers.BuildContentLengthHandler)
|
|
c.Handlers.Sign.PushBack(signer)
|
|
}
|
|
return c, ses, nil
|
|
}
|
|
|
|
// NewFs constructs an Fs from the path, bucket:path
|
|
func NewFs(name, root string) (fs.Fs, error) {
|
|
bucket, directory, err := s3ParsePath(root)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
c, ses, err := s3Connection(name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
f := &Fs{
|
|
name: name,
|
|
c: c,
|
|
bucket: bucket,
|
|
ses: ses,
|
|
acl: config.FileGet(name, "acl"),
|
|
root: directory,
|
|
locationConstraint: config.FileGet(name, "location_constraint"),
|
|
sse: config.FileGet(name, "server_side_encryption"),
|
|
storageClass: config.FileGet(name, "storage_class"),
|
|
}
|
|
f.features = (&fs.Features{
|
|
ReadMimeType: true,
|
|
WriteMimeType: true,
|
|
BucketBased: true,
|
|
}).Fill(f)
|
|
if *s3ACL != "" {
|
|
f.acl = *s3ACL
|
|
}
|
|
if *s3StorageClass != "" {
|
|
f.storageClass = *s3StorageClass
|
|
}
|
|
if f.root != "" {
|
|
f.root += "/"
|
|
// Check to see if the object exists
|
|
req := s3.HeadObjectInput{
|
|
Bucket: &f.bucket,
|
|
Key: &directory,
|
|
}
|
|
_, err = f.c.HeadObject(&req)
|
|
if err == nil {
|
|
f.root = path.Dir(directory)
|
|
if f.root == "." {
|
|
f.root = ""
|
|
} else {
|
|
f.root += "/"
|
|
}
|
|
// return an error with an fs which points to the parent
|
|
return f, fs.ErrorIsFile
|
|
}
|
|
}
|
|
// f.listMultipartUploads()
|
|
return f, nil
|
|
}
|
|
|
|
// Return an Object from a path
|
|
//
|
|
//If it can't be found it returns the error ErrorObjectNotFound.
|
|
func (f *Fs) newObjectWithInfo(remote string, info *s3.Object) (fs.Object, error) {
|
|
o := &Object{
|
|
fs: f,
|
|
remote: remote,
|
|
}
|
|
if info != nil {
|
|
// Set info but not meta
|
|
if info.LastModified == nil {
|
|
fs.Logf(o, "Failed to read last modified")
|
|
o.lastModified = time.Now()
|
|
} else {
|
|
o.lastModified = *info.LastModified
|
|
}
|
|
o.etag = aws.StringValue(info.ETag)
|
|
o.bytes = aws.Int64Value(info.Size)
|
|
} else {
|
|
err := o.readMetaData() // reads info and meta, returning an error
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return o, nil
|
|
}
|
|
|
|
// NewObject finds the Object at remote. If it can't be found
|
|
// it returns the error fs.ErrorObjectNotFound.
|
|
func (f *Fs) NewObject(remote string) (fs.Object, error) {
|
|
return f.newObjectWithInfo(remote, nil)
|
|
}
|
|
|
|
// listFn is called from list to handle an object.
|
|
type listFn func(remote string, object *s3.Object, isDirectory bool) error
|
|
|
|
// list the objects into the function supplied
|
|
//
|
|
// dir is the starting directory, "" for root
|
|
//
|
|
// Set recurse to read sub directories
|
|
func (f *Fs) list(dir string, recurse bool, fn listFn) error {
|
|
root := f.root
|
|
if dir != "" {
|
|
root += dir + "/"
|
|
}
|
|
maxKeys := int64(listChunkSize)
|
|
delimiter := ""
|
|
if !recurse {
|
|
delimiter = "/"
|
|
}
|
|
var marker *string
|
|
for {
|
|
// FIXME need to implement ALL loop
|
|
req := s3.ListObjectsInput{
|
|
Bucket: &f.bucket,
|
|
Delimiter: &delimiter,
|
|
Prefix: &root,
|
|
MaxKeys: &maxKeys,
|
|
Marker: marker,
|
|
}
|
|
resp, err := f.c.ListObjects(&req)
|
|
if err != nil {
|
|
if awsErr, ok := err.(awserr.RequestFailure); ok {
|
|
if awsErr.StatusCode() == http.StatusNotFound {
|
|
err = fs.ErrorDirNotFound
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
rootLength := len(f.root)
|
|
if !recurse {
|
|
for _, commonPrefix := range resp.CommonPrefixes {
|
|
if commonPrefix.Prefix == nil {
|
|
fs.Logf(f, "Nil common prefix received")
|
|
continue
|
|
}
|
|
remote := *commonPrefix.Prefix
|
|
if !strings.HasPrefix(remote, f.root) {
|
|
fs.Logf(f, "Odd name received %q", remote)
|
|
continue
|
|
}
|
|
remote = remote[rootLength:]
|
|
if strings.HasSuffix(remote, "/") {
|
|
remote = remote[:len(remote)-1]
|
|
}
|
|
err = fn(remote, &s3.Object{Key: &remote}, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
for _, object := range resp.Contents {
|
|
key := aws.StringValue(object.Key)
|
|
if !strings.HasPrefix(key, f.root) {
|
|
fs.Logf(f, "Odd name received %q", key)
|
|
continue
|
|
}
|
|
remote := key[rootLength:]
|
|
// is this a directory marker?
|
|
if (strings.HasSuffix(remote, "/") || remote == "") && *object.Size == 0 {
|
|
if recurse {
|
|
// add a directory in if --fast-list since will have no prefixes
|
|
remote = remote[:len(remote)-1]
|
|
err = fn(remote, &s3.Object{Key: &remote}, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
continue // skip directory marker
|
|
}
|
|
err = fn(remote, object, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if !aws.BoolValue(resp.IsTruncated) {
|
|
break
|
|
}
|
|
// Use NextMarker if set, otherwise use last Key
|
|
if resp.NextMarker == nil || *resp.NextMarker == "" {
|
|
if len(resp.Contents) == 0 {
|
|
return errors.New("s3 protocol error: received listing with IsTruncated set, no NextMarker and no Contents")
|
|
}
|
|
marker = resp.Contents[len(resp.Contents)-1].Key
|
|
} else {
|
|
marker = resp.NextMarker
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Convert a list item into a DirEntry
|
|
func (f *Fs) itemToDirEntry(remote string, object *s3.Object, isDirectory bool) (fs.DirEntry, error) {
|
|
if isDirectory {
|
|
size := int64(0)
|
|
if object.Size != nil {
|
|
size = *object.Size
|
|
}
|
|
d := fs.NewDir(remote, time.Time{}).SetSize(size)
|
|
return d, nil
|
|
}
|
|
o, err := f.newObjectWithInfo(remote, object)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return o, nil
|
|
}
|
|
|
|
// mark the bucket as being OK
|
|
func (f *Fs) markBucketOK() {
|
|
if f.bucket != "" {
|
|
f.bucketOKMu.Lock()
|
|
f.bucketOK = true
|
|
f.bucketDeleted = false
|
|
f.bucketOKMu.Unlock()
|
|
}
|
|
}
|
|
|
|
// listDir lists files and directories to out
|
|
func (f *Fs) listDir(dir string) (entries fs.DirEntries, err error) {
|
|
// List the objects and directories
|
|
err = f.list(dir, false, func(remote string, object *s3.Object, isDirectory bool) error {
|
|
entry, err := f.itemToDirEntry(remote, object, isDirectory)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if entry != nil {
|
|
entries = append(entries, entry)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// bucket must be present if listing succeeded
|
|
f.markBucketOK()
|
|
return entries, nil
|
|
}
|
|
|
|
// listBuckets lists the buckets to out
|
|
func (f *Fs) listBuckets(dir string) (entries fs.DirEntries, err error) {
|
|
if dir != "" {
|
|
return nil, fs.ErrorListBucketRequired
|
|
}
|
|
req := s3.ListBucketsInput{}
|
|
resp, err := f.c.ListBuckets(&req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, bucket := range resp.Buckets {
|
|
d := fs.NewDir(aws.StringValue(bucket.Name), aws.TimeValue(bucket.CreationDate))
|
|
entries = append(entries, d)
|
|
}
|
|
return entries, nil
|
|
}
|
|
|
|
// List the objects and directories in dir into entries. The
|
|
// entries can be returned in any order but should be for a
|
|
// complete directory.
|
|
//
|
|
// dir should be "" to list the root, and should not have
|
|
// trailing slashes.
|
|
//
|
|
// This should return ErrDirNotFound if the directory isn't
|
|
// found.
|
|
func (f *Fs) List(dir string) (entries fs.DirEntries, err error) {
|
|
if f.bucket == "" {
|
|
return f.listBuckets(dir)
|
|
}
|
|
return f.listDir(dir)
|
|
}
|
|
|
|
// ListR lists the objects and directories of the Fs starting
|
|
// from dir recursively into out.
|
|
//
|
|
// dir should be "" to start from the root, and should not
|
|
// have trailing slashes.
|
|
//
|
|
// This should return ErrDirNotFound if the directory isn't
|
|
// found.
|
|
//
|
|
// It should call callback for each tranche of entries read.
|
|
// These need not be returned in any particular order. If
|
|
// callback returns an error then the listing will stop
|
|
// immediately.
|
|
//
|
|
// Don't implement this unless you have a more efficient way
|
|
// of listing recursively that doing a directory traversal.
|
|
func (f *Fs) ListR(dir string, callback fs.ListRCallback) (err error) {
|
|
if f.bucket == "" {
|
|
return fs.ErrorListBucketRequired
|
|
}
|
|
list := walk.NewListRHelper(callback)
|
|
err = f.list(dir, true, func(remote string, object *s3.Object, isDirectory bool) error {
|
|
entry, err := f.itemToDirEntry(remote, object, isDirectory)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return list.Add(entry)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// bucket must be present if listing succeeded
|
|
f.markBucketOK()
|
|
return list.Flush()
|
|
}
|
|
|
|
// Put the Object into the bucket
|
|
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
|
// Temporary Object under construction
|
|
fs := &Object{
|
|
fs: f,
|
|
remote: src.Remote(),
|
|
}
|
|
return fs, fs.Update(in, src, options...)
|
|
}
|
|
|
|
// PutStream uploads to the remote path with the modTime given of indeterminate size
|
|
func (f *Fs) PutStream(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
|
return f.Put(in, src, options...)
|
|
}
|
|
|
|
// Check if the bucket exists
|
|
//
|
|
// NB this can return incorrect results if called immediately after bucket deletion
|
|
func (f *Fs) dirExists() (bool, error) {
|
|
req := s3.HeadBucketInput{
|
|
Bucket: &f.bucket,
|
|
}
|
|
_, err := f.c.HeadBucket(&req)
|
|
if err == nil {
|
|
return true, nil
|
|
}
|
|
if err, ok := err.(awserr.RequestFailure); ok {
|
|
if err.StatusCode() == http.StatusNotFound {
|
|
return false, nil
|
|
}
|
|
}
|
|
return false, err
|
|
}
|
|
|
|
// Mkdir creates the bucket if it doesn't exist
|
|
func (f *Fs) Mkdir(dir string) error {
|
|
f.bucketOKMu.Lock()
|
|
defer f.bucketOKMu.Unlock()
|
|
if f.bucketOK {
|
|
return nil
|
|
}
|
|
if !f.bucketDeleted {
|
|
exists, err := f.dirExists()
|
|
if err == nil {
|
|
f.bucketOK = exists
|
|
}
|
|
if err != nil || exists {
|
|
return err
|
|
}
|
|
}
|
|
req := s3.CreateBucketInput{
|
|
Bucket: &f.bucket,
|
|
ACL: &f.acl,
|
|
}
|
|
if f.locationConstraint != "" {
|
|
req.CreateBucketConfiguration = &s3.CreateBucketConfiguration{
|
|
LocationConstraint: &f.locationConstraint,
|
|
}
|
|
}
|
|
_, err := f.c.CreateBucket(&req)
|
|
if err, ok := err.(awserr.Error); ok {
|
|
if err.Code() == "BucketAlreadyOwnedByYou" {
|
|
err = nil
|
|
}
|
|
}
|
|
if err == nil {
|
|
f.bucketOK = true
|
|
f.bucketDeleted = false
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Rmdir deletes the bucket if the fs is at the root
|
|
//
|
|
// Returns an error if it isn't empty
|
|
func (f *Fs) Rmdir(dir string) error {
|
|
f.bucketOKMu.Lock()
|
|
defer f.bucketOKMu.Unlock()
|
|
if f.root != "" || dir != "" {
|
|
return nil
|
|
}
|
|
req := s3.DeleteBucketInput{
|
|
Bucket: &f.bucket,
|
|
}
|
|
_, err := f.c.DeleteBucket(&req)
|
|
if err == nil {
|
|
f.bucketOK = false
|
|
f.bucketDeleted = true
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Precision of the remote
|
|
func (f *Fs) Precision() time.Duration {
|
|
return time.Nanosecond
|
|
}
|
|
|
|
// pathEscape escapes s as for a URL path. It uses rest.URLPathEscape
|
|
// but also escapes '+' for S3 and Digital Ocean spaces compatibility
|
|
func pathEscape(s string) string {
|
|
return strings.Replace(rest.URLPathEscape(s), "+", "%2B", -1)
|
|
}
|
|
|
|
// Copy src to this remote using server side copy operations.
|
|
//
|
|
// This is stored with the remote path given
|
|
//
|
|
// It returns the destination Object and a possible error
|
|
//
|
|
// Will only be called if src.Fs().Name() == f.Name()
|
|
//
|
|
// If it isn't possible then return fs.ErrorCantCopy
|
|
func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) {
|
|
err := f.Mkdir("")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
srcObj, ok := src.(*Object)
|
|
if !ok {
|
|
fs.Debugf(src, "Can't copy - not same remote type")
|
|
return nil, fs.ErrorCantCopy
|
|
}
|
|
srcFs := srcObj.fs
|
|
key := f.root + remote
|
|
source := pathEscape(srcFs.bucket + "/" + srcFs.root + srcObj.remote)
|
|
req := s3.CopyObjectInput{
|
|
Bucket: &f.bucket,
|
|
Key: &key,
|
|
CopySource: &source,
|
|
MetadataDirective: aws.String(s3.MetadataDirectiveCopy),
|
|
}
|
|
_, err = f.c.CopyObject(&req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return f.NewObject(remote)
|
|
}
|
|
|
|
// Hashes returns the supported hash sets.
|
|
func (f *Fs) Hashes() hash.Set {
|
|
return hash.Set(hash.MD5)
|
|
}
|
|
|
|
// ------------------------------------------------------------
|
|
|
|
// Fs returns the parent Fs
|
|
func (o *Object) Fs() fs.Info {
|
|
return o.fs
|
|
}
|
|
|
|
// Return a string version
|
|
func (o *Object) String() string {
|
|
if o == nil {
|
|
return "<nil>"
|
|
}
|
|
return o.remote
|
|
}
|
|
|
|
// Remote returns the remote path
|
|
func (o *Object) Remote() string {
|
|
return o.remote
|
|
}
|
|
|
|
var matchMd5 = regexp.MustCompile(`^[0-9a-f]{32}$`)
|
|
|
|
// Hash returns the Md5sum of an object returning a lowercase hex string
|
|
func (o *Object) Hash(t hash.Type) (string, error) {
|
|
if t != hash.MD5 {
|
|
return "", hash.ErrUnsupported
|
|
}
|
|
hash := strings.Trim(strings.ToLower(o.etag), `"`)
|
|
// Check the etag is a valid md5sum
|
|
if !matchMd5.MatchString(hash) {
|
|
err := o.readMetaData()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
if md5sum, ok := o.meta[metaMD5Hash]; ok {
|
|
md5sumBytes, err := base64.StdEncoding.DecodeString(*md5sum)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
hash = hex.EncodeToString(md5sumBytes)
|
|
} else {
|
|
hash = ""
|
|
}
|
|
}
|
|
return hash, nil
|
|
}
|
|
|
|
// Size returns the size of an object in bytes
|
|
func (o *Object) Size() int64 {
|
|
return o.bytes
|
|
}
|
|
|
|
// readMetaData gets the metadata if it hasn't already been fetched
|
|
//
|
|
// it also sets the info
|
|
func (o *Object) readMetaData() (err error) {
|
|
if o.meta != nil {
|
|
return nil
|
|
}
|
|
key := o.fs.root + o.remote
|
|
req := s3.HeadObjectInput{
|
|
Bucket: &o.fs.bucket,
|
|
Key: &key,
|
|
}
|
|
resp, err := o.fs.c.HeadObject(&req)
|
|
if err != nil {
|
|
if awsErr, ok := err.(awserr.RequestFailure); ok {
|
|
if awsErr.StatusCode() == http.StatusNotFound {
|
|
return fs.ErrorObjectNotFound
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
var size int64
|
|
// Ignore missing Content-Length assuming it is 0
|
|
// Some versions of ceph do this due their apache proxies
|
|
if resp.ContentLength != nil {
|
|
size = *resp.ContentLength
|
|
}
|
|
o.etag = aws.StringValue(resp.ETag)
|
|
o.bytes = size
|
|
o.meta = resp.Metadata
|
|
if resp.LastModified == nil {
|
|
fs.Logf(o, "Failed to read last modified from HEAD: %v", err)
|
|
o.lastModified = time.Now()
|
|
} else {
|
|
o.lastModified = *resp.LastModified
|
|
}
|
|
o.mimeType = aws.StringValue(resp.ContentType)
|
|
return nil
|
|
}
|
|
|
|
// ModTime returns the modification time of the object
|
|
//
|
|
// It attempts to read the objects mtime and if that isn't present the
|
|
// LastModified returned in the http headers
|
|
func (o *Object) ModTime() time.Time {
|
|
err := o.readMetaData()
|
|
if err != nil {
|
|
fs.Logf(o, "Failed to read metadata: %v", err)
|
|
return time.Now()
|
|
}
|
|
// read mtime out of metadata if available
|
|
d, ok := o.meta[metaMtime]
|
|
if !ok || d == nil {
|
|
// fs.Debugf(o, "No metadata")
|
|
return o.lastModified
|
|
}
|
|
modTime, err := swift.FloatStringToTime(*d)
|
|
if err != nil {
|
|
fs.Logf(o, "Failed to read mtime from object: %v", err)
|
|
return o.lastModified
|
|
}
|
|
return modTime
|
|
}
|
|
|
|
// SetModTime sets the modification time of the local fs object
|
|
func (o *Object) SetModTime(modTime time.Time) error {
|
|
err := o.readMetaData()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
o.meta[metaMtime] = aws.String(swift.TimeToFloatString(modTime))
|
|
|
|
if o.bytes >= maxSizeForCopy {
|
|
fs.Debugf(o, "SetModTime is unsupported for objects bigger than %v bytes", fs.SizeSuffix(maxSizeForCopy))
|
|
return nil
|
|
}
|
|
|
|
// Guess the content type
|
|
mimeType := fs.MimeType(o)
|
|
|
|
// Copy the object to itself to update the metadata
|
|
key := o.fs.root + o.remote
|
|
sourceKey := o.fs.bucket + "/" + key
|
|
directive := s3.MetadataDirectiveReplace // replace metadata with that passed in
|
|
req := s3.CopyObjectInput{
|
|
Bucket: &o.fs.bucket,
|
|
ACL: &o.fs.acl,
|
|
Key: &key,
|
|
ContentType: &mimeType,
|
|
CopySource: aws.String(pathEscape(sourceKey)),
|
|
Metadata: o.meta,
|
|
MetadataDirective: &directive,
|
|
}
|
|
_, err = o.fs.c.CopyObject(&req)
|
|
return err
|
|
}
|
|
|
|
// Storable raturns a boolean indicating if this object is storable
|
|
func (o *Object) Storable() bool {
|
|
return true
|
|
}
|
|
|
|
// Open an object for read
|
|
func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
|
|
key := o.fs.root + o.remote
|
|
req := s3.GetObjectInput{
|
|
Bucket: &o.fs.bucket,
|
|
Key: &key,
|
|
}
|
|
for _, option := range options {
|
|
switch option.(type) {
|
|
case *fs.RangeOption, *fs.SeekOption:
|
|
_, value := option.Header()
|
|
req.Range = &value
|
|
default:
|
|
if option.Mandatory() {
|
|
fs.Logf(o, "Unsupported mandatory option: %v", option)
|
|
}
|
|
}
|
|
}
|
|
resp, err := o.fs.c.GetObject(&req)
|
|
if err, ok := err.(awserr.RequestFailure); ok {
|
|
if err.Code() == "InvalidObjectState" {
|
|
return nil, errors.Errorf("Object in GLACIER, restore first: %v", key)
|
|
}
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resp.Body, nil
|
|
}
|
|
|
|
// Update the Object from in with modTime and size
|
|
func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
|
err := o.fs.Mkdir("")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
modTime := src.ModTime()
|
|
size := src.Size()
|
|
|
|
uploader := s3manager.NewUploader(o.fs.ses, func(u *s3manager.Uploader) {
|
|
u.Concurrency = 2
|
|
u.LeavePartsOnError = false
|
|
u.S3 = o.fs.c
|
|
u.PartSize = s3manager.MinUploadPartSize
|
|
|
|
if size == -1 {
|
|
// Make parts as small as possible while still being able to upload to the
|
|
// S3 file size limit. Rounded up to nearest MB.
|
|
u.PartSize = (((maxFileSize / s3manager.MaxUploadParts) >> 20) + 1) << 20
|
|
return
|
|
}
|
|
// Adjust PartSize until the number of parts is small enough.
|
|
if size/u.PartSize >= s3manager.MaxUploadParts {
|
|
// Calculate partition size rounded up to the nearest MB
|
|
u.PartSize = (((size / s3manager.MaxUploadParts) >> 20) + 1) << 20
|
|
}
|
|
})
|
|
|
|
// Set the mtime in the meta data
|
|
metadata := map[string]*string{
|
|
metaMtime: aws.String(swift.TimeToFloatString(modTime)),
|
|
}
|
|
|
|
if size > uploader.PartSize {
|
|
hash, err := src.Hash(hash.MD5)
|
|
|
|
if err == nil && matchMd5.MatchString(hash) {
|
|
hashBytes, err := hex.DecodeString(hash)
|
|
|
|
if err == nil {
|
|
metadata[metaMD5Hash] = aws.String(base64.StdEncoding.EncodeToString(hashBytes))
|
|
}
|
|
}
|
|
}
|
|
|
|
// Guess the content type
|
|
mimeType := fs.MimeType(src)
|
|
|
|
key := o.fs.root + o.remote
|
|
req := s3manager.UploadInput{
|
|
Bucket: &o.fs.bucket,
|
|
ACL: &o.fs.acl,
|
|
Key: &key,
|
|
Body: in,
|
|
ContentType: &mimeType,
|
|
Metadata: metadata,
|
|
//ContentLength: &size,
|
|
}
|
|
if o.fs.sse != "" {
|
|
req.ServerSideEncryption = &o.fs.sse
|
|
}
|
|
if o.fs.storageClass != "" {
|
|
req.StorageClass = &o.fs.storageClass
|
|
}
|
|
_, err = uploader.Upload(&req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Read the metadata from the newly created object
|
|
o.meta = nil // wipe old metadata
|
|
err = o.readMetaData()
|
|
return err
|
|
}
|
|
|
|
// Remove an object
|
|
func (o *Object) Remove() error {
|
|
key := o.fs.root + o.remote
|
|
req := s3.DeleteObjectInput{
|
|
Bucket: &o.fs.bucket,
|
|
Key: &key,
|
|
}
|
|
_, err := o.fs.c.DeleteObject(&req)
|
|
return err
|
|
}
|
|
|
|
// MimeType of an Object if known, "" otherwise
|
|
func (o *Object) MimeType() string {
|
|
err := o.readMetaData()
|
|
if err != nil {
|
|
fs.Logf(o, "Failed to read metadata: %v", err)
|
|
return ""
|
|
}
|
|
return o.mimeType
|
|
}
|
|
|
|
// Check the interfaces are satisfied
|
|
var (
|
|
_ fs.Fs = &Fs{}
|
|
_ fs.Copier = &Fs{}
|
|
_ fs.PutStreamer = &Fs{}
|
|
_ fs.ListRer = &Fs{}
|
|
_ fs.Object = &Object{}
|
|
_ fs.MimeTyper = &Object{}
|
|
)
|