1
mirror of https://github.com/rclone/rclone synced 2024-10-07 15:17:14 +02:00

ftp: add --ftp-concurrency to limit maximum number of connections

Fixes #2166
This commit is contained in:
Nick Craig-Wood 2019-02-13 21:53:43 +00:00
parent 2b05bd9a08
commit 3bfde5f52a

View File

@ -15,6 +15,7 @@ import (
"github.com/ncw/rclone/fs/config/configstruct" "github.com/ncw/rclone/fs/config/configstruct"
"github.com/ncw/rclone/fs/config/obscure" "github.com/ncw/rclone/fs/config/obscure"
"github.com/ncw/rclone/fs/hash" "github.com/ncw/rclone/fs/hash"
"github.com/ncw/rclone/lib/pacer"
"github.com/ncw/rclone/lib/readers" "github.com/ncw/rclone/lib/readers"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -45,6 +46,11 @@ func init() {
Help: "FTP password", Help: "FTP password",
IsPassword: true, IsPassword: true,
Required: true, Required: true,
}, {
Name: "concurrency",
Help: "Maximum number of FTP simultaneous connections, 0 for unlimited",
Default: 0,
Advanced: true,
}, },
}, },
}) })
@ -52,10 +58,11 @@ func init() {
// Options defines the configuration for this backend // Options defines the configuration for this backend
type Options struct { type Options struct {
Host string `config:"host"` Host string `config:"host"`
User string `config:"user"` User string `config:"user"`
Pass string `config:"pass"` Pass string `config:"pass"`
Port string `config:"port"` Port string `config:"port"`
Concurrency int `config:"concurrency"`
} }
// Fs represents a remote FTP server // Fs represents a remote FTP server
@ -70,6 +77,7 @@ type Fs struct {
dialAddr string dialAddr string
poolMu sync.Mutex poolMu sync.Mutex
pool []*ftp.ServerConn pool []*ftp.ServerConn
tokens *pacer.TokenDispenser
} }
// Object describes an FTP file // Object describes an FTP file
@ -128,6 +136,9 @@ func (f *Fs) ftpConnection() (*ftp.ServerConn, error) {
// Get an FTP connection from the pool, or open a new one // Get an FTP connection from the pool, or open a new one
func (f *Fs) getFtpConnection() (c *ftp.ServerConn, err error) { func (f *Fs) getFtpConnection() (c *ftp.ServerConn, err error) {
if f.opt.Concurrency > 0 {
f.tokens.Get()
}
f.poolMu.Lock() f.poolMu.Lock()
if len(f.pool) > 0 { if len(f.pool) > 0 {
c = f.pool[0] c = f.pool[0]
@ -147,6 +158,9 @@ func (f *Fs) getFtpConnection() (c *ftp.ServerConn, err error) {
// if err is not nil then it checks the connection is alive using a // if err is not nil then it checks the connection is alive using a
// NOOP request // NOOP request
func (f *Fs) putFtpConnection(pc **ftp.ServerConn, err error) { func (f *Fs) putFtpConnection(pc **ftp.ServerConn, err error) {
if f.opt.Concurrency > 0 {
defer f.tokens.Put()
}
c := *pc c := *pc
*pc = nil *pc = nil
if err != nil { if err != nil {
@ -198,6 +212,7 @@ func NewFs(name, root string, m configmap.Mapper) (ff fs.Fs, err error) {
user: user, user: user,
pass: pass, pass: pass,
dialAddr: dialAddr, dialAddr: dialAddr,
tokens: pacer.NewTokenDispenser(opt.Concurrency),
} }
f.features = (&fs.Features{ f.features = (&fs.Features{
CanHaveEmptyDirectories: true, CanHaveEmptyDirectories: true,