package writeback

import (
	"container/heap"
	"context"
	"fmt"
	"strings"
	"sync"
	"testing"
	"time"

	"github.com/pkg/errors"
	"github.com/rclone/rclone/fs"
	"github.com/rclone/rclone/vfs/vfscommon"
	"github.com/stretchr/testify/assert"
)

func newTestWriteBack(t *testing.T) (wb *WriteBack, cancel func()) {
	ctx, cancel := context.WithCancel(context.Background())
	opt := vfscommon.DefaultOpt
	opt.WriteBack = 100 * time.Millisecond
	wb = New(ctx, &opt)
	return wb, cancel
}

// string for debugging - make a copy and pop the items out in order
func (wb *WriteBack) string(t *testing.T) string {
	wb.mu.Lock()
	defer wb.mu.Unlock()
	ws := wb.items

	// check indexes OK first
	for i := range ws {
		assert.Equal(t, i, ws[i].index, ws[i].name)
	}
	wsCopy := make(writeBackItems, len(ws))
	// deep copy the elements
	for i := range wsCopy {
		item := *ws[i]
		wsCopy[i] = &item
	}
	// print them
	var out []string
	for wsCopy.Len() > 0 {
		out = append(out, heap.Pop(&wsCopy).(*writeBackItem).name)
	}
	return strings.Join(out, ",")
}

func TestWriteBackItems(t *testing.T) {
	// Test the items heap behaves properly
	now := time.Now()
	wbItem1 := writeBackItem{name: "one", expiry: now.Add(1 * time.Second)}
	wbItem2 := writeBackItem{name: "two", expiry: now.Add(2 * time.Second)}
	wbItem3 := writeBackItem{name: "three", expiry: now.Add(4 * time.Second)}

	wb := &WriteBack{
		items: writeBackItems{},
	}

	heap.Init(&wb.items)
	assert.Equal(t, "", wb.string(t))
	heap.Push(&wb.items, &wbItem2)
	assert.Equal(t, "two", wb.string(t))
	heap.Push(&wb.items, &wbItem3)
	assert.Equal(t, "two,three", wb.string(t))
	heap.Push(&wb.items, &wbItem1)
	assert.Equal(t, "one,two,three", wb.string(t))

	wb.items._update(&wbItem1, now.Add(3*time.Second))
	assert.Equal(t, "two,one,three", wb.string(t))

	wb.items._update(&wbItem1, now.Add(5*time.Second))
	assert.Equal(t, "two,three,one", wb.string(t))

	// Set all times the same - should sort in insertion order
	wb.items._update(&wbItem1, now)
	wb.items._update(&wbItem2, now)
	wb.items._update(&wbItem3, now)
	assert.Equal(t, "one,two,three", wb.string(t))
}

func checkOnHeap(t *testing.T, wb *WriteBack, wbItem *writeBackItem) {
	wb.mu.Lock()
	defer wb.mu.Unlock()
	assert.True(t, wbItem.onHeap)
	for i := range wb.items {
		if wb.items[i] == wbItem {
			return
		}
	}
	assert.Failf(t, "expecting %q on heap", wbItem.name)
}

func checkNotOnHeap(t *testing.T, wb *WriteBack, wbItem *writeBackItem) {
	wb.mu.Lock()
	defer wb.mu.Unlock()
	assert.False(t, wbItem.onHeap)
	for i := range wb.items {
		if wb.items[i] == wbItem {
			t.Errorf("not expecting %q on heap", wbItem.name)
		}
	}
}

func checkInLookup(t *testing.T, wb *WriteBack, wbItem *writeBackItem) {
	wb.mu.Lock()
	defer wb.mu.Unlock()
	assert.Equal(t, wbItem, wb.lookup[wbItem.id])
}

func checkNotInLookup(t *testing.T, wb *WriteBack, wbItem *writeBackItem) {
	wb.mu.Lock()
	defer wb.mu.Unlock()
	assert.Nil(t, wb.lookup[wbItem.id])
}

func TestWriteBackItemCRUD(t *testing.T) {
	wb, cancel := newTestWriteBack(t)
	defer cancel()

	// _peekItem empty
	assert.Nil(t, wb._peekItem())

	wbItem1 := wb._newItem(0, "one")
	checkOnHeap(t, wb, wbItem1)
	checkInLookup(t, wb, wbItem1)

	wbItem2 := wb._newItem(0, "two")
	checkOnHeap(t, wb, wbItem2)
	checkInLookup(t, wb, wbItem2)

	wbItem3 := wb._newItem(0, "three")
	checkOnHeap(t, wb, wbItem3)
	checkInLookup(t, wb, wbItem3)

	assert.Equal(t, "one,two,three", wb.string(t))

	// _delItem
	wb._delItem(wbItem2)
	checkOnHeap(t, wb, wbItem2)
	checkNotInLookup(t, wb, wbItem2)

	// _addItem
	wb._addItem(wbItem2)
	checkOnHeap(t, wb, wbItem2)
	checkInLookup(t, wb, wbItem2)

	// _popItem
	assert.True(t, wbItem1.onHeap)
	poppedWbItem := wb._popItem()
	assert.Equal(t, wbItem1, poppedWbItem)
	checkNotOnHeap(t, wb, wbItem1)
	checkInLookup(t, wb, wbItem1)
	assert.Equal(t, "two,three", wb.string(t))

	// _pushItem
	wb._pushItem(wbItem1)
	checkOnHeap(t, wb, wbItem1)
	checkInLookup(t, wb, wbItem1)
	assert.Equal(t, "one,two,three", wb.string(t))
	// push twice
	wb._pushItem(wbItem1)
	assert.Equal(t, "one,two,three", wb.string(t))

	// _peekItem
	assert.Equal(t, wbItem1, wb._peekItem())

	// _removeItem
	assert.Equal(t, "one,two,three", wb.string(t))
	wb._removeItem(wbItem2)
	checkNotOnHeap(t, wb, wbItem2)
	checkInLookup(t, wb, wbItem2)
	assert.Equal(t, "one,three", wb.string(t))
	// remove twice
	wb._removeItem(wbItem2)
	checkNotOnHeap(t, wb, wbItem2)
	checkInLookup(t, wb, wbItem2)
	assert.Equal(t, "one,three", wb.string(t))
}

func assertTimerRunning(t *testing.T, wb *WriteBack, running bool) {
	wb.mu.Lock()
	if running {
		assert.NotEqual(t, time.Time{}, wb.expiry)
		assert.NotNil(t, wb.timer)
	} else {
		assert.Equal(t, time.Time{}, wb.expiry)
		assert.Nil(t, wb.timer)
	}
	wb.mu.Unlock()
}

func TestWriteBackResetTimer(t *testing.T) {
	wb, cancel := newTestWriteBack(t)
	defer cancel()

	// Reset the timer on an empty queue
	wb._resetTimer()

	// Check timer is stopped
	assertTimerRunning(t, wb, false)

	_ = wb._newItem(0, "three")

	// Reset the timer on an queue with stuff
	wb._resetTimer()

	// Check timer is not stopped
	assertTimerRunning(t, wb, true)
}

// A "transfer" for testing
type putItem struct {
	wg        sync.WaitGroup
	mu        sync.Mutex
	t         *testing.T
	started   chan struct{}
	errChan   chan error
	running   bool
	cancelled bool
	called    bool
}

func newPutItem(t *testing.T) *putItem {
	return &putItem{
		t:       t,
		started: make(chan struct{}, 1),
	}
}

// put the object as per PutFn interface
func (pi *putItem) put(ctx context.Context) (err error) {
	pi.wg.Add(1)
	defer pi.wg.Done()

	pi.mu.Lock()
	pi.called = true
	if pi.running {
		assert.Fail(pi.t, "upload already running")
	}
	pi.running = true
	pi.errChan = make(chan error, 1)
	pi.mu.Unlock()

	pi.started <- struct{}{}

	cancelled := false
	select {
	case err = <-pi.errChan:
	case <-ctx.Done():
		err = ctx.Err()
		cancelled = true
	}

	pi.mu.Lock()
	pi.running = false
	pi.cancelled = cancelled
	pi.mu.Unlock()

	return err
}

// finish the "transfer" with the error passed in
func (pi *putItem) finish(err error) {
	pi.mu.Lock()
	if !pi.running {
		assert.Fail(pi.t, "upload not running")
	}
	pi.mu.Unlock()

	pi.errChan <- err
	pi.wg.Wait()
}

func waitUntilNoTransfers(t *testing.T, wb *WriteBack) {
	for i := 0; i < 100; i++ {
		wb.mu.Lock()
		uploads := wb.uploads
		wb.mu.Unlock()
		if uploads == 0 {
			return
		}
		time.Sleep(10 * time.Millisecond)
	}
	t.Errorf("failed to wait for transfer to finish")
}

// This is the happy path with everything working
func TestWriteBackAddOK(t *testing.T) {
	wb, cancel := newTestWriteBack(t)
	defer cancel()

	pi := newPutItem(t)

	var inID Handle
	wb.SetID(&inID)
	assert.Equal(t, Handle(1), inID)

	id := wb.Add(inID, "one", true, pi.put)
	assert.Equal(t, inID, id)
	wbItem := wb.lookup[id]
	checkOnHeap(t, wb, wbItem)
	checkInLookup(t, wb, wbItem)
	assert.Equal(t, "one", wb.string(t))

	<-pi.started
	checkNotOnHeap(t, wb, wbItem)
	checkInLookup(t, wb, wbItem)

	pi.finish(nil) // transfer successful
	waitUntilNoTransfers(t, wb)
	checkNotOnHeap(t, wb, wbItem)
	checkNotInLookup(t, wb, wbItem)
}

// Now test the upload failing and being retried
func TestWriteBackAddFailRetry(t *testing.T) {
	wb, cancel := newTestWriteBack(t)
	defer cancel()

	pi := newPutItem(t)

	id := wb.Add(0, "one", true, pi.put)
	wbItem := wb.lookup[id]
	checkOnHeap(t, wb, wbItem)
	checkInLookup(t, wb, wbItem)
	assert.Equal(t, "one", wb.string(t))

	<-pi.started
	checkNotOnHeap(t, wb, wbItem)
	checkInLookup(t, wb, wbItem)

	pi.finish(errors.New("transfer failed BOOM"))
	waitUntilNoTransfers(t, wb)
	checkOnHeap(t, wb, wbItem)
	checkInLookup(t, wb, wbItem)

	// check the retry
	<-pi.started
	checkNotOnHeap(t, wb, wbItem)
	checkInLookup(t, wb, wbItem)

	pi.finish(nil) // transfer successful
	waitUntilNoTransfers(t, wb)
	checkNotOnHeap(t, wb, wbItem)
	checkNotInLookup(t, wb, wbItem)
}

// Now test the upload being cancelled by another upload being added
func TestWriteBackAddUpdate(t *testing.T) {
	wb, cancel := newTestWriteBack(t)
	defer cancel()

	pi := newPutItem(t)

	id := wb.Add(0, "one", true, pi.put)
	wbItem := wb.lookup[id]
	checkOnHeap(t, wb, wbItem)
	checkInLookup(t, wb, wbItem)
	assert.Equal(t, "one", wb.string(t))

	<-pi.started
	checkNotOnHeap(t, wb, wbItem)
	checkInLookup(t, wb, wbItem)

	// Now the upload has started add another one

	pi2 := newPutItem(t)
	id2 := wb.Add(id, "one", true, pi2.put)
	assert.Equal(t, id, id2)
	checkOnHeap(t, wb, wbItem) // object awaiting writeback time
	checkInLookup(t, wb, wbItem)

	// check the previous transfer was cancelled
	assert.True(t, pi.cancelled)

	// check the retry
	<-pi2.started
	checkNotOnHeap(t, wb, wbItem)
	checkInLookup(t, wb, wbItem)

	pi2.finish(nil) // transfer successful
	waitUntilNoTransfers(t, wb)
	checkNotOnHeap(t, wb, wbItem)
	checkNotInLookup(t, wb, wbItem)
}

// Now test the upload being not cancelled by another upload being added
func TestWriteBackAddUpdateNotModified(t *testing.T) {
	wb, cancel := newTestWriteBack(t)
	defer cancel()

	pi := newPutItem(t)

	id := wb.Add(0, "one", false, pi.put)
	wbItem := wb.lookup[id]
	checkOnHeap(t, wb, wbItem)
	checkInLookup(t, wb, wbItem)
	assert.Equal(t, "one", wb.string(t))

	<-pi.started
	checkNotOnHeap(t, wb, wbItem)
	checkInLookup(t, wb, wbItem)

	// Now the upload has started add another one

	pi2 := newPutItem(t)
	id2 := wb.Add(id, "one", false, pi2.put)
	assert.Equal(t, id, id2)
	checkNotOnHeap(t, wb, wbItem) // object still being transfered
	checkInLookup(t, wb, wbItem)

	// Because modified was false above this should not cancel the
	// transfer
	assert.False(t, pi.cancelled)

	// wait for original transfer to finish
	pi.finish(nil)
	waitUntilNoTransfers(t, wb)
	checkNotOnHeap(t, wb, wbItem)
	checkNotInLookup(t, wb, wbItem)

	assert.False(t, pi2.called)
}

// Now test the upload being not cancelled by another upload being
// added because the upload hasn't started yet
func TestWriteBackAddUpdateNotStarted(t *testing.T) {
	wb, cancel := newTestWriteBack(t)
	defer cancel()

	pi := newPutItem(t)

	id := wb.Add(0, "one", true, pi.put)
	wbItem := wb.lookup[id]
	checkOnHeap(t, wb, wbItem)
	checkInLookup(t, wb, wbItem)
	assert.Equal(t, "one", wb.string(t))

	// Immediately add another upload before the first has started

	pi2 := newPutItem(t)
	id2 := wb.Add(id, "one", true, pi2.put)
	assert.Equal(t, id, id2)
	checkOnHeap(t, wb, wbItem) // object still awaiting transfer
	checkInLookup(t, wb, wbItem)

	// Wait for the upload to start
	<-pi2.started
	checkNotOnHeap(t, wb, wbItem)
	checkInLookup(t, wb, wbItem)

	// Because modified was false above this should not cancel the
	// transfer
	assert.False(t, pi.cancelled)

	// wait for new transfer to finish
	pi2.finish(nil)
	waitUntilNoTransfers(t, wb)
	checkNotOnHeap(t, wb, wbItem)
	checkNotInLookup(t, wb, wbItem)

	assert.False(t, pi.called)
}

func TestWriteBackGetStats(t *testing.T) {
	wb, cancel := newTestWriteBack(t)
	defer cancel()

	pi := newPutItem(t)

	wb.Add(0, "one", true, pi.put)

	inProgress, queued := wb.Stats()
	assert.Equal(t, queued, 1)
	assert.Equal(t, inProgress, 0)

	<-pi.started

	inProgress, queued = wb.Stats()
	assert.Equal(t, queued, 0)
	assert.Equal(t, inProgress, 1)

	pi.finish(nil) // transfer successful
	waitUntilNoTransfers(t, wb)

	inProgress, queued = wb.Stats()
	assert.Equal(t, queued, 0)
	assert.Equal(t, inProgress, 0)

}

// Test queuing more than fs.Config.Transfers
func TestWriteBackMaxQueue(t *testing.T) {
	ctx := context.Background()
	ci := fs.GetConfig(ctx)
	wb, cancel := newTestWriteBack(t)
	defer cancel()

	maxTransfers := ci.Transfers
	toTransfer := maxTransfers + 2

	// put toTransfer things in the queue
	pis := []*putItem{}
	for i := 0; i < toTransfer; i++ {
		pi := newPutItem(t)
		pis = append(pis, pi)
		wb.Add(0, fmt.Sprintf("number%d", 1), true, pi.put)
	}

	inProgress, queued := wb.Stats()
	assert.Equal(t, toTransfer, queued)
	assert.Equal(t, 0, inProgress)

	// now start the first maxTransfers - this should stop the timer
	for i := 0; i < maxTransfers; i++ {
		<-pis[i].started
	}

	// timer should be stopped now
	assertTimerRunning(t, wb, false)

	inProgress, queued = wb.Stats()
	assert.Equal(t, toTransfer-maxTransfers, queued)
	assert.Equal(t, maxTransfers, inProgress)

	// now finish the the first maxTransfers
	for i := 0; i < maxTransfers; i++ {
		pis[i].finish(nil)
	}

	// now start and finish the remaining transfers one at a time
	for i := maxTransfers; i < toTransfer; i++ {
		<-pis[i].started
		pis[i].finish(nil)
	}
	waitUntilNoTransfers(t, wb)

	inProgress, queued = wb.Stats()
	assert.Equal(t, queued, 0)
	assert.Equal(t, inProgress, 0)
}

func TestWriteBackRename(t *testing.T) {
	wb, cancel := newTestWriteBack(t)
	defer cancel()

	// cancel when not in writeback
	wb.Rename(1, "nonExistent")

	// add item
	pi1 := newPutItem(t)
	id := wb.Add(0, "one", true, pi1.put)
	wbItem := wb.lookup[id]
	checkOnHeap(t, wb, wbItem)
	checkInLookup(t, wb, wbItem)
	assert.Equal(t, wbItem.name, "one")

	// rename when not uploading
	wb.Rename(id, "two")
	checkOnHeap(t, wb, wbItem)
	checkInLookup(t, wb, wbItem)
	assert.False(t, pi1.cancelled)
	assert.Equal(t, wbItem.name, "two")

	// add item
	pi2 := newPutItem(t)
	id = wb.Add(id, "two", true, pi2.put)
	wbItem = wb.lookup[id]
	checkOnHeap(t, wb, wbItem)
	checkInLookup(t, wb, wbItem)
	assert.Equal(t, wbItem.name, "two")

	// wait for upload to start
	<-pi2.started
	checkNotOnHeap(t, wb, wbItem)
	checkInLookup(t, wb, wbItem)

	// rename when uploading - goes back on heap
	wb.Rename(id, "three")
	checkOnHeap(t, wb, wbItem)
	checkInLookup(t, wb, wbItem)
	assert.True(t, pi2.cancelled)
	assert.Equal(t, wbItem.name, "three")
}

func TestWriteBackCancelUpload(t *testing.T) {
	wb, cancel := newTestWriteBack(t)
	defer cancel()

	// cancel when not in writeback
	assert.False(t, wb.cancelUpload(1))

	// add item
	pi := newPutItem(t)
	id := wb.Add(0, "one", true, pi.put)
	wbItem := wb.lookup[id]
	checkOnHeap(t, wb, wbItem)
	checkInLookup(t, wb, wbItem)

	// cancel when not uploading
	assert.False(t, wb.cancelUpload(id))
	checkOnHeap(t, wb, wbItem)
	checkInLookup(t, wb, wbItem)

	// wait for upload to start
	<-pi.started
	checkNotOnHeap(t, wb, wbItem)
	checkInLookup(t, wb, wbItem)

	// cancel when uploading
	assert.True(t, wb.cancelUpload(id))
	checkOnHeap(t, wb, wbItem)
	checkInLookup(t, wb, wbItem)
	assert.True(t, pi.cancelled)
}