From 81a933ae38cf5d1dee9babc0e14c16842a82118a Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Mon, 2 Mar 2015 09:05:23 +0000 Subject: [PATCH] drive: Use chunked upload for files - fixes #33 --- drive/drive.go | 89 +++++++++++------ drive/upload.go | 250 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 311 insertions(+), 28 deletions(-) create mode 100644 drive/upload.go diff --git a/drive/drive.go b/drive/drive.go index 12788c8c6..8002600d5 100644 --- a/drive/drive.go +++ b/drive/drive.go @@ -183,24 +183,41 @@ func (f *FsDrive) endCall(err error) bool { fs.Debug(f, "Reducing sleep to %v", f.sleepTime) } } else { - fs.Debug(f, "Error recived: %v", err) - if gerr, ok := err.(*googleapi.Error); ok { - if len(gerr.Errors) > 0 { + fs.Debug(f, "Error recived: %T %#v", err, err) + // Check for net error Timeout() + if x, ok := err.(interface { + Timeout() bool + }); ok && x.Timeout() { + again = true + } + // Check for net error Temporary() + if x, ok := err.(interface { + Temporary() bool + }); ok && x.Temporary() { + again = true + } + switch gerr := err.(type) { + case *googleapi.Error: + if gerr.Code >= 500 && gerr.Code < 600 { + // All 5xx errors should be retried + again = true + } else if len(gerr.Errors) > 0 { reason := gerr.Errors[0].Reason if reason == "rateLimitExceeded" || reason == "userRateLimitExceeded" { - f.sleepTime *= 2 - if f.sleepTime > maxSleep { - f.sleepTime = maxSleep - } - if f.sleepTime != oldSleepTime { - fs.Debug(f, "Rate limited, increasing sleep to %v", f.sleepTime) - } again = true } } } } - + if again { + f.sleepTime *= 2 + if f.sleepTime > maxSleep { + f.sleepTime = maxSleep + } + if f.sleepTime != oldSleepTime { + fs.Debug(f, "Rate limited, increasing sleep to %v", f.sleepTime) + } + } return again } @@ -712,16 +729,24 @@ func (f *FsDrive) Put(in io.Reader, remote string, modTime time.Time, size int64 ModifiedDate: modTime.Format(timeFormatOut), } - // Make the API request to upload metadata and file data. var info *drive.File - // Don't retry, return a retry error instead - f.beginCall() - info, err = f.svc.Files.Insert(createInfo).Media(in).Do() - if f.endCall(err) { - return o, fs.RetryErrorf("Upload failed - retry: %s", err) - } - if err != nil { - return o, fmt.Errorf("Upload failed: %s", err) + if false { + // Make the API request to upload metadata and file data. + // Don't retry, return a retry error instead + f.beginCall() + info, err = f.svc.Files.Insert(createInfo).Media(in).Do() + if f.endCall(err) { + return o, fs.RetryErrorf("Upload failed - retry: %s", err) + } + if err != nil { + return o, fmt.Errorf("Upload failed: %s", err) + } + } else { + // Upload the file in chunks + info, err = f.Upload(in, size, createInfo.MimeType, createInfo, remote) + if err != nil { + return o, err + } } o.setMetaData(info) return o, nil @@ -945,14 +970,22 @@ func (o *FsObjectDrive) Update(in io.Reader, modTime time.Time, size int64) erro // Make the API request to upload metadata and file data. var err error var info *drive.File - // Don't retry, return a retry error instead - o.drive.beginCall() - info, err = o.drive.svc.Files.Update(updateInfo.Id, updateInfo).SetModifiedDate(true).Media(in).Do() - if o.drive.endCall(err) { - return fs.RetryErrorf("Update failed - retry: %s", err) - } - if err != nil { - return fmt.Errorf("Update failed: %s", err) + if false { + // Don't retry, return a retry error instead + o.drive.beginCall() + info, err = o.drive.svc.Files.Update(updateInfo.Id, updateInfo).SetModifiedDate(true).Media(in).Do() + if o.drive.endCall(err) { + return fs.RetryErrorf("Update failed - retry: %s", err) + } + if err != nil { + return fmt.Errorf("Update failed: %s", err) + } + } else { + // Upload the file in chunks + info, err = o.drive.Upload(in, size, fs.MimeType(o), updateInfo, o.remote) + if err != nil { + return err + } } o.setMetaData(info) return nil diff --git a/drive/upload.go b/drive/upload.go new file mode 100644 index 000000000..5e4450120 --- /dev/null +++ b/drive/upload.go @@ -0,0 +1,250 @@ +// Upload for drive +// +// Docs +// Resumable upload: https://developers.google.com/drive/web/manage-uploads#resumable +// Best practices: https://developers.google.com/drive/web/manage-uploads#best-practices +// Files insert: https://developers.google.com/drive/v2/reference/files/insert +// Files update: https://developers.google.com/drive/v2/reference/files/update +// +// This contains code adapted from google.golang.org/api (C) the GO AUTHORS + +package drive + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "regexp" + "strconv" + + "github.com/ncw/rclone/fs" + "google.golang.org/api/drive/v2" + "google.golang.org/api/googleapi" +) + +const ( + // statusResumeIncomplete is the code returned by the Google uploader when the transfer is not yet complete. + statusResumeIncomplete = 308 + + // Number of times to try each chunk + maxTries = 10 + + // chunkSize is the size of the chunks created during a resumable upload and should be a power of two. + // 1<<18 is the minimum size supported by the Google uploader, and there is no maximum. + chunkSize int64 = 1 << 18 +) + +// resumableUpload is used by the generated APIs to provide resumable uploads. +// It is not used by developers directly. +type resumableUpload struct { + f *FsDrive + remote string + // URI is the resumable resource destination provided by the server after specifying "&uploadType=resumable". + URI string + // Media is the object being uploaded. + Media io.Reader + // MediaType defines the media type, e.g. "image/jpeg". + MediaType string + // ContentLength is the full size of the object being uploaded. + ContentLength int64 + // Return value + ret *drive.File +} + +// Upload the io.Reader in of size bytes with contentType and info +func (f *FsDrive) Upload(in io.Reader, size int64, contentType string, info *drive.File, remote string) (*drive.File, error) { + fileId := info.Id + var body io.Reader = nil + body, err := googleapi.WithoutDataWrapper.JSONReader(info) + if err != nil { + return nil, err + } + params := make(url.Values) + params.Set("alt", "json") + params.Set("uploadType", "resumable") + urls := "https://www.googleapis.com/upload/drive/v2/files" + method := "POST" + if fileId != "" { + params.Set("setModifiedDate", "true") + urls += "/{fileId}" + method = "PUT" + } + urls += "?" + params.Encode() + req, _ := http.NewRequest(method, urls, body) + googleapi.Expand(req.URL, map[string]string{ + "fileId": fileId, + }) + req.Header.Set("Content-Type", "application/json; charset=UTF-8") + req.Header.Set("X-Upload-Content-Type", contentType) + req.Header.Set("X-Upload-Content-Length", fmt.Sprintf("%v", size)) + req.Header.Set("User-Agent", fs.UserAgent) + var res *http.Response + f.call(&err, func() { + res, err = f.client.Do(req) + if err == nil { + defer googleapi.CloseBody(res) + err = googleapi.CheckResponse(res) + } + }) + if err != nil { + return nil, err + } + loc := res.Header.Get("Location") + rx := &resumableUpload{ + f: f, + remote: remote, + URI: loc, + Media: in, + MediaType: contentType, + ContentLength: size, + } + return rx.Upload() +} + +// Make an http.Request for the range passed in +func (rx *resumableUpload) makeRequest(start int64, body []byte) *http.Request { + reqSize := int64(len(body)) + req, _ := http.NewRequest("POST", rx.URI, bytes.NewBuffer(body)) + req.ContentLength = reqSize + if reqSize != 0 { + req.Header.Set("Content-Range", fmt.Sprintf("bytes %v-%v/%v", start, start+reqSize-1, rx.ContentLength)) + } else { + req.Header.Set("Content-Range", fmt.Sprintf("bytes */%v", rx.ContentLength)) + } + req.Header.Set("Content-Type", rx.MediaType) + req.Header.Set("User-Agent", fs.UserAgent) + return req +} + +// rangeRE matches the transfer status response from the server. $1 is +// the last byte index uploaded. +var rangeRE = regexp.MustCompile(`^0\-(\d+)$`) + +// Query drive for the amount transferred so far +// +// If error is nil, then start should be valid +func (rx *resumableUpload) transferStatus() (start int64, err error) { + req := rx.makeRequest(0, nil) + res, err := rx.f.client.Do(req) + if err != nil { + return 0, err + } + defer googleapi.CloseBody(res) + if res.StatusCode == http.StatusCreated || res.StatusCode == http.StatusOK { + return rx.ContentLength, nil + } + if res.StatusCode != statusResumeIncomplete { + err = googleapi.CheckResponse(res) + if err != nil { + return 0, err + } + return 0, fmt.Errorf("unexpected http return code %v", res.StatusCode) + } + Range := res.Header.Get("Range") + if m := rangeRE.FindStringSubmatch(Range); len(m) == 2 { + start, err = strconv.ParseInt(m[1], 10, 64) + if err == nil { + return start, nil + } + } + return 0, fmt.Errorf("unable to parse range %q", Range) +} + +// Transfer a chunk - caller must call googleapi.CloseBody(res) if err == nil || res != nil +func (rx *resumableUpload) transferChunk(start int64, body []byte) (int, error) { + req := rx.makeRequest(start, body) + res, err := rx.f.client.Do(req) + if err != nil { + return 599, err + } + defer googleapi.CloseBody(res) + if res.StatusCode == statusResumeIncomplete { + return res.StatusCode, nil + } + err = googleapi.CheckResponse(res) + if err != nil { + return res.StatusCode, err + } + + // When the entire file upload is complete, the server + // responds with an HTTP 201 Created along with any metadata + // associated with this resource. If this request had been + // updating an existing entity rather than creating a new one, + // the HTTP response code for a completed upload would have + // been 200 OK. + // + // So parse the response out of the body. We aren't expecting + // any other 2xx codes, so we parse it unconditionaly on + // StatusCode + if err = json.NewDecoder(res.Body).Decode(&rx.ret); err != nil { + return 598, err + } + + return res.StatusCode, nil +} + +// Upload uploads the chunks from the input +// It retries each chunk maxTries times (with a pause of uploadPause between attempts). +func (rx *resumableUpload) Upload() (*drive.File, error) { + start := int64(0) + buf := make([]byte, chunkSize) + var StatusCode int + for start < rx.ContentLength { + reqSize := rx.ContentLength - start + if reqSize >= chunkSize { + reqSize = chunkSize + } else { + buf = buf[:reqSize] + } + + // Read the chunk + _, err := io.ReadFull(rx.Media, buf) + if err != nil { + return nil, err + } + + // Transfer the chunk + for try := 1; try <= maxTries; try++ { + fs.Debug(rx.remote, "Sending chunk %d length %d, %d/%d", start, reqSize, try, maxTries) + rx.f.beginCall() + StatusCode, err = rx.transferChunk(start, buf) + rx.f.endCall(err) + if StatusCode == statusResumeIncomplete || StatusCode == http.StatusCreated || StatusCode == http.StatusOK { + goto success + } + fs.Debug(rx.remote, "Retrying chunk %d/%d, code=%d, err=%v", try, maxTries, StatusCode, err) + } + fs.Debug(rx.remote, "Failed to send chunk") + return nil, fs.RetryErrorf("Chunk upload failed - retry: code=%d, err=%v", StatusCode, err) + success: + + start += reqSize + } + // Resume or retry uploads that fail due to connection interruptions or + // any 5xx errors, including: + // + // 500 Internal Server Error + // 502 Bad Gateway + // 503 Service Unavailable + // 504 Gateway Timeout + // + // Use an exponential backoff strategy if any 5xx server error is + // returned when resuming or retrying upload requests. These errors can + // occur if a server is getting overloaded. Exponential backoff can help + // alleviate these kinds of problems during periods of high volume of + // requests or heavy network traffic. Other kinds of requests should not + // be handled by exponential backoff but you can still retry a number of + // them. When retrying these requests, limit the number of times you + // retry them. For example your code could limit to ten retries or less + // before reporting an error. + // + // Handle 404 Not Found errors when doing resumable uploads by starting + // the entire upload over from the beginning. + if rx.ret == nil { + return nil, fs.RetryErrorf("Incomplete upload - retry, last error %d", StatusCode) + } + return rx.ret, nil +}