package couchbase import ( "log" "sync" "time" "fmt" "github.com/couchbase/gomemcached" "github.com/couchbase/gomemcached/client" "github.com/couchbase/goutils/logging" ) // A UprFeed streams mutation events from a bucket. // // Events from the bucket can be read from the channel 'C'. Remember // to call Close() on it when you're done, unless its channel has // closed itself already. type UprFeed struct { C <-chan *memcached.UprEvent bucket *Bucket nodeFeeds map[string]*FeedInfo // The UPR feeds of the individual nodes output chan *memcached.UprEvent // Same as C but writeably-typed outputClosed bool quit chan bool name string // name of this UPR feed sequence uint32 // sequence number for this feed connected bool killSwitch chan bool closing bool wg sync.WaitGroup dcp_buffer_size uint32 data_chan_size int } // UprFeed from a single connection type FeedInfo struct { uprFeed *memcached.UprFeed // UPR feed handle host string // hostname connected bool // connected quit chan bool // quit channel } type FailoverLog map[uint16]memcached.FailoverLog // GetFailoverLogs, get the failover logs for a set of vbucket ids func (b *Bucket) GetFailoverLogs(vBuckets []uint16) (FailoverLog, error) { // map vbids to their corresponding hosts vbHostList := make(map[string][]uint16) vbm := b.VBServerMap() if len(vbm.VBucketMap) < len(vBuckets) { return nil, fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v", vbm.VBucketMap, vBuckets) } for _, vb := range vBuckets { masterID := vbm.VBucketMap[vb][0] master := b.getMasterNode(masterID) if master == "" { return nil, fmt.Errorf("No master found for vb %d", vb) } vbList := vbHostList[master] if vbList == nil { vbList = make([]uint16, 0) } vbList = append(vbList, vb) vbHostList[master] = vbList } failoverLogMap := make(FailoverLog) for _, serverConn := range b.getConnPools(false /* not already locked */) { vbList := vbHostList[serverConn.host] if vbList == nil { continue } mc, err := serverConn.Get() if err != nil { logging.Infof("No Free connections for vblist %v", vbList) return nil, fmt.Errorf("No Free connections for host %s", serverConn.host) } // close the connection so that it doesn't get reused for upr data // connection defer mc.Close() failoverlogs, err := mc.UprGetFailoverLog(vbList) if err != nil { return nil, fmt.Errorf("Error getting failover log %s host %s", err.Error(), serverConn.host) } for vb, log := range failoverlogs { failoverLogMap[vb] = *log } } return failoverLogMap, nil } func (b *Bucket) StartUprFeed(name string, sequence uint32) (*UprFeed, error) { return b.StartUprFeedWithConfig(name, sequence, 10, DEFAULT_WINDOW_SIZE) } // StartUprFeed creates and starts a new Upr feed // No data will be sent on the channel unless vbuckets streams are requested func (b *Bucket) StartUprFeedWithConfig(name string, sequence uint32, data_chan_size int, dcp_buffer_size uint32) (*UprFeed, error) { feed := &UprFeed{ bucket: b, output: make(chan *memcached.UprEvent, data_chan_size), quit: make(chan bool), nodeFeeds: make(map[string]*FeedInfo, 0), name: name, sequence: sequence, killSwitch: make(chan bool), dcp_buffer_size: dcp_buffer_size, data_chan_size: data_chan_size, } err := feed.connectToNodes() if err != nil { return nil, fmt.Errorf("Cannot connect to bucket %s", err.Error()) } feed.connected = true go feed.run() feed.C = feed.output return feed, nil } // UprRequestStream starts a stream for a vb on a feed func (feed *UprFeed) UprRequestStream(vb uint16, opaque uint16, flags uint32, vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error { defer func() { if r := recover(); r != nil { log.Panicf("Panic in UprRequestStream. Feed %v Bucket %v", feed, feed.bucket) } }() vbm := feed.bucket.VBServerMap() if len(vbm.VBucketMap) < int(vb) { return fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v", vb, vbm.VBucketMap) } if int(vb) >= len(vbm.VBucketMap) { return fmt.Errorf("Invalid vbucket id %d", vb) } masterID := vbm.VBucketMap[vb][0] master := feed.bucket.getMasterNode(masterID) if master == "" { return fmt.Errorf("Master node not found for vbucket %d", vb) } singleFeed := feed.nodeFeeds[master] if singleFeed == nil { return fmt.Errorf("UprFeed for this host not found") } if err := singleFeed.uprFeed.UprRequestStream(vb, opaque, flags, vuuid, startSequence, endSequence, snapStart, snapEnd); err != nil { return err } return nil } // UprCloseStream ends a vbucket stream. func (feed *UprFeed) UprCloseStream(vb, opaqueMSB uint16) error { defer func() { if r := recover(); r != nil { log.Panicf("Panic in UprCloseStream. Feed %v Bucket %v ", feed, feed.bucket) } }() vbm := feed.bucket.VBServerMap() if len(vbm.VBucketMap) < int(vb) { return fmt.Errorf("vbmap smaller than vbucket list: %v vs. %v", vb, vbm.VBucketMap) } if int(vb) >= len(vbm.VBucketMap) { return fmt.Errorf("Invalid vbucket id %d", vb) } masterID := vbm.VBucketMap[vb][0] master := feed.bucket.getMasterNode(masterID) if master == "" { return fmt.Errorf("Master node not found for vbucket %d", vb) } singleFeed := feed.nodeFeeds[master] if singleFeed == nil { return fmt.Errorf("UprFeed for this host not found") } if err := singleFeed.uprFeed.CloseStream(vb, opaqueMSB); err != nil { return err } return nil } // Goroutine that runs the feed func (feed *UprFeed) run() { retryInterval := initialRetryInterval bucketOK := true for { // Connect to the UPR feed of each server node: if bucketOK { // Run until one of the sub-feeds fails: select { case <-feed.killSwitch: case <-feed.quit: return } //feed.closeNodeFeeds() retryInterval = initialRetryInterval } if feed.closing == true { // we have been asked to shut down return } // On error, try to refresh the bucket in case the list of nodes changed: logging.Infof("go-couchbase: UPR connection lost; reconnecting to bucket %q in %v", feed.bucket.Name, retryInterval) if err := feed.bucket.Refresh(); err != nil { // if we fail to refresh the bucket, exit the feed // MB-14917 logging.Infof("Unable to refresh bucket %s ", err.Error()) close(feed.output) feed.outputClosed = true feed.closeNodeFeeds() return } // this will only connect to nodes that are not connected or changed // user will have to reconnect the stream err := feed.connectToNodes() if err != nil { logging.Infof("Unable to connect to nodes..exit ") close(feed.output) feed.outputClosed = true feed.closeNodeFeeds() return } bucketOK = err == nil select { case <-time.After(retryInterval): case <-feed.quit: return } if retryInterval *= 2; retryInterval > maximumRetryInterval { retryInterval = maximumRetryInterval } } } func (feed *UprFeed) connectToNodes() (err error) { nodeCount := 0 for _, serverConn := range feed.bucket.getConnPools(false /* not already locked */) { // this maybe a reconnection, so check if the connection to the node // already exists. Connect only if the node is not found in the list // or connected == false nodeFeed := feed.nodeFeeds[serverConn.host] if nodeFeed != nil && nodeFeed.connected == true { continue } var singleFeed *memcached.UprFeed var name string if feed.name == "" { name = "DefaultUprClient" } else { name = feed.name } singleFeed, err = serverConn.StartUprFeed(name, feed.sequence, feed.dcp_buffer_size, feed.data_chan_size) if err != nil { logging.Errorf("go-couchbase: Error connecting to upr feed of %s: %v", serverConn.host, err) feed.closeNodeFeeds() return } // add the node to the connection map feedInfo := &FeedInfo{ uprFeed: singleFeed, connected: true, host: serverConn.host, quit: make(chan bool), } feed.nodeFeeds[serverConn.host] = feedInfo go feed.forwardUprEvents(feedInfo, feed.killSwitch, serverConn.host) feed.wg.Add(1) nodeCount++ } if nodeCount == 0 { return fmt.Errorf("No connection to bucket") } return nil } // Goroutine that forwards Upr events from a single node's feed to the aggregate feed. func (feed *UprFeed) forwardUprEvents(nodeFeed *FeedInfo, killSwitch chan bool, host string) { singleFeed := nodeFeed.uprFeed defer func() { feed.wg.Done() if r := recover(); r != nil { //if feed is not closing, re-throw the panic if feed.outputClosed != true && feed.closing != true { panic(r) } else { logging.Errorf("Panic is recovered. Since feed is closed, exit gracefully") } } }() for { select { case <-nodeFeed.quit: nodeFeed.connected = false return case event, ok := <-singleFeed.C: if !ok { if singleFeed.Error != nil { logging.Errorf("go-couchbase: Upr feed from %s failed: %v", host, singleFeed.Error) } killSwitch <- true return } if feed.outputClosed == true { // someone closed the node feed logging.Infof("Node need closed, returning from forwardUprEvent") return } feed.output <- event if event.Status == gomemcached.NOT_MY_VBUCKET { logging.Infof(" Got a not my vbucket error !! ") if err := feed.bucket.Refresh(); err != nil { logging.Errorf("Unable to refresh bucket %s ", err.Error()) feed.closeNodeFeeds() return } // this will only connect to nodes that are not connected or changed // user will have to reconnect the stream if err := feed.connectToNodes(); err != nil { logging.Errorf("Unable to connect to nodes %s", err.Error()) return } } } } } func (feed *UprFeed) closeNodeFeeds() { for _, f := range feed.nodeFeeds { logging.Infof(" Sending close to forwardUprEvent ") close(f.quit) f.uprFeed.Close() } feed.nodeFeeds = nil } // Close a Upr feed. func (feed *UprFeed) Close() error { select { case <-feed.quit: return nil default: } feed.closing = true feed.closeNodeFeeds() close(feed.quit) feed.wg.Wait() if feed.outputClosed == false { feed.outputClosed = true close(feed.output) } return nil }