mirror of
https://github.com/rclone/rclone
synced 2024-11-18 18:46:07 +01:00
pacer: Factor TokenDispenser into pacer from box remote
This commit is contained in:
parent
a56d51c594
commit
5d911e9450
41
box/box.go
41
box/box.go
@ -85,14 +85,14 @@ func init() {
|
|||||||
|
|
||||||
// Fs represents a remote box
|
// Fs represents a remote box
|
||||||
type Fs struct {
|
type Fs struct {
|
||||||
name string // name of this remote
|
name string // name of this remote
|
||||||
root string // the path we are working on
|
root string // the path we are working on
|
||||||
features *fs.Features // optional features
|
features *fs.Features // optional features
|
||||||
srv *rest.Client // the connection to the one drive server
|
srv *rest.Client // the connection to the one drive server
|
||||||
dirCache *dircache.DirCache // Map of directory path to directory id
|
dirCache *dircache.DirCache // Map of directory path to directory id
|
||||||
pacer *pacer.Pacer // pacer for API calls
|
pacer *pacer.Pacer // pacer for API calls
|
||||||
tokenRenewer *oauthutil.Renew // renew the token on expiry
|
tokenRenewer *oauthutil.Renew // renew the token on expiry
|
||||||
uploadTokens chan struct{} // control concurrency of multipart uploads
|
uploadToken *pacer.TokenDispenser // control concurrency
|
||||||
}
|
}
|
||||||
|
|
||||||
// Object describes a box object
|
// Object describes a box object
|
||||||
@ -238,19 +238,15 @@ func NewFs(name, root string) (fs.Fs, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
f := &Fs{
|
f := &Fs{
|
||||||
name: name,
|
name: name,
|
||||||
root: root,
|
root: root,
|
||||||
srv: rest.NewClient(oAuthClient).SetRoot(rootURL),
|
srv: rest.NewClient(oAuthClient).SetRoot(rootURL),
|
||||||
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
|
pacer: pacer.New().SetMinSleep(minSleep).SetMaxSleep(maxSleep).SetDecayConstant(decayConstant),
|
||||||
uploadTokens: make(chan struct{}, fs.Config.Transfers),
|
uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers),
|
||||||
}
|
}
|
||||||
f.features = (&fs.Features{CaseInsensitive: true}).Fill(f)
|
f.features = (&fs.Features{CaseInsensitive: true}).Fill(f)
|
||||||
f.srv.SetErrorHandler(errorHandler)
|
f.srv.SetErrorHandler(errorHandler)
|
||||||
|
|
||||||
// Fill up the upload tokens
|
|
||||||
for i := 0; i < fs.Config.Transfers; i++ {
|
|
||||||
f.uploadTokens <- struct{}{}
|
|
||||||
}
|
|
||||||
// Renew the token in the background
|
// Renew the token in the background
|
||||||
f.tokenRenewer = oauthutil.NewRenew(f.String(), ts, func() error {
|
f.tokenRenewer = oauthutil.NewRenew(f.String(), ts, func() error {
|
||||||
_, err := f.readMetaDataForPath("")
|
_, err := f.readMetaDataForPath("")
|
||||||
@ -296,17 +292,6 @@ func (f *Fs) rootSlash() string {
|
|||||||
return f.root + "/"
|
return f.root + "/"
|
||||||
}
|
}
|
||||||
|
|
||||||
// getUploadToken gets a token from the upload pool.
|
|
||||||
func (f *Fs) getUploadToken() {
|
|
||||||
<-f.uploadTokens
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// putUploadToken returns a token to the pool
|
|
||||||
func (f *Fs) putUploadToken() {
|
|
||||||
f.uploadTokens <- struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return an Object from a path
|
// Return an Object from a path
|
||||||
//
|
//
|
||||||
// If it can't be found it returns the error fs.ErrorObjectNotFound.
|
// If it can't be found it returns the error fs.ErrorObjectNotFound.
|
||||||
|
@ -214,10 +214,10 @@ outer:
|
|||||||
|
|
||||||
// Transfer the chunk
|
// Transfer the chunk
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
o.fs.uploadToken.Get()
|
||||||
go func(part int, position int64) {
|
go func(part int, position int64) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
o.fs.getUploadToken()
|
defer o.fs.uploadToken.Put()
|
||||||
defer o.fs.putUploadToken()
|
|
||||||
fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %v", part+1, session.TotalParts, fs.SizeSuffix(position), fs.SizeSuffix(size), fs.SizeSuffix(chunkSize))
|
fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %v", part+1, session.TotalParts, fs.SizeSuffix(position), fs.SizeSuffix(size), fs.SizeSuffix(chunkSize))
|
||||||
partResponse, err := o.uploadPart(session.ID, position, size, buf)
|
partResponse, err := o.uploadPart(session.ID, position, size, buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
31
pacer/tokens.go
Normal file
31
pacer/tokens.go
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
// Tokens for controlling concurrency
|
||||||
|
|
||||||
|
package pacer
|
||||||
|
|
||||||
|
// TokenDispenser is for controlling concurrency
|
||||||
|
type TokenDispenser struct {
|
||||||
|
tokens chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewTokenDispenser makes a pool of n tokens
|
||||||
|
func NewTokenDispenser(n int) *TokenDispenser {
|
||||||
|
td := &TokenDispenser{
|
||||||
|
tokens: make(chan struct{}, n),
|
||||||
|
}
|
||||||
|
// Fill up the upload tokens
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
td.tokens <- struct{}{}
|
||||||
|
}
|
||||||
|
return td
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get gets a token from the pool - don't forget to return it with Put
|
||||||
|
func (td *TokenDispenser) Get() {
|
||||||
|
<-td.tokens
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put returns a token
|
||||||
|
func (td *TokenDispenser) Put() {
|
||||||
|
td.tokens <- struct{}{}
|
||||||
|
}
|
16
pacer/tokens_test.go
Normal file
16
pacer/tokens_test.go
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
package pacer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestTokenDispenser(t *testing.T) {
|
||||||
|
td := NewTokenDispenser(5)
|
||||||
|
assert.Equal(t, 5, len(td.tokens))
|
||||||
|
td.Get()
|
||||||
|
assert.Equal(t, 4, len(td.tokens))
|
||||||
|
td.Put()
|
||||||
|
assert.Equal(t, 5, len(td.tokens))
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user