From 11332a19a019bdec81f12fa81dcb5422d002701c Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Fri, 1 Dec 2017 15:15:44 +0000 Subject: [PATCH] fs: make an in memory object for short transfers --- fs/object.go | 187 +++++++++++++++++++++++++++++++++++++++++++++- fs/object_test.go | 178 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 364 insertions(+), 1 deletion(-) create mode 100644 fs/object_test.go diff --git a/fs/object.go b/fs/object.go index 5cb64a3ac..18c3d8f39 100644 --- a/fs/object.go +++ b/fs/object.go @@ -1,6 +1,12 @@ package fs -import "time" +import ( + "bytes" + "errors" + "io" + "io/ioutil" + "time" +) // NewStaticObjectInfo returns a static ObjectInfo // If hashes is nil and fs is not nil, the hash map will be replaced with @@ -48,3 +54,182 @@ func (i *staticObjectInfo) Hash(h HashType) (string, error) { } return "", ErrHashUnsupported } + +// MemoryFs is an in memory Fs, it only supports FsInfo and Put +var MemoryFs memoryFs + +// memoryFs is an in memory fs +type memoryFs struct{} + +// Name of the remote (as passed into NewFs) +func (memoryFs) Name() string { return "memory" } + +// Root of the remote (as passed into NewFs) +func (memoryFs) Root() string { return "" } + +// String returns a description of the FS +func (memoryFs) String() string { return "memory" } + +// Precision of the ModTimes in this Fs +func (memoryFs) Precision() time.Duration { return time.Nanosecond } + +// Returns the supported hash types of the filesystem +func (memoryFs) Hashes() HashSet { return SupportedHashes } + +// Features returns the optional features of this Fs +func (memoryFs) Features() *Features { return &Features{} } + +// List the objects and directories in dir into entries. The +// entries can be returned in any order but should be for a +// complete directory. +// +// dir should be "" to list the root, and should not have +// trailing slashes. +// +// This should return ErrDirNotFound if the directory isn't +// found. +func (memoryFs) List(dir string) (entries DirEntries, err error) { + return nil, nil +} + +// NewObject finds the Object at remote. If it can't be found +// it returns the error ErrorObjectNotFound. +func (memoryFs) NewObject(remote string) (Object, error) { + return nil, ErrorObjectNotFound +} + +// Put in to the remote path with the modTime given of the given size +// +// May create the object even if it returns an error - if so +// will return the object and the error, otherwise will return +// nil and the error +func (memoryFs) Put(in io.Reader, src ObjectInfo, options ...OpenOption) (Object, error) { + o := NewMemoryObject(src.Remote(), src.ModTime(), nil) + return o, o.Update(in, src, options...) +} + +// Mkdir makes the directory (container, bucket) +// +// Shouldn't return an error if it already exists +func (memoryFs) Mkdir(dir string) error { + return errors.New("memoryFs: can't make directory") +} + +// Rmdir removes the directory (container, bucket) if empty +// +// Return an error if it doesn't exist or isn't empty +func (memoryFs) Rmdir(dir string) error { + return ErrorDirNotFound +} + +var _ Fs = MemoryFs + +// MemoryObject is an in memory object +type MemoryObject struct { + remote string + modTime time.Time + content []byte +} + +// NewMemoryObject returns an in memory Object with the modTime and content passed in +func NewMemoryObject(remote string, modTime time.Time, content []byte) *MemoryObject { + return &MemoryObject{ + remote: remote, + modTime: modTime, + content: content, + } +} + +// Content returns the underlying buffer +func (o *MemoryObject) Content() []byte { + return o.content +} + +// Fs returns read only access to the Fs that this object is part of +func (o *MemoryObject) Fs() Info { + return MemoryFs +} + +// Remote returns the remote path +func (o *MemoryObject) Remote() string { + return o.remote +} + +// String returns a description of the Object +func (o *MemoryObject) String() string { + return o.remote +} + +// ModTime returns the modification date of the file +func (o *MemoryObject) ModTime() time.Time { + return o.modTime +} + +// Size returns the size of the file +func (o *MemoryObject) Size() int64 { + return int64(len(o.content)) +} + +// Storable says whether this object can be stored +func (o *MemoryObject) Storable() bool { + return true +} + +// Hash returns the requested hash of the contents +func (o *MemoryObject) Hash(h HashType) (string, error) { + hash, err := NewMultiHasherTypes(HashSet(h)) + if err != nil { + return "", err + } + _, err = hash.Write(o.content) + if err != nil { + return "", err + } + return hash.Sums()[h], nil +} + +// SetModTime sets the metadata on the object to set the modification date +func (o *MemoryObject) SetModTime(modTime time.Time) error { + o.modTime = modTime + return nil +} + +// Open opens the file for read. Call Close() on the returned io.ReadCloser +func (o *MemoryObject) Open(options ...OpenOption) (io.ReadCloser, error) { + content := o.content + for _, option := range options { + switch x := option.(type) { + case *RangeOption: + content = o.content[x.Start:x.End] + case *SeekOption: + content = o.content[x.Offset:] + default: + if option.Mandatory() { + Logf(o, "Unsupported mandatory option: %v", option) + } + } + } + return ioutil.NopCloser(bytes.NewBuffer(content)), nil +} + +// Update in to the object with the modTime given of the given size +// +// This re-uses the internal buffer if at all possible. +func (o *MemoryObject) Update(in io.Reader, src ObjectInfo, options ...OpenOption) (err error) { + size := src.Size() + if size == 0 { + o.content = nil + } else if size < 0 || int64(cap(o.content)) < size { + o.content, err = ioutil.ReadAll(in) + } else { + o.content = o.content[:size] + _, err = io.ReadFull(in, o.content) + } + o.modTime = src.ModTime() + return err +} + +// Remove this object +func (o *MemoryObject) Remove() error { + return errors.New("memoryObject.Remove not supported") +} diff --git a/fs/object_test.go b/fs/object_test.go new file mode 100644 index 000000000..852709598 --- /dev/null +++ b/fs/object_test.go @@ -0,0 +1,178 @@ +package fs_test + +import ( + "bytes" + "io" + "io/ioutil" + "testing" + "time" + + "github.com/ncw/rclone/fs" + "github.com/ncw/rclone/fstest" + "github.com/stretchr/testify/assert" +) + +func TestStaticObject(t *testing.T) { + r := fstest.NewRun(t) + defer r.Finalise() + + now := time.Now() + remote := "path/to/object" + size := int64(1024) + + o := fs.NewStaticObjectInfo(remote, now, size, true, nil, r.Flocal) + + assert.Equal(t, r.Flocal, o.Fs()) + assert.Equal(t, remote, o.Remote()) + assert.Equal(t, remote, o.String()) + assert.Equal(t, now, o.ModTime()) + assert.Equal(t, size, o.Size()) + assert.Equal(t, true, o.Storable()) + + hash, err := o.Hash(fs.HashMD5) + assert.NoError(t, err) + assert.Equal(t, "", hash) + + o = fs.NewStaticObjectInfo(remote, now, size, true, nil, nil) + _, err = o.Hash(fs.HashMD5) + assert.Equal(t, fs.ErrHashUnsupported, err) + + hs := map[fs.HashType]string{ + fs.HashMD5: "potato", + } + o = fs.NewStaticObjectInfo(remote, now, size, true, hs, nil) + hash, err = o.Hash(fs.HashMD5) + assert.NoError(t, err) + assert.Equal(t, "potato", hash) + _, err = o.Hash(fs.HashSHA1) + assert.Equal(t, fs.ErrHashUnsupported, err) + +} + +func TestMemoryFs(t *testing.T) { + f := fs.MemoryFs + assert.Equal(t, "memory", f.Name()) + assert.Equal(t, "", f.Root()) + assert.Equal(t, "memory", f.String()) + assert.Equal(t, time.Nanosecond, f.Precision()) + assert.Equal(t, fs.SupportedHashes, f.Hashes()) + assert.Equal(t, &fs.Features{}, f.Features()) + + entries, err := f.List("") + assert.NoError(t, err) + assert.Nil(t, entries) + + o, err := f.NewObject("obj") + assert.Equal(t, fs.ErrorObjectNotFound, err) + assert.Nil(t, o) + + buf := bytes.NewBufferString("potato") + now := time.Now() + src := fs.NewStaticObjectInfo("remote", now, int64(buf.Len()), true, nil, nil) + o, err = f.Put(buf, src) + assert.NoError(t, err) + hash, err := o.Hash(fs.HashSHA1) + assert.NoError(t, err) + assert.Equal(t, "3e2e95f5ad970eadfa7e17eaf73da97024aa5359", hash) + + err = f.Mkdir("dir") + assert.Error(t, err) + + err = f.Rmdir("dir") + assert.Error(t, fs.ErrorDirNotFound) +} + +func TestMemoryObject(t *testing.T) { + remote := "path/to/object" + now := time.Now() + content := []byte("potatoXXXXXXXXXXXXX") + content = content[:6] // make some extra cap + + o := fs.NewMemoryObject(remote, now, content) + + assert.Equal(t, content, o.Content()) + assert.Equal(t, fs.MemoryFs, o.Fs()) + assert.Equal(t, remote, o.Remote()) + assert.Equal(t, remote, o.String()) + assert.Equal(t, now, o.ModTime()) + assert.Equal(t, int64(len(content)), o.Size()) + assert.Equal(t, true, o.Storable()) + + hash, err := o.Hash(fs.HashMD5) + assert.NoError(t, err) + assert.Equal(t, "8ee2027983915ec78acc45027d874316", hash) + + hash, err = o.Hash(fs.HashSHA1) + assert.NoError(t, err) + assert.Equal(t, "3e2e95f5ad970eadfa7e17eaf73da97024aa5359", hash) + + newNow := now.Add(time.Minute) + err = o.SetModTime(newNow) + assert.NoError(t, err) + assert.Equal(t, newNow, o.ModTime()) + + checkOpen := func(rc io.ReadCloser, expected string) { + actual, err := ioutil.ReadAll(rc) + assert.NoError(t, err) + err = rc.Close() + assert.NoError(t, err) + assert.Equal(t, expected, string(actual)) + } + + checkContent := func(o fs.Object, expected string) { + rc, err := o.Open() + assert.NoError(t, err) + checkOpen(rc, expected) + } + + checkContent(o, string(content)) + + rc, err := o.Open(&fs.RangeOption{Start: 1, End: 3}) + assert.NoError(t, err) + checkOpen(rc, "ot") + + rc, err = o.Open(&fs.SeekOption{Offset: 3}) + assert.NoError(t, err) + checkOpen(rc, "ato") + + // check it fits within the buffer + newNow = now.Add(2 * time.Minute) + newContent := bytes.NewBufferString("Rutabaga") + assert.True(t, newContent.Len() < cap(content)) // fits within cap(content) + src := fs.NewStaticObjectInfo(remote, newNow, int64(newContent.Len()), true, nil, nil) + err = o.Update(newContent, src) + assert.NoError(t, err) + checkContent(o, "Rutabaga") + assert.Equal(t, newNow, o.ModTime()) + assert.Equal(t, "Rutaba", string(content)) // check we re-used the buffer + + // not within the buffer + newStr := "0123456789" + newStr = newStr + newStr + newStr + newStr + newStr + newStr + newStr + newStr + newStr + newStr + newContent = bytes.NewBufferString(newStr) + assert.True(t, newContent.Len() > cap(content)) // does not fit within cap(content) + src = fs.NewStaticObjectInfo(remote, newNow, int64(newContent.Len()), true, nil, nil) + err = o.Update(newContent, src) + assert.NoError(t, err) + checkContent(o, newStr) + assert.Equal(t, "Rutaba", string(content)) // check we didn't re-use the buffer + + // now try streaming + newStr = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + newContent = bytes.NewBufferString(newStr) + src = fs.NewStaticObjectInfo(remote, newNow, -1, true, nil, nil) + err = o.Update(newContent, src) + assert.NoError(t, err) + checkContent(o, newStr) + + // and zero length + newStr = "" + newContent = bytes.NewBufferString(newStr) + src = fs.NewStaticObjectInfo(remote, newNow, 0, true, nil, nil) + err = o.Update(newContent, src) + assert.NoError(t, err) + checkContent(o, newStr) + + err = o.Remove() + assert.Error(t, err) +}