2019-02-05 22:22:51 +05:30
|
|
|
package nodb
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"sync"
|
|
|
|
|
2020-11-03 11:34:09 +05:30
|
|
|
"gitea.com/lunny/nodb/store"
|
2019-02-05 22:22:51 +05:30
|
|
|
)
|
|
|
|
|
|
|
|
type ibucket interface {
|
|
|
|
Get(key []byte) ([]byte, error)
|
|
|
|
|
|
|
|
Put(key []byte, value []byte) error
|
|
|
|
Delete(key []byte) error
|
|
|
|
|
|
|
|
NewIterator() *store.Iterator
|
|
|
|
|
|
|
|
NewWriteBatch() store.WriteBatch
|
|
|
|
|
|
|
|
RangeIterator(min []byte, max []byte, rangeType uint8) *store.RangeLimitIterator
|
|
|
|
RevRangeIterator(min []byte, max []byte, rangeType uint8) *store.RangeLimitIterator
|
|
|
|
RangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *store.RangeLimitIterator
|
|
|
|
RevRangeLimitIterator(min []byte, max []byte, rangeType uint8, offset int, count int) *store.RangeLimitIterator
|
|
|
|
}
|
|
|
|
|
|
|
|
type DB struct {
|
|
|
|
l *Nodb
|
|
|
|
|
|
|
|
sdb *store.DB
|
|
|
|
|
|
|
|
bucket ibucket
|
|
|
|
|
|
|
|
index uint8
|
|
|
|
|
|
|
|
kvBatch *batch
|
|
|
|
listBatch *batch
|
|
|
|
hashBatch *batch
|
|
|
|
zsetBatch *batch
|
|
|
|
binBatch *batch
|
|
|
|
setBatch *batch
|
|
|
|
|
|
|
|
status uint8
|
|
|
|
}
|
|
|
|
|
|
|
|
func (l *Nodb) newDB(index uint8) *DB {
|
|
|
|
d := new(DB)
|
|
|
|
|
|
|
|
d.l = l
|
|
|
|
|
|
|
|
d.sdb = l.ldb
|
|
|
|
|
|
|
|
d.bucket = d.sdb
|
|
|
|
|
|
|
|
d.status = DBAutoCommit
|
|
|
|
d.index = index
|
|
|
|
|
|
|
|
d.kvBatch = d.newBatch()
|
|
|
|
d.listBatch = d.newBatch()
|
|
|
|
d.hashBatch = d.newBatch()
|
|
|
|
d.zsetBatch = d.newBatch()
|
|
|
|
d.binBatch = d.newBatch()
|
|
|
|
d.setBatch = d.newBatch()
|
|
|
|
|
|
|
|
return d
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) newBatch() *batch {
|
|
|
|
return db.l.newBatch(db.bucket.NewWriteBatch(), &dbBatchLocker{l: &sync.Mutex{}, wrLock: &db.l.wLock}, nil)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) Index() int {
|
|
|
|
return int(db.index)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) IsAutoCommit() bool {
|
|
|
|
return db.status == DBAutoCommit
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) FlushAll() (drop int64, err error) {
|
|
|
|
all := [...](func() (int64, error)){
|
|
|
|
db.flush,
|
|
|
|
db.lFlush,
|
|
|
|
db.hFlush,
|
|
|
|
db.zFlush,
|
|
|
|
db.bFlush,
|
|
|
|
db.sFlush}
|
|
|
|
|
|
|
|
for _, flush := range all {
|
|
|
|
if n, e := flush(); e != nil {
|
|
|
|
err = e
|
|
|
|
return
|
|
|
|
} else {
|
|
|
|
drop += n
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) newEliminator() *elimination {
|
|
|
|
eliminator := newEliminator(db)
|
|
|
|
|
|
|
|
eliminator.regRetireContext(KVType, db.kvBatch, db.delete)
|
|
|
|
eliminator.regRetireContext(ListType, db.listBatch, db.lDelete)
|
|
|
|
eliminator.regRetireContext(HashType, db.hashBatch, db.hDelete)
|
|
|
|
eliminator.regRetireContext(ZSetType, db.zsetBatch, db.zDelete)
|
|
|
|
eliminator.regRetireContext(BitType, db.binBatch, db.bDelete)
|
|
|
|
eliminator.regRetireContext(SetType, db.setBatch, db.sDelete)
|
|
|
|
|
|
|
|
return eliminator
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) flushRegion(t *batch, minKey []byte, maxKey []byte) (drop int64, err error) {
|
|
|
|
it := db.bucket.RangeIterator(minKey, maxKey, store.RangeROpen)
|
|
|
|
for ; it.Valid(); it.Next() {
|
|
|
|
t.Delete(it.RawKey())
|
|
|
|
drop++
|
|
|
|
if drop&1023 == 0 {
|
|
|
|
if err = t.Commit(); err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
it.Close()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (db *DB) flushType(t *batch, dataType byte) (drop int64, err error) {
|
|
|
|
var deleteFunc func(t *batch, key []byte) int64
|
|
|
|
var metaDataType byte
|
|
|
|
switch dataType {
|
|
|
|
case KVType:
|
|
|
|
deleteFunc = db.delete
|
|
|
|
metaDataType = KVType
|
|
|
|
case ListType:
|
|
|
|
deleteFunc = db.lDelete
|
|
|
|
metaDataType = LMetaType
|
|
|
|
case HashType:
|
|
|
|
deleteFunc = db.hDelete
|
|
|
|
metaDataType = HSizeType
|
|
|
|
case ZSetType:
|
|
|
|
deleteFunc = db.zDelete
|
|
|
|
metaDataType = ZSizeType
|
|
|
|
case BitType:
|
|
|
|
deleteFunc = db.bDelete
|
|
|
|
metaDataType = BitMetaType
|
|
|
|
case SetType:
|
|
|
|
deleteFunc = db.sDelete
|
|
|
|
metaDataType = SSizeType
|
|
|
|
default:
|
|
|
|
return 0, fmt.Errorf("invalid data type: %s", TypeName[dataType])
|
|
|
|
}
|
|
|
|
|
|
|
|
var keys [][]byte
|
|
|
|
keys, err = db.scan(metaDataType, nil, 1024, false, "")
|
|
|
|
for len(keys) != 0 || err != nil {
|
|
|
|
for _, key := range keys {
|
|
|
|
deleteFunc(t, key)
|
|
|
|
db.rmExpire(t, dataType, key)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
if err = t.Commit(); err != nil {
|
|
|
|
return
|
|
|
|
} else {
|
|
|
|
drop += int64(len(keys))
|
|
|
|
}
|
|
|
|
keys, err = db.scan(metaDataType, nil, 1024, false, "")
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|