From ce5024bf3317dbbe2072abda15036e2d8507d3fd Mon Sep 17 00:00:00 2001 From: wiserain Date: Tue, 11 Jun 2024 02:08:07 +0900 Subject: [PATCH] pikpak: improve upload reliability and resolve potential file conflicts This attempts to resolve upload conflicts by implementing cancel/cleanup on failed uploads * fix panic error on defer cancel upload * increase pacer min sleep from 10 to 100 ms * stop using uploadByForm() * introduce force sleep before and after async tasks * use pacer's retry scheme instead of manual implementation Fixes #7787 --- backend/pikpak/api/types.go | 2 + backend/pikpak/helper.go | 46 +++++++++++++++++++++-- backend/pikpak/pikpak.go | 75 +++++++++++++++++++++---------------- 3 files changed, 87 insertions(+), 36 deletions(-) diff --git a/backend/pikpak/api/types.go b/backend/pikpak/api/types.go index 8b2270e2a..6046ee48b 100644 --- a/backend/pikpak/api/types.go +++ b/backend/pikpak/api/types.go @@ -251,10 +251,12 @@ type Media struct { // FileParams includes parameters for instant open type FileParams struct { + DeviceID string `json:"device_id,omitempty"` Duration int64 `json:"duration,omitempty,string"` // in seconds Height int `json:"height,omitempty,string"` Platform string `json:"platform,omitempty"` // "Upload" PlatformIcon string `json:"platform_icon,omitempty"` + TaskID string `json:"task_id"` URL string `json:"url,omitempty"` Width int `json:"width,omitempty,string"` } diff --git a/backend/pikpak/helper.go b/backend/pikpak/helper.go index 36ae1a03e..6351ef1b4 100644 --- a/backend/pikpak/helper.go +++ b/backend/pikpak/helper.go @@ -9,7 +9,9 @@ import ( "fmt" "io" "net/http" + "net/url" "os" + "strconv" "github.com/rclone/rclone/backend/pikpak/api" "github.com/rclone/rclone/lib/rest" @@ -141,9 +143,8 @@ func (f *Fs) getFile(ctx context.Context, ID string) (info *api.File, err error) var resp *http.Response err = f.pacer.Call(func() (bool, error) { resp, err = f.rst.CallJSON(ctx, &opts, nil, &info) - if err == nil && info.Phase != api.PhaseTypeComplete { - // could be pending right after file is created/uploaded. - return true, errors.New("not PHASE_TYPE_COMPLETE") + if err == nil && !info.Links.ApplicationOctetStream.Valid() { + return true, errors.New("no link") } return f.shouldRetry(ctx, resp, err) }) @@ -167,6 +168,45 @@ func (f *Fs) patchFile(ctx context.Context, ID string, req *api.File) (info *api return } +// getTask gets api.Task from API for the ID passed +func (f *Fs) getTask(ctx context.Context, ID string, checkPhase bool) (info *api.Task, err error) { + opts := rest.Opts{ + Method: "GET", + Path: "/drive/v1/tasks/" + ID, + } + var resp *http.Response + err = f.pacer.Call(func() (bool, error) { + resp, err = f.rst.CallJSON(ctx, &opts, nil, &info) + if checkPhase { + if err == nil && info.Phase != api.PhaseTypeComplete { + // could be pending right after file is created/uploaded. + return true, errors.New(info.Phase) + } + } + return f.shouldRetry(ctx, resp, err) + }) + return +} + +// deleteTask remove a task having the specified ID +func (f *Fs) deleteTask(ctx context.Context, ID string, deleteFiles bool) (err error) { + params := url.Values{} + params.Set("delete_files", strconv.FormatBool(deleteFiles)) + params.Set("task_ids", ID) + opts := rest.Opts{ + Method: "DELETE", + Path: "/drive/v1/tasks", + Parameters: params, + NoResponse: true, + } + var resp *http.Response + err = f.pacer.Call(func() (bool, error) { + resp, err = f.rst.CallJSON(ctx, &opts, nil, nil) + return f.shouldRetry(ctx, resp, err) + }) + return +} + // getAbout gets drive#quota information from server func (f *Fs) getAbout(ctx context.Context) (info *api.About, err error) { opts := rest.Opts{ diff --git a/backend/pikpak/pikpak.go b/backend/pikpak/pikpak.go index 008180356..9b6a527cf 100644 --- a/backend/pikpak/pikpak.go +++ b/backend/pikpak/pikpak.go @@ -53,6 +53,7 @@ import ( "github.com/rclone/rclone/fs/config/obscure" "github.com/rclone/rclone/fs/fserrors" "github.com/rclone/rclone/fs/hash" + "github.com/rclone/rclone/lib/atexit" "github.com/rclone/rclone/lib/dircache" "github.com/rclone/rclone/lib/encoder" "github.com/rclone/rclone/lib/oauthutil" @@ -66,8 +67,9 @@ import ( const ( rcloneClientID = "YNxT9w7GMdWvEOKa" rcloneEncryptedClientSecret = "aqrmB6M1YJ1DWCBxVxFSjFo7wzWEky494YMmkqgAl1do1WKOe2E" - minSleep = 10 * time.Millisecond + minSleep = 100 * time.Millisecond maxSleep = 2 * time.Second + waitTime = 500 * time.Millisecond decayConstant = 2 // bigger for slower decay, exponential rootURL = "https://api-drive.mypikpak.com" ) @@ -1175,12 +1177,13 @@ func (f *Fs) uploadByResumable(ctx context.Context, in io.Reader, resumable *api return } -func (f *Fs) upload(ctx context.Context, in io.Reader, leaf, dirID, sha1Str string, size int64, options ...fs.OpenOption) (*api.File, error) { +func (f *Fs) upload(ctx context.Context, in io.Reader, leaf, dirID, sha1Str string, size int64, options ...fs.OpenOption) (info *api.File, err error) { // determine upload type uploadType := api.UploadTypeResumable - if size >= 0 && size < int64(5*fs.Mebi) { - uploadType = api.UploadTypeForm - } + // if size >= 0 && size < int64(5*fs.Mebi) { + // uploadType = api.UploadTypeForm + // } + // stop using uploadByForm() cause it is not as reliable as uploadByResumable() for a large number of small files // request upload ticket to API req := api.RequestNewFile{ @@ -1195,29 +1198,46 @@ func (f *Fs) upload(ctx context.Context, in io.Reader, leaf, dirID, sha1Str stri if uploadType == api.UploadTypeResumable { req.Resumable = map[string]string{"provider": "PROVIDER_ALIYUN"} } - newfile, err := f.requestNewFile(ctx, &req) + new, err := f.requestNewFile(ctx, &req) if err != nil { return nil, fmt.Errorf("failed to create a new file: %w", err) } - if newfile.File == nil { - return nil, fmt.Errorf("invalid response: %+v", newfile) - } else if newfile.File.Phase == api.PhaseTypeComplete { + if new.File == nil { + return nil, fmt.Errorf("invalid response: %+v", new) + } else if new.File.Phase == api.PhaseTypeComplete { // early return; in case of zero-byte objects - return newfile.File, nil + return new.File, nil } - if uploadType == api.UploadTypeForm && newfile.Form != nil { - err = f.uploadByForm(ctx, in, req.Name, size, newfile.Form, options...) - } else if uploadType == api.UploadTypeResumable && newfile.Resumable != nil { - err = f.uploadByResumable(ctx, in, newfile.Resumable) + defer atexit.OnError(&err, func() { + fs.Debugf(leaf, "canceling upload: %v", err) + if cancelErr := f.deleteObjects(ctx, []string{new.File.ID}, false); cancelErr != nil { + fs.Logf(leaf, "failed to cancel upload: %v", cancelErr) + } + if cancelErr := f.deleteTask(ctx, new.Task.ID, false); cancelErr != nil { + fs.Logf(leaf, "failed to cancel upload: %v", cancelErr) + } + fs.Debugf(leaf, "waiting %v for the cancellation to be effective", waitTime) + time.Sleep(waitTime) + })() + + if uploadType == api.UploadTypeForm && new.Form != nil { + err = f.uploadByForm(ctx, in, req.Name, size, new.Form, options...) + } else if uploadType == api.UploadTypeResumable && new.Resumable != nil { + err = f.uploadByResumable(ctx, in, new.Resumable) } else { - return nil, fmt.Errorf("unable to proceed upload: %+v", newfile) + err = fmt.Errorf("no method available for uploading: %+v", new) } if err != nil { return nil, fmt.Errorf("failed to upload: %w", err) } - return newfile.File, nil + fs.Debugf(leaf, "sleeping for %v before checking upload status", waitTime) + time.Sleep(waitTime) + if _, err = f.getTask(ctx, new.Task.ID, true); err != nil { + return nil, fmt.Errorf("unable to complete the upload: %w", err) + } + return new.File, nil } // Put the object @@ -1470,22 +1490,11 @@ func (o *Object) setMetaDataWithLink(ctx context.Context) error { return nil } - // fetch download link with retry scheme - // 1 initial attempt and 2 retries are reasonable based on empirical analysis - retries := 2 - for i := 1; i <= retries+1; i++ { - info, err := o.fs.getFile(ctx, o.id) - if err != nil { - return fmt.Errorf("can't fetch download link: %w", err) - } - if err = o.setMetaData(info); err == nil && o.link.Valid() { - return nil - } - if i <= retries { - time.Sleep(time.Duration(200*i) * time.Millisecond) - } + info, err := o.fs.getFile(ctx, o.id) + if err != nil { + return err } - return errors.New("can't download - no link to download") + return o.setMetaData(info) } // readMetaData gets the metadata if it hasn't already been fetched @@ -1619,14 +1628,14 @@ func (o *Object) open(ctx context.Context, url string, options ...fs.OpenOption) // Open an object for read func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.ReadCloser, err error) { if o.id == "" { - return nil, errors.New("can't download - no id") + return nil, errors.New("can't download: no id") } if o.size == 0 { // zero-byte objects may have no download link return io.NopCloser(bytes.NewBuffer([]byte(nil))), nil } if err = o.setMetaDataWithLink(ctx); err != nil { - return nil, err + return nil, fmt.Errorf("can't download: %w", err) } return o.open(ctx, o.link.URL, options...) }