chunker: implement resumer interface

Fixes #5154
This commit is contained in:
Maxwell Calman 2020-08-31 10:25:10 -04:00 committed by Ivan Andreev
parent e65e046c21
commit 604dc4d1f9
2 changed files with 174 additions and 6 deletions

View File

@ -6,6 +6,8 @@ import (
"context"
"crypto/md5"
"crypto/sha1"
"encoding"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
@ -13,6 +15,7 @@ import (
gohash "hash"
"io"
"io/ioutil"
"log"
"math/rand"
"path"
"regexp"
@ -379,6 +382,8 @@ type Fs struct {
features *fs.Features // optional features
dirSort bool // reserved for future, ignored
useNoRename bool // can be set with the transactions option
hashState string // set in resume(), used to restore hash state
resumeXactID string // set in resume(), allows reuse of xactID upon resume
}
// configure sets up chunker for given name format, meta format and hash type.
@ -1152,7 +1157,41 @@ func (f *Fs) put(
// Prepare to upload
c := f.newChunkingReader(src)
wrapIn := c.wrapStream(ctx, in, src)
// Prepare for resume if resumable
var resumeOpt *fs.OptionResume
// partialHashState will be used in wrapStream to restore hash state
var partialHashState []byte
for _, option := range options {
switch option.(type) {
case *fs.OptionResume:
resumeOpt = option.(*fs.OptionResume)
if resumeOpt.Pos != 0 {
numChunksOnRemote := resumeOpt.Pos / int64(f.opt.ChunkSize)
// Checks for existing chunks on the remote
for i := 0; i < int(numChunksOnRemote); i++ {
existingChunkName := f.makeChunkName(remote, i, "", f.resumeXactID)
existingChunk, err := f.base.NewObject(ctx, existingChunkName)
// If NewObject returns an error the chunk likely doesn't exist on the remote and we cannot resume
if err != nil {
resumeOpt.Pos = 0
c.chunks = nil
break
}
c.chunks = append(c.chunks, existingChunk)
}
fs.Debugf(f, "Resuming at chunk number: %d", numChunksOnRemote)
partialHashState, _ = base64.StdEncoding.DecodeString(f.hashState)
// Discard bytes that already exist on remote
written, err := io.CopyN(ioutil.Discard, in, resumeOpt.Pos)
if err != nil {
return nil, err
}
c.accountBytes(written)
c.sizeLeft = c.sizeTotal - c.readCount
}
}
}
wrapIn := c.wrapStream(ctx, in, src, partialHashState)
var metaObject fs.Object
defer func() {
@ -1162,13 +1201,22 @@ func (f *Fs) put(
}()
baseRemote := remote
xactID, errXact := f.newXactID(ctx, baseRemote)
if errXact != nil {
return nil, errXact
var xactID string
if resumeOpt != nil && resumeOpt.Pos != 0 {
xactID = f.resumeXactID
} else {
xactID, err = f.newXactID(ctx, baseRemote)
if err != nil {
return nil, err
}
}
// Transfer chunks data
for c.chunkNo = 0; !c.done; c.chunkNo++ {
// skip to chunk we can resume from if resumeOpt is set
if c.chunkNo == 0 && resumeOpt != nil && resumeOpt.Pos != 0 {
c.chunkNo = int(resumeOpt.Pos) / int(f.opt.ChunkSize)
}
if c.chunkNo > maxSafeChunkNumber {
return nil, ErrChunkOverflow
}
@ -1230,6 +1278,41 @@ func (f *Fs) put(
c.chunkLimit = c.chunkSize
c.chunks = append(c.chunks, chunk)
// If an OptionResume was passed than we should call SetID so a resume can be attempted in event of a failure
// ID keeps track of the first chunk that should be uploaded if a resume is attempted
if resumeOpt != nil {
// Publish hash state to control chunk
marshaler, ok := c.hasher.(encoding.BinaryMarshaler)
if !ok {
return nil, fmt.Errorf("The hash type does not implement encoding.BinaryMarshaler")
}
state, err := marshaler.MarshalBinary()
if err != nil {
return nil, err
}
hashType := f.opt.HashType
data, err := marshalPartialHashJSON(ctx, hashType, base64.StdEncoding.EncodeToString(state))
if err != nil {
return nil, err
}
controlChunkName := f.makeChunkName(remote, -1, "phash", xactID)
controlInfo := f.wrapInfo(src, controlChunkName, int64(len(data)))
controlChunk, err := basePut(ctx, bytes.NewReader(data), controlInfo)
defer func() {
_ = controlChunk.Remove(ctx)
}()
if err != nil {
return nil, err
}
positionStr := strconv.Itoa(c.chunkNo + 1) // stores the number of chunks uploaded
chunkSizeStr := strconv.FormatInt(c.chunkSize, 10)
startFromStr := strconv.FormatInt(int64(f.opt.StartFrom), 10)
err = resumeOpt.SetID(ctx, chunkSizeStr+","+startFromStr+","+positionStr+","+xactID, f.opt.HashType, base64.StdEncoding.EncodeToString(state))
if err != nil {
return nil, err
}
}
}
// Validate uploaded size
@ -1356,7 +1439,7 @@ func (f *Fs) newChunkingReader(src fs.ObjectInfo) *chunkingReader {
return c
}
func (c *chunkingReader) wrapStream(ctx context.Context, in io.Reader, src fs.ObjectInfo) io.Reader {
func (c *chunkingReader) wrapStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, partialHashState []byte) io.Reader {
baseIn, wrapBack := accounting.UnWrap(in)
switch {
@ -1391,6 +1474,15 @@ func (c *chunkingReader) wrapStream(ctx context.Context, in io.Reader, src fs.Ob
}
if c.hasher != nil {
// Restores hash state during a resume
if partialHashState != nil {
unmarshaler, ok := c.hasher.(encoding.BinaryUnmarshaler)
if ok {
if err := unmarshaler.UnmarshalBinary(partialHashState); err != nil {
log.Fatal("unable to unmarshal hash:", err)
}
}
}
baseIn = io.TeeReader(baseIn, c.hasher)
}
c.baseReader = baseIn
@ -2510,6 +2602,34 @@ func unmarshalSimpleJSON(ctx context.Context, metaObject fs.Object, data []byte)
return info, true, nil
}
// Format for partial hash control chunks
type partialHashJSON struct {
HashType string `json:"htype"`
PartialHash string `json:"phash"`
}
// marshalPartialHashJSON
//
// Creates a JSON containing the hashType being used and the partial hash state. This will be stored in
// a control chunk and used for resume functionality.
//
func marshalPartialHashJSON(ctx context.Context, hashType, partialHash string) ([]byte, error) {
controlData := partialHashJSON{
HashType: hashType,
PartialHash: partialHash,
}
data, err := json.Marshal(&controlData)
return data, err
}
// unmarshalPartialHashJSON parses partial hash control chunk.
//
func unmarshalPartialHashJSON(ctx context.Context, data []byte) (hashType, partialHashState string, err error) {
var partialHashData partialHashJSON
err = json.Unmarshal(data, &partialHashData)
return partialHashData.HashType, partialHashData.PartialHash, err
}
func silentlyRemove(ctx context.Context, o fs.Object) {
_ = o.Remove(ctx) // ignore error
}
@ -2544,9 +2664,58 @@ func (f *Fs) CanQuickRename() bool {
return f.base.Features().Move != nil
}
// Resume checks whether the (remote, ID) pair is valid and returns
// the point the file should be resumed from or an error.
func (f *Fs) Resume(ctx context.Context, remote, ID, hashName, hashState string) (Pos int64, err error) {
idSlice := strings.Split(ID, ",")
cachedChunkSize, err := strconv.ParseInt(idSlice[0], 10, 64)
cachedStartFrom, err := strconv.ParseInt(idSlice[1], 10, 64)
cachedChunkNo, err := strconv.ParseInt(idSlice[2], 10, 64)
cachedXactID := idSlice[3]
if err != nil {
return 0, err
}
if cachedChunkSize != int64(f.opt.ChunkSize) {
return 0, errors.New("ChunkSize doesn't match for file we are trying to resume")
}
if f.opt.StartFrom != int(cachedStartFrom) {
return 0, errors.New("StartFrom doesn't match for file we are trying to resume")
}
// Check partial hash control chunk
controlChunkName := f.makeChunkName(remote, -1, "phash", cachedXactID)
hashControlChunk, err := f.base.NewObject(ctx, controlChunkName)
if err != nil {
return 0, err
}
reader, err := hashControlChunk.Open(ctx)
data, err := ioutil.ReadAll(reader)
_ = reader.Close() // ensure file handle is freed on windows
if err != nil {
return 0, err
}
remoteHashType, remoteHashState, err := unmarshalPartialHashJSON(ctx, data)
if remoteHashType == hashName && remoteHashState == hashState {
if f.opt.HashType != remoteHashType {
fs.Debugf(f, "Resume skipped, mismatch hash types. prev: %s, curr: %s", remoteHashType, f.opt.HashType)
return 0, nil
}
pos := cachedChunkNo * cachedChunkSize
if err != nil {
return 0, err
}
f.hashState = hashState
f.resumeXactID = cachedXactID
return pos, nil
}
// No valid control chunks found, rewind from start
return 0, nil
}
// Check the interfaces are satisfied
var (
_ fs.Fs = (*Fs)(nil)
_ fs.Resumer = (*Fs)(nil)
_ fs.Purger = (*Fs)(nil)
_ fs.Copier = (*Fs)(nil)
_ fs.Mover = (*Fs)(nil)

View File

@ -43,7 +43,6 @@ func TestIntegration(t *testing.T) {
"DirCacheFlush",
"UserInfo",
"Disconnect",
"Resume",
},
}
if *fstest.RemoteName == "" {