mirror of
https://github.com/rclone/rclone
synced 2025-01-20 01:07:29 +01:00
drive: Use chunked upload for files - fixes #33
This commit is contained in:
parent
ecb3c7bcef
commit
81a933ae38
@ -183,24 +183,41 @@ func (f *FsDrive) endCall(err error) bool {
|
|||||||
fs.Debug(f, "Reducing sleep to %v", f.sleepTime)
|
fs.Debug(f, "Reducing sleep to %v", f.sleepTime)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
fs.Debug(f, "Error recived: %v", err)
|
fs.Debug(f, "Error recived: %T %#v", err, err)
|
||||||
if gerr, ok := err.(*googleapi.Error); ok {
|
// Check for net error Timeout()
|
||||||
if len(gerr.Errors) > 0 {
|
if x, ok := err.(interface {
|
||||||
|
Timeout() bool
|
||||||
|
}); ok && x.Timeout() {
|
||||||
|
again = true
|
||||||
|
}
|
||||||
|
// Check for net error Temporary()
|
||||||
|
if x, ok := err.(interface {
|
||||||
|
Temporary() bool
|
||||||
|
}); ok && x.Temporary() {
|
||||||
|
again = true
|
||||||
|
}
|
||||||
|
switch gerr := err.(type) {
|
||||||
|
case *googleapi.Error:
|
||||||
|
if gerr.Code >= 500 && gerr.Code < 600 {
|
||||||
|
// All 5xx errors should be retried
|
||||||
|
again = true
|
||||||
|
} else if len(gerr.Errors) > 0 {
|
||||||
reason := gerr.Errors[0].Reason
|
reason := gerr.Errors[0].Reason
|
||||||
if reason == "rateLimitExceeded" || reason == "userRateLimitExceeded" {
|
if reason == "rateLimitExceeded" || reason == "userRateLimitExceeded" {
|
||||||
f.sleepTime *= 2
|
|
||||||
if f.sleepTime > maxSleep {
|
|
||||||
f.sleepTime = maxSleep
|
|
||||||
}
|
|
||||||
if f.sleepTime != oldSleepTime {
|
|
||||||
fs.Debug(f, "Rate limited, increasing sleep to %v", f.sleepTime)
|
|
||||||
}
|
|
||||||
again = true
|
again = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if again {
|
||||||
|
f.sleepTime *= 2
|
||||||
|
if f.sleepTime > maxSleep {
|
||||||
|
f.sleepTime = maxSleep
|
||||||
|
}
|
||||||
|
if f.sleepTime != oldSleepTime {
|
||||||
|
fs.Debug(f, "Rate limited, increasing sleep to %v", f.sleepTime)
|
||||||
|
}
|
||||||
|
}
|
||||||
return again
|
return again
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -712,16 +729,24 @@ func (f *FsDrive) Put(in io.Reader, remote string, modTime time.Time, size int64
|
|||||||
ModifiedDate: modTime.Format(timeFormatOut),
|
ModifiedDate: modTime.Format(timeFormatOut),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make the API request to upload metadata and file data.
|
|
||||||
var info *drive.File
|
var info *drive.File
|
||||||
// Don't retry, return a retry error instead
|
if false {
|
||||||
f.beginCall()
|
// Make the API request to upload metadata and file data.
|
||||||
info, err = f.svc.Files.Insert(createInfo).Media(in).Do()
|
// Don't retry, return a retry error instead
|
||||||
if f.endCall(err) {
|
f.beginCall()
|
||||||
return o, fs.RetryErrorf("Upload failed - retry: %s", err)
|
info, err = f.svc.Files.Insert(createInfo).Media(in).Do()
|
||||||
}
|
if f.endCall(err) {
|
||||||
if err != nil {
|
return o, fs.RetryErrorf("Upload failed - retry: %s", err)
|
||||||
return o, fmt.Errorf("Upload failed: %s", err)
|
}
|
||||||
|
if err != nil {
|
||||||
|
return o, fmt.Errorf("Upload failed: %s", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Upload the file in chunks
|
||||||
|
info, err = f.Upload(in, size, createInfo.MimeType, createInfo, remote)
|
||||||
|
if err != nil {
|
||||||
|
return o, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
o.setMetaData(info)
|
o.setMetaData(info)
|
||||||
return o, nil
|
return o, nil
|
||||||
@ -945,14 +970,22 @@ func (o *FsObjectDrive) Update(in io.Reader, modTime time.Time, size int64) erro
|
|||||||
// Make the API request to upload metadata and file data.
|
// Make the API request to upload metadata and file data.
|
||||||
var err error
|
var err error
|
||||||
var info *drive.File
|
var info *drive.File
|
||||||
// Don't retry, return a retry error instead
|
if false {
|
||||||
o.drive.beginCall()
|
// Don't retry, return a retry error instead
|
||||||
info, err = o.drive.svc.Files.Update(updateInfo.Id, updateInfo).SetModifiedDate(true).Media(in).Do()
|
o.drive.beginCall()
|
||||||
if o.drive.endCall(err) {
|
info, err = o.drive.svc.Files.Update(updateInfo.Id, updateInfo).SetModifiedDate(true).Media(in).Do()
|
||||||
return fs.RetryErrorf("Update failed - retry: %s", err)
|
if o.drive.endCall(err) {
|
||||||
}
|
return fs.RetryErrorf("Update failed - retry: %s", err)
|
||||||
if err != nil {
|
}
|
||||||
return fmt.Errorf("Update failed: %s", err)
|
if err != nil {
|
||||||
|
return fmt.Errorf("Update failed: %s", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Upload the file in chunks
|
||||||
|
info, err = o.drive.Upload(in, size, fs.MimeType(o), updateInfo, o.remote)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
o.setMetaData(info)
|
o.setMetaData(info)
|
||||||
return nil
|
return nil
|
||||||
|
250
drive/upload.go
Normal file
250
drive/upload.go
Normal file
@ -0,0 +1,250 @@
|
|||||||
|
// Upload for drive
|
||||||
|
//
|
||||||
|
// Docs
|
||||||
|
// Resumable upload: https://developers.google.com/drive/web/manage-uploads#resumable
|
||||||
|
// Best practices: https://developers.google.com/drive/web/manage-uploads#best-practices
|
||||||
|
// Files insert: https://developers.google.com/drive/v2/reference/files/insert
|
||||||
|
// Files update: https://developers.google.com/drive/v2/reference/files/update
|
||||||
|
//
|
||||||
|
// This contains code adapted from google.golang.org/api (C) the GO AUTHORS
|
||||||
|
|
||||||
|
package drive
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"regexp"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/ncw/rclone/fs"
|
||||||
|
"google.golang.org/api/drive/v2"
|
||||||
|
"google.golang.org/api/googleapi"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// statusResumeIncomplete is the code returned by the Google uploader when the transfer is not yet complete.
|
||||||
|
statusResumeIncomplete = 308
|
||||||
|
|
||||||
|
// Number of times to try each chunk
|
||||||
|
maxTries = 10
|
||||||
|
|
||||||
|
// chunkSize is the size of the chunks created during a resumable upload and should be a power of two.
|
||||||
|
// 1<<18 is the minimum size supported by the Google uploader, and there is no maximum.
|
||||||
|
chunkSize int64 = 1 << 18
|
||||||
|
)
|
||||||
|
|
||||||
|
// resumableUpload is used by the generated APIs to provide resumable uploads.
|
||||||
|
// It is not used by developers directly.
|
||||||
|
type resumableUpload struct {
|
||||||
|
f *FsDrive
|
||||||
|
remote string
|
||||||
|
// URI is the resumable resource destination provided by the server after specifying "&uploadType=resumable".
|
||||||
|
URI string
|
||||||
|
// Media is the object being uploaded.
|
||||||
|
Media io.Reader
|
||||||
|
// MediaType defines the media type, e.g. "image/jpeg".
|
||||||
|
MediaType string
|
||||||
|
// ContentLength is the full size of the object being uploaded.
|
||||||
|
ContentLength int64
|
||||||
|
// Return value
|
||||||
|
ret *drive.File
|
||||||
|
}
|
||||||
|
|
||||||
|
// Upload the io.Reader in of size bytes with contentType and info
|
||||||
|
func (f *FsDrive) Upload(in io.Reader, size int64, contentType string, info *drive.File, remote string) (*drive.File, error) {
|
||||||
|
fileId := info.Id
|
||||||
|
var body io.Reader = nil
|
||||||
|
body, err := googleapi.WithoutDataWrapper.JSONReader(info)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
params := make(url.Values)
|
||||||
|
params.Set("alt", "json")
|
||||||
|
params.Set("uploadType", "resumable")
|
||||||
|
urls := "https://www.googleapis.com/upload/drive/v2/files"
|
||||||
|
method := "POST"
|
||||||
|
if fileId != "" {
|
||||||
|
params.Set("setModifiedDate", "true")
|
||||||
|
urls += "/{fileId}"
|
||||||
|
method = "PUT"
|
||||||
|
}
|
||||||
|
urls += "?" + params.Encode()
|
||||||
|
req, _ := http.NewRequest(method, urls, body)
|
||||||
|
googleapi.Expand(req.URL, map[string]string{
|
||||||
|
"fileId": fileId,
|
||||||
|
})
|
||||||
|
req.Header.Set("Content-Type", "application/json; charset=UTF-8")
|
||||||
|
req.Header.Set("X-Upload-Content-Type", contentType)
|
||||||
|
req.Header.Set("X-Upload-Content-Length", fmt.Sprintf("%v", size))
|
||||||
|
req.Header.Set("User-Agent", fs.UserAgent)
|
||||||
|
var res *http.Response
|
||||||
|
f.call(&err, func() {
|
||||||
|
res, err = f.client.Do(req)
|
||||||
|
if err == nil {
|
||||||
|
defer googleapi.CloseBody(res)
|
||||||
|
err = googleapi.CheckResponse(res)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
loc := res.Header.Get("Location")
|
||||||
|
rx := &resumableUpload{
|
||||||
|
f: f,
|
||||||
|
remote: remote,
|
||||||
|
URI: loc,
|
||||||
|
Media: in,
|
||||||
|
MediaType: contentType,
|
||||||
|
ContentLength: size,
|
||||||
|
}
|
||||||
|
return rx.Upload()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make an http.Request for the range passed in
|
||||||
|
func (rx *resumableUpload) makeRequest(start int64, body []byte) *http.Request {
|
||||||
|
reqSize := int64(len(body))
|
||||||
|
req, _ := http.NewRequest("POST", rx.URI, bytes.NewBuffer(body))
|
||||||
|
req.ContentLength = reqSize
|
||||||
|
if reqSize != 0 {
|
||||||
|
req.Header.Set("Content-Range", fmt.Sprintf("bytes %v-%v/%v", start, start+reqSize-1, rx.ContentLength))
|
||||||
|
} else {
|
||||||
|
req.Header.Set("Content-Range", fmt.Sprintf("bytes */%v", rx.ContentLength))
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", rx.MediaType)
|
||||||
|
req.Header.Set("User-Agent", fs.UserAgent)
|
||||||
|
return req
|
||||||
|
}
|
||||||
|
|
||||||
|
// rangeRE matches the transfer status response from the server. $1 is
|
||||||
|
// the last byte index uploaded.
|
||||||
|
var rangeRE = regexp.MustCompile(`^0\-(\d+)$`)
|
||||||
|
|
||||||
|
// Query drive for the amount transferred so far
|
||||||
|
//
|
||||||
|
// If error is nil, then start should be valid
|
||||||
|
func (rx *resumableUpload) transferStatus() (start int64, err error) {
|
||||||
|
req := rx.makeRequest(0, nil)
|
||||||
|
res, err := rx.f.client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
defer googleapi.CloseBody(res)
|
||||||
|
if res.StatusCode == http.StatusCreated || res.StatusCode == http.StatusOK {
|
||||||
|
return rx.ContentLength, nil
|
||||||
|
}
|
||||||
|
if res.StatusCode != statusResumeIncomplete {
|
||||||
|
err = googleapi.CheckResponse(res)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return 0, fmt.Errorf("unexpected http return code %v", res.StatusCode)
|
||||||
|
}
|
||||||
|
Range := res.Header.Get("Range")
|
||||||
|
if m := rangeRE.FindStringSubmatch(Range); len(m) == 2 {
|
||||||
|
start, err = strconv.ParseInt(m[1], 10, 64)
|
||||||
|
if err == nil {
|
||||||
|
return start, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0, fmt.Errorf("unable to parse range %q", Range)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Transfer a chunk - caller must call googleapi.CloseBody(res) if err == nil || res != nil
|
||||||
|
func (rx *resumableUpload) transferChunk(start int64, body []byte) (int, error) {
|
||||||
|
req := rx.makeRequest(start, body)
|
||||||
|
res, err := rx.f.client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return 599, err
|
||||||
|
}
|
||||||
|
defer googleapi.CloseBody(res)
|
||||||
|
if res.StatusCode == statusResumeIncomplete {
|
||||||
|
return res.StatusCode, nil
|
||||||
|
}
|
||||||
|
err = googleapi.CheckResponse(res)
|
||||||
|
if err != nil {
|
||||||
|
return res.StatusCode, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// When the entire file upload is complete, the server
|
||||||
|
// responds with an HTTP 201 Created along with any metadata
|
||||||
|
// associated with this resource. If this request had been
|
||||||
|
// updating an existing entity rather than creating a new one,
|
||||||
|
// the HTTP response code for a completed upload would have
|
||||||
|
// been 200 OK.
|
||||||
|
//
|
||||||
|
// So parse the response out of the body. We aren't expecting
|
||||||
|
// any other 2xx codes, so we parse it unconditionaly on
|
||||||
|
// StatusCode
|
||||||
|
if err = json.NewDecoder(res.Body).Decode(&rx.ret); err != nil {
|
||||||
|
return 598, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.StatusCode, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Upload uploads the chunks from the input
|
||||||
|
// It retries each chunk maxTries times (with a pause of uploadPause between attempts).
|
||||||
|
func (rx *resumableUpload) Upload() (*drive.File, error) {
|
||||||
|
start := int64(0)
|
||||||
|
buf := make([]byte, chunkSize)
|
||||||
|
var StatusCode int
|
||||||
|
for start < rx.ContentLength {
|
||||||
|
reqSize := rx.ContentLength - start
|
||||||
|
if reqSize >= chunkSize {
|
||||||
|
reqSize = chunkSize
|
||||||
|
} else {
|
||||||
|
buf = buf[:reqSize]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the chunk
|
||||||
|
_, err := io.ReadFull(rx.Media, buf)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Transfer the chunk
|
||||||
|
for try := 1; try <= maxTries; try++ {
|
||||||
|
fs.Debug(rx.remote, "Sending chunk %d length %d, %d/%d", start, reqSize, try, maxTries)
|
||||||
|
rx.f.beginCall()
|
||||||
|
StatusCode, err = rx.transferChunk(start, buf)
|
||||||
|
rx.f.endCall(err)
|
||||||
|
if StatusCode == statusResumeIncomplete || StatusCode == http.StatusCreated || StatusCode == http.StatusOK {
|
||||||
|
goto success
|
||||||
|
}
|
||||||
|
fs.Debug(rx.remote, "Retrying chunk %d/%d, code=%d, err=%v", try, maxTries, StatusCode, err)
|
||||||
|
}
|
||||||
|
fs.Debug(rx.remote, "Failed to send chunk")
|
||||||
|
return nil, fs.RetryErrorf("Chunk upload failed - retry: code=%d, err=%v", StatusCode, err)
|
||||||
|
success:
|
||||||
|
|
||||||
|
start += reqSize
|
||||||
|
}
|
||||||
|
// Resume or retry uploads that fail due to connection interruptions or
|
||||||
|
// any 5xx errors, including:
|
||||||
|
//
|
||||||
|
// 500 Internal Server Error
|
||||||
|
// 502 Bad Gateway
|
||||||
|
// 503 Service Unavailable
|
||||||
|
// 504 Gateway Timeout
|
||||||
|
//
|
||||||
|
// Use an exponential backoff strategy if any 5xx server error is
|
||||||
|
// returned when resuming or retrying upload requests. These errors can
|
||||||
|
// occur if a server is getting overloaded. Exponential backoff can help
|
||||||
|
// alleviate these kinds of problems during periods of high volume of
|
||||||
|
// requests or heavy network traffic. Other kinds of requests should not
|
||||||
|
// be handled by exponential backoff but you can still retry a number of
|
||||||
|
// them. When retrying these requests, limit the number of times you
|
||||||
|
// retry them. For example your code could limit to ten retries or less
|
||||||
|
// before reporting an error.
|
||||||
|
//
|
||||||
|
// Handle 404 Not Found errors when doing resumable uploads by starting
|
||||||
|
// the entire upload over from the beginning.
|
||||||
|
if rx.ret == nil {
|
||||||
|
return nil, fs.RetryErrorf("Incomplete upload - retry, last error %d", StatusCode)
|
||||||
|
}
|
||||||
|
return rx.ret, nil
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user