package zkhelper import ( "errors" "fmt" "path" "strings" "sync" "time" etcderr "github.com/coreos/etcd/error" "github.com/coreos/go-etcd/etcd" zk "github.com/ngaut/go-zookeeper/zk" "github.com/ngaut/log" "github.com/ngaut/pools" ) var ( singleInstanceLock sync.Mutex etcdInstance *etcdImpl ) type PooledEtcdClient struct { c *etcd.Client } func (c *PooledEtcdClient) Close() { } func (e *etcdImpl) Seq2Str(seq int64) string { return fmt.Sprintf("%d", seq) } type etcdImpl struct { sync.Mutex cluster string pool *pools.ResourcePool indexMap map[string]uint64 } func convertToZkError(err error) error { //todo: convert other errors if ec, ok := err.(*etcd.EtcdError); ok { switch ec.ErrorCode { case etcderr.EcodeKeyNotFound: return zk.ErrNoNode case etcderr.EcodeNotFile: case etcderr.EcodeNotDir: case etcderr.EcodeNodeExist: return zk.ErrNodeExists case etcderr.EcodeDirNotEmpty: return zk.ErrNotEmpty } } return err } func convertToZkEvent(watchPath string, resp *etcd.Response, err error) zk.Event { //log.Infof("convert event from path:%s, %+v, %+v", watchPath, resp, resp.Node.Key) var e zk.Event if err != nil { e.Err = convertToZkError(err) e.State = zk.StateDisconnected return e } e.State = zk.StateConnected e.Path = resp.Node.Key if len(resp.Node.Key) > len(watchPath) { e.Type = zk.EventNodeChildrenChanged return e } switch resp.Action { case "set": e.Type = zk.EventNodeDataChanged case "delete": e.Type = zk.EventNodeDeleted case "update": e.Type = zk.EventNodeDataChanged case "create": e.Type = zk.EventNodeCreated case "expire": e.Type = zk.EventNotWatching } return e } func NewEtcdConn(zkAddr string) (Conn, error) { singleInstanceLock.Lock() defer singleInstanceLock.Unlock() if etcdInstance != nil { return etcdInstance, nil } p := pools.NewResourcePool(func() (pools.Resource, error) { cluster := strings.Split(zkAddr, ",") for i, addr := range cluster { if !strings.HasPrefix(addr, "http://") { cluster[i] = "http://" + addr } } newClient := etcd.NewClient(cluster) newClient.SetConsistency(etcd.STRONG_CONSISTENCY) return &PooledEtcdClient{c: newClient}, nil }, 10, 10, 0) etcdInstance = &etcdImpl{ cluster: zkAddr, pool: p, indexMap: make(map[string]uint64), } log.Infof("new etcd %s", zkAddr) if etcdInstance == nil { return nil, errors.New("unknown error") } return etcdInstance, nil } func (e *etcdImpl) Get(key string) (data []byte, stat zk.Stat, err error) { conn, err := e.pool.Get() if err != nil { return nil, nil, err } defer e.pool.Put(conn) c := conn.(*PooledEtcdClient).c resp, err := c.Get(key, true, false) if resp == nil { return nil, nil, convertToZkError(err) } return []byte(resp.Node.Value), nil, nil } func (e *etcdImpl) setIndex(key string, index uint64) { e.Lock() defer e.Unlock() e.indexMap[key] = index } func (e *etcdImpl) getIndex(key string) uint64 { e.Lock() defer e.Unlock() index := e.indexMap[key] return index } func (e *etcdImpl) watch(key string, children bool) (resp *etcd.Response, stat zk.Stat, watch <-chan zk.Event, err error) { conn, err := e.pool.Get() if err != nil { return nil, nil, nil, err } defer e.pool.Put(conn) c := conn.(*PooledEtcdClient).c index := e.getIndex(key) resp, err = c.Get(key, true, true) if resp == nil { return nil, nil, nil, convertToZkError(err) } if index < resp.Node.ModifiedIndex { index = resp.Node.ModifiedIndex } for _, n := range resp.Node.Nodes { if n.ModifiedIndex > index { index = n.ModifiedIndex } } log.Info("try watch", key) ch := make(chan zk.Event, 100) originVal := resp.Node.Value go func() { defer func() { e.setIndex(key, index) }() for { conn, err := e.pool.Get() if err != nil { log.Error(err) return } c := conn.(*PooledEtcdClient).c resp, err := c.Watch(key, index, children, nil, nil) e.pool.Put(conn) if err != nil { if ec, ok := err.(*etcd.EtcdError); ok { if ec.ErrorCode == etcderr.EcodeEventIndexCleared { index++ continue } } log.Warning("watch", err) ch <- convertToZkEvent(key, resp, err) return } if key == resp.Node.Key && originVal == string(resp.Node.Value) { //keep alive event index++ continue } ch <- convertToZkEvent(key, resp, err) //update index if index <= resp.Node.ModifiedIndex { index = resp.Node.ModifiedIndex + 1 } else { index++ } return } }() return resp, nil, ch, nil } func (e *etcdImpl) GetW(key string) (data []byte, stat zk.Stat, watch <-chan zk.Event, err error) { resp, stat, watch, err := e.watch(key, false) if err != nil { return } return []byte(resp.Node.Value), stat, watch, nil } func (e *etcdImpl) Children(key string) (children []string, stat zk.Stat, err error) { conn, err := e.pool.Get() if err != nil { return nil, nil, err } defer e.pool.Put(conn) c := conn.(*PooledEtcdClient).c resp, err := c.Get(key, true, false) if resp == nil { return nil, nil, convertToZkError(err) } for _, c := range resp.Node.Nodes { children = append(children, path.Base(c.Key)) } return } func (e *etcdImpl) ChildrenW(key string) (children []string, stat zk.Stat, watch <-chan zk.Event, err error) { resp, stat, watch, err := e.watch(key, true) if err != nil { return nil, stat, nil, convertToZkError(err) } for _, c := range resp.Node.Nodes { children = append(children, path.Base(c.Key)) } return children, stat, watch, nil } func (e *etcdImpl) Exists(key string) (exist bool, stat zk.Stat, err error) { conn, err := e.pool.Get() if err != nil { return false, nil, err } defer e.pool.Put(conn) c := conn.(*PooledEtcdClient).c _, err = c.Get(key, true, false) if err == nil { return true, nil, nil } if ec, ok := err.(*etcd.EtcdError); ok { if ec.ErrorCode == etcderr.EcodeKeyNotFound { return false, nil, nil } } return false, nil, convertToZkError(err) } func (e *etcdImpl) ExistsW(key string) (exist bool, stat zk.Stat, watch <-chan zk.Event, err error) { _, stat, watch, err = e.watch(key, false) if err != nil { return false, nil, nil, convertToZkError(err) } return true, nil, watch, nil } const MAX_TTL = 365 * 24 * 60 * 60 func (e *etcdImpl) doKeepAlive(key string, ttl uint64) error { conn, err := e.pool.Get() if err != nil { return err } defer e.pool.Put(conn) c := conn.(*PooledEtcdClient).c resp, err := c.Get(key, false, false) if err != nil { log.Error(err) return err } if resp.Node.Dir { return fmt.Errorf("can not set ttl to directory", key) } //log.Info("keep alive ", key) resp, err = c.CompareAndSwap(key, resp.Node.Value, ttl, resp.Node.Value, resp.Node.ModifiedIndex) if err == nil { return nil } if ec, ok := err.(*etcd.EtcdError); ok && ec.ErrorCode == etcderr.EcodeTestFailed { return nil } return err } //todo:add test for keepAlive func (e *etcdImpl) keepAlive(key string, ttl uint64) { go func() { for { time.Sleep(1 * time.Second) err := e.doKeepAlive(key, ttl) if err != nil { log.Error(err) return } } }() } func (e *etcdImpl) Create(wholekey string, value []byte, flags int32, aclv []zk.ACL) (keyCreated string, err error) { seq := (flags & zk.FlagSequence) != 0 tmp := (flags & zk.FlagEphemeral) != 0 ttl := uint64(MAX_TTL) if tmp { ttl = 5 } var resp *etcd.Response conn, err := e.pool.Get() if err != nil { return "", err } defer e.pool.Put(conn) c := conn.(*PooledEtcdClient).c fn := c.Create log.Info("create", wholekey) if seq { wholekey = path.Dir(wholekey) fn = c.CreateInOrder } else { for _, v := range aclv { if v.Perms == PERM_DIRECTORY { log.Info("etcdImpl:create directory", wholekey) fn = nil resp, err = c.CreateDir(wholekey, uint64(ttl)) if err != nil { return "", convertToZkError(err) } } } } if fn == nil { if tmp { e.keepAlive(wholekey, ttl) } return resp.Node.Key, nil } resp, err = fn(wholekey, string(value), uint64(ttl)) if err != nil { return "", convertToZkError(err) } if tmp { e.keepAlive(resp.Node.Key, ttl) } return resp.Node.Key, nil } func (e *etcdImpl) Set(key string, value []byte, version int32) (stat zk.Stat, err error) { if version == 0 { return nil, errors.New("invalid version") } conn, err := e.pool.Get() if err != nil { return nil, err } defer e.pool.Put(conn) c := conn.(*PooledEtcdClient).c resp, err := c.Get(key, true, false) if resp == nil { return nil, convertToZkError(err) } _, err = c.Set(key, string(value), uint64(resp.Node.TTL)) return nil, convertToZkError(err) } func (e *etcdImpl) Delete(key string, version int32) (err error) { //todo: handle version conn, err := e.pool.Get() if err != nil { return err } defer e.pool.Put(conn) c := conn.(*PooledEtcdClient).c resp, err := c.Get(key, true, false) if resp == nil { return convertToZkError(err) } if resp.Node.Dir { _, err = c.DeleteDir(key) } else { _, err = c.Delete(key, false) } return convertToZkError(err) } func (e *etcdImpl) GetACL(key string) ([]zk.ACL, zk.Stat, error) { return nil, nil, nil } func (e *etcdImpl) SetACL(key string, aclv []zk.ACL, version int32) (zk.Stat, error) { return nil, nil } func (e *etcdImpl) Close() { //how to implement this }