diff --git a/backend/webdav/chunking.go b/backend/webdav/chunking.go new file mode 100644 index 000000000..36daab450 --- /dev/null +++ b/backend/webdav/chunking.go @@ -0,0 +1,215 @@ +package webdav + +/* + chunked update for Nextcloud + see https://docs.nextcloud.com/server/20/developer_manual/client_apis/WebDAV/chunking.html +*/ + +import ( + "context" + "crypto/md5" + "encoding/hex" + "errors" + "fmt" + "io" + "net/http" + "path" + "strings" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/lib/readers" + "github.com/rclone/rclone/lib/rest" +) + +func (f *Fs) shouldRetryChunkMerge(ctx context.Context, resp *http.Response, err error) (bool, error) { + // Not found. Can be returned by NextCloud when merging chunks of an upload. + if resp != nil && resp.StatusCode == 404 { + return true, err + } + + // 423 LOCKED + if resp != nil && resp.StatusCode == 423 { + return false, fmt.Errorf("merging the uploaded chunks failed with 423 LOCKED. This usually happens when the chunks merging is still in progress on NextCloud, but it may also indicate a failed transfer: %w", err) + } + + return f.shouldRetry(ctx, resp, err) +} + +// set the chunk size for testing +func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) { + old, f.opt.ChunkSize = f.opt.ChunkSize, cs + return +} + +func (f *Fs) getChunksUploadURL() string { + return strings.Replace(f.endpointURL, "/dav/files/", "/dav/uploads/", 1) +} + +func (o *Object) getChunksUploadDir() (string, error) { + hasher := md5.New() + _, err := hasher.Write([]byte(o.filePath())) + if err != nil { + return "", fmt.Errorf("chunked upload couldn't hash URL: %w", err) + } + uploadDir := "rclone-chunked-upload-" + hex.EncodeToString(hasher.Sum(nil)) + return uploadDir, nil +} + +func (f *Fs) verifyChunkConfig() error { + if f.opt.ChunkSize != 0 && !validateNextCloudChunkedURL.MatchString(f.endpointURL) { + return errors.New("chunked upload with nextcloud must use /dav/files/USER endpoint not /webdav") + } + + return nil +} + +func (o *Object) shouldUseChunkedUpload(src fs.ObjectInfo) bool { + return o.fs.canChunk && o.fs.opt.ChunkSize > 0 && src.Size() > int64(o.fs.opt.ChunkSize) +} + +func (o *Object) updateChunked(ctx context.Context, in0 io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) { + var uploadDir string + + // see https://docs.nextcloud.com/server/24/developer_manual/client_apis/WebDAV/chunking.html#starting-a-chunked-upload + uploadDir, err = o.createChunksUploadDirectory(ctx) + if err != nil { + return err + } + + partObj := &Object{ + fs: o.fs, + } + + // see https://docs.nextcloud.com/server/24/developer_manual/client_apis/WebDAV/chunking.html#uploading-chunks + err = o.uploadChunks(ctx, in0, src.Size(), partObj, uploadDir, options) + if err != nil { + return err + } + + // see https://docs.nextcloud.com/server/24/developer_manual/client_apis/WebDAV/chunking.html#assembling-the-chunks + err = o.mergeChunks(ctx, uploadDir, options, src) + if err != nil { + return err + } + + return nil +} + +func (o *Object) uploadChunks(ctx context.Context, in0 io.Reader, size int64, partObj *Object, uploadDir string, options []fs.OpenOption) error { + chunkSize := int64(partObj.fs.opt.ChunkSize) + + // TODO: upload chunks in parallel for faster transfer speeds + for offset := int64(0); offset < size; offset += chunkSize { + if err := ctx.Err(); err != nil { + return err + } + + contentLength := chunkSize + + // Last chunk may be smaller + if size-offset < contentLength { + contentLength = size - offset + } + + endOffset := offset + contentLength - 1 + + partObj.remote = fmt.Sprintf("%s/%015d-%015d", uploadDir, offset, endOffset) + // Enable low-level HTTP 2 retries. + // 2022-04-28 15:59:06 ERROR : stuff/video.avi: Failed to copy: uploading chunk failed: Put "https://censored.com/remote.php/dav/uploads/Admin/rclone-chunked-upload-censored/000006113198080-000006123683840": http2: Transport: cannot retry err [http2: Transport received Server's graceful shutdown GOAWAY] after Request.Body was written; define Request.GetBody to avoid this error + + buf := make([]byte, chunkSize) + in := readers.NewRepeatableLimitReaderBuffer(in0, buf, chunkSize) + + getBody := func() (io.ReadCloser, error) { + // RepeatableReader{} plays well with accounting so rewinding doesn't make the progress buggy + if _, err := in.Seek(0, io.SeekStart); err == nil { + return nil, err + } + + return io.NopCloser(in), nil + } + + err := partObj.updateSimple(ctx, in, getBody, partObj.remote, contentLength, "application/x-www-form-urlencoded", nil, o.fs.chunksUploadURL, options...) + if err != nil { + return fmt.Errorf("uploading chunk failed: %w", err) + } + } + return nil +} + +func (o *Object) createChunksUploadDirectory(ctx context.Context) (string, error) { + uploadDir, err := o.getChunksUploadDir() + if err != nil { + return uploadDir, err + } + + err = o.purgeUploadedChunks(ctx, uploadDir) + if err != nil { + return "", fmt.Errorf("chunked upload couldn't purge upload directory: %w", err) + } + + opts := rest.Opts{ + Method: "MKCOL", + Path: uploadDir + "/", + NoResponse: true, + RootURL: o.fs.chunksUploadURL, + } + err = o.fs.pacer.CallNoRetry(func() (bool, error) { + resp, err := o.fs.srv.Call(ctx, &opts) + return o.fs.shouldRetry(ctx, resp, err) + }) + if err != nil { + return "", fmt.Errorf("making upload directory failed: %w", err) + } + return uploadDir, err +} + +func (o *Object) mergeChunks(ctx context.Context, uploadDir string, options []fs.OpenOption, src fs.ObjectInfo) error { + var resp *http.Response + + // see https://docs.nextcloud.com/server/24/developer_manual/client_apis/WebDAV/chunking.html?highlight=chunk#assembling-the-chunks + opts := rest.Opts{ + Method: "MOVE", + Path: path.Join(uploadDir, ".file"), + NoResponse: true, + Options: options, + RootURL: o.fs.chunksUploadURL, + } + destinationURL, err := rest.URLJoin(o.fs.endpoint, o.filePath()) + if err != nil { + return fmt.Errorf("finalize chunked upload couldn't join URL: %w", err) + } + opts.ExtraHeaders = o.extraHeaders(ctx, src) + opts.ExtraHeaders["Destination"] = destinationURL.String() + err = o.fs.pacer.Call(func() (bool, error) { + resp, err = o.fs.srv.Call(ctx, &opts) + return o.fs.shouldRetryChunkMerge(ctx, resp, err) + }) + if err != nil { + return fmt.Errorf("finalize chunked upload failed, destinationURL: \"%s\": %w", destinationURL, err) + } + return err +} + +func (o *Object) purgeUploadedChunks(ctx context.Context, uploadDir string) error { + // clean the upload directory if it exists (this means that a previous try didn't clean up properly). + opts := rest.Opts{ + Method: "DELETE", + Path: uploadDir + "/", + NoResponse: true, + RootURL: o.fs.chunksUploadURL, + } + + err := o.fs.pacer.Call(func() (bool, error) { + resp, err := o.fs.srv.CallXML(ctx, &opts, nil, nil) + + // directory doesn't exist, no need to purge + if resp.StatusCode == http.StatusNotFound { + return false, nil + } + + return o.fs.shouldRetry(ctx, resp, err) + }) + + return err +}