package zkhelper

import (
	"errors"
	"fmt"
	"path"
	"strings"
	"sync"
	"time"

	etcderr "github.com/coreos/etcd/error"
	"github.com/coreos/go-etcd/etcd"
	zk "github.com/ngaut/go-zookeeper/zk"
	"github.com/ngaut/log"
	"github.com/ngaut/pools"
)

var (
	singleInstanceLock sync.Mutex
	etcdInstance       *etcdImpl
)

type PooledEtcdClient struct {
	c *etcd.Client
}

func (c *PooledEtcdClient) Close() {

}

func (e *etcdImpl) Seq2Str(seq int64) string {
	return fmt.Sprintf("%d", seq)
}

type etcdImpl struct {
	sync.Mutex
	cluster  string
	pool     *pools.ResourcePool
	indexMap map[string]uint64
}

func convertToZkError(err error) error {
	//todo: convert other errors
	if ec, ok := err.(*etcd.EtcdError); ok {
		switch ec.ErrorCode {
		case etcderr.EcodeKeyNotFound:
			return zk.ErrNoNode
		case etcderr.EcodeNotFile:
		case etcderr.EcodeNotDir:
		case etcderr.EcodeNodeExist:
			return zk.ErrNodeExists
		case etcderr.EcodeDirNotEmpty:
			return zk.ErrNotEmpty
		}
	}

	return err
}

func convertToZkEvent(watchPath string, resp *etcd.Response, err error) zk.Event {
	//log.Infof("convert event from path:%s, %+v, %+v", watchPath, resp, resp.Node.Key)
	var e zk.Event

	if err != nil {
		e.Err = convertToZkError(err)
		e.State = zk.StateDisconnected
		return e
	}

	e.State = zk.StateConnected

	e.Path = resp.Node.Key
	if len(resp.Node.Key) > len(watchPath) {
		e.Type = zk.EventNodeChildrenChanged
		return e
	}

	switch resp.Action {
	case "set":
		e.Type = zk.EventNodeDataChanged
	case "delete":
		e.Type = zk.EventNodeDeleted
	case "update":
		e.Type = zk.EventNodeDataChanged
	case "create":
		e.Type = zk.EventNodeCreated
	case "expire":
		e.Type = zk.EventNotWatching
	}

	return e
}

func NewEtcdConn(zkAddr string) (Conn, error) {
	singleInstanceLock.Lock()
	defer singleInstanceLock.Unlock()
	if etcdInstance != nil {
		return etcdInstance, nil
	}

	p := pools.NewResourcePool(func() (pools.Resource, error) {
		cluster := strings.Split(zkAddr, ",")
		for i, addr := range cluster {
			if !strings.HasPrefix(addr, "http://") {
				cluster[i] = "http://" + addr
			}
		}
		newClient := etcd.NewClient(cluster)
		newClient.SetConsistency(etcd.STRONG_CONSISTENCY)
		return &PooledEtcdClient{c: newClient}, nil
	}, 10, 10, 0)

	etcdInstance = &etcdImpl{
		cluster:  zkAddr,
		pool:     p,
		indexMap: make(map[string]uint64),
	}

	log.Infof("new etcd %s", zkAddr)
	if etcdInstance == nil {
		return nil, errors.New("unknown error")
	}

	return etcdInstance, nil
}

func (e *etcdImpl) Get(key string) (data []byte, stat zk.Stat, err error) {
	conn, err := e.pool.Get()
	if err != nil {
		return nil, nil, err
	}

	defer e.pool.Put(conn)
	c := conn.(*PooledEtcdClient).c

	resp, err := c.Get(key, true, false)
	if resp == nil {
		return nil, nil, convertToZkError(err)
	}

	return []byte(resp.Node.Value), nil, nil
}

func (e *etcdImpl) setIndex(key string, index uint64) {
	e.Lock()
	defer e.Unlock()

	e.indexMap[key] = index
}

func (e *etcdImpl) getIndex(key string) uint64 {
	e.Lock()
	defer e.Unlock()

	index := e.indexMap[key]

	return index
}

func (e *etcdImpl) watch(key string, children bool) (resp *etcd.Response, stat zk.Stat, watch <-chan zk.Event, err error) {
	conn, err := e.pool.Get()
	if err != nil {
		return nil, nil, nil, err
	}

	defer e.pool.Put(conn)
	c := conn.(*PooledEtcdClient).c
	index := e.getIndex(key)
	resp, err = c.Get(key, true, true)
	if resp == nil {
		return nil, nil, nil, convertToZkError(err)
	}

	if index < resp.Node.ModifiedIndex {
		index = resp.Node.ModifiedIndex
	}

	for _, n := range resp.Node.Nodes {
		if n.ModifiedIndex > index {
			index = n.ModifiedIndex
		}
	}

	log.Info("try watch", key)
	ch := make(chan zk.Event, 100)
	originVal := resp.Node.Value

	go func() {
		defer func() {
			e.setIndex(key, index)
		}()

		for {
			conn, err := e.pool.Get()
			if err != nil {
				log.Error(err)
				return
			}

			c := conn.(*PooledEtcdClient).c

			resp, err := c.Watch(key, index, children, nil, nil)
			e.pool.Put(conn)

			if err != nil {
				if ec, ok := err.(*etcd.EtcdError); ok {
					if ec.ErrorCode == etcderr.EcodeEventIndexCleared {
						index++
						continue
					}
				}

				log.Warning("watch", err)
				ch <- convertToZkEvent(key, resp, err)
				return
			}

			if key == resp.Node.Key && originVal == string(resp.Node.Value) { //keep alive event
				index++
				continue
			}

			ch <- convertToZkEvent(key, resp, err)
			//update index
			if index <= resp.Node.ModifiedIndex {
				index = resp.Node.ModifiedIndex + 1
			} else {
				index++
			}
			return
		}
	}()

	return resp, nil, ch, nil
}

func (e *etcdImpl) GetW(key string) (data []byte, stat zk.Stat, watch <-chan zk.Event, err error) {
	resp, stat, watch, err := e.watch(key, false)
	if err != nil {
		return
	}

	return []byte(resp.Node.Value), stat, watch, nil
}

func (e *etcdImpl) Children(key string) (children []string, stat zk.Stat, err error) {
	conn, err := e.pool.Get()
	if err != nil {
		return nil, nil, err
	}

	defer e.pool.Put(conn)
	c := conn.(*PooledEtcdClient).c

	resp, err := c.Get(key, true, false)
	if resp == nil {
		return nil, nil, convertToZkError(err)
	}

	for _, c := range resp.Node.Nodes {
		children = append(children, path.Base(c.Key))
	}

	return
}

func (e *etcdImpl) ChildrenW(key string) (children []string, stat zk.Stat, watch <-chan zk.Event, err error) {
	resp, stat, watch, err := e.watch(key, true)
	if err != nil {
		return nil, stat, nil, convertToZkError(err)
	}

	for _, c := range resp.Node.Nodes {
		children = append(children, path.Base(c.Key))
	}

	return children, stat, watch, nil
}

func (e *etcdImpl) Exists(key string) (exist bool, stat zk.Stat, err error) {
	conn, err := e.pool.Get()
	if err != nil {
		return false, nil, err
	}

	defer e.pool.Put(conn)
	c := conn.(*PooledEtcdClient).c

	_, err = c.Get(key, true, false)
	if err == nil {
		return true, nil, nil
	}

	if ec, ok := err.(*etcd.EtcdError); ok {
		if ec.ErrorCode == etcderr.EcodeKeyNotFound {
			return false, nil, nil
		}
	}

	return false, nil, convertToZkError(err)
}

func (e *etcdImpl) ExistsW(key string) (exist bool, stat zk.Stat, watch <-chan zk.Event, err error) {
	_, stat, watch, err = e.watch(key, false)
	if err != nil {
		return false, nil, nil, convertToZkError(err)
	}

	return true, nil, watch, nil
}

const MAX_TTL = 365 * 24 * 60 * 60

func (e *etcdImpl) doKeepAlive(key string, ttl uint64) error {
	conn, err := e.pool.Get()
	if err != nil {
		return err
	}

	defer e.pool.Put(conn)
	c := conn.(*PooledEtcdClient).c

	resp, err := c.Get(key, false, false)
	if err != nil {
		log.Error(err)
		return err
	}

	if resp.Node.Dir {
		return fmt.Errorf("can not set ttl to directory", key)
	}

	//log.Info("keep alive ", key)
	resp, err = c.CompareAndSwap(key, resp.Node.Value, ttl, resp.Node.Value, resp.Node.ModifiedIndex)
	if err == nil {
		return nil
	}

	if ec, ok := err.(*etcd.EtcdError); ok && ec.ErrorCode == etcderr.EcodeTestFailed {
		return nil
	}

	return err
}

//todo:add test for keepAlive
func (e *etcdImpl) keepAlive(key string, ttl uint64) {
	go func() {
		for {
			time.Sleep(1 * time.Second)
			err := e.doKeepAlive(key, ttl)
			if err != nil {
				log.Error(err)
				return
			}
		}
	}()
}

func (e *etcdImpl) Create(wholekey string, value []byte, flags int32, aclv []zk.ACL) (keyCreated string, err error) {
	seq := (flags & zk.FlagSequence) != 0
	tmp := (flags & zk.FlagEphemeral) != 0
	ttl := uint64(MAX_TTL)
	if tmp {
		ttl = 5
	}

	var resp *etcd.Response

	conn, err := e.pool.Get()
	if err != nil {
		return "", err
	}

	defer e.pool.Put(conn)
	c := conn.(*PooledEtcdClient).c

	fn := c.Create
	log.Info("create", wholekey)

	if seq {
		wholekey = path.Dir(wholekey)
		fn = c.CreateInOrder
	} else {
		for _, v := range aclv {
			if v.Perms == PERM_DIRECTORY {
				log.Info("etcdImpl:create directory", wholekey)
				fn = nil
				resp, err = c.CreateDir(wholekey, uint64(ttl))
				if err != nil {
					return "", convertToZkError(err)
				}
			}
		}
	}

	if fn == nil {
		if tmp {
			e.keepAlive(wholekey, ttl)
		}
		return resp.Node.Key, nil
	}

	resp, err = fn(wholekey, string(value), uint64(ttl))
	if err != nil {
		return "", convertToZkError(err)
	}

	if tmp {
		e.keepAlive(resp.Node.Key, ttl)
	}

	return resp.Node.Key, nil
}

func (e *etcdImpl) Set(key string, value []byte, version int32) (stat zk.Stat, err error) {
	if version == 0 {
		return nil, errors.New("invalid version")
	}

	conn, err := e.pool.Get()
	if err != nil {
		return nil, err
	}

	defer e.pool.Put(conn)
	c := conn.(*PooledEtcdClient).c

	resp, err := c.Get(key, true, false)
	if resp == nil {
		return nil, convertToZkError(err)
	}

	_, err = c.Set(key, string(value), uint64(resp.Node.TTL))
	return nil, convertToZkError(err)
}

func (e *etcdImpl) Delete(key string, version int32) (err error) {
	//todo: handle version
	conn, err := e.pool.Get()
	if err != nil {
		return err
	}

	defer e.pool.Put(conn)
	c := conn.(*PooledEtcdClient).c

	resp, err := c.Get(key, true, false)
	if resp == nil {
		return convertToZkError(err)
	}

	if resp.Node.Dir {
		_, err = c.DeleteDir(key)
	} else {
		_, err = c.Delete(key, false)
	}

	return convertToZkError(err)
}

func (e *etcdImpl) GetACL(key string) ([]zk.ACL, zk.Stat, error) {
	return nil, nil, nil
}

func (e *etcdImpl) SetACL(key string, aclv []zk.ACL, version int32) (zk.Stat, error) {
	return nil, nil
}

func (e *etcdImpl) Close() {
	//how to implement this
}