1
mirror of https://github.com/rclone/rclone synced 2024-11-28 06:41:41 +01:00

Forward port 930ff266f2 to cmount branch

compare checksums on upload/download via FUSE
This commit is contained in:
Nick Craig-Wood 2017-05-08 17:47:22 +01:00
parent 855071cc19
commit bc9856b570
9 changed files with 137 additions and 23 deletions

View File

@ -41,6 +41,9 @@ func NewFS(f fs.Fs) *FS {
if noSeek { if noSeek {
fsys.FS.NoSeek() fsys.FS.NoSeek()
} }
if noChecksum {
fsys.FS.NoChecksum()
}
return fsys return fsys
} }

View File

@ -27,6 +27,7 @@ import (
// Globals // Globals
var ( var (
noModTime = false noModTime = false
noChecksum = false
debugFUSE = false debugFUSE = false
noSeek = false noSeek = false
dirCacheTime = 5 * 60 * time.Second dirCacheTime = 5 * 60 * time.Second
@ -50,6 +51,7 @@ var (
func init() { func init() {
cmd.Root.AddCommand(commandDefintion) cmd.Root.AddCommand(commandDefintion)
commandDefintion.Flags().BoolVarP(&noModTime, "no-modtime", "", noModTime, "Don't read/write the modification time (can speed things up).") commandDefintion.Flags().BoolVarP(&noModTime, "no-modtime", "", noModTime, "Don't read/write the modification time (can speed things up).")
commandDefintion.Flags().BoolVarP(&noChecksum, "no-checksum", "", noChecksum, "Don't compare checksums on up/download.")
commandDefintion.Flags().BoolVarP(&debugFUSE, "debug-fuse", "", debugFUSE, "Debug the FUSE internals - needs -v.") commandDefintion.Flags().BoolVarP(&debugFUSE, "debug-fuse", "", debugFUSE, "Debug the FUSE internals - needs -v.")
commandDefintion.Flags().BoolVarP(&noSeek, "no-seek", "", noSeek, "Don't allow seeking in files.") commandDefintion.Flags().BoolVarP(&noSeek, "no-seek", "", noSeek, "Don't allow seeking in files.")
commandDefintion.Flags().DurationVarP(&dirCacheTime, "dir-cache-time", "", dirCacheTime, "Time to cache directory entries for.") commandDefintion.Flags().DurationVarP(&dirCacheTime, "dir-cache-time", "", dirCacheTime, "Time to cache directory entries for.")
@ -147,10 +149,6 @@ like this:
* those which need to know the size in advance won't - eg B2 * those which need to know the size in advance won't - eg B2
* maybe should pass in size as -1 to mean work it out * maybe should pass in size as -1 to mean work it out
* Or put in an an upload cache to cache the files on disk first * Or put in an an upload cache to cache the files on disk first
### TODO ###
* Check hashes on upload/download
`, `,
Run: func(command *cobra.Command, args []string) { Run: func(command *cobra.Command, args []string) {
cmd.CheckArgs(2, 2, command, args) cmd.CheckArgs(2, 2, command, args)

View File

@ -24,6 +24,7 @@ func TestFileModTimeWithOpenWriters(t *testing.T) {} // FIXME mounttest.TestFile
func TestMount(t *testing.T) { mounttest.TestMount(t) } func TestMount(t *testing.T) { mounttest.TestMount(t) }
func TestRoot(t *testing.T) { mounttest.TestRoot(t) } func TestRoot(t *testing.T) { mounttest.TestRoot(t) }
func TestReadByByte(t *testing.T) { mounttest.TestReadByByte(t) } func TestReadByByte(t *testing.T) { mounttest.TestReadByByte(t) }
func TestReadChecksum(t *testing.T) { mounttest.TestReadChecksum(t) }
func TestReadFileDoubleClose(t *testing.T) { mounttest.TestReadFileDoubleClose(t) } func TestReadFileDoubleClose(t *testing.T) { mounttest.TestReadFileDoubleClose(t) }
func TestReadSeek(t *testing.T) { mounttest.TestReadSeek(t) } func TestReadSeek(t *testing.T) { mounttest.TestReadSeek(t) }
func TestWriteFileNoWrite(t *testing.T) { mounttest.TestWriteFileNoWrite(t) } func TestWriteFileNoWrite(t *testing.T) { mounttest.TestWriteFileNoWrite(t) }

View File

@ -31,6 +31,9 @@ func NewFS(f fs.Fs) *FS {
if noSeek { if noSeek {
fsys.FS.NoSeek() fsys.FS.NoSeek()
} }
if noChecksum {
fsys.FS.NoChecksum()
}
return fsys return fsys
} }

View File

@ -21,6 +21,7 @@ func TestFileModTimeWithOpenWriters(t *testing.T) { mounttest.TestFileModTimeWit
func TestMount(t *testing.T) { mounttest.TestMount(t) } func TestMount(t *testing.T) { mounttest.TestMount(t) }
func TestRoot(t *testing.T) { mounttest.TestRoot(t) } func TestRoot(t *testing.T) { mounttest.TestRoot(t) }
func TestReadByByte(t *testing.T) { mounttest.TestReadByByte(t) } func TestReadByByte(t *testing.T) { mounttest.TestReadByByte(t) }
func TestReadChecksum(t *testing.T) { mounttest.TestReadChecksum(t) }
func TestReadFileDoubleClose(t *testing.T) { mounttest.TestReadFileDoubleClose(t) } func TestReadFileDoubleClose(t *testing.T) { mounttest.TestReadFileDoubleClose(t) }
func TestReadSeek(t *testing.T) { mounttest.TestReadSeek(t) } func TestReadSeek(t *testing.T) { mounttest.TestReadSeek(t) }
func TestWriteFileNoWrite(t *testing.T) { mounttest.TestWriteFileNoWrite(t) } func TestWriteFileNoWrite(t *testing.T) { mounttest.TestWriteFileNoWrite(t) }

View File

@ -33,9 +33,10 @@ var (
// FS represents the top level filing system // FS represents the top level filing system
type FS struct { type FS struct {
f fs.Fs f fs.Fs
root *Dir root *Dir
noSeek bool // don't allow seeking if set noSeek bool // don't allow seeking if set
noChecksum bool // don't check checksums if set
} }
// NewFS creates a new filing system and root directory // NewFS creates a new filing system and root directory
@ -57,6 +58,12 @@ func (fsys *FS) NoSeek() *FS {
return fsys return fsys
} }
// NoChecksum disables checksum checking
func (fsys *FS) NoChecksum() *FS {
fsys.noChecksum = true
return fsys
}
// Root returns the root node // Root returns the root node
func (fsys *FS) Root() (*Dir, error) { func (fsys *FS) Root() (*Dir, error) {
fs.Debugf(fsys.f, "Root()") fs.Debugf(fsys.f, "Root()")

View File

@ -34,6 +34,45 @@ func TestReadByByte(t *testing.T) {
run.rm(t, "testfile") run.rm(t, "testfile")
} }
func TestReadChecksum(t *testing.T) {
run.skipIfNoFUSE(t)
// create file big enough so we exceed any single FUSE read
// request
b := make([]rune, 3*128*1024)
for i := range b {
b[i] = 'r'
}
run.createFile(t, "bigfile", string(b))
// The hash comparison would fail in Flush, if we did not
// ensure we read the whole file
fd, err := os.Open(run.path("bigfile"))
assert.NoError(t, err)
buf := make([]byte, 10)
_, err = io.ReadFull(fd, buf)
assert.NoError(t, err)
err = fd.Close()
assert.NoError(t, err)
// The hash comparison would fail, because we only read parts
// of the file
fd, err = os.Open(run.path("bigfile"))
assert.NoError(t, err)
// read at start
_, err = io.ReadFull(fd, buf)
assert.NoError(t, err)
// read at end
_, err = fd.Seek(int64(len(b)-len(buf)), 0)
assert.NoError(t, err)
_, err = io.ReadFull(fd, buf)
// ensure we don't compare hashes
err = fd.Close()
assert.NoError(t, err)
run.rm(t, "bigfile")
}
// Test seeking // Test seeking
func TestReadSeek(t *testing.T) { func TestReadSeek(t *testing.T) {
run.skipIfNoFUSE(t) run.skipIfNoFUSE(t)

View File

@ -5,6 +5,7 @@ import (
"sync" "sync"
"github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs"
"github.com/pkg/errors"
) )
// ReadFileHandle is an open for read file handle on a File // ReadFileHandle is an open for read file handle on a File
@ -17,6 +18,7 @@ type ReadFileHandle struct {
offset int64 offset int64
noSeek bool noSeek bool
file *File file *File
hash *fs.MultiHasher
} }
func newReadFileHandle(f *File, o fs.Object, noSeek bool) (*ReadFileHandle, error) { func newReadFileHandle(f *File, o fs.Object, noSeek bool) (*ReadFileHandle, error) {
@ -24,11 +26,21 @@ func newReadFileHandle(f *File, o fs.Object, noSeek bool) (*ReadFileHandle, erro
if err != nil { if err != nil {
return nil, err return nil, err
} }
var hash *fs.MultiHasher
if !f.d.fsys.noChecksum {
hash, err = fs.NewMultiHasherTypes(o.Fs().Hashes())
if err != nil {
fs.Errorf(o.Fs(), "newReadFileHandle hash error: %v", err)
}
}
fh := &ReadFileHandle{ fh := &ReadFileHandle{
o: o, o: o,
r: fs.NewAccount(r, o).WithBuffer(), // account the transfer r: fs.NewAccount(r, o).WithBuffer(), // account the transfer
noSeek: noSeek, noSeek: noSeek,
file: f, file: f,
hash: hash,
} }
fs.Stats.Transferring(fh.o.Remote()) fs.Stats.Transferring(fh.o.Remote())
return fh, nil return fh, nil
@ -49,6 +61,7 @@ func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) {
return ESPIPE return ESPIPE
} }
fh.r.StopBuffering() // stop the background reading first fh.r.StopBuffering() // stop the background reading first
fh.hash = nil
oldReader := fh.r.GetReader() oldReader := fh.r.GetReader()
r := oldReader r := oldReader
// Can we seek it directly? // Can we seek it directly?
@ -142,10 +155,36 @@ func (fh *ReadFileHandle) Read(reqSize, reqOffset int64) (respData []byte, err e
respData = buf[:n] respData = buf[:n]
fh.offset = newOffset fh.offset = newOffset
fs.Debugf(fh.o, "ReadFileHandle.Read OK") fs.Debugf(fh.o, "ReadFileHandle.Read OK")
if fh.hash != nil {
_, err = fh.hash.Write(respData)
if err != nil {
fs.Errorf(fh.o, "ReadFileHandle.Read HashError: %v", err)
return nil, err
}
}
} }
return respData, err return respData, err
} }
func (fh *ReadFileHandle) checkHash() error {
if fh.hash == nil || !fh.readCalled || fh.offset < fh.o.Size() {
return nil
}
for hashType, dstSum := range fh.hash.Sums() {
srcSum, err := fh.o.Hash(hashType)
if err != nil {
return err
}
if !fs.HashEquals(dstSum, srcSum) {
return errors.Errorf("corrupted on transfer: %v hash differ %q vs %q", hashType, dstSum, srcSum)
}
}
return nil
}
// close the file handle returning EBADF if it has been // close the file handle returning EBADF if it has been
// closed already. // closed already.
// //
@ -156,6 +195,11 @@ func (fh *ReadFileHandle) close() error {
} }
fh.closed = true fh.closed = true
fs.Stats.DoneTransferring(fh.o.Remote(), true) fs.Stats.DoneTransferring(fh.o.Remote(), true)
if err := fh.checkHash(); err != nil {
return err
}
return fh.r.Close() return fh.r.Close()
} }
@ -167,23 +211,11 @@ func (fh *ReadFileHandle) Flush() error {
defer fh.mu.Unlock() defer fh.mu.Unlock()
fs.Debugf(fh.o, "ReadFileHandle.Flush") fs.Debugf(fh.o, "ReadFileHandle.Flush")
// Ignore the Flush as there is nothing we can sensibly do and if err := fh.checkHash(); err != nil {
// it seems quite common for Flush to be called from fs.Errorf(fh.o, "ReadFileHandle.Flush error: %v", err)
// different threads each of which have read some data. return err
if false {
// If Read hasn't been called then ignore the Flush - Release
// will pick it up
if !fh.readCalled {
fs.Debugf(fh.o, "ReadFileHandle.Flush ignoring flush on unread handle")
return nil
}
err := fh.close()
if err != nil {
fs.Errorf(fh.o, "ReadFileHandle.Flush error: %v", err)
return err
}
} }
fs.Debugf(fh.o, "ReadFileHandle.Flush OK") fs.Debugf(fh.o, "ReadFileHandle.Flush OK")
return nil return nil
} }

View File

@ -5,6 +5,7 @@ import (
"sync" "sync"
"github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs"
"github.com/pkg/errors"
) )
// WriteFileHandle is an open for write handle on a File // WriteFileHandle is an open for write handle on a File
@ -19,13 +20,24 @@ type WriteFileHandle struct {
file *File file *File
writeCalled bool // set the first time Write() is called writeCalled bool // set the first time Write() is called
offset int64 offset int64
hash *fs.MultiHasher
} }
func newWriteFileHandle(d *Dir, f *File, src fs.ObjectInfo) (*WriteFileHandle, error) { func newWriteFileHandle(d *Dir, f *File, src fs.ObjectInfo) (*WriteFileHandle, error) {
var hash *fs.MultiHasher
if !f.d.fsys.noChecksum {
var err error
hash, err = fs.NewMultiHasherTypes(src.Fs().Hashes())
if err != nil {
fs.Errorf(src.Fs(), "newWriteFileHandle hash error: %v", err)
}
}
fh := &WriteFileHandle{ fh := &WriteFileHandle{
remote: src.Remote(), remote: src.Remote(),
result: make(chan error, 1), result: make(chan error, 1),
file: f, file: f,
hash: hash,
} }
fh.pipeReader, fh.pipeWriter = io.Pipe() fh.pipeReader, fh.pipeWriter = io.Pipe()
r := fs.NewAccountSizeName(fh.pipeReader, 0, src.Remote()).WithBuffer() // account the transfer r := fs.NewAccountSizeName(fh.pipeReader, 0, src.Remote()).WithBuffer() // account the transfer
@ -69,6 +81,13 @@ func (fh *WriteFileHandle) Write(data []byte, offset int64) (written int64, err
return 0, err return 0, err
} }
fs.Debugf(fh.remote, "WriteFileHandle.Write OK (%d bytes written)", n) fs.Debugf(fh.remote, "WriteFileHandle.Write OK (%d bytes written)", n)
if fh.hash != nil {
_, err = fh.hash.Write(data[:n])
if err != nil {
fs.Errorf(fh.remote, "WriteFileHandle.Write HashError: %v", err)
return written, err
}
}
return written, nil return written, nil
} }
@ -98,6 +117,17 @@ func (fh *WriteFileHandle) close() error {
if err == nil { if err == nil {
err = readCloseErr err = readCloseErr
} }
if err == nil && fh.hash != nil {
for hashType, srcSum := range fh.hash.Sums() {
dstSum, err := fh.o.Hash(hashType)
if err != nil {
return err
}
if !fs.HashEquals(srcSum, dstSum) {
return errors.Errorf("corrupted on transfer: %v hash differ %q vs %q", hashType, srcSum, dstSum)
}
}
}
return err return err
} }