mirror of
https://github.com/rclone/rclone
synced 2025-03-31 17:09:19 +02:00

In this commit we discovered a problem with objects being uploaded to the incorrect object name. It added an integration test for the problem. 65b2e378e0992d65 drive: fix incorrect remote after Update on object This test was tripped by the hdfs backend and this patch fixes the problem.
179 lines
3.3 KiB
Go
179 lines
3.3 KiB
Go
//go:build !plan9
|
|
// +build !plan9
|
|
|
|
package hdfs
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"path"
|
|
"time"
|
|
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fs/hash"
|
|
"github.com/rclone/rclone/lib/readers"
|
|
)
|
|
|
|
// Object describes an HDFS file
|
|
type Object struct {
|
|
fs *Fs
|
|
remote string
|
|
size int64
|
|
modTime time.Time
|
|
}
|
|
|
|
// Fs returns the parent Fs
|
|
func (o *Object) Fs() fs.Info {
|
|
return o.fs
|
|
}
|
|
|
|
// Remote returns the remote path
|
|
func (o *Object) Remote() string {
|
|
return o.remote
|
|
}
|
|
|
|
// Size returns the size of an object in bytes
|
|
func (o *Object) Size() int64 {
|
|
return o.size
|
|
}
|
|
|
|
// ModTime returns the modification time of the object
|
|
func (o *Object) ModTime(ctx context.Context) time.Time {
|
|
return o.modTime
|
|
}
|
|
|
|
// SetModTime sets the modification time of the local fs object
|
|
func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
|
|
realpath := o.fs.realpath(o.Remote())
|
|
err := o.fs.client.Chtimes(realpath, modTime, modTime)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
o.modTime = modTime
|
|
return nil
|
|
}
|
|
|
|
// Storable returns whether this object is storable
|
|
func (o *Object) Storable() bool {
|
|
return true
|
|
}
|
|
|
|
// Return a string version
|
|
func (o *Object) String() string {
|
|
if o == nil {
|
|
return "<nil>"
|
|
}
|
|
return o.Remote()
|
|
}
|
|
|
|
// Hash is not supported
|
|
func (o *Object) Hash(ctx context.Context, r hash.Type) (string, error) {
|
|
return "", hash.ErrUnsupported
|
|
}
|
|
|
|
// Open an object for read
|
|
func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.ReadCloser, err error) {
|
|
realpath := o.realpath()
|
|
fs.Debugf(o.fs, "open [%s]", realpath)
|
|
f, err := o.fs.client.Open(realpath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var offset, limit int64 = 0, -1
|
|
for _, option := range options {
|
|
switch x := option.(type) {
|
|
case *fs.SeekOption:
|
|
offset = x.Offset
|
|
case *fs.RangeOption:
|
|
offset, limit = x.Decode(o.Size())
|
|
}
|
|
}
|
|
|
|
_, err = f.Seek(offset, io.SeekStart)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if limit != -1 {
|
|
in = readers.NewLimitedReadCloser(f, limit)
|
|
} else {
|
|
in = f
|
|
}
|
|
|
|
return in, err
|
|
}
|
|
|
|
// Update object
|
|
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
|
realpath := o.fs.realpath(o.remote)
|
|
dirname := path.Dir(realpath)
|
|
fs.Debugf(o.fs, "update [%s]", realpath)
|
|
|
|
err := o.fs.client.MkdirAll(dirname, 0755)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = o.fs.client.Stat(realpath)
|
|
if err == nil {
|
|
err = o.fs.client.Remove(realpath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
out, err := o.fs.client.Create(realpath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
cleanup := func() {
|
|
rerr := o.fs.client.Remove(realpath)
|
|
if rerr != nil {
|
|
fs.Errorf(o.fs, "failed to remove [%v]: %v", realpath, rerr)
|
|
}
|
|
}
|
|
|
|
_, err = io.Copy(out, in)
|
|
if err != nil {
|
|
cleanup()
|
|
return err
|
|
}
|
|
|
|
err = out.Close()
|
|
if err != nil {
|
|
cleanup()
|
|
return err
|
|
}
|
|
|
|
info, err := o.fs.client.Stat(realpath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = o.SetModTime(ctx, src.ModTime(ctx))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
o.size = info.Size()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Remove an object
|
|
func (o *Object) Remove(ctx context.Context) error {
|
|
realpath := o.fs.realpath(o.remote)
|
|
fs.Debugf(o.fs, "remove [%s]", realpath)
|
|
return o.fs.client.Remove(realpath)
|
|
}
|
|
|
|
func (o *Object) realpath() string {
|
|
return o.fs.opt.Enc.FromStandardPath(xPath(o.Fs().Root(), o.remote))
|
|
}
|
|
|
|
// Check the interfaces are satisfied
|
|
var (
|
|
_ fs.Object = (*Object)(nil)
|
|
)
|