mirror of
https://github.com/rclone/rclone
synced 2024-12-22 13:03:02 +01:00
seafile: send large upload by chunks
This commit is contained in:
parent
e111ffba9e
commit
7a6f882750
@ -27,6 +27,7 @@ type AccountInfo struct {
|
||||
// ServerInfo contains server information
|
||||
type ServerInfo struct {
|
||||
Version string `json:"version"`
|
||||
Features []string `json:"features"`
|
||||
}
|
||||
|
||||
// DefaultLibrary when none specified
|
||||
@ -152,3 +153,13 @@ type BatchSourceDestRequest struct {
|
||||
DstLibraryID string `json:"dst_repo_id"`
|
||||
DstParentDir string `json:"dst_parent_dir"`
|
||||
}
|
||||
|
||||
// FileUploadedBytes contains the JSON response to the "file-uploaded-bytes" API call
|
||||
type FileUploadedBytes struct {
|
||||
FileUploadedBytes int64 `json:"uploadedBytes"`
|
||||
}
|
||||
|
||||
// ChunkUpload contains the result of uploading one part of a file
|
||||
type ChunkUpload struct {
|
||||
Success bool `json:"success"`
|
||||
}
|
||||
|
71
backend/seafile/content_range.go
Normal file
71
backend/seafile/content_range.go
Normal file
@ -0,0 +1,71 @@
|
||||
package seafile
|
||||
|
||||
import "fmt"
|
||||
|
||||
type contentRanger interface {
|
||||
getChunkSize() int64
|
||||
getContentRangeHeader() string
|
||||
}
|
||||
|
||||
type streamedContentRange struct {
|
||||
size int64
|
||||
}
|
||||
|
||||
func newStreamedContentRange(size int64) *streamedContentRange {
|
||||
return &streamedContentRange{
|
||||
size: size,
|
||||
}
|
||||
}
|
||||
func (r *streamedContentRange) getChunkSize() int64 { return r.size }
|
||||
func (r *streamedContentRange) getContentRangeHeader() string { return "" }
|
||||
|
||||
type chunkedContentRange struct {
|
||||
start int64
|
||||
chunkSize int64
|
||||
size int64
|
||||
}
|
||||
|
||||
// newChunkedContentRange does not support streaming (unknown size)
|
||||
func newChunkedContentRange(chunkSize, size int64) *chunkedContentRange {
|
||||
if size <= 0 {
|
||||
panic("content range cannot operate on streaming")
|
||||
}
|
||||
if chunkSize <= 0 {
|
||||
panic("content range cannot operate without a chunk size")
|
||||
}
|
||||
return &chunkedContentRange{
|
||||
start: 0,
|
||||
chunkSize: chunkSize,
|
||||
size: size,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *chunkedContentRange) getEnd() int64 {
|
||||
end := r.chunkSize + r.start
|
||||
if end > r.size {
|
||||
end = r.size
|
||||
}
|
||||
return end
|
||||
}
|
||||
|
||||
func (r *chunkedContentRange) getChunkSize() int64 {
|
||||
return r.getEnd() - r.start
|
||||
}
|
||||
|
||||
// next moves the range to the next frame
|
||||
// it panics if it was the last chunk
|
||||
func (r *chunkedContentRange) next() {
|
||||
r.start += r.chunkSize
|
||||
if r.start >= r.size {
|
||||
panic("no more chunk of data")
|
||||
}
|
||||
}
|
||||
|
||||
func (r *chunkedContentRange) isLastChunk() bool {
|
||||
return r.getEnd() == r.size
|
||||
}
|
||||
|
||||
func (r *chunkedContentRange) getContentRangeHeader() string {
|
||||
end := r.getEnd()
|
||||
return fmt.Sprintf("bytes %d-%d/%d", r.start, end-1, r.size)
|
||||
}
|
86
backend/seafile/content_range_test.go
Normal file
86
backend/seafile/content_range_test.go
Normal file
@ -0,0 +1,86 @@
|
||||
package seafile
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestContentRangeHeader(t *testing.T) {
|
||||
fixtures := []struct {
|
||||
start, chunkSize, size int64
|
||||
expect string
|
||||
}{
|
||||
{0, 1, 10, "bytes 0-0/10"}, // from byte 0 (inclusive) to byte 0 (inclusive) == 1 byte
|
||||
{0, 10, 10, "bytes 0-9/10"},
|
||||
{0, 20, 10, "bytes 0-9/10"},
|
||||
{1, 1, 10, "bytes 1-1/10"},
|
||||
{1, 10, 10, "bytes 1-9/10"},
|
||||
{1, 10, 10, "bytes 1-9/10"},
|
||||
{9, 1, 10, "bytes 9-9/10"},
|
||||
{9, 2, 10, "bytes 9-9/10"},
|
||||
{9, 5, 10, "bytes 9-9/10"},
|
||||
}
|
||||
|
||||
for _, fixture := range fixtures {
|
||||
t.Run(fmt.Sprintf("%+v", fixture), func(t *testing.T) {
|
||||
r := &chunkedContentRange{start: fixture.start, chunkSize: fixture.chunkSize, size: fixture.size}
|
||||
assert.Equal(t, fixture.expect, r.getContentRangeHeader())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestChunkSize(t *testing.T) {
|
||||
fixtures := []struct {
|
||||
start, chunkSize, size int64
|
||||
expected int64
|
||||
isLastChunk bool
|
||||
}{
|
||||
{0, 10, 10, 10, true}, // chunk size same as size
|
||||
{0, 20, 10, 10, true}, // chuck size bigger than size
|
||||
{0, 10, 20, 10, false}, // chuck size smaller than size
|
||||
{1, 10, 10, 9, true}, // chunk size same as size
|
||||
{1, 20, 10, 9, true}, // chuck size bigger than size
|
||||
{1, 10, 20, 10, false}, // chuck size smaller than size
|
||||
{15, 10, 20, 5, true}, // smaller remaining
|
||||
}
|
||||
|
||||
for _, fixture := range fixtures {
|
||||
t.Run(fmt.Sprintf("%d/%d/%d", fixture.start, fixture.chunkSize, fixture.size), func(t *testing.T) {
|
||||
r := &chunkedContentRange{start: fixture.start, chunkSize: fixture.chunkSize, size: fixture.size}
|
||||
assert.Equal(t, fixture.expected, r.getChunkSize())
|
||||
assert.Equal(t, fixture.isLastChunk, r.isLastChunk())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRanges(t *testing.T) {
|
||||
fixtures := []struct {
|
||||
size int64
|
||||
chunkSize int64
|
||||
expectedChunks int
|
||||
}{
|
||||
{10, 1, 10},
|
||||
{20, 2, 10},
|
||||
{10, 10, 1},
|
||||
{10, 3, 4},
|
||||
}
|
||||
for _, fixture := range fixtures {
|
||||
t.Run(fmt.Sprintf("%d/%d", fixture.size, fixture.chunkSize), func(t *testing.T) {
|
||||
r := newChunkedContentRange(fixture.chunkSize, fixture.size)
|
||||
// first chunk is counted before the loop
|
||||
count := 1
|
||||
size := r.getChunkSize()
|
||||
|
||||
for !r.isLastChunk() {
|
||||
r.next()
|
||||
count++
|
||||
size += r.getChunkSize()
|
||||
}
|
||||
assert.Panics(t, func() { r.next() })
|
||||
assert.Equal(t, fixture.expectedChunks, count)
|
||||
assert.Equal(t, fixture.size, size)
|
||||
})
|
||||
}
|
||||
}
|
@ -6,6 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/chunksize"
|
||||
"github.com/rclone/rclone/fs/hash"
|
||||
)
|
||||
|
||||
@ -89,19 +90,24 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadClo
|
||||
// But for unknown-sized objects (indicated by src.Size() == -1), Upload should either
|
||||
// return an error or update the object properly (rather than e.g. calling panic).
|
||||
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
||||
// The upload sometimes return a temporary 500 error
|
||||
// We cannot use the pacer to retry uploading the file as the upload link is single use only
|
||||
for retry := 0; retry <= 3; retry++ {
|
||||
size := src.Size()
|
||||
if size <= int64(o.fs.opt.UploadCutoff) || o.fs.noChunkUpload {
|
||||
// upload whole file in 1 request
|
||||
return o.upload(ctx, in, src)
|
||||
}
|
||||
// upload in parts
|
||||
chunkSize := chunksize.Calculator(o, size, maxParts, o.fs.opt.ChunkSize)
|
||||
return o.uploadLargeFile(ctx, in, src, chunkSize)
|
||||
}
|
||||
|
||||
// upload whole file in 1 request
|
||||
func (o *Object) upload(ctx context.Context, in io.Reader, src fs.ObjectInfo) error {
|
||||
uploadLink, err := o.fs.getUploadLink(ctx, o.libraryID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
uploaded, err := o.fs.upload(ctx, in, uploadLink, o.pathInLibrary)
|
||||
if err == ErrorInternalDuringUpload {
|
||||
// This is a temporary error, try again with a new upload link
|
||||
continue
|
||||
}
|
||||
uploaded, err := o.fs.upload(ctx, in, uploadLink, o.pathInLibrary, src.Size())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -110,8 +116,37 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||
o.id = uploaded.ID
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *Object) uploadLargeFile(ctx context.Context, in io.Reader, src fs.ObjectInfo, chunkSize fs.SizeSuffix) error {
|
||||
uploadLink, err := o.fs.getUploadLink(ctx, o.libraryID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return ErrorInternalDuringUpload
|
||||
size := src.Size()
|
||||
contentRange := newChunkedContentRange(int64(chunkSize), size)
|
||||
for {
|
||||
fs.Debugf(nil, "uploading chunk %s", contentRange.getContentRangeHeader())
|
||||
err = o.fs.uploadChunk(ctx, in, uploadLink, o.pathInLibrary, contentRange)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
contentRange.next()
|
||||
// the last part is a slightly different API call
|
||||
if contentRange.isLastChunk() {
|
||||
break
|
||||
}
|
||||
}
|
||||
fs.Debugf(nil, "uploading last chunk %s", contentRange.getContentRangeHeader())
|
||||
uploaded, err := o.fs.uploadLastChunk(ctx, in, uploadLink, o.pathInLibrary, contentRange)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Set the properties from the upload back to the object
|
||||
o.size = uploaded.Size
|
||||
o.id = uploaded.ID
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remove this object
|
||||
|
@ -12,7 +12,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
minSleep = 100 * time.Millisecond
|
||||
defaultMinSleep = 100 * time.Millisecond
|
||||
maxSleep = 10 * time.Second
|
||||
decayConstant = 2 // bigger for slower decay, exponential
|
||||
)
|
||||
@ -28,7 +28,7 @@ func init() {
|
||||
}
|
||||
|
||||
// getPacer returns the unique pacer for that remote URL
|
||||
func getPacer(ctx context.Context, remote string) *fs.Pacer {
|
||||
func getPacer(ctx context.Context, remote string, minPacer int) *fs.Pacer {
|
||||
pacerMutex.Lock()
|
||||
defer pacerMutex.Unlock()
|
||||
|
||||
@ -37,6 +37,10 @@ func getPacer(ctx context.Context, remote string) *fs.Pacer {
|
||||
return existing
|
||||
}
|
||||
|
||||
minSleep := time.Duration(minPacer) * time.Millisecond
|
||||
if minSleep == 0 {
|
||||
minSleep = defaultMinSleep
|
||||
}
|
||||
pacers[remote] = fs.NewPacer(
|
||||
ctx,
|
||||
pacer.NewDefault(
|
||||
|
@ -28,6 +28,7 @@ import (
|
||||
"github.com/rclone/rclone/lib/cache"
|
||||
"github.com/rclone/rclone/lib/encoder"
|
||||
"github.com/rclone/rclone/lib/pacer"
|
||||
"github.com/rclone/rclone/lib/pool"
|
||||
"github.com/rclone/rclone/lib/random"
|
||||
"github.com/rclone/rclone/lib/rest"
|
||||
)
|
||||
@ -43,6 +44,12 @@ const (
|
||||
configLibraryKey = "library_key"
|
||||
configCreateLibrary = "create_library"
|
||||
configAuthToken = "auth_token"
|
||||
minChunkSize = 1 * fs.Mebi
|
||||
defaultChunkSize = 127 * fs.Mebi
|
||||
maxChunkSize = 511 * fs.Mebi
|
||||
defaultUploadCutoff = 255 * fs.Mebi
|
||||
maxParts = 100000
|
||||
minPacer = 100
|
||||
)
|
||||
|
||||
// This is global to all instances of fs
|
||||
@ -59,7 +66,8 @@ func init() {
|
||||
Description: "seafile",
|
||||
NewFs: NewFs,
|
||||
Config: Config,
|
||||
Options: []fs.Option{{
|
||||
Options: []fs.Option{
|
||||
{
|
||||
Name: configURL,
|
||||
Help: "URL of seafile host to connect to.",
|
||||
Required: true,
|
||||
@ -112,7 +120,38 @@ func init() {
|
||||
encoder.EncodeBackSlash |
|
||||
encoder.EncodeDoubleQuote |
|
||||
encoder.EncodeInvalidUtf8),
|
||||
}},
|
||||
}, {
|
||||
Name: "upload_cutoff",
|
||||
Help: `Cutoff for switching to chunked upload.
|
||||
|
||||
Files above this size will be uploaded in chunks of "--seafile-chunk-size".`,
|
||||
Default: defaultUploadCutoff,
|
||||
Advanced: true,
|
||||
}, {
|
||||
Name: "chunk_size",
|
||||
Help: `Upload chunk size.
|
||||
|
||||
When uploading large files, chunk the file into this size.
|
||||
|
||||
Must fit in memory. These chunks are buffered in memory and there
|
||||
might a maximum of "--transfers" chunks in progress at once.
|
||||
|
||||
1 MB is the minimum size.`,
|
||||
Default: defaultChunkSize,
|
||||
Advanced: true,
|
||||
}, {
|
||||
Name: "min_pacer",
|
||||
Help: `Minimum time between requests (in milliseconds).
|
||||
|
||||
Seafile API is rate limited. The default value is to wait at least 100ms between requests.
|
||||
|
||||
You can try to tweak the default value but you might trigger the rate limit which will make the request rate slower.
|
||||
|
||||
The minimum value is 1ms (0 will default to 100ms)`,
|
||||
Default: minPacer,
|
||||
Advanced: true,
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
@ -127,6 +166,9 @@ type Options struct {
|
||||
LibraryKey string `config:"library_key"`
|
||||
CreateLibrary bool `config:"create_library"`
|
||||
Enc encoder.MultiEncoder `config:"encoding"`
|
||||
UploadCutoff fs.SizeSuffix `config:"upload_cutoff"`
|
||||
ChunkSize fs.SizeSuffix `config:"chunk_size"`
|
||||
MinPacer int `config:"min_pacer"`
|
||||
}
|
||||
|
||||
// Fs represents a remote seafile
|
||||
@ -136,6 +178,7 @@ type Fs struct {
|
||||
libraryName string // current library
|
||||
encrypted bool // Is this an encrypted library
|
||||
rootDirectory string // directory part of root (if any)
|
||||
ci *fs.ConfigInfo // global options
|
||||
opt Options // parsed options
|
||||
libraries *cache.Cache // Keep a cache of libraries
|
||||
librariesMutex sync.Mutex // Mutex to protect getLibraryID
|
||||
@ -148,7 +191,9 @@ type Fs struct {
|
||||
createDirMutex sync.Mutex // Protect creation of directories
|
||||
useOldDirectoryAPI bool // Use the old API v2 if seafile < 7
|
||||
moveDirNotAvailable bool // Version < 7.0 don't have an API to move a directory
|
||||
noChunkUpload bool // Version < 7.0 don't have support for chunk upload
|
||||
renew *Renew // Renew an encrypted library token
|
||||
pool *pool.Pool // memory pool
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------
|
||||
@ -195,17 +240,26 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ci := fs.GetConfig(ctx)
|
||||
|
||||
f := &Fs{
|
||||
name: name,
|
||||
root: root,
|
||||
libraryName: libraryName,
|
||||
rootDirectory: rootDirectory,
|
||||
libraries: cache.New(),
|
||||
ci: ci,
|
||||
opt: *opt,
|
||||
endpoint: u,
|
||||
endpointURL: u.String(),
|
||||
srv: rest.NewClient(fshttp.NewClient(ctx)).SetRoot(u.String()),
|
||||
pacer: getPacer(ctx, opt.URL),
|
||||
pacer: getPacer(ctx, opt.URL, opt.MinPacer),
|
||||
pool: pool.New(
|
||||
time.Minute,
|
||||
int(opt.ChunkSize),
|
||||
ci.Transfers,
|
||||
ci.UseMmap,
|
||||
),
|
||||
}
|
||||
f.features = (&fs.Features{
|
||||
CanHaveEmptyDirectories: true,
|
||||
@ -216,7 +270,14 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fs.Debugf(nil, "Seafile server version %s", serverInfo.Version)
|
||||
edition := "Community"
|
||||
for _, feature := range serverInfo.Features {
|
||||
if feature == "seafile-pro" {
|
||||
edition = "Professional"
|
||||
break
|
||||
}
|
||||
}
|
||||
fs.Debugf(nil, "Seafile server version %s %s Edition", serverInfo.Version, edition)
|
||||
|
||||
// We don't support lower than seafile v6.0 (version 6.0 is already more than 3 years old)
|
||||
serverVersion := semver.New(serverInfo.Version)
|
||||
@ -227,8 +288,10 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
||||
// Seafile 6 does not support recursive listing
|
||||
f.useOldDirectoryAPI = true
|
||||
f.features.ListR = nil
|
||||
// It also does no support moving directories
|
||||
// It also does not support moving directories
|
||||
f.moveDirNotAvailable = true
|
||||
// no chunk upload either
|
||||
f.noChunkUpload = true
|
||||
}
|
||||
|
||||
// Take the authentication token from the configuration first
|
||||
@ -1343,6 +1406,57 @@ func (f *Fs) newObject(ctx context.Context, remote string, size int64, modTime t
|
||||
return object
|
||||
}
|
||||
|
||||
func checkUploadChunkSize(cs fs.SizeSuffix) error {
|
||||
if cs < minChunkSize {
|
||||
return fmt.Errorf("%s is less than %s", cs, minChunkSize)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
|
||||
err = checkUploadChunkSize(cs)
|
||||
if err == nil {
|
||||
old, f.opt.ChunkSize = f.opt.ChunkSize, cs
|
||||
// this method is only called before starting an upload
|
||||
// so it should be safe to adjust the memory pool to the new chunk size
|
||||
f.pool.Flush()
|
||||
f.pool = pool.New(
|
||||
time.Minute,
|
||||
int(f.opt.ChunkSize),
|
||||
f.ci.Transfers,
|
||||
f.ci.UseMmap,
|
||||
)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func checkUploadCutoff(opt *Options, cs fs.SizeSuffix) error {
|
||||
if cs < opt.ChunkSize {
|
||||
return fmt.Errorf("%v is less than chunk size %v", cs, opt.ChunkSize)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
|
||||
err = checkUploadCutoff(&f.opt, cs)
|
||||
if err == nil {
|
||||
old, f.opt.UploadCutoff = f.opt.UploadCutoff, cs
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (f *Fs) getBuf(size int) []byte {
|
||||
buf := f.pool.Get()
|
||||
if size > 0 && len(buf) > size {
|
||||
buf = buf[:size]
|
||||
}
|
||||
return buf
|
||||
}
|
||||
|
||||
func (f *Fs) putBuf(buf []byte) {
|
||||
f.pool.Put(buf)
|
||||
}
|
||||
|
||||
// Check the interfaces are satisfied
|
||||
var (
|
||||
_ fs.Fs = &Fs{}
|
||||
|
307
backend/seafile/seafile_mock_server_test.go
Normal file
307
backend/seafile/seafile_mock_server_test.go
Normal file
@ -0,0 +1,307 @@
|
||||
package seafile
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"hash"
|
||||
"io"
|
||||
"mime"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/config/configmap"
|
||||
"github.com/rclone/rclone/fs/object"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var (
|
||||
smallContent = []byte("01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
|
||||
)
|
||||
|
||||
func getBasicHandler(t *testing.T, libraryID, libraryName string) *http.ServeMux {
|
||||
t.Helper()
|
||||
|
||||
handler := http.NewServeMux()
|
||||
handler.HandleFunc("/api2/server-info/", func(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = w.Write([]byte(`{"version":"9.0.10"}`))
|
||||
})
|
||||
handler.HandleFunc("/api2/auth-token/", func(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = w.Write([]byte(`{"token":"test_token"}`))
|
||||
})
|
||||
handler.HandleFunc("/api2/repos/", func(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = w.Write([]byte(fmt.Sprintf(`[{"encrypted":false,"id":"%s","size":10,"name":"%s"}]`, libraryID, libraryName)))
|
||||
})
|
||||
handler.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||||
t.Logf("unhandled call to %q", r.URL.String())
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
_, _ = w.Write([]byte("Not found: " + r.URL.String()))
|
||||
})
|
||||
return handler
|
||||
}
|
||||
|
||||
func TestNewFsWithMockServer(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
handler := getBasicHandler(t, "library_id", "My Library")
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
|
||||
options := configmap.Simple{
|
||||
"url": server.URL,
|
||||
"library": "My Library",
|
||||
}
|
||||
fs, err := NewFs(context.Background(), "TestSeafile", "", options)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, fs)
|
||||
}
|
||||
|
||||
func TestUploadWholeFileWithErrorNoRetry(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
const filename = "new file.txt"
|
||||
|
||||
handler := getBasicHandler(t, "library_id", "My Library")
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
|
||||
// call to retrieve an upload slot
|
||||
handler.HandleFunc("/api2/repos/library_id/upload-link/", func(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = w.Write([]byte(fmt.Sprintf(`"%s/upload-api/temp_upload"`, server.URL)))
|
||||
})
|
||||
// upload will fail
|
||||
handler.HandleFunc("/upload-api/temp_upload", func(w http.ResponseWriter, r *http.Request) {
|
||||
defer func() { _ = r.Body.Close() }()
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
})
|
||||
|
||||
options := configmap.Simple{
|
||||
"url": server.URL,
|
||||
"library": "My Library",
|
||||
"upload_cutoff": defaultUploadCutoff.String(),
|
||||
"chunk_size": defaultChunkSize.String(),
|
||||
}
|
||||
fs, err := NewFs(context.Background(), "TestSeafile", "", options)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, fs)
|
||||
|
||||
src := object.NewStaticObjectInfo(filename, time.Now(), int64(len(smallContent)), true, nil, nil)
|
||||
// call should fail
|
||||
in := bytes.NewReader(smallContent)
|
||||
_, err = fs.Put(context.Background(), in, src)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func TestUploadWholeFile(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
const filename = "new file.txt"
|
||||
const parallelUploadCount = 3
|
||||
|
||||
handler := getBasicHandler(t, "library_id", "My Library")
|
||||
server := httptest.NewServer(handler)
|
||||
t.Cleanup(server.Close)
|
||||
|
||||
// call to retrieve an upload slot
|
||||
handler.HandleFunc("/api2/repos/library_id/upload-link/", func(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = w.Write([]byte(fmt.Sprintf(`"%s/upload-api/temp_upload"`, server.URL)))
|
||||
})
|
||||
handler.HandleFunc("/upload-api/temp_upload", func(w http.ResponseWriter, r *http.Request) {
|
||||
defer func() { _ = r.Body.Close() }()
|
||||
|
||||
mediaType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "multipart/form-data", mediaType)
|
||||
mr := multipart.NewReader(r.Body, params["boundary"])
|
||||
for {
|
||||
p, err := mr.NextPart()
|
||||
if err == io.EOF {
|
||||
return
|
||||
}
|
||||
assert.NoError(t, err)
|
||||
if p.FileName() == filename {
|
||||
body, err := io.ReadAll(p)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, smallContent, body)
|
||||
|
||||
// sends response now
|
||||
_, _ = w.Write([]byte(fmt.Sprintf(`[{"name":"%s","size":%d}]`, filename, len(body))))
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
for i := 0; i < parallelUploadCount; i++ {
|
||||
t.Run(fmt.Sprintf("parallel upload file %d", i), func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
uploadFileContent(t, filename, server.URL, smallContent)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestUploadFileByChunksWithRetryOnError(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
const filename = "new file.txt"
|
||||
const parallelUploadCount = 3
|
||||
var chunkSize fs.SizeSuffix = 1048576
|
||||
var currentUploadID atomic.Int32
|
||||
chunkCount := make([]atomic.Int32, parallelUploadCount)
|
||||
bytesReceived := make([]atomic.Int32, parallelUploadCount)
|
||||
hashes := make([]hash.Hash, parallelUploadCount)
|
||||
for i := 0; i < parallelUploadCount; i++ {
|
||||
hashes[i] = sha256.New()
|
||||
}
|
||||
|
||||
handler := getBasicHandler(t, "library_id", "My Library")
|
||||
server := httptest.NewServer(handler)
|
||||
t.Cleanup(server.Close)
|
||||
|
||||
// call to retrieve an upload slot
|
||||
handler.HandleFunc("/api2/repos/library_id/upload-link/", func(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = w.Write([]byte(fmt.Sprintf(`"%s/upload-api/temp_upload/%d"`, server.URL, currentUploadID.Load())))
|
||||
currentUploadID.Add(1)
|
||||
})
|
||||
|
||||
// call to upload chunks
|
||||
handler.HandleFunc("/upload-api/temp_upload/", func(w http.ResponseWriter, r *http.Request) {
|
||||
defer func() { _ = r.Body.Close() }()
|
||||
|
||||
// quick hack to get the file ID from the URL
|
||||
rawFileID := strings.TrimPrefix(r.URL.String(), "/upload-api/temp_upload/")
|
||||
rawFileID = strings.TrimSuffix(rawFileID, "?ret-json=1")
|
||||
fileID, err := strconv.Atoi(rawFileID)
|
||||
require.NoError(t, err)
|
||||
|
||||
currentChunk := chunkCount[fileID].Add(1)
|
||||
if currentChunk == 2 {
|
||||
// simulate an error on the second chunk
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
partLen := 0
|
||||
var totalBytesReceived int32
|
||||
// read all the data
|
||||
mediaType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "multipart/form-data", mediaType)
|
||||
mr := multipart.NewReader(r.Body, params["boundary"])
|
||||
for {
|
||||
p, err := mr.NextPart()
|
||||
if err == io.EOF {
|
||||
return
|
||||
}
|
||||
assert.NoError(t, err)
|
||||
if p.FileName() == filename {
|
||||
body, err := io.ReadAll(p)
|
||||
assert.NoError(t, err)
|
||||
partLen = len(body)
|
||||
totalBytesReceived = bytesReceived[fileID].Add(int32(partLen))
|
||||
hashes[fileID].Write(body)
|
||||
break
|
||||
}
|
||||
}
|
||||
t.Logf("file %d: received chunk %d = %d bytes", fileID, currentChunk, totalBytesReceived)
|
||||
|
||||
// check the content-range header
|
||||
contentRange := r.Header.Get("Content-Range")
|
||||
t.Logf("uploaded %s", contentRange)
|
||||
pattern := regexp.MustCompile(`bytes (\d+)-(\d+)\/(\d+)`)
|
||||
match := pattern.FindStringSubmatch(contentRange)
|
||||
if len(match) == 4 {
|
||||
start, err := strconv.Atoi(match[1])
|
||||
assert.NoError(t, err)
|
||||
end, err := strconv.Atoi(match[2])
|
||||
assert.NoError(t, err)
|
||||
size, err := strconv.Atoi(match[3])
|
||||
assert.NoError(t, err)
|
||||
|
||||
// make sure the chunk size is right
|
||||
assert.Equal(t, end-start+1, partLen)
|
||||
|
||||
if end+1 == size {
|
||||
// this was the last chunk
|
||||
_, _ = w.Write([]byte(fmt.Sprintf(`[{"name":"%s","id":"new_file_id","size":%d}]`, filename, totalBytesReceived)))
|
||||
t.Logf("file %d: uploaded hash = %x", fileID, hashes[fileID].Sum(nil))
|
||||
return
|
||||
}
|
||||
if end+1 > size {
|
||||
t.Fatalf("end %d is bigger than size %d", end, size)
|
||||
}
|
||||
}
|
||||
// keep going to the next chunk
|
||||
_, _ = w.Write([]byte(`{"success":true}`))
|
||||
})
|
||||
|
||||
for i := 0; i < parallelUploadCount; i++ {
|
||||
fileID := i // can remove this for go >= 1.22
|
||||
t.Run(fmt.Sprintf("parallel upload file %d", fileID), func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
dataHash := uploadBigFile(t, filename, server.URL, chunkSize)
|
||||
t.Logf("file %d: uploaded hash = %x", fileID, dataHash)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func uploadBigFile(t *testing.T, name, endpoint string, chunkSize fs.SizeSuffix) []byte {
|
||||
options := configmap.Simple{
|
||||
"url": endpoint,
|
||||
"library": "My Library",
|
||||
"upload_cutoff": chunkSize.String(),
|
||||
"chunk_size": chunkSize.String(),
|
||||
}
|
||||
fs, err := NewFs(context.Background(), "TestSeafile", "", options)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, fs)
|
||||
|
||||
// should allow for at least 2 chunks
|
||||
buffer := &bytes.Buffer{}
|
||||
iterations := int(chunkSize) * 2 / len(smallContent)
|
||||
for i := 0; i <= iterations; i++ {
|
||||
buffer.Write(smallContent)
|
||||
}
|
||||
|
||||
// calculate the sha256 hash while uploading
|
||||
sha256Hash := sha256.New()
|
||||
reader := io.TeeReader(buffer, sha256Hash)
|
||||
|
||||
size := int64(buffer.Len())
|
||||
src := object.NewStaticObjectInfo(name, time.Now(), size, true, nil, nil)
|
||||
|
||||
object, err := fs.Put(context.Background(), reader, src)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, object)
|
||||
assert.Equal(t, size, object.Size())
|
||||
|
||||
return sha256Hash.Sum(nil)
|
||||
}
|
||||
|
||||
func uploadFileContent(t *testing.T, name, endpoint string, content []byte) {
|
||||
options := configmap.Simple{
|
||||
"url": endpoint,
|
||||
"library": "My Library",
|
||||
"upload_cutoff": defaultUploadCutoff.String(),
|
||||
"chunk_size": defaultChunkSize.String(),
|
||||
}
|
||||
fs, err := NewFs(context.Background(), "TestSeafile", "", options)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, fs)
|
||||
|
||||
src := object.NewStaticObjectInfo(name, time.Now(), int64(len(content)), true, nil, nil)
|
||||
in := bytes.NewReader(content)
|
||||
object, err := fs.Put(context.Background(), in, src)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, object)
|
||||
assert.Equal(t, int64(len(content)), object.Size())
|
||||
}
|
162
backend/seafile/seafile_reverseproxy_test.go
Normal file
162
backend/seafile/seafile_reverseproxy_test.go
Normal file
@ -0,0 +1,162 @@
|
||||
//go:build go1.20
|
||||
|
||||
package seafile
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/config/configmap"
|
||||
"github.com/rclone/rclone/fs/object"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestNewFsWithProxiedServer(t *testing.T) {
|
||||
// creates a reverse proxy to a local instance of seafile
|
||||
host := "localhost:8088"
|
||||
target, _ := url.Parse("http://" + host)
|
||||
handler := &httputil.ReverseProxy{
|
||||
Rewrite: func(pr *httputil.ProxyRequest) {
|
||||
pr.SetURL(target)
|
||||
pr.Out.Host = host
|
||||
t.Logf("calling %s on %s", pr.Out.Method, pr.Out.URL.String())
|
||||
},
|
||||
ModifyResponse: func(r *http.Response) error {
|
||||
t.Logf("%s response: %s", r.Request.URL.String(), r.Status)
|
||||
return nil
|
||||
},
|
||||
}
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
|
||||
options := configmap.Simple{
|
||||
"url": server.URL,
|
||||
"library": "My Library",
|
||||
"user": "seafile@rclone.org",
|
||||
"pass": "GYdWLJQb55COZYnO9Zl0GcKc_SYDr0EMVcl6rnZVFxV8zoLPBjJ7NQ",
|
||||
"create_library": "true",
|
||||
}
|
||||
fs, err := NewFs(context.Background(), "TestSeafile", "", options)
|
||||
if err != nil && strings.Contains(err.Error(), "502 Bad Gateway") {
|
||||
t.Skip("cannot contact local seafile instance")
|
||||
}
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, fs)
|
||||
}
|
||||
|
||||
// this test is using a reverse proxy to simulate one broken chunk during an upload
|
||||
// a local instance of seafile needs to be started from the script "fstest/testserver/init.d/TestSeafile"
|
||||
func TestFailedChunkUploadWithProxiedServer(t *testing.T) {
|
||||
minimumChunks := 3
|
||||
var chunkSize fs.SizeSuffix = 1048576
|
||||
|
||||
// should allow for at least minimumChunks
|
||||
writer := &bytes.Buffer{}
|
||||
iterations := int(chunkSize) * minimumChunks / len(smallContent)
|
||||
for i := 0; i <= iterations; i++ {
|
||||
writer.Write(smallContent)
|
||||
}
|
||||
data := writer.Bytes()
|
||||
|
||||
// each test will fail one chunk from 0 to 3
|
||||
for failedChunk := 0; failedChunk < minimumChunks+1; failedChunk++ {
|
||||
t.Run(strconv.Itoa(failedChunk), func(t *testing.T) {
|
||||
chunkCount := 0
|
||||
var proxyURL []byte
|
||||
|
||||
// creates a reverse proxy to a local instance of seafile
|
||||
host := "127.0.0.1:8088"
|
||||
target, _ := url.Parse("http://" + host)
|
||||
handler := &httputil.ReverseProxy{
|
||||
Rewrite: func(pr *httputil.ProxyRequest) {
|
||||
pr.SetURL(target)
|
||||
pr.Out.Host = host
|
||||
pr.Out.Header.Del("Accept-Encoding") // we don't want to decompress and recompress the response
|
||||
if strings.Contains(pr.Out.URL.String(), "/upload-api/") {
|
||||
t.Logf("uploading chunk %s (%d)", pr.Out.Header.Get("Content-Range"), chunkCount)
|
||||
if chunkCount == failedChunk {
|
||||
t.Logf("this chunk should fail (%d)", chunkCount)
|
||||
// the length of the data won't match with the Content-Length header
|
||||
pr.Out.Body = io.NopCloser(io.LimitReader(pr.In.Body, 100))
|
||||
}
|
||||
chunkCount++
|
||||
}
|
||||
},
|
||||
ModifyResponse: func(r *http.Response) error {
|
||||
b, _ := io.ReadAll(r.Body)
|
||||
_ = r.Body.Close()
|
||||
|
||||
// replace the URLs with the reverse proxy
|
||||
b = bytes.ReplaceAll(b, []byte("http://"+host), proxyURL)
|
||||
buf := bytes.NewBuffer(b)
|
||||
r.Body = io.NopCloser(buf)
|
||||
r.Header.Set("Content-Length", strconv.Itoa(buf.Len()))
|
||||
return nil
|
||||
},
|
||||
ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) {
|
||||
if strings.Contains(err.Error(), "transport connection broken") {
|
||||
// we need to send a 500 error like the seafile server would do in case of a transmission error
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
t.Log(err)
|
||||
w.WriteHeader(http.StatusBadGateway)
|
||||
},
|
||||
}
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
proxyURL = []byte(server.URL)
|
||||
|
||||
options := configmap.Simple{
|
||||
"url": server.URL,
|
||||
"library": "My Library",
|
||||
"user": "seafile@rclone.org",
|
||||
"pass": "GYdWLJQb55COZYnO9Zl0GcKc_SYDr0EMVcl6rnZVFxV8zoLPBjJ7NQ",
|
||||
"create_library": "true",
|
||||
"upload_cutoff": chunkSize.String(),
|
||||
"chunk_size": chunkSize.String(),
|
||||
}
|
||||
fs, err := NewFs(context.Background(), "TestSeafile", "", options)
|
||||
if err != nil && strings.Contains(err.Error(), "502 Bad Gateway") {
|
||||
t.Skip("cannot contact local seafile instance")
|
||||
}
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, fs)
|
||||
|
||||
buffer := bytes.NewBuffer(data)
|
||||
|
||||
size := int64(buffer.Len())
|
||||
filename := fmt.Sprintf("new file %d.txt", failedChunk)
|
||||
src := object.NewStaticObjectInfo(filename, time.Now(), size, true, nil, nil)
|
||||
|
||||
object, err := fs.Put(context.Background(), buffer, src)
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, object)
|
||||
assert.Equal(t, size, object.Size())
|
||||
|
||||
// read the file back for comparison
|
||||
object, err = fs.NewObject(context.Background(), filename)
|
||||
assert.NoError(t, err)
|
||||
reader, err := object.Open(context.Background())
|
||||
assert.NoError(t, err)
|
||||
read, err := io.ReadAll(reader)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, data, read)
|
||||
|
||||
// clean up
|
||||
err = object.Remove(context.Background())
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
}
|
||||
}
|
@ -1,10 +1,10 @@
|
||||
// Test Seafile filesystem interface
|
||||
package seafile_test
|
||||
package seafile
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/rclone/rclone/backend/seafile"
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fstest/fstests"
|
||||
)
|
||||
|
||||
@ -12,6 +12,24 @@ import (
|
||||
func TestIntegration(t *testing.T) {
|
||||
fstests.Run(t, &fstests.Opt{
|
||||
RemoteName: "TestSeafile:",
|
||||
NilObject: (*seafile.Object)(nil),
|
||||
NilObject: (*Object)(nil),
|
||||
ChunkedUpload: fstests.ChunkedUploadConfig{
|
||||
MinChunkSize: minChunkSize,
|
||||
MaxChunkSize: maxChunkSize,
|
||||
NeedMultipleChunks: true,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
|
||||
return f.setUploadChunkSize(cs)
|
||||
}
|
||||
|
||||
func (f *Fs) SetUploadCutoff(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
|
||||
return f.setUploadCutoff(cs)
|
||||
}
|
||||
|
||||
var (
|
||||
_ fstests.SetUploadChunkSizer = (*Fs)(nil)
|
||||
_ fstests.SetUploadCutoffer = (*Fs)(nil)
|
||||
)
|
||||
|
@ -31,14 +31,28 @@ var (
|
||||
// ==================== Seafile API ====================
|
||||
|
||||
func (f *Fs) getAuthorizationToken(ctx context.Context) (string, error) {
|
||||
return getAuthorizationToken(ctx, f.srv, f.opt.User, f.opt.Password, "")
|
||||
opts, request := prepareAuthorizationRequest(f.opt.User, f.opt.Password, "")
|
||||
result := api.AuthenticationResult{}
|
||||
|
||||
err := f.pacer.Call(func() (bool, error) {
|
||||
resp, err := f.srv.CallJSON(ctx, &opts, &request, &result)
|
||||
return f.shouldRetry(ctx, resp, err)
|
||||
})
|
||||
if err != nil {
|
||||
// This is only going to be http errors here
|
||||
return "", fmt.Errorf("failed to authenticate: %w", err)
|
||||
}
|
||||
if result.Errors != nil && len(result.Errors) > 0 {
|
||||
return "", errors.New(strings.Join(result.Errors, ", "))
|
||||
}
|
||||
if result.Token == "" {
|
||||
// No error in "non_field_errors" field but still empty token
|
||||
return "", errors.New("failed to authenticate")
|
||||
}
|
||||
return result.Token, nil
|
||||
}
|
||||
|
||||
// getAuthorizationToken can be called outside of an fs (during configuration of the remote to get the authentication token)
|
||||
// it's doing a single call (no pacer involved)
|
||||
func getAuthorizationToken(ctx context.Context, srv *rest.Client, user, password, oneTimeCode string) (string, error) {
|
||||
// API Documentation
|
||||
// https://download.seafile.com/published/web-api/home.md#user-content-Quick%20Start
|
||||
func prepareAuthorizationRequest(user, password, oneTimeCode string) (rest.Opts, api.AuthenticationRequest) {
|
||||
opts := rest.Opts{
|
||||
Method: "POST",
|
||||
Path: "api2/auth-token/",
|
||||
@ -55,6 +69,15 @@ func getAuthorizationToken(ctx context.Context, srv *rest.Client, user, password
|
||||
Username: user,
|
||||
Password: password,
|
||||
}
|
||||
return opts, request
|
||||
}
|
||||
|
||||
// getAuthorizationToken is called outside of an fs (during configuration of the remote to get the authentication token)
|
||||
// it's doing a single call (no pacer involved)
|
||||
func getAuthorizationToken(ctx context.Context, srv *rest.Client, user, password, oneTimeCode string) (string, error) {
|
||||
// API Documentation
|
||||
// https://download.seafile.com/published/web-api/home.md#user-content-Quick%20Start
|
||||
opts, request := prepareAuthorizationRequest(user, password, oneTimeCode)
|
||||
result := api.AuthenticationResult{}
|
||||
|
||||
_, err := srv.CallJSON(ctx, &opts, &request, &result)
|
||||
@ -480,6 +503,9 @@ func (f *Fs) deleteDir(ctx context.Context, libraryID, filePath string) error {
|
||||
if resp.StatusCode == 401 || resp.StatusCode == 403 {
|
||||
return fs.ErrorPermissionDenied
|
||||
}
|
||||
if resp.StatusCode == 404 {
|
||||
return fs.ErrorDirNotFound
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("failed to delete directory: %w", err)
|
||||
}
|
||||
@ -678,27 +704,76 @@ func (f *Fs) getUploadLink(ctx context.Context, libraryID string) (string, error
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (f *Fs) upload(ctx context.Context, in io.Reader, uploadLink, filePath string) (*api.FileDetail, error) {
|
||||
// getFileUploadedSize returns the size already uploaded on the server
|
||||
//
|
||||
//nolint:unused
|
||||
func (f *Fs) getFileUploadedSize(ctx context.Context, libraryID, filePath string) (int64, error) {
|
||||
// API Documentation
|
||||
// https://download.seafile.com/published/web-api/v2.1/file-upload.md
|
||||
if libraryID == "" {
|
||||
return 0, errors.New("cannot get file uploaded size without a library")
|
||||
}
|
||||
fs.Debugf(nil, "filePath=%q", filePath)
|
||||
fileDir, filename := path.Split(filePath)
|
||||
fileDir = "/" + strings.TrimSuffix(fileDir, "/")
|
||||
if fileDir == "" {
|
||||
fileDir = "/"
|
||||
}
|
||||
opts := rest.Opts{
|
||||
Method: "GET",
|
||||
Path: APIv21 + libraryID + "/file-uploaded-bytes/",
|
||||
Parameters: url.Values{
|
||||
"parent_dir": {f.opt.Enc.FromStandardPath(fileDir)},
|
||||
"file_name": {f.opt.Enc.FromStandardPath(filename)},
|
||||
},
|
||||
}
|
||||
|
||||
result := api.FileUploadedBytes{}
|
||||
var resp *http.Response
|
||||
var err error
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
resp, err = f.srv.CallJSON(ctx, &opts, nil, &result)
|
||||
return f.shouldRetry(ctx, resp, err)
|
||||
})
|
||||
if err != nil {
|
||||
if resp != nil {
|
||||
if resp.StatusCode == 401 || resp.StatusCode == 403 {
|
||||
return 0, fs.ErrorPermissionDenied
|
||||
}
|
||||
}
|
||||
return 0, fmt.Errorf("failed to get file uploaded size for parent_dir=%q and file_name=%q: %w", fileDir, filename, err)
|
||||
}
|
||||
return result.FileUploadedBytes, nil
|
||||
}
|
||||
|
||||
func (f *Fs) prepareFileUpload(ctx context.Context, in io.Reader, uploadLink, filePath string, contentRange contentRanger) (*rest.Opts, error) {
|
||||
fileDir, filename := path.Split(filePath)
|
||||
safeFilename := f.opt.Enc.FromStandardName(filename)
|
||||
parameters := url.Values{
|
||||
"parent_dir": {"/"},
|
||||
"relative_path": {f.opt.Enc.FromStandardPath(fileDir)},
|
||||
"need_idx_progress": {"true"},
|
||||
"replace": {"1"},
|
||||
}
|
||||
formReader, contentType, _, err := rest.MultipartUpload(ctx, in, parameters, "file", f.opt.Enc.FromStandardName(filename))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to make multipart upload: %w", err)
|
||||
|
||||
contentRangeHeader := contentRange.getContentRangeHeader()
|
||||
opts := &rest.Opts{
|
||||
Method: http.MethodPost,
|
||||
Body: in,
|
||||
ContentRange: contentRangeHeader,
|
||||
Parameters: url.Values{"ret-json": {"1"}}, // It needs to be on the url, not in the body parameters
|
||||
MultipartParams: parameters,
|
||||
MultipartContentName: "file",
|
||||
MultipartFileName: safeFilename,
|
||||
}
|
||||
if contentRangeHeader != "" {
|
||||
// When using resumable upload, the name of the file is no longer retrieved from the "file" field of the form.
|
||||
// It's instead retrieved from the header.
|
||||
opts.ExtraHeaders = map[string]string{
|
||||
"Content-Disposition": "attachment; filename=\"" + safeFilename + "\"",
|
||||
}
|
||||
}
|
||||
|
||||
opts := rest.Opts{
|
||||
Method: "POST",
|
||||
Body: formReader,
|
||||
ContentType: contentType,
|
||||
Parameters: url.Values{"ret-json": {"1"}}, // It needs to be on the url, not in the body parameters
|
||||
}
|
||||
parsedURL, err := url.Parse(uploadLink)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse upload url: %w", err)
|
||||
@ -708,11 +783,29 @@ func (f *Fs) upload(ctx context.Context, in io.Reader, uploadLink, filePath stri
|
||||
} else {
|
||||
opts.Path = uploadLink
|
||||
}
|
||||
|
||||
chunkSize := contentRange.getChunkSize()
|
||||
if chunkSize > 0 {
|
||||
// seafile might not make use of the Content-Length header but a proxy (or reverse proxy) in the middle might
|
||||
opts.ContentLength = &chunkSize
|
||||
}
|
||||
return opts, nil
|
||||
}
|
||||
|
||||
func (f *Fs) upload(ctx context.Context, in io.Reader, uploadLink, filePath string, size int64) (*api.FileDetail, error) {
|
||||
// API Documentation
|
||||
// https://download.seafile.com/published/web-api/v2.1/file-upload.md
|
||||
contentRange := newStreamedContentRange(size)
|
||||
opts, err := f.prepareFileUpload(ctx, in, uploadLink, filePath, contentRange)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result := make([]api.FileDetail, 1)
|
||||
var resp *http.Response
|
||||
// If an error occurs during the call, do not attempt to retry: The upload link is single use only
|
||||
// We do not attempt to retry if an error occurs during the call, as we don't know the state of the reader
|
||||
err = f.pacer.CallNoRetry(func() (bool, error) {
|
||||
resp, err = f.srv.CallJSON(ctx, &opts, nil, &result)
|
||||
resp, err = f.srv.CallJSON(ctx, opts, nil, &result)
|
||||
return f.shouldRetryUpload(ctx, resp, err)
|
||||
})
|
||||
if err != nil {
|
||||
@ -721,7 +814,7 @@ func (f *Fs) upload(ctx context.Context, in io.Reader, uploadLink, filePath stri
|
||||
return nil, fs.ErrorPermissionDenied
|
||||
}
|
||||
if resp.StatusCode == 500 {
|
||||
// This is a temporary error - we will get a new upload link before retrying
|
||||
// This is quite common on heavy load
|
||||
return nil, ErrorInternalDuringUpload
|
||||
}
|
||||
}
|
||||
@ -732,6 +825,97 @@ func (f *Fs) upload(ctx context.Context, in io.Reader, uploadLink, filePath stri
|
||||
result[0].Name = f.opt.Enc.ToStandardName(result[0].Name)
|
||||
return &result[0], nil
|
||||
}
|
||||
// no file results sent back
|
||||
return nil, ErrorInternalDuringUpload
|
||||
}
|
||||
|
||||
func (f *Fs) uploadChunk(ctx context.Context, in io.Reader, uploadLink, filePath string, contentRange contentRanger) error {
|
||||
// API Documentation
|
||||
// https://download.seafile.com/published/web-api/v2.1/file-upload.md
|
||||
chunkSize := int(contentRange.getChunkSize())
|
||||
buffer := f.getBuf(chunkSize)
|
||||
defer f.putBuf(buffer)
|
||||
|
||||
read, err := io.ReadFull(in, buffer)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error reading from source: %w", err)
|
||||
}
|
||||
if chunkSize > 0 && read != chunkSize {
|
||||
return fmt.Errorf("expected to read %d from source, but got %d", chunkSize, read)
|
||||
}
|
||||
|
||||
result := api.ChunkUpload{}
|
||||
var resp *http.Response
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
// recreate a reader on the temporary buffer
|
||||
in = bytes.NewReader(buffer)
|
||||
opts, err := f.prepareFileUpload(ctx, in, uploadLink, filePath, contentRange)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
resp, err = f.srv.CallJSON(ctx, opts, nil, &result)
|
||||
return f.shouldRetry(ctx, resp, err)
|
||||
})
|
||||
if err != nil {
|
||||
if resp != nil {
|
||||
if resp.StatusCode == 401 || resp.StatusCode == 403 {
|
||||
return fs.ErrorPermissionDenied
|
||||
}
|
||||
if resp.StatusCode == 500 {
|
||||
return fmt.Errorf("chunk upload %s: %w", contentRange.getContentRangeHeader(), ErrorInternalDuringUpload)
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("failed to upload chunk %s: %w", contentRange.getContentRangeHeader(), err)
|
||||
}
|
||||
if !result.Success {
|
||||
return errors.New("upload failed")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *Fs) uploadLastChunk(ctx context.Context, in io.Reader, uploadLink, filePath string, contentRange contentRanger) (*api.FileDetail, error) {
|
||||
// API Documentation
|
||||
// https://download.seafile.com/published/web-api/v2.1/file-upload.md
|
||||
chunkSize := int(contentRange.getChunkSize())
|
||||
buffer := f.getBuf(chunkSize)
|
||||
defer f.putBuf(buffer)
|
||||
|
||||
read, err := io.ReadFull(in, buffer)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error reading from source: %w", err)
|
||||
}
|
||||
if chunkSize > 0 && read != chunkSize {
|
||||
return nil, fmt.Errorf("expected to read %d from source, but got %d", chunkSize, read)
|
||||
}
|
||||
|
||||
result := make([]api.FileDetail, 1)
|
||||
var resp *http.Response
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
// recreate a reader on the buffer
|
||||
in = bytes.NewReader(buffer)
|
||||
opts, err := f.prepareFileUpload(ctx, in, uploadLink, filePath, contentRange)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
resp, err = f.srv.CallJSON(ctx, opts, nil, &result)
|
||||
return f.shouldRetry(ctx, resp, err)
|
||||
})
|
||||
if err != nil {
|
||||
if resp != nil {
|
||||
if resp.StatusCode == 401 || resp.StatusCode == 403 {
|
||||
return nil, fs.ErrorPermissionDenied
|
||||
}
|
||||
if resp.StatusCode == 500 {
|
||||
return nil, fmt.Errorf("last chunk: %w", ErrorInternalDuringUpload)
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("failed to upload last chunk: %w", err)
|
||||
}
|
||||
if len(result) > 0 {
|
||||
result[0].Parent = f.opt.Enc.ToStandardPath(result[0].Parent)
|
||||
result[0].Name = f.opt.Enc.ToStandardName(result[0].Name)
|
||||
return &result[0], nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
@ -400,5 +400,53 @@ Properties:
|
||||
- Type: string
|
||||
- Required: false
|
||||
|
||||
#### --seafile-upload-cutoff
|
||||
|
||||
Cutoff for switching to chunked upload.
|
||||
|
||||
Files above this size will be uploaded in chunks of "--seafile-chunk-size".
|
||||
|
||||
Properties:
|
||||
|
||||
- Config: upload_cutoff
|
||||
- Env Var: RCLONE_SEAFILE_UPLOAD_CUTOFF
|
||||
- Type: SizeSuffix
|
||||
- Default: 255Mi
|
||||
|
||||
#### --seafile-chunk-size
|
||||
|
||||
Upload chunk size.
|
||||
|
||||
When uploading large files, chunk the file into this size.
|
||||
|
||||
Must fit in memory. These chunks are buffered in memory and there
|
||||
might a maximum of "--transfers" chunks in progress at once.
|
||||
|
||||
1 MB is the minimum size.
|
||||
|
||||
Properties:
|
||||
|
||||
- Config: chunk_size
|
||||
- Env Var: RCLONE_SEAFILE_CHUNK_SIZE
|
||||
- Type: SizeSuffix
|
||||
- Default: 127Mi
|
||||
|
||||
#### --seafile-min-pacer
|
||||
|
||||
Minimum time between requests (in milliseconds).
|
||||
|
||||
Seafile API is rate limited. The default value is to wait at least 100ms between requests.
|
||||
|
||||
You can try to tweak the default value but you might trigger the rate limit which will make the request rate slower.
|
||||
|
||||
The minimum value is 1ms (0 will default to 100ms)
|
||||
|
||||
Properties:
|
||||
|
||||
- Config: min_pacer
|
||||
- Env Var: RCLONE_SEAFILE_MIN_PACER
|
||||
- Type: int
|
||||
- Default: 100
|
||||
|
||||
{{< rem autogenerated options stop >}}
|
||||
|
||||
|
@ -418,11 +418,6 @@ backends:
|
||||
fastlist: false
|
||||
ignore:
|
||||
- TestApplyTransforms
|
||||
- backend: "seafile"
|
||||
remote: "TestSeafileV6:"
|
||||
fastlist: false
|
||||
ignore:
|
||||
- TestIntegration/FsMkdir/FsPutFiles/FsDirMove
|
||||
- backend: "seafile"
|
||||
remote: "TestSeafile:"
|
||||
fastlist: true
|
||||
|
@ -3,14 +3,14 @@
|
||||
set -e
|
||||
|
||||
# environment variables passed on docker-compose
|
||||
export NAME=seafile7
|
||||
export NAME=seafile
|
||||
export MYSQL_ROOT_PASSWORD=pixenij4zacoguq0kopamid6
|
||||
export SEAFILE_ADMIN_EMAIL=seafile@rclone.org
|
||||
export SEAFILE_ADMIN_PASSWORD=pixenij4zacoguq0kopamid6
|
||||
export SEAFILE_IP=127.0.0.1
|
||||
export SEAFILE_PORT=8087
|
||||
export SEAFILE_TEST_DATA=${SEAFILE_TEST_DATA:-/tmp/seafile-test-data}
|
||||
export SEAFILE_VERSION=latest
|
||||
export SEAFILE_VERSION=${SEAFILE_VERSION:-latest}
|
||||
|
||||
# make sure the data directory exists
|
||||
mkdir -p ${SEAFILE_TEST_DATA}/${NAME}
|
||||
@ -45,12 +45,14 @@ start() {
|
||||
# create default library
|
||||
curl -X POST -H "Authorization: Token ${TOKEN}" "http://${SEAFILE_IP}:${SEAFILE_PORT}/api2/default-repo/"
|
||||
|
||||
echo
|
||||
echo _connect=${SEAFILE_IP}:${SEAFILE_PORT}
|
||||
echo type=seafile
|
||||
echo url=http://${SEAFILE_IP}:${SEAFILE_PORT}/
|
||||
echo user=${SEAFILE_ADMIN_EMAIL}
|
||||
echo pass=$(rclone obscure ${SEAFILE_ADMIN_PASSWORD})
|
||||
echo library=My Library
|
||||
echo min_pacer=100
|
||||
}
|
||||
|
||||
stop() {
|
||||
|
@ -7,14 +7,14 @@ TEST_LIBRARY=Encrypted
|
||||
TEST_LIBRARY_PASSWORD=SecretKey
|
||||
|
||||
# environment variables passed on docker-compose
|
||||
export NAME=seafile7encrypted
|
||||
export NAME=seafile-encrypted
|
||||
export MYSQL_ROOT_PASSWORD=pixenij4zacoguq0kopamid6
|
||||
export SEAFILE_ADMIN_EMAIL=seafile@rclone.org
|
||||
export SEAFILE_ADMIN_PASSWORD=pixenij4zacoguq0kopamid6
|
||||
export SEAFILE_IP=127.0.0.1
|
||||
export SEAFILE_PORT=8088
|
||||
export SEAFILE_TEST_DATA=${SEAFILE_TEST_DATA:-/tmp/seafile-test-data}
|
||||
export SEAFILE_VERSION=latest
|
||||
export SEAFILE_VERSION=${SEAFILE_VERSION:-latest}
|
||||
|
||||
# make sure the data directory exists
|
||||
mkdir -p ${SEAFILE_TEST_DATA}/${NAME}
|
||||
@ -25,8 +25,20 @@ COMPOSE_DIR=$(dirname "$0")/seafile
|
||||
start() {
|
||||
docker-compose --project-directory ${COMPOSE_DIR} --project-name ${NAME} --file ${COMPOSE_DIR}/docker-compose.yml up -d
|
||||
|
||||
# it takes some time for the database to be created
|
||||
sleep 60
|
||||
# wait for Seafile server to start
|
||||
seafile_endpoint="http://${SEAFILE_IP}:${SEAFILE_PORT}/"
|
||||
wait_seconds=1
|
||||
echo -n "Waiting for Seafile server to start"
|
||||
for iterations in `seq 1 60`;
|
||||
do
|
||||
http_code=$(curl -s -o /dev/null -L -w '%{http_code}' "$seafile_endpoint" || true;)
|
||||
if [ "$http_code" -eq 200 ]; then
|
||||
echo
|
||||
break
|
||||
fi
|
||||
echo -n "."
|
||||
sleep $wait_seconds
|
||||
done
|
||||
|
||||
# authentication token answer should be like: {"token":"dbf58423f1632b5b679a13b0929f1d0751d9250c"}
|
||||
TOKEN=`curl --silent \
|
||||
@ -37,6 +49,7 @@ start() {
|
||||
# create encrypted library
|
||||
curl -X POST -d "name=${TEST_LIBRARY}&passwd=${TEST_LIBRARY_PASSWORD}" -H "Authorization: Token ${TOKEN}" "http://${SEAFILE_IP}:${SEAFILE_PORT}/api2/repos/"
|
||||
|
||||
echo
|
||||
echo _connect=${SEAFILE_IP}:${SEAFILE_PORT}
|
||||
echo type=seafile
|
||||
echo url=http://${SEAFILE_IP}:${SEAFILE_PORT}/
|
||||
@ -44,6 +57,7 @@ start() {
|
||||
echo pass=$(rclone obscure ${SEAFILE_ADMIN_PASSWORD})
|
||||
echo library=${TEST_LIBRARY}
|
||||
echo library_key=$(rclone obscure ${TEST_LIBRARY_PASSWORD})
|
||||
echo min_pacer=100
|
||||
}
|
||||
|
||||
stop() {
|
||||
|
@ -1,48 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
set -e
|
||||
|
||||
# local variables
|
||||
NAME=seafile6
|
||||
SEAFILE_IP=127.0.0.1
|
||||
SEAFILE_PORT=8086
|
||||
SEAFILE_ADMIN_EMAIL=seafile@rclone.org
|
||||
SEAFILE_ADMIN_PASSWORD=qebiwob7wafixif8sojiboj4
|
||||
SEAFILE_TEST_DATA=${SEAFILE_TEST_DATA:-/tmp/seafile-test-data}
|
||||
SEAFILE_VERSION=latest
|
||||
|
||||
. $(dirname "$0")/docker.bash
|
||||
|
||||
start() {
|
||||
# make sure the data directory exists
|
||||
mkdir -p ${SEAFILE_TEST_DATA}/${NAME}
|
||||
|
||||
docker run --rm -d --name $NAME \
|
||||
-e SEAFILE_SERVER_HOSTNAME=${SEAFILE_IP}:${SEAFILE_PORT} \
|
||||
-e SEAFILE_ADMIN_EMAIL=${SEAFILE_ADMIN_EMAIL} \
|
||||
-e SEAFILE_ADMIN_PASSWORD=${SEAFILE_ADMIN_PASSWORD} \
|
||||
-v ${SEAFILE_TEST_DATA}/${NAME}:/shared \
|
||||
-p ${SEAFILE_IP}:${SEAFILE_PORT}:80 \
|
||||
seafileltd/seafile:${SEAFILE_VERSION}
|
||||
|
||||
# it takes some time for the database to be created
|
||||
sleep 60
|
||||
|
||||
# authentication token answer should be like: {"token":"dbf58423f1632b5b679a13b0929f1d0751d9250c"}
|
||||
TOKEN=`curl --silent \
|
||||
--data-urlencode username=${SEAFILE_ADMIN_EMAIL} -d password=${SEAFILE_ADMIN_PASSWORD} \
|
||||
http://${SEAFILE_IP}:${SEAFILE_PORT}/api2/auth-token/ \
|
||||
| sed 's/^{"token":"\(.*\)"}$/\1/'`
|
||||
|
||||
# create default library
|
||||
curl -X POST -H "Authorization: Token ${TOKEN}" "http://${SEAFILE_IP}:${SEAFILE_PORT}/api2/default-repo/"
|
||||
|
||||
echo _connect=${SEAFILE_IP}:${SEAFILE_PORT}
|
||||
echo type=seafile
|
||||
echo url=http://${SEAFILE_IP}:${SEAFILE_PORT}/
|
||||
echo user=${SEAFILE_ADMIN_EMAIL}
|
||||
echo pass=$(rclone obscure ${SEAFILE_ADMIN_PASSWORD})
|
||||
echo library=My Library
|
||||
}
|
||||
|
||||
. $(dirname "$0")/run.bash
|
Loading…
Reference in New Issue
Block a user