package couchbase import ( "encoding/json" "fmt" "github.com/couchbase/goutils/logging" "io" "io/ioutil" "math/rand" "net" "net/http" "time" "unsafe" ) // Bucket auto-updater gets the latest version of the bucket config from // the server. If the configuration has changed then updated the local // bucket information. If the bucket has been deleted then notify anyone // who is holding a reference to this bucket const MAX_RETRY_COUNT = 5 const DISCONNECT_PERIOD = 120 * time.Second type NotifyFn func(bucket string, err error) // Use TCP keepalive to detect half close sockets var updaterTransport http.RoundTripper = &http.Transport{ Proxy: http.ProxyFromEnvironment, Dial: (&net.Dialer{ Timeout: 30 * time.Second, KeepAlive: 30 * time.Second, }).Dial, } var updaterHTTPClient = &http.Client{Transport: updaterTransport} func doHTTPRequestForUpdate(req *http.Request) (*http.Response, error) { var err error var res *http.Response for i := 0; i < HTTP_MAX_RETRY; i++ { res, err = updaterHTTPClient.Do(req) if err != nil && isHttpConnError(err) { continue } break } if err != nil { return nil, err } return res, err } func (b *Bucket) RunBucketUpdater(notify NotifyFn) { go func() { err := b.UpdateBucket() if err != nil { if notify != nil { notify(b.GetName(), err) } logging.Errorf(" Bucket Updater exited with err %v", err) } }() } func (b *Bucket) replaceConnPools2(with []*connectionPool, bucketLocked bool) { if !bucketLocked { b.Lock() defer b.Unlock() } old := b.connPools b.connPools = unsafe.Pointer(&with) if old != nil { for _, pool := range *(*[]*connectionPool)(old) { if pool != nil && pool.inUse == false { pool.Close() } } } return } func (b *Bucket) UpdateBucket() error { var failures int var returnErr error var poolServices PoolServices var err error tlsConfig := b.pool.client.tlsConfig if tlsConfig != nil { poolServices, err = b.pool.client.GetPoolServices("default") if err != nil { return err } } for { if failures == MAX_RETRY_COUNT { logging.Errorf(" Maximum failures reached. Exiting loop...") return fmt.Errorf("Max failures reached. Last Error %v", returnErr) } nodes := b.Nodes() if len(nodes) < 1 { return fmt.Errorf("No healthy nodes found") } startNode := rand.Intn(len(nodes)) node := nodes[(startNode)%len(nodes)] streamUrl := fmt.Sprintf("http://%s/pools/default/bucketsStreaming/%s", node.Hostname, b.GetName()) logging.Infof(" Trying with %s", streamUrl) req, err := http.NewRequest("GET", streamUrl, nil) if err != nil { return err } // Lock here to avoid having pool closed under us. b.RLock() err = maybeAddAuth(req, b.pool.client.ah) b.RUnlock() if err != nil { return err } res, err := doHTTPRequestForUpdate(req) if err != nil { return err } if res.StatusCode != 200 { bod, _ := ioutil.ReadAll(io.LimitReader(res.Body, 512)) logging.Errorf("Failed to connect to host, unexpected status code: %v. Body %s", res.StatusCode, bod) res.Body.Close() returnErr = fmt.Errorf("Failed to connect to host. Status %v Body %s", res.StatusCode, bod) failures++ continue } dec := json.NewDecoder(res.Body) tmpb := &Bucket{} for { err := dec.Decode(&tmpb) if err != nil { returnErr = err res.Body.Close() break } // if we got here, reset failure count failures = 0 b.Lock() // mark all the old connection pools for deletion pools := b.getConnPools(true /* already locked */) for _, pool := range pools { if pool != nil { pool.inUse = false } } newcps := make([]*connectionPool, len(tmpb.VBSMJson.ServerList)) for i := range newcps { // get the old connection pool and check if it is still valid pool := b.getConnPoolByHost(tmpb.VBSMJson.ServerList[i], true /* bucket already locked */) if pool != nil && pool.inUse == false { // if the hostname and index is unchanged then reuse this pool newcps[i] = pool pool.inUse = true continue } // else create a new pool hostport := tmpb.VBSMJson.ServerList[i] if tlsConfig != nil { hostport, err = MapKVtoSSL(hostport, &poolServices) if err != nil { b.Unlock() return err } } if b.ah != nil { newcps[i] = newConnectionPool(hostport, b.ah, false, PoolSize, PoolOverflow, b.pool.client.tlsConfig, b.Name) } else { newcps[i] = newConnectionPool(hostport, b.authHandler(true /* bucket already locked */), false, PoolSize, PoolOverflow, b.pool.client.tlsConfig, b.Name) } } b.replaceConnPools2(newcps, true /* bucket already locked */) tmpb.ah = b.ah b.vBucketServerMap = unsafe.Pointer(&tmpb.VBSMJson) b.nodeList = unsafe.Pointer(&tmpb.NodesJSON) b.Unlock() logging.Infof("Got new configuration for bucket %s", b.GetName()) } // we are here because of an error failures++ continue } return nil }