diff --git a/swift/swift.go b/swift/swift.go index f052fa2ad..a6d94f820 100644 --- a/swift/swift.go +++ b/swift/swift.go @@ -66,10 +66,11 @@ func init() { // FsSwift represents a remote swift server type FsSwift struct { - name string // name of this remote - c swift.Connection // the connection to the swift server - container string // the container we are working on - root string // the path we are working on if any + name string // name of this remote + c swift.Connection // the connection to the swift server + container string // the container we are working on + segmentsContainer string // container to store the segments (if any) in + root string // the path we are working on if any } // FsObjectSwift describes a swift object @@ -163,10 +164,11 @@ func NewFs(name, root string) (fs.Fs, error) { return nil, err } f := &FsSwift{ - name: name, - c: *c, - container: container, - root: directory, + name: name, + c: *c, + container: container, + segmentsContainer: container + "_segments", + root: directory, } if f.root != "" { f.root += "/" @@ -216,21 +218,25 @@ func (f *FsSwift) NewFsObject(remote string) fs.Object { return f.newFsObjectWithInfo(remote, nil) } -// list the objects into the function supplied +// listFn is called from list and listContainerRoot to handle an object +type listFn func(string, *swift.Object) error + +// listContainerRoot lists the objects into the function supplied from +// the container and root supplied // // If directories is set it only sends directories -func (f *FsSwift) list(directories bool, fn func(string, *swift.Object)) { +func (f *FsSwift) listContainerRoot(container, root string, directories bool, fn listFn) error { // Options for ObjectsWalk opts := swift.ObjectsOpts{ - Prefix: f.root, + Prefix: root, Limit: 256, } if directories { opts.Delimiter = '/' } - rootLength := len(f.root) - err := f.c.ObjectsWalk(f.container, &opts, func(opts *swift.ObjectsOpts) (interface{}, error) { - objects, err := f.c.Objects(f.container, opts) + rootLength := len(root) + return f.c.ObjectsWalk(container, &opts, func(opts *swift.ObjectsOpts) (interface{}, error) { + objects, err := f.c.Objects(container, opts) if err == nil { for i := range objects { object := &objects[i] @@ -241,16 +247,26 @@ func (f *FsSwift) list(directories bool, fn func(string, *swift.Object)) { } object.Name = object.Name[:len(object.Name)-1] } - if !strings.HasPrefix(object.Name, f.root) { + if !strings.HasPrefix(object.Name, root) { fs.Log(f, "Odd name received %q", object.Name) continue } remote := object.Name[rootLength:] - fn(remote, object) + err = fn(remote, object) + if err != nil { + break + } } } return objects, err }) +} + +// list the objects into the function supplied +// +// If directories is set it only sends directories +func (f *FsSwift) list(directories bool, fn listFn) { + err := f.listContainerRoot(f.container, f.root, directories, fn) if err != nil { fs.Stats.Error() fs.ErrorLog(f, "Couldn't read container %q: %s", f.container, err) @@ -269,10 +285,18 @@ func (f *FsSwift) List() fs.ObjectsChan { // List the objects go func() { defer close(out) - f.list(false, func(remote string, object *swift.Object) { - if fs := f.newFsObjectWithInfo(remote, object); fs != nil { - out <- fs + f.list(false, func(remote string, object *swift.Object) error { + if o := f.newFsObjectWithInfo(remote, object); o != nil { + // Do full metadata read on 0 size objects which might be manifest files + if o.Size() == 0 { + err := o.(*FsObjectSwift).readMetaData() + if err != nil { + fs.Debug(o, "Failed to read metadata: %v", err) + } + } + out <- o } + return nil }) }() } @@ -304,12 +328,13 @@ func (f *FsSwift) ListDir() fs.DirChan { // List the directories in the path in the container go func() { defer close(out) - f.list(true, func(remote string, object *swift.Object) { + f.list(true, func(remote string, object *swift.Object) error { out <- &fs.Dir{ Name: remote, Bytes: object.Bytes, Count: 0, } + return nil }) }() } @@ -394,7 +419,7 @@ func (o *FsObjectSwift) Md5sum() (string, error) { return "", err } if isManifest { - fs.Debug(o, "Return empty md5 for swift manifest file. Md5 of manifest file calculate as md5 of md5 of it's parts, so it's not original md5") + fs.Debug(o, "Returning empty Md5sum for swift manifest file") return "", nil } return strings.ToLower(o.info.Hash), nil @@ -404,6 +429,9 @@ func (o *FsObjectSwift) Md5sum() (string, error) { func (o *FsObjectSwift) isManifestFile() (bool, error) { err := o.readMetaData() if err != nil { + if err == swift.ObjectNotFound { + return false, nil + } return false, err } _, isManifestFile := (*o.headers)["X-Object-Manifest"] @@ -491,76 +519,100 @@ func min(x, y int64) int64 { return y } -// nsToSwiftFloatString turns a number of ns into a floating point -// string in seconds the same way as the "swift" tool -func nsToSwiftFloatString(ns int64) string { - if ns < 0 { - return "-" + nsToSwiftFloatString(-ns) +// removeSegments removes any old segments from o +// +// if except is passed in then segments with that prefix won't be deleted +func (o *FsObjectSwift) removeSegments(except string) error { + segmentsRoot := o.swift.root + o.remote + "/" + err := o.swift.listContainerRoot(o.swift.segmentsContainer, segmentsRoot, false, func(remote string, object *swift.Object) error { + if except != "" && strings.HasPrefix(remote, except) { + // fs.Debug(o, "Ignoring current segment file %q in container %q", segmentsRoot+remote, o.swift.segmentsContainer) + return nil + } + segmentPath := segmentsRoot + remote + fs.Debug(o, "Removing segment file %q in container %q", segmentPath, o.swift.segmentsContainer) + return o.swift.c.ObjectDelete(o.swift.segmentsContainer, segmentPath) + }) + if err != nil { + return err } - result := fmt.Sprintf("%010d", ns) - split := len(result) - 9 - result, decimals := result[:split], result[split:split+2] - if decimals != "" { - result += "." - result += decimals + // remove the segments container if empty, ignore errors + err = o.swift.c.ContainerDelete(o.swift.segmentsContainer) + if err == nil { + fs.Debug(o, "Removed empty container %q", o.swift.segmentsContainer) } - return result + return nil +} + +// updateChunks updates the existing object using chunks to a separate +// container. It returns a string which prefixes current segments. +func (o *FsObjectSwift) updateChunks(in io.Reader, headers swift.Headers, size int64) (string, error) { + // Create the segmentsContainer if it doesn't exist + err := o.swift.c.ContainerCreate(o.swift.segmentsContainer, nil) + if err != nil { + return "", err + } + // Upload the chunks + left := size + i := 0 + uniquePrefix := fmt.Sprintf("%s/%d", swift.TimeToFloatString(time.Now()), size) + segmentsPath := fmt.Sprintf("%s%s/%s", o.swift.root, o.remote, uniquePrefix) + for left > 0 { + n := min(left, int64(chunkSize)) + segmentReader := io.LimitReader(in, n) + segmentPath := fmt.Sprintf("%s/%08d", segmentsPath, i) + fs.Debug(o, "Uploading segment file %q into %q", segmentPath, o.swift.segmentsContainer) + _, err := o.swift.c.ObjectPut(o.swift.segmentsContainer, segmentPath, segmentReader, true, "", "", headers) + if err != nil { + return "", err + } + left -= n + i++ + } + // Upload the manifest + headers["X-Object-Manifest"] = fmt.Sprintf("%s/%s", o.swift.segmentsContainer, segmentsPath) + emptyReader := bytes.NewReader(nil) + manifestName := o.swift.root + o.remote + _, err = o.swift.c.ObjectPut(o.swift.container, manifestName, emptyReader, true, "", "", headers) + return uniquePrefix + "/", err } // Update the object with the contents of the io.Reader, modTime and size // // The new object may have been created if an error is returned func (o *FsObjectSwift) Update(in io.Reader, modTime time.Time, size int64) error { + // Note whether this has a manifest before starting + isManifest, err := o.isManifestFile() + if err != nil { + return err + } + // Set the mtime m := swift.Metadata{} m.SetModTime(modTime) + headers := m.ObjectHeaders() + uniquePrefix := "" if size > int64(chunkSize) { - segmentsContainerName := o.swift.container + "_segments" - left := size - i := 0 - nowFloat := nsToSwiftFloatString(time.Now().UnixNano()) - for left > 0 { - n := min(left, int64(chunkSize)) - segmentReader := io.LimitReader(in, n) - segmentPath := fmt.Sprintf("%s%s/%s/%d/%08d", o.swift.root, o.remote, nowFloat, size, i) - _, err := o.swift.c.ObjectPut(segmentsContainerName, segmentPath, segmentReader, true, "", "", m.ObjectHeaders()) - if err != nil { - return err - } - left -= n - i++ - } - manifestHeaders := swift.Headers{"X-Object-Manifest": fmt.Sprintf("%s/%s%s/%s/%d", segmentsContainerName, o.swift.root, o.remote, nowFloat, size)} - for k, v := range m.ObjectHeaders() { - manifestHeaders[k] = v - } - emptyReader := bytes.NewReader(nil) - manifestName := o.swift.root + o.remote - _, err := o.swift.c.ObjectPut(o.swift.container, manifestName, emptyReader, true, "", "", manifestHeaders) + uniquePrefix, err = o.updateChunks(in, headers, size) if err != nil { return err } - // remove old segments - segmentsPath := fmt.Sprintf("%s/%s%s/", segmentsContainerName, o.swift.root, o.remote) - segmentsFs, err := NewFs(o.swift.name, segmentsPath) - if err != nil { - return err - } - for o := range segmentsFs.List() { - if !strings.HasPrefix(o.Remote(), nowFloat) { - fs.Log(o, "Remove old file segment '%s'", o.Remote()) - err := o.Remove() - if err != nil { - return err - } - } - } } else { + headers["X-Object-Manifest"] = "" // remove manifest _, err := o.swift.c.ObjectPut(o.swift.container, o.swift.root+o.remote, in, true, "", "", m.ObjectHeaders()) if err != nil { return err } } + + // If file was a manifest then remove old/all segments + if isManifest { + err = o.removeSegments(uniquePrefix) + if err != nil { + fs.Log(o, "Failed to remove old segments - carrying on with upload: %v", err) + } + } + // Read the metadata from the newly created object o.headers = nil // wipe old metadata return o.readMetaData() @@ -572,22 +624,19 @@ func (o *FsObjectSwift) Remove() error { if err != nil { return err } + // Remove file/manifest first + err = o.swift.c.ObjectDelete(o.swift.container, o.swift.root+o.remote) + if err != nil { + return err + } + // ...then segments if required if isManifestFile { - // remove segments - segmentsContainerName := o.swift.container + "_segments" - segmentsPath := fmt.Sprintf("%s/%s%s/", segmentsContainerName, o.swift.root, o.remote) - segmentsFs, err := NewFs(o.swift.name, segmentsPath) + err = o.removeSegments("") if err != nil { return err } - for o := range segmentsFs.List() { - err := o.Remove() - if err != nil { - return err - } - } } - return o.swift.c.ObjectDelete(o.swift.container, o.swift.root+o.remote) + return nil } // Check the interfaces are satisfied