diff --git a/b2/upload.go b/b2/upload.go index 174da497f..cdb7dd31f 100644 --- a/b2/upload.go +++ b/b2/upload.go @@ -166,7 +166,7 @@ func (up *largeUpload) transferChunk(part int64, body []byte) error { Method: "POST", Absolute: true, Path: upload.UploadURL, - Body: bytes.NewBuffer(body), + Body: fs.AccountPart(up.o, bytes.NewBuffer(body)), ExtraHeaders: map[string]string{ "Authorization": upload.AuthorizationToken, "X-Bz-Part-Number": fmt.Sprintf("%d", part), @@ -240,6 +240,7 @@ func (up *largeUpload) Upload() error { errs := make(chan error, 1) var wg sync.WaitGroup var err error + fs.AccountByPart(up.o) // Cancel whole file accounting before reading outer: for part := int64(1); part <= up.parts; part++ { // Check any errors diff --git a/fs/accounting.go b/fs/accounting.go index 84a414ace..a7cea2d47 100644 --- a/fs/accounting.go +++ b/fs/accounting.go @@ -257,6 +257,8 @@ type Account struct { avg ewma.MovingAverage // Moving average of last few measurements closed bool // set if the file is closed exit chan struct{} // channel that will be closed when transfer is finished + + wholeFileDisabled bool // disables the whole file when doing parts } // NewAccount makes a Account reader for an object @@ -274,46 +276,56 @@ func NewAccount(in io.ReadCloser, obj Object) *Account { return acc } -func (file *Account) averageLoop() { +// disableWholeFileAccounting turns off the whole file accounting +func (acc *Account) disableWholeFileAccounting() { + acc.mu.Lock() + acc.wholeFileDisabled = true + acc.mu.Unlock() +} + +// accountPart disables the whole file counter and returns an +// io.Reader to wrap a segment of the transfer. +func (acc *Account) accountPart(in io.Reader) io.Reader { + return newAccountStream(acc, in) +} + +func (acc *Account) averageLoop() { tick := time.NewTicker(time.Second) defer tick.Stop() for { select { case now := <-tick.C: - file.statmu.Lock() + acc.statmu.Lock() // Add average of last second. - elapsed := now.Sub(file.lpTime).Seconds() - avg := float64(file.lpBytes) / elapsed - file.avg.Add(avg) - file.lpBytes = 0 - file.lpTime = now + elapsed := now.Sub(acc.lpTime).Seconds() + avg := float64(acc.lpBytes) / elapsed + acc.avg.Add(avg) + acc.lpBytes = 0 + acc.lpTime = now // Unlock stats - file.statmu.Unlock() - case <-file.exit: + acc.statmu.Unlock() + case <-acc.exit: return } } } -// Read bytes from the object - see io.Reader -func (file *Account) Read(p []byte) (n int, err error) { - file.mu.Lock() - defer file.mu.Unlock() - +// read bytes from the io.Reader passed in and account them +func (acc *Account) read(in io.Reader, p []byte) (n int, err error) { // Set start time. - file.statmu.Lock() - if file.start.IsZero() { - file.start = time.Now() + acc.statmu.Lock() + if acc.start.IsZero() { + acc.start = time.Now() } - file.statmu.Unlock() + acc.statmu.Unlock() - n, err = file.in.Read(p) + n, err = in.Read(p) // Update Stats - file.statmu.Lock() - file.lpBytes += n - file.bytes += int64(n) - file.statmu.Unlock() + acc.statmu.Lock() + acc.lpBytes += n + acc.bytes += int64(n) + acc.statmu.Unlock() Stats.Bytes(int64(n)) @@ -324,69 +336,80 @@ func (file *Account) Read(p []byte) (n int, err error) { return } +// Read bytes from the object - see io.Reader +func (acc *Account) Read(p []byte) (n int, err error) { + acc.mu.Lock() + defer acc.mu.Unlock() + if acc.wholeFileDisabled { + // Don't account + return acc.in.Read(p) + } + return acc.read(acc.in, p) +} + // Progress returns bytes read as well as the size. // Size can be <= 0 if the size is unknown. -func (file *Account) Progress() (bytes, size int64) { - if file == nil { +func (acc *Account) Progress() (bytes, size int64) { + if acc == nil { return 0, 0 } - file.statmu.Lock() + acc.statmu.Lock() if bytes > size { size = 0 } - defer file.statmu.Unlock() - return file.bytes, file.size + defer acc.statmu.Unlock() + return acc.bytes, acc.size } // Speed returns the speed of the current file transfer // in bytes per second, as well a an exponentially weighted moving average // If no read has completed yet, 0 is returned for both values. -func (file *Account) Speed() (bps, current float64) { - if file == nil { +func (acc *Account) Speed() (bps, current float64) { + if acc == nil { return 0, 0 } - file.statmu.Lock() - defer file.statmu.Unlock() - if file.bytes == 0 { + acc.statmu.Lock() + defer acc.statmu.Unlock() + if acc.bytes == 0 { return 0, 0 } // Calculate speed from first read. - total := float64(time.Now().Sub(file.start)) / float64(time.Second) - bps = float64(file.bytes) / total - current = file.avg.Value() + total := float64(time.Now().Sub(acc.start)) / float64(time.Second) + bps = float64(acc.bytes) / total + current = acc.avg.Value() return } // ETA returns the ETA of the current operation, // rounded to full seconds. // If the ETA cannot be determined 'ok' returns false. -func (file *Account) ETA() (eta time.Duration, ok bool) { - if file == nil || file.size <= 0 { +func (acc *Account) ETA() (eta time.Duration, ok bool) { + if acc == nil || acc.size <= 0 { return 0, false } - file.statmu.Lock() - defer file.statmu.Unlock() - if file.bytes == 0 { + acc.statmu.Lock() + defer acc.statmu.Unlock() + if acc.bytes == 0 { return 0, false } - left := file.size - file.bytes + left := acc.size - acc.bytes if left <= 0 { return 0, true } - avg := file.avg.Value() + avg := acc.avg.Value() if avg <= 0 { return 0, false } - seconds := float64(left) / file.avg.Value() + seconds := float64(left) / acc.avg.Value() return time.Duration(time.Second * time.Duration(int(seconds))), true } // String produces stats for this file -func (file *Account) String() string { - a, b := file.Progress() - avg, cur := file.Speed() - eta, etaok := file.ETA() +func (acc *Account) String() string { + a, b := acc.Progress() + avg, cur := acc.Speed() + eta, etaok := acc.ETA() etas := "-" if etaok { if eta > 0 { @@ -395,7 +418,7 @@ func (file *Account) String() string { etas = "0s" } } - name := []rune(file.name) + name := []rune(acc.name) if len(name) > 45 { where := len(name) - 42 name = append([]rune{'.', '.', '.'}, name[where:]...) @@ -407,17 +430,63 @@ func (file *Account) String() string { } // Close the object -func (file *Account) Close() error { - file.mu.Lock() - defer file.mu.Unlock() - if file.closed { +func (acc *Account) Close() error { + acc.mu.Lock() + defer acc.mu.Unlock() + if acc.closed { return nil } - file.closed = true - close(file.exit) - Stats.inProgress.clear(file.name) - return file.in.Close() + acc.closed = true + close(acc.exit) + Stats.inProgress.clear(acc.name) + return acc.in.Close() +} + +// accountStream accounts a single io.Reader into a parent *Account +type accountStream struct { + acc *Account + in io.Reader +} + +// newAccountStream makes a new accountStream for an in +func newAccountStream(acc *Account, in io.Reader) *accountStream { + return &accountStream{ + acc: acc, + in: in, + } +} + +// Read bytes from the object - see io.Reader +func (a *accountStream) Read(p []byte) (n int, err error) { + return a.acc.read(a.in, p) +} + +// AccountByPart turns off whole file accounting +// +// Returns the current account or nil if not found +func AccountByPart(obj Object) *Account { + acc := Stats.inProgress.get(obj.Remote()) + if acc == nil { + Debug(obj, "Didn't find object to account part transfer") + } + acc.disableWholeFileAccounting() + return acc +} + +// AccountPart accounts for part of a transfer +// +// It disables the whole file counter and returns an io.Reader to wrap +// a segment of the transfer. +func AccountPart(obj Object, in io.Reader) io.Reader { + acc := AccountByPart(obj) + if acc == nil { + return in + } + return acc.accountPart(in) } // Check it satisfies the interface -var _ io.ReadCloser = &Account{} +var ( + _ io.ReadCloser = &Account{} + _ io.Reader = &accountStream{} +)