1
mirror of https://github.com/rclone/rclone synced 2024-12-01 10:31:57 +01:00

mount: fix seek with buffering to use correct interface

Stop pre-cache before seeking which stops lots of excess data transfer
This commit is contained in:
Nick Craig-Wood 2017-02-16 23:57:58 +00:00
parent 6f75290678
commit 928be0f1fd
2 changed files with 34 additions and 15 deletions

View File

@ -46,30 +46,33 @@ var _ fusefs.HandleReader = (*ReadFileHandle)(nil)
// if reopen is true, then we won't attempt to use an io.Seeker interface
//
// Must be called with fh.mu held
func (fh *ReadFileHandle) seek(offset int64, reopen bool) error {
// Can we seek it directly?
func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) {
fh.r.StopBuffering() // stop the background reading first
oldReader := fh.r.GetReader()
r := oldReader
// Can we seek it directly?
if do, ok := oldReader.(io.Seeker); !reopen && ok {
fs.Debugf(fh.o, "ReadFileHandle.seek from %d to %d (io.Seeker)", fh.offset, offset)
_, err := do.Seek(offset, 0)
_, err = do.Seek(offset, 0)
if err != nil {
fs.Debugf(fh.o, "ReadFileHandle.Read io.Seeker failed: %v", err)
return err
}
} else {
fs.Debugf(fh.o, "ReadFileHandle.seek from %d to %d", fh.offset, offset)
// if not re-open with a seek
r, err := fh.o.Open(&fs.SeekOption{Offset: offset})
if err != nil {
fs.Debugf(fh.o, "ReadFileHandle.Read seek failed: %v", err)
return err
}
// close old one
err = oldReader.Close()
if err != nil {
fs.Debugf(fh.o, "ReadFileHandle.Read seek close old failed: %v", err)
}
fh.r.UpdateReader(r)
// re-open with a seek
r, err = fh.o.Open(&fs.SeekOption{Offset: offset})
if err != nil {
fs.Debugf(fh.o, "ReadFileHandle.Read seek failed: %v", err)
return err
}
}
fh.r.UpdateReader(r)
fh.offset = offset
return nil
}

View File

@ -308,6 +308,7 @@ type Account struct {
// shouldn't.
mu sync.Mutex
in io.ReadCloser
origIn io.ReadCloser
size int64
name string
statmu sync.Mutex // Separate mutex for stat values.
@ -318,6 +319,7 @@ type Account struct {
avg ewma.MovingAverage // Moving average of last few measurements
closed bool // set if the file is closed
exit chan struct{} // channel that will be closed when transfer is finished
withBuf bool // is using a buffered in
wholeFileDisabled bool // disables the whole file when doing parts
}
@ -327,6 +329,7 @@ type Account struct {
func NewAccountSizeName(in io.ReadCloser, size int64, name string) *Account {
acc := &Account{
in: in,
origIn: in,
size: size,
name: name,
exit: make(chan struct{}),
@ -368,7 +371,10 @@ func bufferReadCloser(in io.ReadCloser, size int64, name string) io.ReadCloser {
//
// If the file is above a certain size it adds an Async reader
func NewAccountSizeNameWithBuffer(in io.ReadCloser, size int64, name string) *Account {
return NewAccountSizeName(bufferReadCloser(in, size, name), size, name)
acc := NewAccountSizeName(in, size, name)
acc.in = bufferReadCloser(in, size, name)
acc.withBuf = true
return acc
}
// NewAccountWithBuffer makes a Account reader for an object
@ -382,16 +388,26 @@ func NewAccountWithBuffer(in io.ReadCloser, obj Object) *Account {
func (acc *Account) GetReader() io.ReadCloser {
acc.mu.Lock()
defer acc.mu.Unlock()
return acc.in
return acc.origIn
}
// StopBuffering stops the async buffer doing any more buffering
func (acc *Account) StopBuffering() {
if asyncIn, ok := acc.in.(*asyncReader); ok {
asyncIn.Abandon()
}
}
// UpdateReader updates the underlying io.ReadCloser
func (acc *Account) UpdateReader(in io.ReadCloser) {
acc.mu.Lock()
if asyncIn, ok := acc.in.(*asyncReader); ok {
asyncIn.Abandon()
acc.StopBuffering()
acc.origIn = in
if acc.withBuf {
acc.in = bufferReadCloser(in, acc.size, acc.name)
} else {
acc.in = in
}
acc.in = bufferReadCloser(in, acc.size, acc.name)
acc.mu.Unlock()
}