1
mirror of https://github.com/rclone/rclone synced 2024-12-21 11:45:56 +01:00

torrent: init

This commit is contained in:
Joel Kåberg 2024-12-10 14:07:42 +00:00
parent a78bc093de
commit a77cfaf59b
3 changed files with 1029 additions and 0 deletions

View File

@ -57,6 +57,7 @@ import (
_ "github.com/rclone/rclone/backend/storj"
_ "github.com/rclone/rclone/backend/sugarsync"
_ "github.com/rclone/rclone/backend/swift"
_ "github.com/rclone/rclone/backend/torrent"
_ "github.com/rclone/rclone/backend/ulozto"
_ "github.com/rclone/rclone/backend/union"
_ "github.com/rclone/rclone/backend/uptobox"

384
backend/torrent/object.go Normal file
View File

@ -0,0 +1,384 @@
package torrent
import (
"context"
"fmt"
"io"
"sync"
"sync/atomic"
"time"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/types"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/hash"
)
const (
defaultReadAhead int64 = 4 << 20
largeFileReadAhead int64 = 16 << 20
criticalWindow = 3
prefetchWindow = 10
priorityNow = 255
priorityHigh = 192
priorityNormal = 128
priorityLow = 64
)
type Object struct {
fs *Fs
virtualPath string
torrentPath string
size int64
modTime time.Time
sourcePath string
}
func (o *Object) Fs() fs.Info { return o.fs }
func (o *Object) Remote() string { return o.virtualPath }
func (o *Object) ModTime(context.Context) time.Time { return o.modTime }
func (o *Object) Size() int64 { return o.size }
func (o *Object) Storable() bool { return false }
func (o *Object) String() string { return o.virtualPath }
func (o *Object) Hash(context.Context, hash.Type) (string, error) { return "", hash.ErrUnsupported }
func (o *Object) SetModTime(context.Context, time.Time) error { return fs.ErrorPermissionDenied }
func (o *Object) Remove(context.Context) error { return fs.ErrorPermissionDenied }
func (o *Object) Update(context.Context, io.Reader, fs.ObjectInfo, ...fs.OpenOption) error {
return fs.ErrorPermissionDenied
}
type pieceWindow struct {
start int64
end int64
priority int
readCount int64
lastRead time.Time
}
type torrentReader struct {
ctx context.Context
cancel context.CancelFunc
object *Object
file *torrent.File
reader torrent.Reader
startTime time.Time
closed atomic.Bool
mu sync.Mutex
offset int64
readAhead int64
pieceLength int64
windows []pieceWindow
windowsMu sync.Mutex
}
func (r *torrentReader) updatePiecePriorities(currentPiece int64) {
r.windowsMu.Lock()
defer r.windowsMu.Unlock()
numPieces := r.file.Torrent().NumPieces()
critical := currentPiece
criticalEnd := min(critical+int64(criticalWindow), int64(numPieces))
prefetch := criticalEnd
prefetchEnd := min(prefetch+int64(prefetchWindow), int64(numPieces))
for i := int64(0); i < int64(numPieces); i++ {
piece := r.file.Torrent().Piece(int(i))
if !piece.State().Complete {
piece.SetPriority(types.PiecePriorityNone)
}
}
for i := critical; i < criticalEnd; i++ {
piece := r.file.Torrent().Piece(int(i))
if !piece.State().Complete {
piece.SetPriority(types.PiecePriorityNow)
fs.Debugf(r.object, "Set critical priority for piece %d", i)
}
}
for i := prefetch; i < prefetchEnd; i++ {
piece := r.file.Torrent().Piece(int(i))
if !piece.State().Complete {
piece.SetPriority(types.PiecePriorityNormal)
fs.Debugf(r.object, "Set prefetch priority for piece %d", i)
}
}
// Update windows
r.windows = []pieceWindow{
{
start: critical,
end: criticalEnd,
priority: int(types.PiecePriorityNow),
lastRead: time.Now(),
},
{
start: prefetch,
end: prefetchEnd,
priority: int(types.PiecePriorityNormal),
lastRead: time.Now(),
},
}
}
func (r *torrentReader) logProgress() {
stats := r.file.Torrent().Stats()
bytesCompleted := r.file.BytesCompleted()
progress := float64(bytesCompleted) / float64(r.file.Length()) * 100
fs.Debugf(r.object, "Progress: %.1f%%, Active peers: %d, Total peers: %d, Current offset: %d/%d",
progress,
stats.ActivePeers,
stats.TotalPeers,
atomic.LoadInt64(&r.offset),
r.file.Length())
if r.windows != nil {
r.windowsMu.Lock()
for _, window := range r.windows {
completed := 0
for i := window.start; i < window.end; i++ {
if r.file.Torrent().Piece(int(i)).State().Complete {
completed++
}
}
fs.Debugf(r.object, "Window [%d-%d]: %d/%d pieces complete, priority %d",
window.start, window.end, completed, window.end-window.start, window.priority)
}
r.windowsMu.Unlock()
}
}
func (r *torrentReader) Read(p []byte) (n int, err error) {
if r.closed.Load() {
return 0, io.ErrClosedPipe
}
r.mu.Lock()
reader := r.reader
r.mu.Unlock()
if reader == nil {
return 0, io.ErrClosedPipe
}
currentPos := atomic.LoadInt64(&r.offset)
currentPiece := currentPos / r.pieceLength
needsUpdate := false
r.windowsMu.Lock()
for i := range r.windows {
if currentPiece >= r.windows[i].end {
needsUpdate = true
break
}
}
r.windowsMu.Unlock()
if needsUpdate {
r.updatePiecePriorities(currentPiece)
}
// Perform the read
readCtx, cancel := context.WithTimeout(r.ctx, 30*time.Second)
defer cancel()
readDone := make(chan struct {
n int
err error
}, 1)
go func() {
n, err := reader.Read(p)
readDone <- struct {
n int
err error
}{n, err}
}()
select {
case result := <-readDone:
if result.err == nil {
atomic.AddInt64(&r.offset, int64(result.n))
// Log progress periodically
if time.Now().Unix()%5 == 0 {
r.logProgress()
}
}
return result.n, result.err
case <-readCtx.Done():
return 0, readCtx.Err()
}
}
func (r *torrentReader) Seek(offset int64, whence int) (int64, error) {
if r.closed.Load() {
return 0, io.ErrClosedPipe
}
r.mu.Lock()
defer r.mu.Unlock()
if r.reader == nil {
return 0, io.ErrClosedPipe
}
var abs int64
switch whence {
case io.SeekStart:
abs = offset
case io.SeekCurrent:
abs = atomic.LoadInt64(&r.offset) + offset
case io.SeekEnd:
abs = r.file.Length() + offset
default:
return 0, fmt.Errorf("invalid whence: %d", whence)
}
if abs < 0 {
return 0, fmt.Errorf("negative seek position: %d", abs)
}
if abs > r.file.Length() {
return 0, fmt.Errorf("seek beyond end: %d > %d", abs, r.file.Length())
}
newPiece := abs / r.pieceLength
r.updatePiecePriorities(newPiece)
pos, err := r.reader.Seek(abs, io.SeekStart)
if err == nil {
atomic.StoreInt64(&r.offset, pos)
fs.Debugf(r.object, "Seeked to offset %d (piece %d)", pos, newPiece)
}
return pos, err
}
func (r *torrentReader) RangeSeek(ctx context.Context, offset int64, whence int, length int64) (int64, error) {
newReader := r.file.NewReader()
pos, err := newReader.Seek(offset, whence)
if err != nil {
newReader.Close()
return 0, err
}
r.mu.Lock()
if oldReader := r.reader; oldReader != nil {
oldReader.Close()
}
r.reader = newReader
atomic.StoreInt64(&r.offset, pos)
r.mu.Unlock()
newPiece := pos / r.pieceLength
r.updatePiecePriorities(newPiece)
return pos, nil
}
func (r *torrentReader) Close() error {
if !r.closed.CompareAndSwap(false, true) {
return nil
}
r.cancel()
r.mu.Lock()
reader := r.reader
r.reader = nil
r.mu.Unlock()
fs.Debugf(r.object, "Closed reader after %v", time.Since(r.startTime))
if reader != nil {
return reader.Close()
}
return nil
}
func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) {
fs.Debugf(o, "Opening file: %q", o.virtualPath)
t, err := o.fs.getTorrent(o.sourcePath)
if err != nil {
return nil, fmt.Errorf("failed to load torrent: %w", err)
}
var targetFile *torrent.File
for _, file := range t.Files() {
if o.torrentPath == file.DisplayPath() {
targetFile = file
break
}
}
if targetFile == nil {
return nil, fmt.Errorf("file not found in torrent: %s", o.virtualPath)
}
ctx, cancel := context.WithCancel(ctx)
tReader := targetFile.NewReader()
readAhead := defaultReadAhead
if targetFile.Length() > 1<<30 { // > 1GB
readAhead = largeFileReadAhead
}
tReader.SetReadahead(readAhead)
tr := &torrentReader{
ctx: ctx,
cancel: cancel,
object: o,
file: targetFile,
reader: tReader,
startTime: time.Now(),
readAhead: readAhead,
pieceLength: int64(targetFile.Torrent().Info().PieceLength),
}
// Initialize first piece window
tr.updatePiecePriorities(0)
// Handle initial seek
for _, option := range options {
switch opt := option.(type) {
case *fs.SeekOption:
_, err = tr.Seek(opt.Offset, io.SeekStart)
if err != nil {
tr.Close()
return nil, fmt.Errorf("initial seek failed: %w", err)
}
}
}
fs.Debugf(o, "Opened with read-ahead size: %d bytes", readAhead)
return tr, nil
}
// Helper functions
func min(a, b int64) int64 {
if a < b {
return a
}
return b
}
func max(a, b int) int {
if a > b {
return a
}
return b
}
// Interface checks
var (
_ io.Reader = (*torrentReader)(nil)
_ io.Closer = (*torrentReader)(nil)
_ io.Seeker = (*torrentReader)(nil)
_ fs.RangeSeeker = (*torrentReader)(nil)
)

644
backend/torrent/torrent.go Normal file
View File

@ -0,0 +1,644 @@
package torrent
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/config/configstruct"
"github.com/rclone/rclone/fs/hash"
"golang.org/x/time/rate"
)
const (
defaultCleanupTimeout = 0
defaultHandshakeTimeout = 30 * time.Second
maxConnectionsPerTorrent = 50
defaultPendingPeers = 25
)
var (
// Registry info for this backend
fsInfo = &fs.RegInfo{
Name: "torrent",
Description: "Read-only torrent backend for accessing torrent contents",
NewFs: NewFs,
Options: []fs.Option{{
Name: "root_directory",
Help: "Local directory containing torrent files.",
Required: true,
}, {
Name: "max_download_speed",
Help: "Maximum download speed (kBytes/s).",
Default: 0,
}, {
Name: "max_upload_speed",
Help: "Maximum upload speed (kBytes/s).",
Default: 0,
}, {
Name: "cache_dir",
Help: "Directory to store downloaded torrent data.",
Default: "",
Advanced: true,
}, {
Name: "cleanup_timeout",
Help: "Remove inactive torrents after X minutes (0 to disable).",
Default: defaultCleanupTimeout,
Advanced: true,
}},
}
)
func init() {
fs.Register(fsInfo)
}
type Options struct {
RootDirectory string `config:"root_directory"`
MaxDownloadSpeed int `config:"max_download_speed"`
MaxUploadSpeed int `config:"max_upload_speed"`
CacheDir string `config:"cache_dir"`
CleanupTimeout int `config:"cleanup_timeout"`
}
// Fs implements a read-only torrent filesystem
type Fs struct {
name string
root string
opt Options
features *fs.Features
client *torrent.Client
baseFs fs.Fs
// Track active torrents with concurrent map
activeTorrents sync.Map // map[string]*torrentInfo
}
// torrentInfo tracks metadata for active torrents
type torrentInfo struct {
torrent *torrent.Torrent
lastAccess time.Time
mu sync.RWMutex // Protects lastAccess
}
// Directory represents a virtual directory in the torrent filesystem
type Directory struct {
fs *Fs
remote string
modTime time.Time
size int64
items int64
}
// Common directory interface implementations
func (d *Directory) String() string { return d.remote }
func (d *Directory) Remote() string { return d.remote }
func (d *Directory) ModTime(context.Context) time.Time { return d.modTime }
func (d *Directory) Size() int64 { return d.size }
func (d *Directory) Items() int64 { return d.items }
func (d *Directory) ID() string { return "torrentdir:" + d.remote }
func (d *Directory) SetID(string) {}
func (d *Directory) Fs() fs.Info { return d.fs }
// Standard Fs interface implementations
func (f *Fs) Name() string { return f.name }
func (f *Fs) Root() string { return f.root }
func (f *Fs) String() string { return fmt.Sprintf("torrent root '%s'", f.root) }
func (f *Fs) Features() *fs.Features { return f.features }
func (f *Fs) Precision() time.Duration { return time.Second }
func (f *Fs) Hashes() hash.Set { return hash.Set(hash.None) }
// Read-only operation errors
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
return nil, fs.ErrorPermissionDenied
}
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
return nil, fs.ErrorPermissionDenied
}
func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
return nil, fs.ErrorPermissionDenied
}
// Pass-through operations to base filesystem
func (f *Fs) Mkdir(ctx context.Context, dir string) error { return f.baseFs.Mkdir(ctx, dir) }
func (f *Fs) Rmdir(ctx context.Context, dir string) error { return f.baseFs.Rmdir(ctx, dir) }
// DirMove handles directory movement in the base filesystem
func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) error {
srcFs, ok := src.(*Fs)
if !ok {
fs.Debugf(srcRemote, "Can't move directory - not same remote type")
return fs.ErrorCantDirMove
}
return srcFs.baseFs.Features().DirMove(ctx, srcFs.baseFs, srcRemote, dstRemote)
}
// getTorrent loads or retrieves a torrent and updates its access time
func (f *Fs) getTorrent(path string) (*torrent.Torrent, error) {
// Try to get existing torrent
if v, ok := f.activeTorrents.Load(path); ok {
info := v.(*torrentInfo)
info.mu.Lock()
info.lastAccess = time.Now()
info.mu.Unlock()
return info.torrent, nil
}
fs.Debugf(nil, "Loading new torrent: %s", path)
// Load new torrent
t, err := f.client.AddTorrentFromFile(path)
if err != nil {
return nil, fmt.Errorf("failed to load torrent: %w", err)
}
// Add standard public trackers
t.AddTrackers([][]string{
{"udp://tracker.opentrackr.org:1337/announce"},
{"udp://tracker.openbittorrent.com:6969/announce"},
{"udp://exodus.desync.com:6969/announce"},
{"udp://tracker.torrent.eu.org:451/announce"},
})
// Wait for metadata with timeout
select {
case <-t.GotInfo():
fs.Debugf(nil, "Got torrent metadata for: %s", t.Name())
case <-time.After(defaultHandshakeTimeout):
t.Drop()
return nil, fmt.Errorf("timeout waiting for torrent metadata")
}
// Log torrent details
fs.Debugf(nil, "Torrent loaded: %s (pieces: %d, length: %d, trackers: %d)",
t.Name(), t.Info().NumPieces(), t.Length(), len(t.Metainfo().AnnounceList))
// Start downloading
t.DownloadAll()
// Store in active torrents map
info := &torrentInfo{
torrent: t,
lastAccess: time.Now(),
}
f.activeTorrents.Store(path, info)
// Start cleanup if enabled
if f.opt.CleanupTimeout > 0 {
go f.cleanupTorrent(path)
}
// Start peer stats monitoring
go f.monitorPeerStats(t, path)
return t, nil
}
// monitorPeerStats periodically logs peer statistics
func (f *Fs) monitorPeerStats(t *torrent.Torrent, path string) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
if _, exists := f.activeTorrents.Load(path); !exists {
return
}
stats := t.Stats()
fs.Debugf(nil, "Peer stats for %s - Active: %d, Total: %d, Pending: %d",
t.Name(), stats.ActivePeers, stats.TotalPeers, stats.PendingPeers)
}
}
// cleanupTorrent monitors torrent activity and removes inactive ones
func (f *Fs) cleanupTorrent(path string) {
if f.opt.CleanupTimeout <= 0 {
return
}
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
timeout := time.Duration(f.opt.CleanupTimeout) * time.Minute
for range ticker.C {
v, ok := f.activeTorrents.Load(path)
if !ok {
return
}
info := v.(*torrentInfo)
info.mu.RLock()
inactive := time.Since(info.lastAccess) > timeout
info.mu.RUnlock()
if inactive {
if v, ok := f.activeTorrents.LoadAndDelete(path); ok {
info := v.(*torrentInfo)
info.torrent.Drop()
fs.Debugf(nil, "Dropped inactive torrent: %s", path)
}
return
}
}
}
// List implements directory listing with virtual torrent directories
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
fs.Debugf(dir, "Listing directory")
// Get base directory contents
baseEntries, err := f.baseFs.List(ctx, dir)
if err != nil {
// Check if it's a virtual torrent directory
if torrentPath, isVirtual := f.findTorrentForPath(dir); isVirtual {
fs.Debugf(dir, "Found torrent file: %s", torrentPath)
return f.listTorrentContents(ctx, torrentPath, dir)
}
return nil, err
}
// Track seen names to avoid duplicates
seen := make(map[string]bool)
entries = make(fs.DirEntries, 0, len(baseEntries))
// Add regular non-torrent entries
for _, entry := range baseEntries {
name := entry.Remote()
if !isTorrentFile(name) {
entries = append(entries, entry)
seen[name] = true
}
}
// Add virtual torrent directories
for _, entry := range baseEntries {
if o, ok := entry.(fs.Object); ok && isTorrentFile(o.Remote()) {
virtualName := strings.TrimSuffix(o.Remote(), filepath.Ext(o.Remote()))
if !seen[virtualName] {
if info, modTime, err := f.getTorrentInfo(o.Remote()); err == nil {
size, items := f.getTorrentSize(info)
entries = append(entries, &Directory{
fs: f,
remote: virtualName,
modTime: modTime,
size: size,
items: items,
})
seen[virtualName] = true
}
}
}
}
fs.Debugf(dir, "Listed %d entries", len(entries))
return entries, nil
}
// findTorrentForPath locates the .torrent file for a given path
func (f *Fs) findTorrentForPath(path string) (string, bool) {
if path == "" {
return "", false
}
current := path
for {
torrentPath := filepath.Join(f.opt.RootDirectory, current+".torrent")
if _, err := os.Stat(torrentPath); err == nil {
fs.Debugf(path, "Found torrent at: %s", torrentPath)
return torrentPath, true
}
parent := filepath.Dir(current)
if parent == "." || parent == current {
break
}
current = parent
}
return "", false
}
// getTorrentInfo reads and parses a torrent file's metadata
func (f *Fs) getTorrentInfo(path string) (*metainfo.Info, time.Time, error) {
absPath := path
if !filepath.IsAbs(path) {
absPath = filepath.Join(f.opt.RootDirectory, path)
}
mi, err := metainfo.LoadFromFile(absPath)
if err != nil {
return nil, time.Time{}, fmt.Errorf("failed to read torrent info: %w", err)
}
info, err := mi.UnmarshalInfo()
if err != nil {
return nil, time.Time{}, fmt.Errorf("failed to unmarshal torrent info: %w", err)
}
stat, err := os.Stat(absPath)
if err != nil {
return nil, time.Time{}, fmt.Errorf("failed to stat torrent file: %w", err)
}
return &info, stat.ModTime(), nil
}
// listTorrentContents returns the contents of a torrent as directory entries
func (f *Fs) listTorrentContents(ctx context.Context, torrentPath, virtualPath string) (fs.DirEntries, error) {
// Get torrent info
info, modTime, err := f.getTorrentInfo(torrentPath)
if err != nil {
return nil, fmt.Errorf("failed to read torrent info: %w", err)
}
// Calculate paths
torrentName := strings.TrimSuffix(filepath.Base(torrentPath), ".torrent")
relTorrentDir, err := filepath.Rel(f.opt.RootDirectory, filepath.Dir(torrentPath))
if err != nil {
return nil, fmt.Errorf("failed to get relative torrent dir: %w", err)
}
virtualTorrentDir := filepath.Join(relTorrentDir, torrentName)
// Get relative path within torrent
var internalPath string
switch {
case virtualPath == virtualTorrentDir:
internalPath = ""
case strings.HasPrefix(virtualPath, virtualTorrentDir+string(filepath.Separator)):
internalPath = strings.TrimPrefix(virtualPath, virtualTorrentDir+string(filepath.Separator))
default:
return nil, fmt.Errorf("path %q is not within torrent directory %q", virtualPath, virtualTorrentDir)
}
entries := make(fs.DirEntries, 0)
seen := make(map[string]bool)
// Handle single file torrents
if len(info.Files) == 0 {
if internalPath == "" {
entries = append(entries, &Object{
fs: f,
virtualPath: filepath.Join(virtualPath, info.Name),
torrentPath: info.Name,
size: info.Length,
modTime: modTime,
sourcePath: torrentPath,
})
}
return entries, nil
}
// Handle multi-file torrents efficiently
prefix := ""
if internalPath != "" {
prefix = internalPath + string(filepath.Separator)
}
// Use map to track directory sizes
dirSizes := make(map[string]int64)
dirItems := make(map[string]int64)
// Process all files in a single pass
for _, file := range info.Files {
filePath := filepath.Join(file.Path...)
// Skip files not in current directory
if !strings.HasPrefix(filePath, prefix) {
continue
}
// Get path relative to current directory
relPath := strings.TrimPrefix(filePath, prefix)
if relPath == "" {
continue
}
// Split into components
components := strings.Split(relPath, string(filepath.Separator))
firstComponent := components[0]
if len(components) == 1 {
// File in current directory
entries = append(entries, &Object{
fs: f,
virtualPath: filepath.Join(virtualPath, firstComponent),
torrentPath: filePath,
size: file.Length,
modTime: modTime,
sourcePath: torrentPath,
})
} else if !seen[firstComponent] {
// Directory - accumulate sizes
currentPath := firstComponent
for i := 0; i < len(components)-1; i++ {
dirSizes[currentPath] += file.Length
dirItems[currentPath]++
if i < len(components)-2 {
currentPath = filepath.Join(currentPath, components[i+1])
}
}
// Add directory entry if not already seen
entries = append(entries, &Directory{
fs: f,
remote: filepath.Join(virtualPath, firstComponent),
modTime: modTime,
size: dirSizes[firstComponent],
items: dirItems[firstComponent],
})
seen[firstComponent] = true
}
}
fs.Debugf(virtualPath, "Listed %d entries", len(entries))
return entries, nil
}
// getTorrentSize returns total size and item count for a torrent
func (f *Fs) getTorrentSize(info *metainfo.Info) (size, items int64) {
if len(info.Files) == 0 {
return info.Length, 1
}
for _, file := range info.Files {
size += file.Length
items++
}
return
}
// isTorrentFile checks if a file is a torrent based on extension
func isTorrentFile(path string) bool {
return strings.ToLower(filepath.Ext(path)) == ".torrent"
}
// NewObject finds an Object at the given remote path
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
// Try regular filesystem first
obj, err := f.baseFs.NewObject(ctx, remote)
if err == nil && !isTorrentFile(remote) {
return obj, nil
}
// Look for the file in a torrent
if torrentPath, isVirtual := f.findTorrentForPath(remote); isVirtual {
info, modTime, err := f.getTorrentInfo(torrentPath)
if err != nil {
return nil, err
}
// Calculate relative path
torrentRoot := strings.TrimSuffix(filepath.Base(torrentPath), ".torrent")
relPath, err := filepath.Rel(torrentRoot, remote)
if err != nil {
return nil, fs.ErrorObjectNotFound
}
// Find file in torrent
switch {
case len(info.Files) == 0:
// Single file torrent
if relPath == info.Name {
return &Object{
fs: f,
virtualPath: remote,
torrentPath: info.Name,
size: info.Length,
modTime: modTime,
sourcePath: torrentPath,
}, nil
}
default:
// Multi-file torrent - look for exact match
for _, file := range info.Files {
if relPath == filepath.Join(file.Path...) {
return &Object{
fs: f,
virtualPath: remote,
torrentPath: filepath.Join(file.Path...),
size: file.Length,
modTime: modTime,
sourcePath: torrentPath,
}, nil
}
}
}
}
return nil, fs.ErrorObjectNotFound
}
// getCacheDir determines the directory for storing downloaded data
func getCacheDir(opt Options) string {
if opt.CacheDir != "" {
return opt.CacheDir
}
return filepath.Join(os.TempDir(), "rclone-torrent-cache")
}
// Shutdown cleanly shuts down the filesystem
func (f *Fs) Shutdown(ctx context.Context) error {
// Drop all active torrents
f.activeTorrents.Range(func(key, value interface{}) bool {
if info, ok := value.(*torrentInfo); ok {
info.mu.Lock()
if info.torrent != nil {
info.torrent.Drop()
}
info.mu.Unlock()
}
return true
})
// Close torrent client
if f.client != nil {
f.client.Close()
}
// Shutdown base filesystem if supported
if shutdowner, ok := f.baseFs.(fs.Shutdowner); ok {
return shutdowner.Shutdown(ctx)
}
return nil
}
// NewFs creates a new Fs instance
func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, error) {
// Parse config
opt := new(Options)
err := configstruct.Set(m, opt)
if err != nil {
return nil, err
}
// Configure torrent client
cfg := torrent.NewDefaultClientConfig()
cfg.DataDir = getCacheDir(*opt)
// Set bandwidth limits
if opt.MaxDownloadSpeed > 0 {
cfg.DownloadRateLimiter = rate.NewLimiter(rate.Limit(opt.MaxDownloadSpeed*1024), 256*1024)
}
if opt.MaxUploadSpeed > 0 {
cfg.UploadRateLimiter = rate.NewLimiter(rate.Limit(opt.MaxUploadSpeed*1024), 256*1024)
}
// Configure for read-only operation
cfg.NoUpload = true
cfg.Seed = false
// Network settings
cfg.HandshakesTimeout = defaultHandshakeTimeout
cfg.HalfOpenConnsPerTorrent = defaultPendingPeers
cfg.EstablishedConnsPerTorrent = maxConnectionsPerTorrent
cfg.DisableUTP = false
cfg.DisableTCP = false
cfg.NoDHT = false
cfg.DisableIPv6 = false
// Create torrent client
client, err := torrent.NewClient(cfg)
if err != nil {
return nil, fmt.Errorf("failed to create torrent client: %w", err)
}
// Create base filesystem
baseFs, err := fs.NewFs(ctx, opt.RootDirectory)
if err != nil {
client.Close()
return nil, fmt.Errorf("failed to create base filesystem: %w", err)
}
// Set up features
features := &fs.Features{
CaseInsensitive: false,
DuplicateFiles: false,
ReadMimeType: false,
WriteMimeType: false,
CanHaveEmptyDirectories: true,
BucketBased: false,
BucketBasedRootOK: false,
SetTier: false,
GetTier: false,
}
return &Fs{
name: name,
root: root,
opt: *opt,
client: client,
features: features,
baseFs: baseFs,
}, nil
}