mirror of
https://github.com/rclone/rclone
synced 2025-01-27 10:28:38 +01:00
Implement box storage remote - #97
This commit is contained in:
parent
b5bf819256
commit
d279161cee
192
box/api/types.go
Normal file
192
box/api/types.go
Normal file
@ -0,0 +1,192 @@
|
|||||||
|
// Package api has type definitions for box
|
||||||
|
//
|
||||||
|
// Converted from the API docs with help from https://mholt.github.io/json-to-go/
|
||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// 2017-05-03T07:26:10-07:00
|
||||||
|
timeFormat = `"` + time.RFC3339 + `"`
|
||||||
|
)
|
||||||
|
|
||||||
|
// Time represents represents date and time information for the
|
||||||
|
// box API, by using RFC3339
|
||||||
|
type Time time.Time
|
||||||
|
|
||||||
|
// MarshalJSON turns a Time into JSON (in UTC)
|
||||||
|
func (t *Time) MarshalJSON() (out []byte, err error) {
|
||||||
|
timeString := (*time.Time)(t).Format(timeFormat)
|
||||||
|
return []byte(timeString), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalJSON turns JSON into a Time
|
||||||
|
func (t *Time) UnmarshalJSON(data []byte) error {
|
||||||
|
newT, err := time.Parse(timeFormat, string(data))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
*t = Time(newT)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error is returned from box when things go wrong
|
||||||
|
type Error struct {
|
||||||
|
Type string `json:"type"`
|
||||||
|
Status int `json:"status"`
|
||||||
|
Code string `json:"code"`
|
||||||
|
ContextInfo json.RawMessage
|
||||||
|
HelpURL string `json:"help_url"`
|
||||||
|
Message string `json:"message"`
|
||||||
|
RequestID string `json:"request_id"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error returns a string for the error and statistifes the error interface
|
||||||
|
func (e *Error) Error() string {
|
||||||
|
out := fmt.Sprintf("Error %q (%d)", e.Code, e.Status)
|
||||||
|
if e.Message != "" {
|
||||||
|
out += ": " + e.Message
|
||||||
|
}
|
||||||
|
if e.ContextInfo != nil {
|
||||||
|
out += fmt.Sprintf(" (%+v)", e.ContextInfo)
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check Error statisfies the error interface
|
||||||
|
var _ error = (*Error)(nil)
|
||||||
|
|
||||||
|
// ItemFields are the fields needed for FileInfo
|
||||||
|
var ItemFields = "type,id,sequence_id,etag,sha1,name,size,created_at,modified_at,content_created_at,content_modified_at,item_status"
|
||||||
|
|
||||||
|
// Types of things in Item
|
||||||
|
const (
|
||||||
|
ItemTypeFolder = "folder"
|
||||||
|
ItemTypeFile = "file"
|
||||||
|
ItemStatusActive = "active"
|
||||||
|
ItemStatusTrashed = "trashed"
|
||||||
|
ItemStatusDeleted = "deleted"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Item describes a folder or a file as returned by Get Folder Items and others
|
||||||
|
type Item struct {
|
||||||
|
Type string `json:"type"`
|
||||||
|
ID string `json:"id"`
|
||||||
|
SequenceID string `json:"sequence_id"`
|
||||||
|
Etag string `json:"etag"`
|
||||||
|
SHA1 string `json:"sha1"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
Size int64 `json:"size"`
|
||||||
|
CreatedAt Time `json:"created_at"`
|
||||||
|
ModifiedAt Time `json:"modified_at"`
|
||||||
|
ContentCreatedAt Time `json:"content_created_at"`
|
||||||
|
ContentModifiedAt Time `json:"content_modified_at"`
|
||||||
|
ItemStatus string `json:"item_status"` // active, trashed if the file has been moved to the trash, and deleted if the file has been permanently deleted
|
||||||
|
}
|
||||||
|
|
||||||
|
// ModTime returns the modification time of the item
|
||||||
|
func (i *Item) ModTime() (t time.Time) {
|
||||||
|
t = time.Time(i.ContentModifiedAt)
|
||||||
|
if t.IsZero() {
|
||||||
|
t = time.Time(i.ModifiedAt)
|
||||||
|
}
|
||||||
|
return t
|
||||||
|
}
|
||||||
|
|
||||||
|
// FolderItems is returned from the GetFolderItems call
|
||||||
|
type FolderItems struct {
|
||||||
|
TotalCount int `json:"total_count"`
|
||||||
|
Entries []Item `json:"entries"`
|
||||||
|
Offset int `json:"offset"`
|
||||||
|
Limit int `json:"limit"`
|
||||||
|
Order []struct {
|
||||||
|
By string `json:"by"`
|
||||||
|
Direction string `json:"direction"`
|
||||||
|
} `json:"order"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parent defined the ID of the parent directory
|
||||||
|
type Parent struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateFolder is the request for Create Folder
|
||||||
|
type CreateFolder struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Parent Parent `json:"parent"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// UploadFile is the request for Upload File
|
||||||
|
type UploadFile struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Parent Parent `json:"parent"`
|
||||||
|
ContentCreatedAt Time `json:"content_created_at"`
|
||||||
|
ContentModifiedAt Time `json:"content_modified_at"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateFileModTime is used in Update File Info
|
||||||
|
type UpdateFileModTime struct {
|
||||||
|
ContentModifiedAt Time `json:"content_modified_at"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateFileMove is the request for Upload File to change name and parent
|
||||||
|
type UpdateFileMove struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Parent Parent `json:"parent"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// CopyFile is the request for Copy File
|
||||||
|
type CopyFile struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Parent Parent `json:"parent"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// UploadSessionRequest is uses in Create Upload Session
|
||||||
|
type UploadSessionRequest struct {
|
||||||
|
FolderID string `json:"folder_id,omitempty"` // don't pass for update
|
||||||
|
FileSize int64 `json:"file_size"`
|
||||||
|
FileName string `json:"file_name,omitempty"` // optional for update
|
||||||
|
}
|
||||||
|
|
||||||
|
// UploadSessionResponse is returned from Create Upload Session
|
||||||
|
type UploadSessionResponse struct {
|
||||||
|
TotalParts int `json:"total_parts"`
|
||||||
|
PartSize int64 `json:"part_size"`
|
||||||
|
SessionEndpoints struct {
|
||||||
|
ListParts string `json:"list_parts"`
|
||||||
|
Commit string `json:"commit"`
|
||||||
|
UploadPart string `json:"upload_part"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
Abort string `json:"abort"`
|
||||||
|
} `json:"session_endpoints"`
|
||||||
|
SessionExpiresAt Time `json:"session_expires_at"`
|
||||||
|
ID string `json:"id"`
|
||||||
|
Type string `json:"type"`
|
||||||
|
NumPartsProcessed int `json:"num_parts_processed"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Part defines the return from upload part call which are passed to commit upload also
|
||||||
|
type Part struct {
|
||||||
|
PartID string `json:"part_id"`
|
||||||
|
Offset int `json:"offset"`
|
||||||
|
Size int `json:"size"`
|
||||||
|
Sha1 string `json:"sha1"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// UploadPartResponse is returned from the upload part call
|
||||||
|
type UploadPartResponse struct {
|
||||||
|
Part Part `json:"part"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// CommitUpload is used in the Commit Upload call
|
||||||
|
type CommitUpload struct {
|
||||||
|
Parts []Part `json:"parts"`
|
||||||
|
Attributes struct {
|
||||||
|
ContentCreatedAt Time `json:"content_created_at"`
|
||||||
|
ContentModifiedAt Time `json:"content_modified_at"`
|
||||||
|
} `json:"attributes"`
|
||||||
|
}
|
1073
box/box.go
Normal file
1073
box/box.go
Normal file
File diff suppressed because it is too large
Load Diff
72
box/box_test.go
Normal file
72
box/box_test.go
Normal file
@ -0,0 +1,72 @@
|
|||||||
|
// Test Box filesystem interface
|
||||||
|
//
|
||||||
|
// Automatically generated - DO NOT EDIT
|
||||||
|
// Regenerate with: make gen_tests
|
||||||
|
package box_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/ncw/rclone/box"
|
||||||
|
"github.com/ncw/rclone/fs"
|
||||||
|
"github.com/ncw/rclone/fstest/fstests"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSetup(t *testing.T) {
|
||||||
|
fstests.NilObject = fs.Object((*box.Object)(nil))
|
||||||
|
fstests.RemoteName = "TestBox:"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generic tests for the Fs
|
||||||
|
func TestInit(t *testing.T) { fstests.TestInit(t) }
|
||||||
|
func TestFsString(t *testing.T) { fstests.TestFsString(t) }
|
||||||
|
func TestFsName(t *testing.T) { fstests.TestFsName(t) }
|
||||||
|
func TestFsRoot(t *testing.T) { fstests.TestFsRoot(t) }
|
||||||
|
func TestFsRmdirEmpty(t *testing.T) { fstests.TestFsRmdirEmpty(t) }
|
||||||
|
func TestFsRmdirNotFound(t *testing.T) { fstests.TestFsRmdirNotFound(t) }
|
||||||
|
func TestFsMkdir(t *testing.T) { fstests.TestFsMkdir(t) }
|
||||||
|
func TestFsMkdirRmdirSubdir(t *testing.T) { fstests.TestFsMkdirRmdirSubdir(t) }
|
||||||
|
func TestFsListEmpty(t *testing.T) { fstests.TestFsListEmpty(t) }
|
||||||
|
func TestFsListDirEmpty(t *testing.T) { fstests.TestFsListDirEmpty(t) }
|
||||||
|
func TestFsListRDirEmpty(t *testing.T) { fstests.TestFsListRDirEmpty(t) }
|
||||||
|
func TestFsNewObjectNotFound(t *testing.T) { fstests.TestFsNewObjectNotFound(t) }
|
||||||
|
func TestFsPutFile1(t *testing.T) { fstests.TestFsPutFile1(t) }
|
||||||
|
func TestFsPutError(t *testing.T) { fstests.TestFsPutError(t) }
|
||||||
|
func TestFsPutFile2(t *testing.T) { fstests.TestFsPutFile2(t) }
|
||||||
|
func TestFsUpdateFile1(t *testing.T) { fstests.TestFsUpdateFile1(t) }
|
||||||
|
func TestFsListDirFile2(t *testing.T) { fstests.TestFsListDirFile2(t) }
|
||||||
|
func TestFsListRDirFile2(t *testing.T) { fstests.TestFsListRDirFile2(t) }
|
||||||
|
func TestFsListDirRoot(t *testing.T) { fstests.TestFsListDirRoot(t) }
|
||||||
|
func TestFsListRDirRoot(t *testing.T) { fstests.TestFsListRDirRoot(t) }
|
||||||
|
func TestFsListSubdir(t *testing.T) { fstests.TestFsListSubdir(t) }
|
||||||
|
func TestFsListRSubdir(t *testing.T) { fstests.TestFsListRSubdir(t) }
|
||||||
|
func TestFsListLevel2(t *testing.T) { fstests.TestFsListLevel2(t) }
|
||||||
|
func TestFsListRLevel2(t *testing.T) { fstests.TestFsListRLevel2(t) }
|
||||||
|
func TestFsListFile1(t *testing.T) { fstests.TestFsListFile1(t) }
|
||||||
|
func TestFsNewObject(t *testing.T) { fstests.TestFsNewObject(t) }
|
||||||
|
func TestFsListFile1and2(t *testing.T) { fstests.TestFsListFile1and2(t) }
|
||||||
|
func TestFsNewObjectDir(t *testing.T) { fstests.TestFsNewObjectDir(t) }
|
||||||
|
func TestFsCopy(t *testing.T) { fstests.TestFsCopy(t) }
|
||||||
|
func TestFsMove(t *testing.T) { fstests.TestFsMove(t) }
|
||||||
|
func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) }
|
||||||
|
func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) }
|
||||||
|
func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) }
|
||||||
|
func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) }
|
||||||
|
func TestObjectString(t *testing.T) { fstests.TestObjectString(t) }
|
||||||
|
func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) }
|
||||||
|
func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) }
|
||||||
|
func TestObjectHashes(t *testing.T) { fstests.TestObjectHashes(t) }
|
||||||
|
func TestObjectModTime(t *testing.T) { fstests.TestObjectModTime(t) }
|
||||||
|
func TestObjectMimeType(t *testing.T) { fstests.TestObjectMimeType(t) }
|
||||||
|
func TestObjectSetModTime(t *testing.T) { fstests.TestObjectSetModTime(t) }
|
||||||
|
func TestObjectSize(t *testing.T) { fstests.TestObjectSize(t) }
|
||||||
|
func TestObjectOpen(t *testing.T) { fstests.TestObjectOpen(t) }
|
||||||
|
func TestObjectOpenSeek(t *testing.T) { fstests.TestObjectOpenSeek(t) }
|
||||||
|
func TestObjectPartialRead(t *testing.T) { fstests.TestObjectPartialRead(t) }
|
||||||
|
func TestObjectUpdate(t *testing.T) { fstests.TestObjectUpdate(t) }
|
||||||
|
func TestObjectStorable(t *testing.T) { fstests.TestObjectStorable(t) }
|
||||||
|
func TestFsIsFile(t *testing.T) { fstests.TestFsIsFile(t) }
|
||||||
|
func TestFsIsFileNotFound(t *testing.T) { fstests.TestFsIsFileNotFound(t) }
|
||||||
|
func TestObjectRemove(t *testing.T) { fstests.TestObjectRemove(t) }
|
||||||
|
func TestObjectPurge(t *testing.T) { fstests.TestObjectPurge(t) }
|
||||||
|
func TestFinalise(t *testing.T) { fstests.TestFinalise(t) }
|
259
box/upload.go
Normal file
259
box/upload.go
Normal file
@ -0,0 +1,259 @@
|
|||||||
|
// multpart upload for box
|
||||||
|
|
||||||
|
package box
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"crypto/sha1"
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ncw/rclone/box/api"
|
||||||
|
"github.com/ncw/rclone/fs"
|
||||||
|
"github.com/ncw/rclone/rest"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
// createUploadSession creates an upload session for the object
|
||||||
|
func (o *Object) createUploadSession(leaf, directoryID string, size int64) (response *api.UploadSessionResponse, err error) {
|
||||||
|
opts := rest.Opts{
|
||||||
|
Method: "POST",
|
||||||
|
Path: "/files/upload_sessions",
|
||||||
|
RootURL: uploadURL,
|
||||||
|
}
|
||||||
|
request := api.UploadSessionRequest{
|
||||||
|
FileSize: size,
|
||||||
|
}
|
||||||
|
// If object has an ID then it is existing so create a new version
|
||||||
|
if o.id != "" {
|
||||||
|
opts.Path = "/files/" + o.id + "/upload_sessions"
|
||||||
|
} else {
|
||||||
|
opts.Path = "/files/upload_sessions"
|
||||||
|
request.FolderID = directoryID
|
||||||
|
request.FileName = replaceReservedChars(leaf)
|
||||||
|
}
|
||||||
|
var resp *http.Response
|
||||||
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
|
resp, err = o.fs.srv.CallJSON(&opts, &request, &response)
|
||||||
|
return shouldRetry(resp, err)
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// sha1Digest produces a digest using sha1 as per RFC3230
|
||||||
|
func sha1Digest(digest []byte) string {
|
||||||
|
return "sha=" + base64.StdEncoding.EncodeToString(digest)
|
||||||
|
}
|
||||||
|
|
||||||
|
// uploadPart uploads a part in an upload session
|
||||||
|
func (o *Object) uploadPart(SessionID string, offset, totalSize int64, chunk []byte) (response *api.UploadPartResponse, err error) {
|
||||||
|
chunkSize := int64(len(chunk))
|
||||||
|
in := bytes.NewReader(chunk)
|
||||||
|
sha1sum := sha1.Sum(chunk)
|
||||||
|
opts := rest.Opts{
|
||||||
|
Method: "PUT",
|
||||||
|
Path: "/files/upload_sessions/" + SessionID,
|
||||||
|
RootURL: uploadURL,
|
||||||
|
ContentType: "application/octet-stream",
|
||||||
|
ContentLength: &chunkSize,
|
||||||
|
ContentRange: fmt.Sprintf("bytes %d-%d/%d", offset, offset+chunkSize-1, totalSize),
|
||||||
|
ExtraHeaders: map[string]string{
|
||||||
|
"Digest": sha1Digest(sha1sum[:]),
|
||||||
|
},
|
||||||
|
Body: in,
|
||||||
|
}
|
||||||
|
var resp *http.Response
|
||||||
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
|
_, _ = in.Seek(0, 0)
|
||||||
|
resp, err = o.fs.srv.CallJSON(&opts, nil, &response)
|
||||||
|
return shouldRetry(resp, err)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// commitUpload finishes an upload session
|
||||||
|
func (o *Object) commitUpload(SessionID string, parts []api.Part, modTime time.Time, sha1sum []byte) (result *api.FolderItems, err error) {
|
||||||
|
opts := rest.Opts{
|
||||||
|
Method: "POST",
|
||||||
|
Path: "/files/upload_sessions/" + SessionID + "/commit",
|
||||||
|
RootURL: uploadURL,
|
||||||
|
ExtraHeaders: map[string]string{
|
||||||
|
"Digest": sha1Digest(sha1sum),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
request := api.CommitUpload{
|
||||||
|
Parts: parts,
|
||||||
|
}
|
||||||
|
request.Attributes.ContentModifiedAt = api.Time(modTime)
|
||||||
|
request.Attributes.ContentCreatedAt = api.Time(modTime)
|
||||||
|
var body []byte
|
||||||
|
var resp *http.Response
|
||||||
|
const maxTries = 10
|
||||||
|
var tries int
|
||||||
|
outer:
|
||||||
|
for tries = 0; tries < maxTries; tries++ {
|
||||||
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
|
resp, err = o.fs.srv.CallJSON(&opts, &request, nil)
|
||||||
|
if err != nil {
|
||||||
|
return shouldRetry(resp, err)
|
||||||
|
}
|
||||||
|
body, err = rest.ReadBody(resp)
|
||||||
|
return shouldRetry(resp, err)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
delay := 1
|
||||||
|
switch resp.StatusCode {
|
||||||
|
case http.StatusOK, http.StatusCreated:
|
||||||
|
break outer
|
||||||
|
case http.StatusAccepted:
|
||||||
|
delayString := resp.Header.Get("Retry-After")
|
||||||
|
if delayString != "" {
|
||||||
|
delay, err = strconv.Atoi(delayString)
|
||||||
|
if err != nil {
|
||||||
|
fs.Debugf(o, "Couldn't decode Retry-After header %q: %v", delayString, err)
|
||||||
|
delay = 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return nil, errors.Errorf("unknown HTTP status return %q (%d)", resp.Status, resp.StatusCode)
|
||||||
|
}
|
||||||
|
fs.Debugf(o, "commit multipart upload failed %d/%d - trying again in %d seconds", tries+1, maxTries, delay)
|
||||||
|
time.Sleep(time.Duration(delay) * time.Second)
|
||||||
|
}
|
||||||
|
if tries >= maxTries {
|
||||||
|
return nil, errors.New("too many tries to commit multipart upload")
|
||||||
|
}
|
||||||
|
err = json.Unmarshal(body, &result)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "couldn't decode commit response: %q", body)
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// abortUpload cancels an upload session
|
||||||
|
func (o *Object) abortUpload(SessionID string) (err error) {
|
||||||
|
opts := rest.Opts{
|
||||||
|
Method: "DELETE",
|
||||||
|
Path: "/files/upload_sessions/" + SessionID,
|
||||||
|
RootURL: uploadURL,
|
||||||
|
NoResponse: true,
|
||||||
|
}
|
||||||
|
var resp *http.Response
|
||||||
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
|
resp, err = o.fs.srv.Call(&opts)
|
||||||
|
return shouldRetry(resp, err)
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// uploadMultipart uploads a file using multipart upload
|
||||||
|
func (o *Object) uploadMultipart(in io.Reader, leaf, directoryID string, size int64, modTime time.Time) (err error) {
|
||||||
|
// Create upload session
|
||||||
|
session, err := o.createUploadSession(leaf, directoryID, size)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "multipart upload create session failed")
|
||||||
|
}
|
||||||
|
chunkSize := session.PartSize
|
||||||
|
fs.Debugf(o, "Multipart upload session started for %d parts of size %v", session.TotalParts, fs.SizeSuffix(chunkSize))
|
||||||
|
|
||||||
|
// Cancel the session if something went wrong
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
fs.Debugf(o, "Cancelling multipart upload: %v", err)
|
||||||
|
cancelErr := o.abortUpload(session.ID)
|
||||||
|
if cancelErr != nil {
|
||||||
|
fs.Logf(o, "Failed to cancel multipart upload: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Upload the chunks
|
||||||
|
remaining := size
|
||||||
|
position := int64(0)
|
||||||
|
parts := make([]api.Part, session.TotalParts)
|
||||||
|
hash := sha1.New()
|
||||||
|
errs := make(chan error, 1)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
outer:
|
||||||
|
for part := 0; part < session.TotalParts; part++ {
|
||||||
|
// Check any errors
|
||||||
|
select {
|
||||||
|
case err = <-errs:
|
||||||
|
break outer
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
reqSize := remaining
|
||||||
|
if reqSize >= int64(chunkSize) {
|
||||||
|
reqSize = int64(chunkSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make a block of memory
|
||||||
|
buf := make([]byte, reqSize)
|
||||||
|
|
||||||
|
// Read the chunk
|
||||||
|
_, err = io.ReadFull(in, buf)
|
||||||
|
if err != nil {
|
||||||
|
err = errors.Wrap(err, "multipart upload failed to read source")
|
||||||
|
break outer
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make the global hash (must be done sequentially)
|
||||||
|
_, _ = hash.Write(buf)
|
||||||
|
|
||||||
|
// Transfer the chunk
|
||||||
|
wg.Add(1)
|
||||||
|
go func(part int, position int64) {
|
||||||
|
defer wg.Done()
|
||||||
|
o.fs.getUploadToken()
|
||||||
|
defer o.fs.putUploadToken()
|
||||||
|
fs.Debugf(o, "Uploading part %d/%d offset %v/%v part size %v", part+1, session.TotalParts, fs.SizeSuffix(position), fs.SizeSuffix(size), fs.SizeSuffix(chunkSize))
|
||||||
|
partResponse, err := o.uploadPart(session.ID, position, size, buf)
|
||||||
|
if err != nil {
|
||||||
|
err = errors.Wrap(err, "multipart upload failed to upload part")
|
||||||
|
select {
|
||||||
|
case errs <- err:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
parts[part] = partResponse.Part
|
||||||
|
}(part, position)
|
||||||
|
|
||||||
|
// ready for next block
|
||||||
|
remaining -= chunkSize
|
||||||
|
position += chunkSize
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
if err == nil {
|
||||||
|
select {
|
||||||
|
case err = <-errs:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finalise the upload session
|
||||||
|
result, err := o.commitUpload(session.ID, parts, modTime, hash.Sum(nil))
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "multipart upload failed to finalize")
|
||||||
|
}
|
||||||
|
|
||||||
|
if result.TotalCount != 1 || len(result.Entries) != 1 {
|
||||||
|
return errors.Errorf("multipart upload failed %v - not sure why", o)
|
||||||
|
}
|
||||||
|
return o.setMetaData(&result.Entries[0])
|
||||||
|
}
|
@ -4,6 +4,7 @@ import (
|
|||||||
// Active file systems
|
// Active file systems
|
||||||
_ "github.com/ncw/rclone/amazonclouddrive"
|
_ "github.com/ncw/rclone/amazonclouddrive"
|
||||||
_ "github.com/ncw/rclone/b2"
|
_ "github.com/ncw/rclone/b2"
|
||||||
|
_ "github.com/ncw/rclone/box"
|
||||||
_ "github.com/ncw/rclone/crypt"
|
_ "github.com/ncw/rclone/crypt"
|
||||||
_ "github.com/ncw/rclone/drive"
|
_ "github.com/ncw/rclone/drive"
|
||||||
_ "github.com/ncw/rclone/dropbox"
|
_ "github.com/ncw/rclone/dropbox"
|
||||||
|
@ -143,5 +143,6 @@ func main() {
|
|||||||
generateTestProgram(t, fns, "Crypt", "3")
|
generateTestProgram(t, fns, "Crypt", "3")
|
||||||
generateTestProgram(t, fns, "Sftp", "")
|
generateTestProgram(t, fns, "Sftp", "")
|
||||||
generateTestProgram(t, fns, "FTP", "")
|
generateTestProgram(t, fns, "FTP", "")
|
||||||
|
generateTestProgram(t, fns, "Box", "")
|
||||||
log.Printf("Done")
|
log.Printf("Done")
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user