package couchbase

import (
	"github.com/couchbase/gomemcached/client"
	"github.com/couchbase/goutils/logging"
	"sync"
	"time"
)

const initialRetryInterval = 1 * time.Second
const maximumRetryInterval = 30 * time.Second

// A TapFeed streams mutation events from a bucket.
//
// Events from the bucket can be read from the channel 'C'.  Remember
// to call Close() on it when you're done, unless its channel has
// closed itself already.
type TapFeed struct {
	C <-chan memcached.TapEvent

	bucket    *Bucket
	args      *memcached.TapArguments
	nodeFeeds []*memcached.TapFeed    // The TAP feeds of the individual nodes
	output    chan memcached.TapEvent // Same as C but writeably-typed
	wg        sync.WaitGroup
	quit      chan bool
}

// StartTapFeed creates and starts a new Tap feed
func (b *Bucket) StartTapFeed(args *memcached.TapArguments) (*TapFeed, error) {
	if args == nil {
		defaultArgs := memcached.DefaultTapArguments()
		args = &defaultArgs
	}

	feed := &TapFeed{
		bucket: b,
		args:   args,
		output: make(chan memcached.TapEvent, 10),
		quit:   make(chan bool),
	}

	go feed.run()

	feed.C = feed.output
	return feed, nil
}

// Goroutine that runs the feed
func (feed *TapFeed) run() {
	retryInterval := initialRetryInterval
	bucketOK := true
	for {
		// Connect to the TAP feed of each server node:
		if bucketOK {
			killSwitch, err := feed.connectToNodes()
			if err == nil {
				// Run until one of the sub-feeds fails:
				select {
				case <-killSwitch:
				case <-feed.quit:
					return
				}
				feed.closeNodeFeeds()
				retryInterval = initialRetryInterval
			}
		}

		// On error, try to refresh the bucket in case the list of nodes changed:
		logging.Infof("go-couchbase: TAP connection lost; reconnecting to bucket %q in %v",
			feed.bucket.Name, retryInterval)
		err := feed.bucket.Refresh()
		bucketOK = err == nil

		select {
		case <-time.After(retryInterval):
		case <-feed.quit:
			return
		}
		if retryInterval *= 2; retryInterval > maximumRetryInterval {
			retryInterval = maximumRetryInterval
		}
	}
}

func (feed *TapFeed) connectToNodes() (killSwitch chan bool, err error) {
	killSwitch = make(chan bool)
	for _, serverConn := range feed.bucket.getConnPools(false /* not already locked */) {
		var singleFeed *memcached.TapFeed
		singleFeed, err = serverConn.StartTapFeed(feed.args)
		if err != nil {
			logging.Errorf("go-couchbase: Error connecting to tap feed of %s: %v", serverConn.host, err)
			feed.closeNodeFeeds()
			return
		}
		feed.nodeFeeds = append(feed.nodeFeeds, singleFeed)
		go feed.forwardTapEvents(singleFeed, killSwitch, serverConn.host)
		feed.wg.Add(1)
	}
	return
}

// Goroutine that forwards Tap events from a single node's feed to the aggregate feed.
func (feed *TapFeed) forwardTapEvents(singleFeed *memcached.TapFeed, killSwitch chan bool, host string) {
	defer feed.wg.Done()
	for {
		select {
		case event, ok := <-singleFeed.C:
			if !ok {
				if singleFeed.Error != nil {
					logging.Errorf("go-couchbase: Tap feed from %s failed: %v", host, singleFeed.Error)
				}
				killSwitch <- true
				return
			}
			feed.output <- event
		case <-feed.quit:
			return
		}
	}
}

func (feed *TapFeed) closeNodeFeeds() {
	for _, f := range feed.nodeFeeds {
		f.Close()
	}
	feed.nodeFeeds = nil
}

// Close a Tap feed.
func (feed *TapFeed) Close() error {
	select {
	case <-feed.quit:
		return nil
	default:
	}

	feed.closeNodeFeeds()
	close(feed.quit)
	feed.wg.Wait()
	close(feed.output)
	return nil
}