/* Package couchbase provides a smart client for go. Usage: client, err := couchbase.Connect("http://myserver:8091/") handleError(err) pool, err := client.GetPool("default") handleError(err) bucket, err := pool.GetBucket("MyAwesomeBucket") handleError(err) ... or a shortcut for the bucket directly bucket, err := couchbase.GetBucket("http://myserver:8091/", "default", "default") in any case, you can specify authentication credentials using standard URL userinfo syntax: b, err := couchbase.GetBucket("http://bucketname:bucketpass@myserver:8091/", "default", "bucket") */ package couchbase import ( "encoding/binary" "encoding/json" "errors" "fmt" "io" "runtime" "strconv" "strings" "sync" "sync/atomic" "time" "unsafe" "github.com/couchbase/gomemcached" "github.com/couchbase/gomemcached/client" // package name is 'memcached' "github.com/couchbase/goutils/logging" ) // Mutation Token type MutationToken struct { VBid uint16 // vbucket id Guard uint64 // vbuuid Value uint64 // sequence number } // Maximum number of times to retry a chunk of a bulk get on error. var MaxBulkRetries = 5000 var backOffDuration time.Duration = 100 * time.Millisecond var MaxBackOffRetries = 25 // exponentail backOff result in over 30sec (25*13*0.1s) // If this is set to a nonzero duration, Do() and ViewCustom() will log a warning if the call // takes longer than that. var SlowServerCallWarningThreshold time.Duration func slowLog(startTime time.Time, format string, args ...interface{}) { if elapsed := time.Now().Sub(startTime); elapsed > SlowServerCallWarningThreshold { pc, _, _, _ := runtime.Caller(2) caller := runtime.FuncForPC(pc).Name() logging.Infof("go-couchbase: "+format+" in "+caller+" took "+elapsed.String(), args...) } } // Return true if error is KEY_ENOENT. Required by cbq-engine func IsKeyEExistsError(err error) bool { res, ok := err.(*gomemcached.MCResponse) if ok && res.Status == gomemcached.KEY_EEXISTS { return true } return false } // Return true if error is KEY_ENOENT. Required by cbq-engine func IsKeyNoEntError(err error) bool { res, ok := err.(*gomemcached.MCResponse) if ok && res.Status == gomemcached.KEY_ENOENT { return true } return false } // Return true if error suggests a bucket refresh is required. Required by cbq-engine func IsRefreshRequired(err error) bool { res, ok := err.(*gomemcached.MCResponse) if ok && (res.Status == gomemcached.NO_BUCKET || res.Status == gomemcached.NOT_MY_VBUCKET) { return true } return false } // Return true if a collection is not known. Required by cbq-engine func IsUnknownCollection(err error) bool { res, ok := err.(*gomemcached.MCResponse) if ok && (res.Status == gomemcached.UNKNOWN_COLLECTION) { return true } return false } // ClientOpCallback is called for each invocation of Do. var ClientOpCallback func(opname, k string, start time.Time, err error) // Do executes a function on a memcached connection to the node owning key "k" // // Note that this automatically handles transient errors by replaying // your function on a "not-my-vbucket" error, so don't assume // your command will only be executed only once. func (b *Bucket) Do(k string, f func(mc *memcached.Client, vb uint16) error) (err error) { return b.Do2(k, f, true) } func (b *Bucket) Do2(k string, f func(mc *memcached.Client, vb uint16) error, deadline bool) (err error) { var lastError error if SlowServerCallWarningThreshold > 0 { defer slowLog(time.Now(), "call to Do(%q)", k) } vb := b.VBHash(k) maxTries := len(b.Nodes()) * 2 for i := 0; i < maxTries; i++ { conn, pool, err := b.getConnectionToVBucket(vb) if err != nil { if (err == errNoPool || isConnError(err)) && backOff(i, maxTries, backOffDuration, true) { b.Refresh() continue } return err } if deadline && DefaultTimeout > 0 { conn.SetDeadline(getDeadline(noDeadline, DefaultTimeout)) } else { conn.SetDeadline(noDeadline) } lastError = f(conn, uint16(vb)) retry := false discard := isOutOfBoundsError(lastError) || IsReadTimeOutError(lastError) // MB-30967 / MB-31001 implement back off for transient errors if resp, ok := lastError.(*gomemcached.MCResponse); ok { switch resp.Status { case gomemcached.NOT_MY_VBUCKET: b.Refresh() // MB-28842: in case of NMVB, check if the node is still part of the map // and ditch the connection if it isn't. discard = b.checkVBmap(pool.Node()) retry = true case gomemcached.NOT_SUPPORTED: discard = true retry = true case gomemcached.ENOMEM: fallthrough case gomemcached.TMPFAIL, gomemcached.EBUSY: retry = backOff(i, maxTries, backOffDuration, true) } } else if lastError != nil && isConnError(lastError) && backOff(i, maxTries, backOffDuration, true) { retry = true } if discard { pool.Discard(conn) } else { pool.Return(conn) } if !retry { return lastError } } return fmt.Errorf("unable to complete action after %v attemps: ", maxTries, lastError) } type GatheredStats struct { Server string Stats map[string]string Err error } func getStatsParallel(sn string, b *Bucket, offset int, which string, ch chan<- GatheredStats) { pool := b.getConnPool(offset) var gatheredStats GatheredStats conn, err := pool.Get() defer func() { pool.Return(conn) ch <- gatheredStats }() if err != nil { gatheredStats = GatheredStats{Server: sn, Err: err} } else { conn.SetDeadline(getDeadline(time.Time{}, DefaultTimeout)) sm, err := conn.StatsMap(which) gatheredStats = GatheredStats{Server: sn, Stats: sm, Err: err} } } func getStatsParallelFunc(fn func(key, val []byte), sn string, b *Bucket, offset int, which string, ch chan<- GatheredStats) { pool := b.getConnPool(offset) conn, err := pool.Get() if err == nil { conn.SetDeadline(getDeadline(time.Time{}, DefaultTimeout)) err = conn.StatsFunc(which, fn) pool.Return(conn) } ch <- GatheredStats{Server: sn, Err: err} } // GetStats gets a set of stats from all servers. // // Returns a map of server ID -> map of stat key to map value. func (b *Bucket) GetStats(which string) map[string]map[string]string { rv := map[string]map[string]string{} for server, gs := range b.GatherStats(which) { if len(gs.Stats) > 0 { rv[server] = gs.Stats } } return rv } // GatherStats returns a map of server ID -> GatheredStats from all servers. func (b *Bucket) GatherStats(which string) map[string]GatheredStats { vsm := b.VBServerMap() if vsm.ServerList == nil { return nil } // Go grab all the things at once. ch := make(chan GatheredStats, len(vsm.ServerList)) for i, sn := range vsm.ServerList { go getStatsParallel(sn, b, i, which, ch) } // Gather the results rv := map[string]GatheredStats{} for range vsm.ServerList { gs := <-ch rv[gs.Server] = gs } return rv } // GatherStats returns a map of server ID -> GatheredStats from all servers. func (b *Bucket) GatherStatsFunc(which string, fn func(key, val []byte)) map[string]error { var errMap map[string]error vsm := b.VBServerMap() if vsm.ServerList == nil { return errMap } // Go grab all the things at once. ch := make(chan GatheredStats, len(vsm.ServerList)) for i, sn := range vsm.ServerList { go getStatsParallelFunc(fn, sn, b, i, which, ch) } // Gather the results for range vsm.ServerList { gs := <-ch if gs.Err != nil { if errMap == nil { errMap = make(map[string]error) errMap[gs.Server] = gs.Err } } } return errMap } type BucketStats int const ( StatCount = BucketStats(iota) StatSize ) var bucketStatString = []string{ "curr_items", "ep_value_size", } var collectionStatString = []string{ "items", "disk_size", } // Get selected bucket or collection stats func (b *Bucket) GetIntStats(refresh bool, which []BucketStats, context ...*memcached.ClientContext) ([]int64, error) { if refresh { b.Refresh() } var vals []int64 = make([]int64, len(which)) if len(vals) == 0 { return vals, nil } var outErr error if len(context) > 0 { collKey := fmt.Sprintf("collections-byid 0x%x", context[0].CollId) errs := b.GatherStatsFunc(collKey, func(key, val []byte) { for i, f := range which { lk := len(key) ls := len(collectionStatString[f]) if lk >= ls && string(key[lk-ls:]) == collectionStatString[f] { v, err := strconv.ParseInt(string(val), 10, 64) if err == nil { atomic.AddInt64(&vals[i], v) } else if outErr == nil { outErr = err } } } }) // have to use a range to access any one element of a map for _, err := range errs { return nil, err } } else { errs := b.GatherStatsFunc("", func(key, val []byte) { for i, f := range which { if string(key) == bucketStatString[f] { v, err := strconv.ParseInt(string(val), 10, 64) if err == nil { atomic.AddInt64(&vals[i], v) } else if outErr == nil { outErr = err } } } }) // have to use a range to access any one element of a map for _, err := range errs { return nil, err } } return vals, outErr } // Get bucket count through the bucket stats func (b *Bucket) GetCount(refresh bool, context ...*memcached.ClientContext) (count int64, err error) { if refresh { b.Refresh() } var cnt int64 if len(context) > 0 { key := fmt.Sprintf("collections-byid 0x%x", context[0].CollId) resKey := "" for _, gs := range b.GatherStats(key) { if len(gs.Stats) > 0 { // the key encodes the scope and collection id // we don't have the scope id, so we have to find it... if resKey == "" { for k, _ := range gs.Stats { resKey = strings.TrimRightFunc(k, func(r rune) bool { return r != ':' }) + "items" break } } cnt, err = strconv.ParseInt(gs.Stats[resKey], 10, 64) if err != nil { return 0, err } count += cnt } else if gs.Err != nil { return 0, gs.Err } } } else { for _, gs := range b.GatherStats("") { if len(gs.Stats) > 0 { cnt, err = strconv.ParseInt(gs.Stats["curr_items"], 10, 64) if err != nil { return 0, err } count += cnt } else if gs.Err != nil { return 0, gs.Err } } } return count, nil } // Get bucket document size through the bucket stats func (b *Bucket) GetSize(refresh bool, context ...*memcached.ClientContext) (size int64, err error) { if refresh { b.Refresh() } var sz int64 if len(context) > 0 { key := fmt.Sprintf("collections-byid 0x%x", context[0].CollId) resKey := "" for _, gs := range b.GatherStats(key) { if len(gs.Stats) > 0 { // the key encodes the scope and collection id // we don't have the scope id, so we have to find it... if resKey == "" { for k, _ := range gs.Stats { resKey = strings.TrimRightFunc(k, func(r rune) bool { return r != ':' }) + "disk_size" break } } sz, err = strconv.ParseInt(gs.Stats[resKey], 10, 64) if err != nil { return 0, err } size += sz } else if gs.Err != nil { return 0, gs.Err } } } else { for _, gs := range b.GatherStats("") { if len(gs.Stats) > 0 { sz, err = strconv.ParseInt(gs.Stats["ep_value_size"], 10, 64) if err != nil { return 0, err } size += sz } else if gs.Err != nil { return 0, gs.Err } } } return size, nil } func isAuthError(err error) bool { estr := err.Error() return strings.Contains(estr, "Auth failure") } func IsReadTimeOutError(err error) bool { if err == nil { return false } estr := err.Error() return strings.Contains(estr, "read tcp") || strings.Contains(estr, "i/o timeout") } func isTimeoutError(err error) bool { estr := err.Error() return strings.Contains(estr, "i/o timeout") || strings.Contains(estr, "connection timed out") || strings.Contains(estr, "no route to host") } // Errors that are not considered fatal for our fetch loop func isConnError(err error) bool { if err == io.EOF { return true } estr := err.Error() return strings.Contains(estr, "broken pipe") || strings.Contains(estr, "connection reset") || strings.Contains(estr, "connection refused") || strings.Contains(estr, "connection pool is closed") } func isOutOfBoundsError(err error) bool { return err != nil && strings.Contains(err.Error(), "Out of Bounds error") } func getDeadline(reqDeadline time.Time, duration time.Duration) time.Time { if reqDeadline.IsZero() { if duration > 0 { return time.Unix(time.Now().Unix(), 0).Add(duration) } else { return noDeadline } } return reqDeadline } func backOff(attempt, maxAttempts int, duration time.Duration, exponential bool) bool { if attempt < maxAttempts { // 0th attempt return immediately if attempt > 0 { if exponential { duration = time.Duration(attempt) * duration } time.Sleep(duration) } return true } return false } func (b *Bucket) doBulkGet(vb uint16, keys []string, reqDeadline time.Time, ch chan<- map[string]*gomemcached.MCResponse, ech chan<- error, subPaths []string, eStatus *errorStatus, context ...*memcached.ClientContext) { if SlowServerCallWarningThreshold > 0 { defer slowLog(time.Now(), "call to doBulkGet(%d, %d keys)", vb, len(keys)) } rv := _STRING_MCRESPONSE_POOL.Get() attempts := 0 backOffAttempts := 0 done := false bname := b.Name for ; attempts < MaxBulkRetries && !done && !eStatus.errStatus; attempts++ { if len(b.VBServerMap().VBucketMap) < int(vb) { //fatal err := fmt.Errorf("vbmap smaller than requested for %v", bname) logging.Errorf("go-couchbase: %v vb %d vbmap len %d", err.Error(), vb, len(b.VBServerMap().VBucketMap)) ech <- err return } masterID := b.VBServerMap().VBucketMap[vb][0] if masterID < 0 { // fatal err := fmt.Errorf("No master node available for %v vb %d", bname, vb) logging.Errorf("%v", err.Error()) ech <- err return } // This stack frame exists to ensure we can clean up // connection at a reasonable time. err := func() error { pool := b.getConnPool(masterID) conn, err := pool.Get() if err != nil { if isAuthError(err) || isTimeoutError(err) { logging.Errorf("Fatal Error %v : %v", bname, err) ech <- err return err } else if isConnError(err) { if !backOff(backOffAttempts, MaxBackOffRetries, backOffDuration, true) { logging.Errorf("Connection Error %v : %v", bname, err) ech <- err return err } b.Refresh() backOffAttempts++ } else if err == errNoPool { if !backOff(backOffAttempts, MaxBackOffRetries, backOffDuration, true) { logging.Errorf("Connection Error %v : %v", bname, err) ech <- err return err } err = b.Refresh() if err != nil { ech <- err return err } backOffAttempts++ // retry, and make no noise return nil } logging.Infof("Pool Get returned %v: %v", bname, err) // retry return nil } conn.SetDeadline(getDeadline(reqDeadline, DefaultTimeout)) err = conn.GetBulk(vb, keys, rv, subPaths, context...) discard := false defer func() { if discard { pool.Discard(conn) } else { pool.Return(conn) } }() switch err.(type) { case *gomemcached.MCResponse: notSMaxTries := len(b.Nodes()) * 2 st := err.(*gomemcached.MCResponse).Status if st == gomemcached.NOT_MY_VBUCKET || (st == gomemcached.NOT_SUPPORTED && attempts < notSMaxTries) { b.Refresh() discard = b.checkVBmap(pool.Node()) return nil // retry } else if st == gomemcached.EBUSY || st == gomemcached.LOCKED { if (attempts % (MaxBulkRetries / 100)) == 0 { logging.Infof("Retrying Memcached error (%v) FOR %v(vbid:%d, keys:<ud>%v</ud>)", err.Error(), bname, vb, keys) } return nil // retry } else if (st == gomemcached.ENOMEM || st == gomemcached.TMPFAIL) && backOff(backOffAttempts, MaxBackOffRetries, backOffDuration, true) { // MB-30967 / MB-31001 use backoff for TMPFAIL too backOffAttempts++ logging.Infof("Retrying Memcached error (%v) FOR %v(vbid:%d, keys:<ud>%v</ud>)", err.Error(), bname, vb, keys) return nil // retry } ech <- err return err case error: if isOutOfBoundsError(err) || IsReadTimeOutError(err) { // We got an out of bounds error or a read timeout error; retry the operation discard = true return nil } else if isConnError(err) && backOff(backOffAttempts, MaxBackOffRetries, backOffDuration, true) { backOffAttempts++ logging.Errorf("Connection Error: %s. Refreshing bucket %v (vbid:%v,keys:<ud>%v</ud>)", err.Error(), bname, vb, keys) discard = true b.Refresh() return nil // retry } ech <- err ch <- rv return err } done = true return nil }() if err != nil { return } } if attempts >= MaxBulkRetries { err := fmt.Errorf("bulkget exceeded MaxBulkRetries for %v(vbid:%d,keys:<ud>%v</ud>)", bname, vb, keys) logging.Errorf("%v", err.Error()) ech <- err } ch <- rv } type errorStatus struct { errStatus bool } type vbBulkGet struct { b *Bucket ch chan<- map[string]*gomemcached.MCResponse ech chan<- error k uint16 keys []string reqDeadline time.Time wg *sync.WaitGroup subPaths []string groupError *errorStatus context []*memcached.ClientContext } const _NUM_CHANNELS = 5 var _NUM_CHANNEL_WORKERS = (runtime.NumCPU() + 1) / 2 var DefaultDialTimeout = time.Duration(0) var DefaultTimeout = time.Duration(0) var noDeadline = time.Time{} // Buffer 4k requests per worker var _VB_BULK_GET_CHANNELS []chan *vbBulkGet func InitBulkGet() { DefaultDialTimeout = 20 * time.Second DefaultTimeout = 120 * time.Second memcached.SetDefaultDialTimeout(DefaultDialTimeout) _VB_BULK_GET_CHANNELS = make([]chan *vbBulkGet, _NUM_CHANNELS) for i := 0; i < _NUM_CHANNELS; i++ { channel := make(chan *vbBulkGet, 16*1024*_NUM_CHANNEL_WORKERS) _VB_BULK_GET_CHANNELS[i] = channel for j := 0; j < _NUM_CHANNEL_WORKERS; j++ { go vbBulkGetWorker(channel) } } } func vbBulkGetWorker(ch chan *vbBulkGet) { defer func() { // Workers cannot panic and die recover() go vbBulkGetWorker(ch) }() for vbg := range ch { vbDoBulkGet(vbg) } } func vbDoBulkGet(vbg *vbBulkGet) { defer vbg.wg.Done() defer func() { // Workers cannot panic and die recover() }() vbg.b.doBulkGet(vbg.k, vbg.keys, vbg.reqDeadline, vbg.ch, vbg.ech, vbg.subPaths, vbg.groupError, vbg.context...) } var _ERR_CHAN_FULL = fmt.Errorf("Data request queue full, aborting query.") func (b *Bucket) processBulkGet(kdm map[uint16][]string, reqDeadline time.Time, ch chan<- map[string]*gomemcached.MCResponse, ech chan<- error, subPaths []string, eStatus *errorStatus, context ...*memcached.ClientContext) { defer close(ch) defer close(ech) wg := &sync.WaitGroup{} for k, keys := range kdm { // GetBulk() group has error donot Queue items for this group if eStatus.errStatus { break } vbg := &vbBulkGet{ b: b, ch: ch, ech: ech, k: k, keys: keys, reqDeadline: reqDeadline, wg: wg, subPaths: subPaths, groupError: eStatus, context: context, } wg.Add(1) // Random int // Right shift to avoid 8-byte alignment, and take low bits c := (uintptr(unsafe.Pointer(vbg)) >> 4) % _NUM_CHANNELS select { case _VB_BULK_GET_CHANNELS[c] <- vbg: // No-op default: // Buffer full, abandon the bulk get ech <- _ERR_CHAN_FULL wg.Add(-1) } } // Wait for my vb bulk gets wg.Wait() } type multiError []error func (m multiError) Error() string { if len(m) == 0 { panic("Error of none") } return fmt.Sprintf("{%v errors, starting with %v}", len(m), m[0].Error()) } // Convert a stream of errors from ech into a multiError (or nil) and // send down eout. // // At least one send is guaranteed on eout, but two is possible, so // buffer the out channel appropriately. func errorCollector(ech <-chan error, eout chan<- error, eStatus *errorStatus) { defer func() { eout <- nil }() var errs multiError for e := range ech { if !eStatus.errStatus && !IsKeyNoEntError(e) { eStatus.errStatus = true } errs = append(errs, e) } if len(errs) > 0 { eout <- errs } } // Fetches multiple keys concurrently, with []byte values // // This is a wrapper around GetBulk which converts all values returned // by GetBulk from raw memcached responses into []byte slices. // Returns one document for duplicate keys func (b *Bucket) GetBulkRaw(keys []string, context ...*memcached.ClientContext) (map[string][]byte, error) { resp, eout := b.getBulk(keys, noDeadline, nil, context...) rv := make(map[string][]byte, len(keys)) for k, av := range resp { rv[k] = av.Body } b.ReleaseGetBulkPools(resp) return rv, eout } // GetBulk fetches multiple keys concurrently. // // Unlike more convenient GETs, the entire response is returned in the // map array for each key. Keys that were not found will not be included in // the map. func (b *Bucket) GetBulk(keys []string, reqDeadline time.Time, subPaths []string, context ...*memcached.ClientContext) (map[string]*gomemcached.MCResponse, error) { return b.getBulk(keys, reqDeadline, subPaths, context...) } func (b *Bucket) ReleaseGetBulkPools(rv map[string]*gomemcached.MCResponse) { _STRING_MCRESPONSE_POOL.Put(rv) } func (b *Bucket) getBulk(keys []string, reqDeadline time.Time, subPaths []string, context ...*memcached.ClientContext) (map[string]*gomemcached.MCResponse, error) { kdm := _VB_STRING_POOL.Get() defer _VB_STRING_POOL.Put(kdm) for _, k := range keys { if k != "" { vb := uint16(b.VBHash(k)) a, ok1 := kdm[vb] if !ok1 { a = _STRING_POOL.Get() } kdm[vb] = append(a, k) } } eout := make(chan error, 2) groupErrorStatus := &errorStatus{} // processBulkGet will own both of these channels and // guarantee they're closed before it returns. ch := make(chan map[string]*gomemcached.MCResponse) ech := make(chan error) go errorCollector(ech, eout, groupErrorStatus) go b.processBulkGet(kdm, reqDeadline, ch, ech, subPaths, groupErrorStatus, context...) var rv map[string]*gomemcached.MCResponse for m := range ch { if rv == nil { rv = m continue } for k, v := range m { rv[k] = v } _STRING_MCRESPONSE_POOL.Put(m) } return rv, <-eout } // WriteOptions is the set of option flags availble for the Write // method. They are ORed together to specify the desired request. type WriteOptions int const ( // Raw specifies that the value is raw []byte or nil; don't // JSON-encode it. Raw = WriteOptions(1 << iota) // AddOnly indicates an item should only be written if it // doesn't exist, otherwise ErrKeyExists is returned. AddOnly // Persist causes the operation to block until the server // confirms the item is persisted. Persist // Indexable causes the operation to block until it's availble via the index. Indexable // Append indicates the given value should be appended to the // existing value for the given key. Append ) var optNames = []struct { opt WriteOptions name string }{ {Raw, "raw"}, {AddOnly, "addonly"}, {Persist, "persist"}, {Indexable, "indexable"}, {Append, "append"}, } // String representation of WriteOptions func (w WriteOptions) String() string { f := []string{} for _, on := range optNames { if w&on.opt != 0 { f = append(f, on.name) w &= ^on.opt } } if len(f) == 0 || w != 0 { f = append(f, fmt.Sprintf("0x%x", int(w))) } return strings.Join(f, "|") } // Error returned from Write with AddOnly flag, when key already exists in the bucket. var ErrKeyExists = errors.New("key exists") // General-purpose value setter. // // The Set, Add and Delete methods are just wrappers around this. The // interpretation of `v` depends on whether the `Raw` option is // given. If it is, v must be a byte array or nil. (A nil value causes // a delete.) If `Raw` is not given, `v` will be marshaled as JSON // before being written. It must be JSON-marshalable and it must not // be nil. func (b *Bucket) Write(k string, flags, exp int, v interface{}, opt WriteOptions, context ...*memcached.ClientContext) (err error) { _, err = b.WriteWithCAS(k, flags, exp, v, opt, context...) return err } func (b *Bucket) WriteWithCAS(k string, flags, exp int, v interface{}, opt WriteOptions, context ...*memcached.ClientContext) (cas uint64, err error) { if ClientOpCallback != nil { defer func(t time.Time) { ClientOpCallback(fmt.Sprintf("Write(%v)", opt), k, t, err) }(time.Now()) } var data []byte if opt&Raw == 0 { data, err = json.Marshal(v) if err != nil { return cas, err } } else if v != nil { data = v.([]byte) } var res *gomemcached.MCResponse err = b.Do(k, func(mc *memcached.Client, vb uint16) error { if opt&AddOnly != 0 { res, err = memcached.UnwrapMemcachedError( mc.Add(vb, k, flags, exp, data, context...)) if err == nil && res.Status != gomemcached.SUCCESS { if res.Status == gomemcached.KEY_EEXISTS { err = ErrKeyExists } else { err = res } } } else if opt&Append != 0 { res, err = mc.Append(vb, k, data, context...) } else if data == nil { res, err = mc.Del(vb, k, context...) } else { res, err = mc.Set(vb, k, flags, exp, data, context...) } if err == nil { cas = res.Cas } return err }) if err == nil && (opt&(Persist|Indexable) != 0) { err = b.WaitForPersistence(k, cas, data == nil) } return cas, err } func (b *Bucket) WriteWithMT(k string, flags, exp int, v interface{}, opt WriteOptions, context ...*memcached.ClientContext) (mt *MutationToken, err error) { if ClientOpCallback != nil { defer func(t time.Time) { ClientOpCallback(fmt.Sprintf("WriteWithMT(%v)", opt), k, t, err) }(time.Now()) } var data []byte if opt&Raw == 0 { data, err = json.Marshal(v) if err != nil { return nil, err } } else if v != nil { data = v.([]byte) } var res *gomemcached.MCResponse err = b.Do(k, func(mc *memcached.Client, vb uint16) error { if opt&AddOnly != 0 { res, err = memcached.UnwrapMemcachedError( mc.Add(vb, k, flags, exp, data, context...)) if err == nil && res.Status != gomemcached.SUCCESS { if res.Status == gomemcached.KEY_EEXISTS { err = ErrKeyExists } else { err = res } } } else if opt&Append != 0 { res, err = mc.Append(vb, k, data, context...) } else if data == nil { res, err = mc.Del(vb, k, context...) } else { res, err = mc.Set(vb, k, flags, exp, data, context...) } if len(res.Extras) >= 16 { vbuuid := uint64(binary.BigEndian.Uint64(res.Extras[0:8])) seqNo := uint64(binary.BigEndian.Uint64(res.Extras[8:16])) mt = &MutationToken{VBid: vb, Guard: vbuuid, Value: seqNo} } return err }) if err == nil && (opt&(Persist|Indexable) != 0) { err = b.WaitForPersistence(k, res.Cas, data == nil) } return mt, err } // Set a value in this bucket with Cas and return the new Cas value func (b *Bucket) Cas(k string, exp int, cas uint64, v interface{}, context ...*memcached.ClientContext) (uint64, error) { return b.WriteCas(k, 0, exp, cas, v, 0, context...) } // Set a value in this bucket with Cas without json encoding it func (b *Bucket) CasRaw(k string, exp int, cas uint64, v interface{}, context ...*memcached.ClientContext) (uint64, error) { return b.WriteCas(k, 0, exp, cas, v, Raw, context...) } func (b *Bucket) WriteCas(k string, flags, exp int, cas uint64, v interface{}, opt WriteOptions, context ...*memcached.ClientContext) (newCas uint64, err error) { if ClientOpCallback != nil { defer func(t time.Time) { ClientOpCallback(fmt.Sprintf("Write(%v)", opt), k, t, err) }(time.Now()) } var data []byte if opt&Raw == 0 { data, err = json.Marshal(v) if err != nil { return 0, err } } else if v != nil { data = v.([]byte) } var res *gomemcached.MCResponse err = b.Do(k, func(mc *memcached.Client, vb uint16) error { res, err = mc.SetCas(vb, k, flags, exp, cas, data, context...) return err }) if err == nil && (opt&(Persist|Indexable) != 0) { err = b.WaitForPersistence(k, res.Cas, data == nil) } return res.Cas, err } // Extended CAS operation. These functions will return the mutation token, i.e vbuuid & guard func (b *Bucket) CasWithMeta(k string, flags int, exp int, cas uint64, v interface{}, context ...*memcached.ClientContext) (uint64, *MutationToken, error) { return b.WriteCasWithMT(k, flags, exp, cas, v, 0, context...) } func (b *Bucket) CasWithMetaRaw(k string, flags int, exp int, cas uint64, v interface{}, context ...*memcached.ClientContext) (uint64, *MutationToken, error) { return b.WriteCasWithMT(k, flags, exp, cas, v, Raw, context...) } func (b *Bucket) WriteCasWithMT(k string, flags, exp int, cas uint64, v interface{}, opt WriteOptions, context ...*memcached.ClientContext) (newCas uint64, mt *MutationToken, err error) { if ClientOpCallback != nil { defer func(t time.Time) { ClientOpCallback(fmt.Sprintf("Write(%v)", opt), k, t, err) }(time.Now()) } var data []byte if opt&Raw == 0 { data, err = json.Marshal(v) if err != nil { return 0, nil, err } } else if v != nil { data = v.([]byte) } var res *gomemcached.MCResponse err = b.Do(k, func(mc *memcached.Client, vb uint16) error { res, err = mc.SetCas(vb, k, flags, exp, cas, data, context...) return err }) if err != nil { return 0, nil, err } // check for extras if len(res.Extras) >= 16 { vbuuid := uint64(binary.BigEndian.Uint64(res.Extras[0:8])) seqNo := uint64(binary.BigEndian.Uint64(res.Extras[8:16])) vb := b.VBHash(k) mt = &MutationToken{VBid: uint16(vb), Guard: vbuuid, Value: seqNo} } if err == nil && (opt&(Persist|Indexable) != 0) { err = b.WaitForPersistence(k, res.Cas, data == nil) } return res.Cas, mt, err } // Set a value in this bucket. // The value will be serialized into a JSON document. func (b *Bucket) Set(k string, exp int, v interface{}, context ...*memcached.ClientContext) error { return b.Write(k, 0, exp, v, 0, context...) } // Set a value in this bucket. func (b *Bucket) SetWithCAS(k string, exp int, v interface{}, context ...*memcached.ClientContext) (uint64, error) { return b.WriteWithCAS(k, 0, exp, v, 0, context...) } // Set a value in this bucket with with flags func (b *Bucket) SetWithMeta(k string, flags int, exp int, v interface{}, context ...*memcached.ClientContext) (*MutationToken, error) { return b.WriteWithMT(k, flags, exp, v, 0, context...) } // SetRaw sets a value in this bucket without JSON encoding it. func (b *Bucket) SetRaw(k string, exp int, v []byte, context ...*memcached.ClientContext) error { return b.Write(k, 0, exp, v, Raw, context...) } // Add adds a value to this bucket; like Set except that nothing // happens if the key exists. The value will be serialized into a // JSON document. func (b *Bucket) Add(k string, exp int, v interface{}, context ...*memcached.ClientContext) (added bool, err error) { err = b.Write(k, 0, exp, v, AddOnly, context...) if err == ErrKeyExists { return false, nil } return (err == nil), err } // Add adds a value to this bucket; like Set except that nothing // happens if the key exists. Return the CAS value. func (b *Bucket) AddWithCAS(k string, exp int, v interface{}, context ...*memcached.ClientContext) (bool, uint64, error) { cas, err := b.WriteWithCAS(k, 0, exp, v, AddOnly, context...) if err == ErrKeyExists { return false, 0, nil } return (err == nil), cas, err } // AddRaw adds a value to this bucket; like SetRaw except that nothing // happens if the key exists. The value will be stored as raw bytes. func (b *Bucket) AddRaw(k string, exp int, v []byte, context ...*memcached.ClientContext) (added bool, err error) { err = b.Write(k, 0, exp, v, AddOnly|Raw, context...) if err == ErrKeyExists { return false, nil } return (err == nil), err } // Add adds a value to this bucket; like Set except that nothing // happens if the key exists. The value will be serialized into a // JSON document. func (b *Bucket) AddWithMT(k string, exp int, v interface{}, context ...*memcached.ClientContext) (added bool, mt *MutationToken, err error) { mt, err = b.WriteWithMT(k, 0, exp, v, AddOnly, context...) if err == ErrKeyExists { return false, mt, nil } return (err == nil), mt, err } // AddRaw adds a value to this bucket; like SetRaw except that nothing // happens if the key exists. The value will be stored as raw bytes. func (b *Bucket) AddRawWithMT(k string, exp int, v []byte, context ...*memcached.ClientContext) (added bool, mt *MutationToken, err error) { mt, err = b.WriteWithMT(k, 0, exp, v, AddOnly|Raw, context...) if err == ErrKeyExists { return false, mt, nil } return (err == nil), mt, err } // Append appends raw data to an existing item. func (b *Bucket) Append(k string, data []byte, context ...*memcached.ClientContext) error { return b.Write(k, 0, 0, data, Append|Raw, context...) } // Returns collectionUid, manifestUid, error. func (b *Bucket) GetCollectionCID(scope string, collection string, reqDeadline time.Time) (uint32, uint32, error) { var err error var response *gomemcached.MCResponse if ClientOpCallback != nil { defer func(t time.Time) { ClientOpCallback("GetCollectionCID", scope+"."+collection, t, err) }(time.Now()) } var key = "DUMMY" // Contact any server. var manifestUid uint32 var collUid uint32 err = b.Do2(key, func(mc *memcached.Client, vb uint16) error { var err1 error mc.SetDeadline(getDeadline(reqDeadline, DefaultTimeout)) _, err1 = mc.SelectBucket(b.Name) if err1 != nil { return err1 } response, err1 = mc.CollectionsGetCID(scope, collection) if err1 != nil { return err1 } manifestUid = binary.BigEndian.Uint32(response.Extras[4:8]) collUid = binary.BigEndian.Uint32(response.Extras[8:12]) return nil }, false) return collUid, manifestUid, err } // Get a value straight from Memcached func (b *Bucket) GetsMC(key string, reqDeadline time.Time, context ...*memcached.ClientContext) (*gomemcached.MCResponse, error) { var err error var response *gomemcached.MCResponse if key == "" { return nil, nil } if ClientOpCallback != nil { defer func(t time.Time) { ClientOpCallback("GetsMC", key, t, err) }(time.Now()) } err = b.Do2(key, func(mc *memcached.Client, vb uint16) error { var err1 error mc.SetDeadline(getDeadline(reqDeadline, DefaultTimeout)) response, err1 = mc.Get(vb, key, context...) if err1 != nil { return err1 } return nil }, false) return response, err } // Get a value through the subdoc API func (b *Bucket) GetsSubDoc(key string, reqDeadline time.Time, subPaths []string, context ...*memcached.ClientContext) (*gomemcached.MCResponse, error) { var err error var response *gomemcached.MCResponse if key == "" { return nil, nil } if ClientOpCallback != nil { defer func(t time.Time) { ClientOpCallback("GetsSubDoc", key, t, err) }(time.Now()) } err = b.Do2(key, func(mc *memcached.Client, vb uint16) error { var err1 error mc.SetDeadline(getDeadline(reqDeadline, DefaultTimeout)) response, err1 = mc.GetSubdoc(vb, key, subPaths, context...) if err1 != nil { return err1 } return nil }, false) return response, err } // GetsRaw gets a raw value from this bucket including its CAS // counter and flags. func (b *Bucket) GetsRaw(k string, context ...*memcached.ClientContext) (data []byte, flags int, cas uint64, err error) { if ClientOpCallback != nil { defer func(t time.Time) { ClientOpCallback("GetsRaw", k, t, err) }(time.Now()) } err = b.Do(k, func(mc *memcached.Client, vb uint16) error { res, err := mc.Get(vb, k, context...) if err != nil { return err } cas = res.Cas if len(res.Extras) >= 4 { flags = int(binary.BigEndian.Uint32(res.Extras)) } data = res.Body return nil }) return } // Gets gets a value from this bucket, including its CAS counter. The // value is expected to be a JSON stream and will be deserialized into // rv. func (b *Bucket) Gets(k string, rv interface{}, caso *uint64, context ...*memcached.ClientContext) error { data, _, cas, err := b.GetsRaw(k, context...) if err != nil { return err } if caso != nil { *caso = cas } return json.Unmarshal(data, rv) } // Get a value from this bucket. // The value is expected to be a JSON stream and will be deserialized // into rv. func (b *Bucket) Get(k string, rv interface{}, context ...*memcached.ClientContext) error { return b.Gets(k, rv, nil, context...) } // GetRaw gets a raw value from this bucket. No marshaling is performed. func (b *Bucket) GetRaw(k string, context ...*memcached.ClientContext) ([]byte, error) { d, _, _, err := b.GetsRaw(k, context...) return d, err } // GetAndTouchRaw gets a raw value from this bucket including its CAS // counter and flags, and updates the expiry on the doc. func (b *Bucket) GetAndTouchRaw(k string, exp int, context ...*memcached.ClientContext) (data []byte, cas uint64, err error) { if ClientOpCallback != nil { defer func(t time.Time) { ClientOpCallback("GetsRaw", k, t, err) }(time.Now()) } err = b.Do(k, func(mc *memcached.Client, vb uint16) error { res, err := mc.GetAndTouch(vb, k, exp, context...) if err != nil { return err } cas = res.Cas data = res.Body return nil }) return data, cas, err } // GetMeta returns the meta values for a key func (b *Bucket) GetMeta(k string, flags *int, expiry *int, cas *uint64, seqNo *uint64, context ...*memcached.ClientContext) (err error) { if ClientOpCallback != nil { defer func(t time.Time) { ClientOpCallback("GetsMeta", k, t, err) }(time.Now()) } err = b.Do(k, func(mc *memcached.Client, vb uint16) error { res, err := mc.GetMeta(vb, k, context...) if err != nil { return err } *cas = res.Cas if len(res.Extras) >= 8 { *flags = int(binary.BigEndian.Uint32(res.Extras[4:])) } if len(res.Extras) >= 12 { *expiry = int(binary.BigEndian.Uint32(res.Extras[8:])) } if len(res.Extras) >= 20 { *seqNo = uint64(binary.BigEndian.Uint64(res.Extras[12:])) } return nil }) return err } // Delete a key from this bucket. func (b *Bucket) Delete(k string, context ...*memcached.ClientContext) error { return b.Write(k, 0, 0, nil, Raw, context...) } // Incr increments the value at a given key by amt and defaults to def if no value present. func (b *Bucket) Incr(k string, amt, def uint64, exp int, context ...*memcached.ClientContext) (val uint64, err error) { if ClientOpCallback != nil { defer func(t time.Time) { ClientOpCallback("Incr", k, t, err) }(time.Now()) } var rv uint64 err = b.Do(k, func(mc *memcached.Client, vb uint16) error { res, err := mc.Incr(vb, k, amt, def, exp, context...) if err != nil { return err } rv = res return nil }) return rv, err } // Decr decrements the value at a given key by amt and defaults to def if no value present func (b *Bucket) Decr(k string, amt, def uint64, exp int, context ...*memcached.ClientContext) (val uint64, err error) { if ClientOpCallback != nil { defer func(t time.Time) { ClientOpCallback("Decr", k, t, err) }(time.Now()) } var rv uint64 err = b.Do(k, func(mc *memcached.Client, vb uint16) error { res, err := mc.Decr(vb, k, amt, def, exp, context...) if err != nil { return err } rv = res return nil }) return rv, err } // Wrapper around memcached.CASNext() func (b *Bucket) casNext(k string, exp int, state *memcached.CASState) bool { if ClientOpCallback != nil { defer func(t time.Time) { ClientOpCallback("casNext", k, t, state.Err) }(time.Now()) } keepGoing := false state.Err = b.Do(k, func(mc *memcached.Client, vb uint16) error { keepGoing = mc.CASNext(vb, k, exp, state) return state.Err }) return keepGoing && state.Err == nil } // An UpdateFunc is a callback function to update a document type UpdateFunc func(current []byte) (updated []byte, err error) // Return this as the error from an UpdateFunc to cancel the Update // operation. const UpdateCancel = memcached.CASQuit // Update performs a Safe update of a document, avoiding conflicts by // using CAS. // // The callback function will be invoked with the current raw document // contents (or nil if the document doesn't exist); it should return // the updated raw contents (or nil to delete.) If it decides not to // change anything it can return UpdateCancel as the error. // // If another writer modifies the document between the get and the // set, the callback will be invoked again with the newer value. func (b *Bucket) Update(k string, exp int, callback UpdateFunc) error { _, err := b.update(k, exp, callback) return err } // internal version of Update that returns a CAS value func (b *Bucket) update(k string, exp int, callback UpdateFunc) (newCas uint64, err error) { var state memcached.CASState for b.casNext(k, exp, &state) { var err error if state.Value, err = callback(state.Value); err != nil { return 0, err } } return state.Cas, state.Err } // A WriteUpdateFunc is a callback function to update a document type WriteUpdateFunc func(current []byte) (updated []byte, opt WriteOptions, err error) // WriteUpdate performs a Safe update of a document, avoiding // conflicts by using CAS. WriteUpdate is like Update, except that // the callback can return a set of WriteOptions, of which Persist and // Indexable are recognized: these cause the call to wait until the // document update has been persisted to disk and/or become available // to index. func (b *Bucket) WriteUpdate(k string, exp int, callback WriteUpdateFunc) error { var writeOpts WriteOptions var deletion bool // Wrap the callback in an UpdateFunc we can pass to Update: updateCallback := func(current []byte) (updated []byte, err error) { update, opt, err := callback(current) writeOpts = opt deletion = (update == nil) return update, err } cas, err := b.update(k, exp, updateCallback) if err != nil { return err } // If callback asked, wait for persistence or indexability: if writeOpts&(Persist|Indexable) != 0 { err = b.WaitForPersistence(k, cas, deletion) } return err } // Observe observes the current state of a document. func (b *Bucket) Observe(k string) (result memcached.ObserveResult, err error) { if ClientOpCallback != nil { defer func(t time.Time) { ClientOpCallback("Observe", k, t, err) }(time.Now()) } err = b.Do(k, func(mc *memcached.Client, vb uint16) error { result, err = mc.Observe(vb, k) return err }) return } // Returned from WaitForPersistence (or Write, if the Persistent or Indexable flag is used) // if the value has been overwritten by another before being persisted. var ErrOverwritten = errors.New("overwritten") // Returned from WaitForPersistence (or Write, if the Persistent or Indexable flag is used) // if the value hasn't been persisted by the timeout interval var ErrTimeout = errors.New("timeout") // WaitForPersistence waits for an item to be considered durable. // // Besides transport errors, ErrOverwritten may be returned if the // item is overwritten before it reaches durability. ErrTimeout may // occur if the item isn't found durable in a reasonable amount of // time. func (b *Bucket) WaitForPersistence(k string, cas uint64, deletion bool) error { timeout := 10 * time.Second sleepDelay := 5 * time.Millisecond start := time.Now() for { time.Sleep(sleepDelay) sleepDelay += sleepDelay / 2 // multiply delay by 1.5 every time result, err := b.Observe(k) if err != nil { return err } if persisted, overwritten := result.CheckPersistence(cas, deletion); overwritten { return ErrOverwritten } else if persisted { return nil } if result.PersistenceTime > 0 { timeout = 2 * result.PersistenceTime } if time.Since(start) >= timeout-sleepDelay { return ErrTimeout } } } var _STRING_MCRESPONSE_POOL = gomemcached.NewStringMCResponsePool(16) type stringPool struct { pool *sync.Pool size int } func newStringPool(size int) *stringPool { rv := &stringPool{ pool: &sync.Pool{ New: func() interface{} { return make([]string, 0, size) }, }, size: size, } return rv } func (this *stringPool) Get() []string { return this.pool.Get().([]string) } func (this *stringPool) Put(s []string) { if s == nil || cap(s) < this.size || cap(s) > 2*this.size { return } this.pool.Put(s[0:0]) } var _STRING_POOL = newStringPool(16) type vbStringPool struct { pool *sync.Pool strPool *stringPool } func newVBStringPool(size int, sp *stringPool) *vbStringPool { rv := &vbStringPool{ pool: &sync.Pool{ New: func() interface{} { return make(map[uint16][]string, size) }, }, strPool: sp, } return rv } func (this *vbStringPool) Get() map[uint16][]string { return this.pool.Get().(map[uint16][]string) } func (this *vbStringPool) Put(s map[uint16][]string) { if s == nil { return } for k, v := range s { delete(s, k) this.strPool.Put(v) } this.pool.Put(s) } var _VB_STRING_POOL = newVBStringPool(16, _STRING_POOL)