333 lines
8.2 KiB
Go
333 lines
8.2 KiB
Go
package memcached
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
|
|
"github.com/couchbase/gomemcached"
|
|
"github.com/couchbase/goutils/logging"
|
|
)
|
|
|
|
// TAP protocol docs: <http://www.couchbase.com/wiki/display/couchbase/TAP+Protocol>
|
|
|
|
// TapOpcode is the tap operation type (found in TapEvent)
|
|
type TapOpcode uint8
|
|
|
|
// Tap opcode values.
|
|
const (
|
|
TapBeginBackfill = TapOpcode(iota)
|
|
TapEndBackfill
|
|
TapMutation
|
|
TapDeletion
|
|
TapCheckpointStart
|
|
TapCheckpointEnd
|
|
tapEndStream
|
|
)
|
|
|
|
const tapMutationExtraLen = 16
|
|
|
|
var tapOpcodeNames map[TapOpcode]string
|
|
|
|
func init() {
|
|
tapOpcodeNames = map[TapOpcode]string{
|
|
TapBeginBackfill: "BeginBackfill",
|
|
TapEndBackfill: "EndBackfill",
|
|
TapMutation: "Mutation",
|
|
TapDeletion: "Deletion",
|
|
TapCheckpointStart: "TapCheckpointStart",
|
|
TapCheckpointEnd: "TapCheckpointEnd",
|
|
tapEndStream: "EndStream",
|
|
}
|
|
}
|
|
|
|
func (opcode TapOpcode) String() string {
|
|
name := tapOpcodeNames[opcode]
|
|
if name == "" {
|
|
name = fmt.Sprintf("#%d", opcode)
|
|
}
|
|
return name
|
|
}
|
|
|
|
// TapEvent is a TAP notification of an operation on the server.
|
|
type TapEvent struct {
|
|
Opcode TapOpcode // Type of event
|
|
VBucket uint16 // VBucket this event applies to
|
|
Flags uint32 // Item flags
|
|
Expiry uint32 // Item expiration time
|
|
Key, Value []byte // Item key/value
|
|
Cas uint64
|
|
}
|
|
|
|
func makeTapEvent(req gomemcached.MCRequest) *TapEvent {
|
|
event := TapEvent{
|
|
VBucket: req.VBucket,
|
|
}
|
|
switch req.Opcode {
|
|
case gomemcached.TAP_MUTATION:
|
|
event.Opcode = TapMutation
|
|
event.Key = req.Key
|
|
event.Value = req.Body
|
|
event.Cas = req.Cas
|
|
case gomemcached.TAP_DELETE:
|
|
event.Opcode = TapDeletion
|
|
event.Key = req.Key
|
|
event.Cas = req.Cas
|
|
case gomemcached.TAP_CHECKPOINT_START:
|
|
event.Opcode = TapCheckpointStart
|
|
case gomemcached.TAP_CHECKPOINT_END:
|
|
event.Opcode = TapCheckpointEnd
|
|
case gomemcached.TAP_OPAQUE:
|
|
if len(req.Extras) < 8+4 {
|
|
return nil
|
|
}
|
|
switch op := int(binary.BigEndian.Uint32(req.Extras[8:])); op {
|
|
case gomemcached.TAP_OPAQUE_INITIAL_VBUCKET_STREAM:
|
|
event.Opcode = TapBeginBackfill
|
|
case gomemcached.TAP_OPAQUE_CLOSE_BACKFILL:
|
|
event.Opcode = TapEndBackfill
|
|
case gomemcached.TAP_OPAQUE_CLOSE_TAP_STREAM:
|
|
event.Opcode = tapEndStream
|
|
case gomemcached.TAP_OPAQUE_ENABLE_AUTO_NACK:
|
|
return nil
|
|
case gomemcached.TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC:
|
|
return nil
|
|
default:
|
|
logging.Infof("TapFeed: Ignoring TAP_OPAQUE/%d", op)
|
|
return nil // unknown opaque event
|
|
}
|
|
case gomemcached.NOOP:
|
|
return nil // ignore
|
|
default:
|
|
logging.Infof("TapFeed: Ignoring %s", req.Opcode)
|
|
return nil // unknown event
|
|
}
|
|
|
|
if len(req.Extras) >= tapMutationExtraLen &&
|
|
(event.Opcode == TapMutation || event.Opcode == TapDeletion) {
|
|
|
|
event.Flags = binary.BigEndian.Uint32(req.Extras[8:])
|
|
event.Expiry = binary.BigEndian.Uint32(req.Extras[12:])
|
|
}
|
|
|
|
return &event
|
|
}
|
|
|
|
func (event TapEvent) String() string {
|
|
switch event.Opcode {
|
|
case TapBeginBackfill, TapEndBackfill, TapCheckpointStart, TapCheckpointEnd:
|
|
return fmt.Sprintf("<TapEvent %s, vbucket=%d>",
|
|
event.Opcode, event.VBucket)
|
|
default:
|
|
return fmt.Sprintf("<TapEvent %s, key=%q (%d bytes) flags=%x, exp=%d>",
|
|
event.Opcode, event.Key, len(event.Value),
|
|
event.Flags, event.Expiry)
|
|
}
|
|
}
|
|
|
|
// TapArguments are parameters for requesting a TAP feed.
|
|
//
|
|
// Call DefaultTapArguments to get a default one.
|
|
type TapArguments struct {
|
|
// Timestamp of oldest item to send.
|
|
//
|
|
// Use TapNoBackfill to suppress all past items.
|
|
Backfill uint64
|
|
// If set, server will disconnect after sending existing items.
|
|
Dump bool
|
|
// The indices of the vbuckets to watch; empty/nil to watch all.
|
|
VBuckets []uint16
|
|
// Transfers ownership of vbuckets during cluster rebalance.
|
|
Takeover bool
|
|
// If true, server will wait for client ACK after every notification.
|
|
SupportAck bool
|
|
// If true, client doesn't want values so server shouldn't send them.
|
|
KeysOnly bool
|
|
// If true, client wants the server to send checkpoint events.
|
|
Checkpoint bool
|
|
// Optional identifier to use for this client, to allow reconnects
|
|
ClientName string
|
|
// Registers this client (by name) till explicitly deregistered.
|
|
RegisteredClient bool
|
|
}
|
|
|
|
// Value for TapArguments.Backfill denoting that no past events at all
|
|
// should be sent.
|
|
const TapNoBackfill = math.MaxUint64
|
|
|
|
// DefaultTapArguments returns a default set of parameter values to
|
|
// pass to StartTapFeed.
|
|
func DefaultTapArguments() TapArguments {
|
|
return TapArguments{
|
|
Backfill: TapNoBackfill,
|
|
}
|
|
}
|
|
|
|
func (args *TapArguments) flags() []byte {
|
|
var flags gomemcached.TapConnectFlag
|
|
if args.Backfill != 0 {
|
|
flags |= gomemcached.BACKFILL
|
|
}
|
|
if args.Dump {
|
|
flags |= gomemcached.DUMP
|
|
}
|
|
if len(args.VBuckets) > 0 {
|
|
flags |= gomemcached.LIST_VBUCKETS
|
|
}
|
|
if args.Takeover {
|
|
flags |= gomemcached.TAKEOVER_VBUCKETS
|
|
}
|
|
if args.SupportAck {
|
|
flags |= gomemcached.SUPPORT_ACK
|
|
}
|
|
if args.KeysOnly {
|
|
flags |= gomemcached.REQUEST_KEYS_ONLY
|
|
}
|
|
if args.Checkpoint {
|
|
flags |= gomemcached.CHECKPOINT
|
|
}
|
|
if args.RegisteredClient {
|
|
flags |= gomemcached.REGISTERED_CLIENT
|
|
}
|
|
encoded := make([]byte, 4)
|
|
binary.BigEndian.PutUint32(encoded, uint32(flags))
|
|
return encoded
|
|
}
|
|
|
|
func must(err error) {
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func (args *TapArguments) bytes() (rv []byte) {
|
|
buf := bytes.NewBuffer([]byte{})
|
|
|
|
if args.Backfill > 0 {
|
|
must(binary.Write(buf, binary.BigEndian, uint64(args.Backfill)))
|
|
}
|
|
|
|
if len(args.VBuckets) > 0 {
|
|
must(binary.Write(buf, binary.BigEndian, uint16(len(args.VBuckets))))
|
|
for i := 0; i < len(args.VBuckets); i++ {
|
|
must(binary.Write(buf, binary.BigEndian, uint16(args.VBuckets[i])))
|
|
}
|
|
}
|
|
return buf.Bytes()
|
|
}
|
|
|
|
// TapFeed represents a stream of events from a server.
|
|
type TapFeed struct {
|
|
C <-chan TapEvent
|
|
Error error
|
|
closer chan bool
|
|
}
|
|
|
|
// StartTapFeed starts a TAP feed on a client connection.
|
|
//
|
|
// The events can be read from the returned channel. The connection
|
|
// can no longer be used for other purposes; it's now reserved for
|
|
// receiving the TAP messages. To stop receiving events, close the
|
|
// client connection.
|
|
func (mc *Client) StartTapFeed(args TapArguments) (*TapFeed, error) {
|
|
rq := &gomemcached.MCRequest{
|
|
Opcode: gomemcached.TAP_CONNECT,
|
|
Key: []byte(args.ClientName),
|
|
Extras: args.flags(),
|
|
Body: args.bytes()}
|
|
|
|
err := mc.Transmit(rq)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ch := make(chan TapEvent)
|
|
feed := &TapFeed{
|
|
C: ch,
|
|
closer: make(chan bool),
|
|
}
|
|
go mc.runFeed(ch, feed)
|
|
return feed, nil
|
|
}
|
|
|
|
// TapRecvHook is called after every incoming tap packet is received.
|
|
var TapRecvHook func(*gomemcached.MCRequest, int, error)
|
|
|
|
// Internal goroutine that reads from the socket and writes events to
|
|
// the channel
|
|
func (mc *Client) runFeed(ch chan TapEvent, feed *TapFeed) {
|
|
defer close(ch)
|
|
var headerBuf [gomemcached.HDR_LEN]byte
|
|
loop:
|
|
for {
|
|
// Read the next request from the server.
|
|
//
|
|
// (Can't call mc.Receive() because it reads a
|
|
// _response_ not a request.)
|
|
var pkt gomemcached.MCRequest
|
|
n, err := pkt.Receive(mc.conn, headerBuf[:])
|
|
if TapRecvHook != nil {
|
|
TapRecvHook(&pkt, n, err)
|
|
}
|
|
|
|
if err != nil {
|
|
if err != io.EOF {
|
|
feed.Error = err
|
|
}
|
|
break loop
|
|
}
|
|
|
|
//logging.Infof("** TapFeed received %#v : %q", pkt, pkt.Body)
|
|
|
|
if pkt.Opcode == gomemcached.TAP_CONNECT {
|
|
// This is not an event from the server; it's
|
|
// an error response to my connect request.
|
|
feed.Error = fmt.Errorf("tap connection failed: %s", pkt.Body)
|
|
break loop
|
|
}
|
|
|
|
event := makeTapEvent(pkt)
|
|
if event != nil {
|
|
if event.Opcode == tapEndStream {
|
|
break loop
|
|
}
|
|
|
|
select {
|
|
case ch <- *event:
|
|
case <-feed.closer:
|
|
break loop
|
|
}
|
|
}
|
|
|
|
if len(pkt.Extras) >= 4 {
|
|
reqFlags := binary.BigEndian.Uint16(pkt.Extras[2:])
|
|
if reqFlags&gomemcached.TAP_ACK != 0 {
|
|
if _, err := mc.sendAck(&pkt); err != nil {
|
|
feed.Error = err
|
|
break loop
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if err := mc.Close(); err != nil {
|
|
logging.Errorf("Error closing memcached client: %v", err)
|
|
}
|
|
}
|
|
|
|
func (mc *Client) sendAck(pkt *gomemcached.MCRequest) (int, error) {
|
|
res := gomemcached.MCResponse{
|
|
Opcode: pkt.Opcode,
|
|
Opaque: pkt.Opaque,
|
|
Status: gomemcached.SUCCESS,
|
|
}
|
|
return res.Transmit(mc.conn)
|
|
}
|
|
|
|
// Close terminates a TapFeed.
|
|
//
|
|
// Call this if you stop using a TapFeed before its channel ends.
|
|
func (feed *TapFeed) Close() {
|
|
close(feed.closer)
|
|
}
|