package nodb import ( "encoding/binary" "errors" "sort" "time" "github.com/lunny/nodb/store" ) const ( OPand uint8 = iota + 1 OPor OPxor OPnot ) type BitPair struct { Pos int32 Val uint8 } type segBitInfo struct { Seq uint32 Off uint32 Val uint8 } type segBitInfoArray []segBitInfo const ( // byte segByteWidth uint32 = 9 segByteSize uint32 = 1 << segByteWidth // bit segBitWidth uint32 = segByteWidth + 3 segBitSize uint32 = segByteSize << 3 maxByteSize uint32 = 8 << 20 maxSegCount uint32 = maxByteSize / segByteSize minSeq uint32 = 0 maxSeq uint32 = uint32((maxByteSize << 3) - 1) ) var bitsInByte = [256]int32{0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 4, 5, 5, 6, 5, 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8} var fillBits = [...]uint8{1, 3, 7, 15, 31, 63, 127, 255} var emptySegment []byte = make([]byte, segByteSize, segByteSize) var fillSegment []byte = func() []byte { data := make([]byte, segByteSize, segByteSize) for i := uint32(0); i < segByteSize; i++ { data[i] = 0xff } return data }() var errBinKey = errors.New("invalid bin key") var errOffset = errors.New("invalid offset") var errDuplicatePos = errors.New("duplicate bit pos") func getBit(sz []byte, offset uint32) uint8 { index := offset >> 3 if index >= uint32(len(sz)) { return 0 // error("overflow") } offset -= index << 3 return sz[index] >> offset & 1 } func setBit(sz []byte, offset uint32, val uint8) bool { if val != 1 && val != 0 { return false // error("invalid val") } index := offset >> 3 if index >= uint32(len(sz)) { return false // error("overflow") } offset -= index << 3 if sz[index]>>offset&1 != val { sz[index] ^= (1 << offset) } return true } func (datas segBitInfoArray) Len() int { return len(datas) } func (datas segBitInfoArray) Less(i, j int) bool { res := (datas)[i].Seq < (datas)[j].Seq if !res && (datas)[i].Seq == (datas)[j].Seq { res = (datas)[i].Off < (datas)[j].Off } return res } func (datas segBitInfoArray) Swap(i, j int) { datas[i], datas[j] = datas[j], datas[i] } func (db *DB) bEncodeMetaKey(key []byte) []byte { mk := make([]byte, len(key)+2) mk[0] = db.index mk[1] = BitMetaType copy(mk[2:], key) return mk } func (db *DB) bDecodeMetaKey(bkey []byte) ([]byte, error) { if len(bkey) < 2 || bkey[0] != db.index || bkey[1] != BitMetaType { return nil, errBinKey } return bkey[2:], nil } func (db *DB) bEncodeBinKey(key []byte, seq uint32) []byte { bk := make([]byte, len(key)+8) pos := 0 bk[pos] = db.index pos++ bk[pos] = BitType pos++ binary.BigEndian.PutUint16(bk[pos:], uint16(len(key))) pos += 2 copy(bk[pos:], key) pos += len(key) binary.BigEndian.PutUint32(bk[pos:], seq) return bk } func (db *DB) bDecodeBinKey(bkey []byte) (key []byte, seq uint32, err error) { if len(bkey) < 8 || bkey[0] != db.index { err = errBinKey return } keyLen := binary.BigEndian.Uint16(bkey[2:4]) if int(keyLen+8) != len(bkey) { err = errBinKey return } key = bkey[4 : 4+keyLen] seq = uint32(binary.BigEndian.Uint32(bkey[4+keyLen:])) return } func (db *DB) bCapByteSize(seq uint32, off uint32) uint32 { var offByteSize uint32 = (off >> 3) + 1 if offByteSize > segByteSize { offByteSize = segByteSize } return seq<= 0 { offset += int32((uint32(tailSeq)<> segBitWidth off &= (segBitSize - 1) return } func (db *DB) bGetMeta(key []byte) (tailSeq int32, tailOff int32, err error) { var v []byte mk := db.bEncodeMetaKey(key) v, err = db.bucket.Get(mk) if err != nil { return } if v != nil { tailSeq = int32(binary.LittleEndian.Uint32(v[0:4])) tailOff = int32(binary.LittleEndian.Uint32(v[4:8])) } else { tailSeq = -1 tailOff = -1 } return } func (db *DB) bSetMeta(t *batch, key []byte, tailSeq uint32, tailOff uint32) { ek := db.bEncodeMetaKey(key) buf := make([]byte, 8) binary.LittleEndian.PutUint32(buf[0:4], tailSeq) binary.LittleEndian.PutUint32(buf[4:8], tailOff) t.Put(ek, buf) return } func (db *DB) bUpdateMeta(t *batch, key []byte, seq uint32, off uint32) (tailSeq uint32, tailOff uint32, err error) { var tseq, toff int32 var update bool = false if tseq, toff, err = db.bGetMeta(key); err != nil { return } else if tseq < 0 { update = true } else { tailSeq = uint32(MaxInt32(tseq, 0)) tailOff = uint32(MaxInt32(toff, 0)) update = (seq > tailSeq || (seq == tailSeq && off > tailOff)) } if update { db.bSetMeta(t, key, seq, off) tailSeq = seq tailOff = off } return } func (db *DB) bDelete(t *batch, key []byte) (drop int64) { mk := db.bEncodeMetaKey(key) t.Delete(mk) minKey := db.bEncodeBinKey(key, minSeq) maxKey := db.bEncodeBinKey(key, maxSeq) it := db.bucket.RangeIterator(minKey, maxKey, store.RangeClose) for ; it.Valid(); it.Next() { t.Delete(it.RawKey()) drop++ } it.Close() return drop } func (db *DB) bGetSegment(key []byte, seq uint32) ([]byte, []byte, error) { bk := db.bEncodeBinKey(key, seq) segment, err := db.bucket.Get(bk) if err != nil { return bk, nil, err } return bk, segment, nil } func (db *DB) bAllocateSegment(key []byte, seq uint32) ([]byte, []byte, error) { bk, segment, err := db.bGetSegment(key, seq) if err == nil && segment == nil { segment = make([]byte, segByteSize, segByteSize) } return bk, segment, err } func (db *DB) bIterator(key []byte) *store.RangeLimitIterator { sk := db.bEncodeBinKey(key, minSeq) ek := db.bEncodeBinKey(key, maxSeq) return db.bucket.RangeIterator(sk, ek, store.RangeClose) } func (db *DB) bSegAnd(a []byte, b []byte, res *[]byte) { if a == nil || b == nil { *res = nil return } data := *res if data == nil { data = make([]byte, segByteSize, segByteSize) *res = data } for i := uint32(0); i < segByteSize; i++ { data[i] = a[i] & b[i] } return } func (db *DB) bSegOr(a []byte, b []byte, res *[]byte) { if a == nil || b == nil { if a == nil && b == nil { *res = nil } else if a == nil { *res = b } else { *res = a } return } data := *res if data == nil { data = make([]byte, segByteSize, segByteSize) *res = data } for i := uint32(0); i < segByteSize; i++ { data[i] = a[i] | b[i] } return } func (db *DB) bSegXor(a []byte, b []byte, res *[]byte) { if a == nil && b == nil { *res = fillSegment return } if a == nil { a = emptySegment } if b == nil { b = emptySegment } data := *res if data == nil { data = make([]byte, segByteSize, segByteSize) *res = data } for i := uint32(0); i < segByteSize; i++ { data[i] = a[i] ^ b[i] } return } func (db *DB) bExpireAt(key []byte, when int64) (int64, error) { t := db.binBatch t.Lock() defer t.Unlock() if seq, _, err := db.bGetMeta(key); err != nil || seq < 0 { return 0, err } else { db.expireAt(t, BitType, key, when) if err := t.Commit(); err != nil { return 0, err } } return 1, nil } func (db *DB) bCountByte(val byte, soff uint32, eoff uint32) int32 { if soff > eoff { soff, eoff = eoff, soff } mask := uint8(0) if soff > 0 { mask |= fillBits[soff-1] } if eoff < 7 { mask |= (fillBits[7] ^ fillBits[eoff]) } mask = fillBits[7] ^ mask return bitsInByte[val&mask] } func (db *DB) bCountSeg(key []byte, seq uint32, soff uint32, eoff uint32) (cnt int32, err error) { if soff >= segBitSize || soff < 0 || eoff >= segBitSize || eoff < 0 { return } var segment []byte if _, segment, err = db.bGetSegment(key, seq); err != nil { return } if segment == nil { return } if soff > eoff { soff, eoff = eoff, soff } headIdx := int(soff >> 3) endIdx := int(eoff >> 3) sByteOff := soff - ((soff >> 3) << 3) eByteOff := eoff - ((eoff >> 3) << 3) if headIdx == endIdx { cnt = db.bCountByte(segment[headIdx], sByteOff, eByteOff) } else { cnt = db.bCountByte(segment[headIdx], sByteOff, 7) + db.bCountByte(segment[endIdx], 0, eByteOff) } // sum up following bytes for idx, end := headIdx+1, endIdx-1; idx <= end; idx += 1 { cnt += bitsInByte[segment[idx]] if idx == end { break } } return } func (db *DB) BGet(key []byte) (data []byte, err error) { if err = checkKeySize(key); err != nil { return } var ts, to int32 if ts, to, err = db.bGetMeta(key); err != nil || ts < 0 { return } var tailSeq, tailOff = uint32(ts), uint32(to) var capByteSize uint32 = db.bCapByteSize(tailSeq, tailOff) data = make([]byte, capByteSize, capByteSize) minKey := db.bEncodeBinKey(key, minSeq) maxKey := db.bEncodeBinKey(key, tailSeq) it := db.bucket.RangeIterator(minKey, maxKey, store.RangeClose) var seq, s, e uint32 for ; it.Valid(); it.Next() { if _, seq, err = db.bDecodeBinKey(it.RawKey()); err != nil { data = nil break } s = seq << segByteWidth e = MinUInt32(s+segByteSize, capByteSize) copy(data[s:e], it.RawValue()) } it.Close() return } func (db *DB) BDelete(key []byte) (drop int64, err error) { if err = checkKeySize(key); err != nil { return } t := db.binBatch t.Lock() defer t.Unlock() drop = db.bDelete(t, key) db.rmExpire(t, BitType, key) err = t.Commit() return } func (db *DB) BSetBit(key []byte, offset int32, val uint8) (ori uint8, err error) { if err = checkKeySize(key); err != nil { return } // todo : check offset var seq, off uint32 if seq, off, err = db.bParseOffset(key, offset); err != nil { return 0, err } var bk, segment []byte if bk, segment, err = db.bAllocateSegment(key, seq); err != nil { return 0, err } if segment != nil { ori = getBit(segment, off) if setBit(segment, off, val) { t := db.binBatch t.Lock() defer t.Unlock() t.Put(bk, segment) if _, _, e := db.bUpdateMeta(t, key, seq, off); e != nil { err = e return } err = t.Commit() } } return } func (db *DB) BMSetBit(key []byte, args ...BitPair) (place int64, err error) { if err = checkKeySize(key); err != nil { return } // (ps : so as to aviod wasting memory copy while calling db.Get() and batch.Put(), // here we sequence the params by pos, so that we can merge the execution of // diff pos setting which targets on the same segment respectively. ) // #1 : sequence request data var argCnt = len(args) var bitInfos segBitInfoArray = make(segBitInfoArray, argCnt) var seq, off uint32 for i, info := range args { if seq, off, err = db.bParseOffset(key, info.Pos); err != nil { return } bitInfos[i].Seq = seq bitInfos[i].Off = off bitInfos[i].Val = info.Val } sort.Sort(bitInfos) for i := 1; i < argCnt; i++ { if bitInfos[i].Seq == bitInfos[i-1].Seq && bitInfos[i].Off == bitInfos[i-1].Off { return 0, errDuplicatePos } } // #2 : execute bit set in order t := db.binBatch t.Lock() defer t.Unlock() var curBinKey, curSeg []byte var curSeq, maxSeq, maxOff uint32 for _, info := range bitInfos { if curSeg != nil && info.Seq != curSeq { t.Put(curBinKey, curSeg) curSeg = nil } if curSeg == nil { curSeq = info.Seq if curBinKey, curSeg, err = db.bAllocateSegment(key, info.Seq); err != nil { return } if curSeg == nil { continue } } if setBit(curSeg, info.Off, info.Val) { maxSeq = info.Seq maxOff = info.Off place++ } } if curSeg != nil { t.Put(curBinKey, curSeg) } // finally, update meta if place > 0 { if _, _, err = db.bUpdateMeta(t, key, maxSeq, maxOff); err != nil { return } err = t.Commit() } return } func (db *DB) BGetBit(key []byte, offset int32) (uint8, error) { if seq, off, err := db.bParseOffset(key, offset); err != nil { return 0, err } else { _, segment, err := db.bGetSegment(key, seq) if err != nil { return 0, err } if segment == nil { return 0, nil } else { return getBit(segment, off), nil } } } // func (db *DB) BGetRange(key []byte, start int32, end int32) ([]byte, error) { // section := make([]byte) // return // } func (db *DB) BCount(key []byte, start int32, end int32) (cnt int32, err error) { var sseq, soff uint32 if sseq, soff, err = db.bParseOffset(key, start); err != nil { return } var eseq, eoff uint32 if eseq, eoff, err = db.bParseOffset(key, end); err != nil { return } if sseq > eseq || (sseq == eseq && soff > eoff) { sseq, eseq = eseq, sseq soff, eoff = eoff, soff } var segCnt int32 if eseq == sseq { if segCnt, err = db.bCountSeg(key, sseq, soff, eoff); err != nil { return 0, err } cnt = segCnt } else { if segCnt, err = db.bCountSeg(key, sseq, soff, segBitSize-1); err != nil { return 0, err } else { cnt += segCnt } if segCnt, err = db.bCountSeg(key, eseq, 0, eoff); err != nil { return 0, err } else { cnt += segCnt } } // middle segs var segment []byte skey := db.bEncodeBinKey(key, sseq) ekey := db.bEncodeBinKey(key, eseq) it := db.bucket.RangeIterator(skey, ekey, store.RangeOpen) for ; it.Valid(); it.Next() { segment = it.RawValue() for _, bt := range segment { cnt += bitsInByte[bt] } } it.Close() return } func (db *DB) BTail(key []byte) (int32, error) { // effective length of data, the highest bit-pos set in history tailSeq, tailOff, err := db.bGetMeta(key) if err != nil { return 0, err } tail := int32(-1) if tailSeq >= 0 { tail = int32(uint32(tailSeq)< maxDstSeq || (seq == maxDstSeq && off > maxDstOff) { maxDstSeq = seq maxDstOff = off } } if (op == OPnot && validKeyNum != 1) || (op != OPnot && validKeyNum < 2) { return // with not enough existing source key } var srcIdx int for srcIdx = 0; srcIdx < keyNum; srcIdx++ { if srckeys[srcIdx] != nil { break } } // init - data var segments = make([][]byte, maxDstSeq+1) if op == OPnot { // ps : // ( ~num == num ^ 0x11111111 ) // we init the result segments with all bit set, // then we can calculate through the way of 'xor'. // ahead segments bin format : 1111 ... 1111 for i := uint32(0); i < maxDstSeq; i++ { segments[i] = fillSegment } // last segment bin format : 1111..1100..0000 var tailSeg = make([]byte, segByteSize, segByteSize) var fillByte = fillBits[7] var tailSegLen = db.bCapByteSize(uint32(0), maxDstOff) for i := uint32(0); i < tailSegLen-1; i++ { tailSeg[i] = fillByte } tailSeg[tailSegLen-1] = fillBits[maxDstOff-(tailSegLen-1)<<3] segments[maxDstSeq] = tailSeg } else { // ps : init segments by data corresponding to the 1st valid source key it := db.bIterator(srckeys[srcIdx]) for ; it.Valid(); it.Next() { if _, seq, err = db.bDecodeBinKey(it.RawKey()); err != nil { // to do ... it.Close() return } segments[seq] = it.Value() } it.Close() srcIdx++ } // operation with following keys var res []byte for i := srcIdx; i < keyNum; i++ { if srckeys[i] == nil { continue } it := db.bIterator(srckeys[i]) for idx, end := uint32(0), false; !end; it.Next() { end = !it.Valid() if !end { if _, seq, err = db.bDecodeBinKey(it.RawKey()); err != nil { // to do ... it.Close() return } } else { seq = maxDstSeq + 1 } // todo : // operation 'and' can be optimize here : // if seq > max_segments_idx, this loop can be break, // which can avoid cost from Key() and bDecodeBinKey() for ; idx < seq; idx++ { res = nil exeOp(segments[idx], nil, &res) segments[idx] = res } if !end { res = it.Value() exeOp(segments[seq], res, &res) segments[seq] = res idx++ } } it.Close() } // clear the old data in case db.bDelete(t, dstkey) db.rmExpire(t, BitType, dstkey) // set data db.bSetMeta(t, dstkey, maxDstSeq, maxDstOff) var bk []byte for seq, segt := range segments { if segt != nil { bk = db.bEncodeBinKey(dstkey, uint32(seq)) t.Put(bk, segt) } } err = t.Commit() if err == nil { // blen = int32(db.bCapByteSize(maxDstOff, maxDstOff)) blen = int32(maxDstSeq<