gorush/vendor/github.com/dgraph-io/badger/db.go

1363 lines
35 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
* Copyright 2017 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package badger
import (
"bytes"
"encoding/binary"
"expvar"
"log"
"math"
"os"
"path/filepath"
"strconv"
"sync"
"time"
"github.com/dgraph-io/badger/options"
"golang.org/x/net/trace"
"github.com/dgraph-io/badger/skl"
"github.com/dgraph-io/badger/table"
"github.com/dgraph-io/badger/y"
"github.com/pkg/errors"
)
var (
badgerPrefix = []byte("!badger!") // Prefix for internal keys used by badger.
head = []byte("!badger!head") // For storing value offset for replay.
txnKey = []byte("!badger!txn") // For indicating end of entries in txn.
)
type closers struct {
updateSize *y.Closer
compactors *y.Closer
memtable *y.Closer
writes *y.Closer
valueGC *y.Closer
}
// DB provides the various functions required to interact with Badger.
// DB is thread-safe.
type DB struct {
sync.RWMutex // Guards list of inmemory tables, not individual reads and writes.
dirLockGuard *directoryLockGuard
// nil if Dir and ValueDir are the same
valueDirGuard *directoryLockGuard
closers closers
elog trace.EventLog
mt *skl.Skiplist // Our latest (actively written) in-memory table
imm []*skl.Skiplist // Add here only AFTER pushing to flushChan.
opt Options
manifest *manifestFile
lc *levelsController
vlog valueLog
vptr valuePointer // less than or equal to a pointer to the last vlog value put into mt
writeCh chan *request
flushChan chan flushTask // For flushing memtables.
orc *oracle
}
const (
kvWriteChCapacity = 1000
)
func replayFunction(out *DB) func(Entry, valuePointer) error {
type txnEntry struct {
nk []byte
v y.ValueStruct
}
var txn []txnEntry
var lastCommit uint64
toLSM := func(nk []byte, vs y.ValueStruct) {
for err := out.ensureRoomForWrite(); err != nil; err = out.ensureRoomForWrite() {
out.elog.Printf("Replay: Making room for writes")
time.Sleep(10 * time.Millisecond)
}
out.mt.Put(nk, vs)
}
first := true
return func(e Entry, vp valuePointer) error { // Function for replaying.
if first {
out.elog.Printf("First key=%s\n", e.Key)
}
first = false
if out.orc.curRead < y.ParseTs(e.Key) {
out.orc.curRead = y.ParseTs(e.Key)
}
nk := make([]byte, len(e.Key))
copy(nk, e.Key)
var nv []byte
meta := e.meta
if out.shouldWriteValueToLSM(e) {
nv = make([]byte, len(e.Value))
copy(nv, e.Value)
} else {
nv = make([]byte, vptrSize)
vp.Encode(nv)
meta = meta | bitValuePointer
}
v := y.ValueStruct{
Value: nv,
Meta: meta,
UserMeta: e.UserMeta,
}
if e.meta&bitFinTxn > 0 {
txnTs, err := strconv.ParseUint(string(e.Value), 10, 64)
if err != nil {
return errors.Wrapf(err, "Unable to parse txn fin: %q", e.Value)
}
y.AssertTrue(lastCommit == txnTs)
y.AssertTrue(len(txn) > 0)
// Got the end of txn. Now we can store them.
for _, t := range txn {
toLSM(t.nk, t.v)
}
txn = txn[:0]
lastCommit = 0
} else if e.meta&bitTxn == 0 {
// This entry is from a rewrite.
toLSM(nk, v)
// We shouldn't get this entry in the middle of a transaction.
y.AssertTrue(lastCommit == 0)
y.AssertTrue(len(txn) == 0)
} else {
txnTs := y.ParseTs(nk)
if lastCommit == 0 {
lastCommit = txnTs
}
y.AssertTrue(lastCommit == txnTs)
te := txnEntry{nk: nk, v: v}
txn = append(txn, te)
}
return nil
}
}
// Open returns a new DB object.
func Open(opt Options) (db *DB, err error) {
opt.maxBatchSize = (15 * opt.MaxTableSize) / 100
opt.maxBatchCount = opt.maxBatchSize / int64(skl.MaxNodeSize)
if opt.ReadOnly {
// Can't truncate if the DB is read only.
opt.Truncate = false
}
for _, path := range []string{opt.Dir, opt.ValueDir} {
dirExists, err := exists(path)
if err != nil {
return nil, y.Wrapf(err, "Invalid Dir: %q", path)
}
if !dirExists {
if opt.ReadOnly {
return nil, y.Wrapf(err, "Cannot find Dir for read-only open: %q", path)
}
// Try to create the directory
err = os.Mkdir(path, 0700)
if err != nil {
return nil, y.Wrapf(err, "Error Creating Dir: %q", path)
}
}
}
absDir, err := filepath.Abs(opt.Dir)
if err != nil {
return nil, err
}
absValueDir, err := filepath.Abs(opt.ValueDir)
if err != nil {
return nil, err
}
var dirLockGuard, valueDirLockGuard *directoryLockGuard
dirLockGuard, err = acquireDirectoryLock(opt.Dir, lockFile, opt.ReadOnly)
if err != nil {
return nil, err
}
defer func() {
if dirLockGuard != nil {
_ = dirLockGuard.release()
}
}()
if absValueDir != absDir {
valueDirLockGuard, err = acquireDirectoryLock(opt.ValueDir, lockFile, opt.ReadOnly)
if err != nil {
return nil, err
}
}
defer func() {
if valueDirLockGuard != nil {
_ = valueDirLockGuard.release()
}
}()
if !(opt.ValueLogFileSize <= 2<<30 && opt.ValueLogFileSize >= 1<<20) {
return nil, ErrValueLogSize
}
if !(opt.ValueLogLoadingMode == options.FileIO ||
opt.ValueLogLoadingMode == options.MemoryMap) {
return nil, ErrInvalidLoadingMode
}
manifestFile, manifest, err := openOrCreateManifestFile(opt.Dir, opt.ReadOnly)
if err != nil {
return nil, err
}
defer func() {
if manifestFile != nil {
_ = manifestFile.close()
}
}()
orc := &oracle{
isManaged: opt.managedTxns,
nextCommit: 1,
commits: make(map[uint64]uint64),
}
db = &DB{
imm: make([]*skl.Skiplist, 0, opt.NumMemtables),
flushChan: make(chan flushTask, opt.NumMemtables),
writeCh: make(chan *request, kvWriteChCapacity),
opt: opt,
manifest: manifestFile,
elog: trace.NewEventLog("Badger", "DB"),
dirLockGuard: dirLockGuard,
valueDirGuard: valueDirLockGuard,
orc: orc,
}
// Calculate initial size.
db.calculateSize()
db.closers.updateSize = y.NewCloser(1)
go db.updateSize(db.closers.updateSize)
db.mt = skl.NewSkiplist(arenaSize(opt))
// newLevelsController potentially loads files in directory.
if db.lc, err = newLevelsController(db, &manifest); err != nil {
return nil, err
}
if !opt.ReadOnly {
db.closers.compactors = y.NewCloser(1)
db.lc.startCompact(db.closers.compactors)
db.closers.memtable = y.NewCloser(1)
go db.flushMemtable(db.closers.memtable) // Need levels controller to be up.
}
if err = db.vlog.Open(db, opt); err != nil {
return nil, err
}
headKey := y.KeyWithTs(head, math.MaxUint64)
// Need to pass with timestamp, lsm get removes the last 8 bytes and compares key
vs, err := db.get(headKey)
if err != nil {
return nil, errors.Wrap(err, "Retrieving head")
}
db.orc.curRead = vs.Version
var vptr valuePointer
if len(vs.Value) > 0 {
vptr.Decode(vs.Value)
}
// lastUsedCasCounter will either be the value stored in !badger!head, or some subsequently
// written value log entry that we replay. (Subsequent value log entries might be _less_
// than lastUsedCasCounter, if there was value log gc so we have to max() values while
// replaying.)
// out.lastUsedCasCounter = item.casCounter
// TODO: Figure this out. This would update the read timestamp, and set nextCommitTs.
replayCloser := y.NewCloser(1)
go db.doWrites(replayCloser)
if err = db.vlog.Replay(vptr, replayFunction(db)); err != nil {
return db, err
}
replayCloser.SignalAndWait() // Wait for replay to be applied first.
// Now that we have the curRead, we can update the nextCommit.
db.orc.nextCommit = db.orc.curRead + 1
// Mmap writable log
lf := db.vlog.filesMap[db.vlog.maxFid]
if err = lf.mmap(2 * db.vlog.opt.ValueLogFileSize); err != nil {
return db, errors.Wrapf(err, "Unable to mmap RDWR log file")
}
db.writeCh = make(chan *request, kvWriteChCapacity)
db.closers.writes = y.NewCloser(1)
go db.doWrites(db.closers.writes)
db.closers.valueGC = y.NewCloser(1)
go db.vlog.waitOnGC(db.closers.valueGC)
valueDirLockGuard = nil
dirLockGuard = nil
manifestFile = nil
return db, nil
}
// Close closes a DB. It's crucial to call it to ensure all the pending updates
// make their way to disk. Calling DB.Close() multiple times is not safe and would
// cause panic.
func (db *DB) Close() (err error) {
db.elog.Printf("Closing database")
// Stop value GC first.
db.closers.valueGC.SignalAndWait()
// Stop writes next.
db.closers.writes.SignalAndWait()
// Now close the value log.
if vlogErr := db.vlog.Close(); err == nil {
err = errors.Wrap(vlogErr, "DB.Close")
}
// Make sure that block writer is done pushing stuff into memtable!
// Otherwise, you will have a race condition: we are trying to flush memtables
// and remove them completely, while the block / memtable writer is still
// trying to push stuff into the memtable. This will also resolve the value
// offset problem: as we push into memtable, we update value offsets there.
if !db.mt.Empty() {
db.elog.Printf("Flushing memtable")
for {
pushedFlushTask := func() bool {
db.Lock()
defer db.Unlock()
y.AssertTrue(db.mt != nil)
select {
case db.flushChan <- flushTask{db.mt, db.vptr}:
db.imm = append(db.imm, db.mt) // Flusher will attempt to remove this from s.imm.
db.mt = nil // Will segfault if we try writing!
db.elog.Printf("pushed to flush chan\n")
return true
default:
// If we fail to push, we need to unlock and wait for a short while.
// The flushing operation needs to update s.imm. Otherwise, we have a deadlock.
// TODO: Think about how to do this more cleanly, maybe without any locks.
}
return false
}()
if pushedFlushTask {
break
}
time.Sleep(10 * time.Millisecond)
}
}
db.flushChan <- flushTask{nil, valuePointer{}} // Tell flusher to quit.
if db.closers.memtable != nil {
db.closers.memtable.Wait()
db.elog.Printf("Memtable flushed")
}
if db.closers.compactors != nil {
db.closers.compactors.SignalAndWait()
db.elog.Printf("Compaction finished")
}
// Force Compact L0
// We don't need to care about cstatus since no parallel compaction is running.
cd := compactDef{
elog: trace.New("Badger", "Compact"),
thisLevel: db.lc.levels[0],
nextLevel: db.lc.levels[1],
}
cd.elog.SetMaxEvents(100)
defer cd.elog.Finish()
if db.lc.fillTablesL0(&cd) {
if err := db.lc.runCompactDef(0, cd); err != nil {
cd.elog.LazyPrintf("\tLOG Compact FAILED with error: %+v: %+v", err, cd)
}
} else {
cd.elog.LazyPrintf("fillTables failed for level zero. No compaction required")
}
if lcErr := db.lc.close(); err == nil {
err = errors.Wrap(lcErr, "DB.Close")
}
db.elog.Printf("Waiting for closer")
db.closers.updateSize.SignalAndWait()
db.elog.Finish()
if db.dirLockGuard != nil {
if guardErr := db.dirLockGuard.release(); err == nil {
err = errors.Wrap(guardErr, "DB.Close")
}
}
if db.valueDirGuard != nil {
if guardErr := db.valueDirGuard.release(); err == nil {
err = errors.Wrap(guardErr, "DB.Close")
}
}
if manifestErr := db.manifest.close(); err == nil {
err = errors.Wrap(manifestErr, "DB.Close")
}
// Fsync directories to ensure that lock file, and any other removed files whose directory
// we haven't specifically fsynced, are guaranteed to have their directory entry removal
// persisted to disk.
if syncErr := syncDir(db.opt.Dir); err == nil {
err = errors.Wrap(syncErr, "DB.Close")
}
if syncErr := syncDir(db.opt.ValueDir); err == nil {
err = errors.Wrap(syncErr, "DB.Close")
}
return err
}
const (
lockFile = "LOCK"
)
// When you create or delete a file, you have to ensure the directory entry for the file is synced
// in order to guarantee the file is visible (if the system crashes). (See the man page for fsync,
// or see https://github.com/coreos/etcd/issues/6368 for an example.)
func syncDir(dir string) error {
f, err := openDir(dir)
if err != nil {
return errors.Wrapf(err, "While opening directory: %s.", dir)
}
err = f.Sync()
closeErr := f.Close()
if err != nil {
return errors.Wrapf(err, "While syncing directory: %s.", dir)
}
return errors.Wrapf(closeErr, "While closing directory: %s.", dir)
}
// getMemtables returns the current memtables and get references.
func (db *DB) getMemTables() ([]*skl.Skiplist, func()) {
db.RLock()
defer db.RUnlock()
tables := make([]*skl.Skiplist, len(db.imm)+1)
// Get mutable memtable.
tables[0] = db.mt
tables[0].IncrRef()
// Get immutable memtables.
last := len(db.imm) - 1
for i := range db.imm {
tables[i+1] = db.imm[last-i]
tables[i+1].IncrRef()
}
return tables, func() {
for _, tbl := range tables {
tbl.DecrRef()
}
}
}
// get returns the value in memtable or disk for given key.
// Note that value will include meta byte.
func (db *DB) get(key []byte) (y.ValueStruct, error) {
tables, decr := db.getMemTables() // Lock should be released.
defer decr()
y.NumGets.Add(1)
version := y.ParseTs(key)
var maxVs y.ValueStruct
// Need to search for values in all tables, with managed db
// latest value needn't be present in the latest table.
// Even without managed db, purging can cause this constraint
// to be violated.
// Search until required version is found or iterate over all
// tables and return max version.
for i := 0; i < len(tables); i++ {
vs := tables[i].Get(key)
y.NumMemtableGets.Add(1)
if vs.Meta == 0 && vs.Value == nil {
continue
}
if vs.Version == version {
return vs, nil
}
if maxVs.Version < vs.Version {
maxVs = vs
}
}
return db.lc.get(key, maxVs)
}
func (db *DB) updateOffset(ptrs []valuePointer) {
var ptr valuePointer
for i := len(ptrs) - 1; i >= 0; i-- {
p := ptrs[i]
if !p.IsZero() {
ptr = p
break
}
}
if ptr.IsZero() {
return
}
db.Lock()
defer db.Unlock()
y.AssertTrue(!ptr.Less(db.vptr))
db.vptr = ptr
}
var requestPool = sync.Pool{
New: func() interface{} {
return new(request)
},
}
func (db *DB) shouldWriteValueToLSM(e Entry) bool {
return len(e.Value) < db.opt.ValueThreshold
}
func (db *DB) writeToLSM(b *request) error {
if len(b.Ptrs) != len(b.Entries) {
return errors.Errorf("Ptrs and Entries don't match: %+v", b)
}
for i, entry := range b.Entries {
if entry.meta&bitFinTxn != 0 {
continue
}
if db.shouldWriteValueToLSM(*entry) { // Will include deletion / tombstone case.
db.mt.Put(entry.Key,
y.ValueStruct{
Value: entry.Value,
Meta: entry.meta,
UserMeta: entry.UserMeta,
ExpiresAt: entry.ExpiresAt,
})
} else {
var offsetBuf [vptrSize]byte
db.mt.Put(entry.Key,
y.ValueStruct{
Value: b.Ptrs[i].Encode(offsetBuf[:]),
Meta: entry.meta | bitValuePointer,
UserMeta: entry.UserMeta,
ExpiresAt: entry.ExpiresAt,
})
}
}
return nil
}
// writeRequests is called serially by only one goroutine.
func (db *DB) writeRequests(reqs []*request) error {
if len(reqs) == 0 {
return nil
}
done := func(err error) {
for _, r := range reqs {
r.Err = err
r.Wg.Done()
}
}
db.elog.Printf("writeRequests called. Writing to value log")
err := db.vlog.write(reqs)
if err != nil {
done(err)
return err
}
db.elog.Printf("Writing to memtable")
var count int
for _, b := range reqs {
if len(b.Entries) == 0 {
continue
}
count += len(b.Entries)
var i uint64
for err := db.ensureRoomForWrite(); err != nil; err = db.ensureRoomForWrite() {
i++
if i%100 == 0 {
db.elog.Printf("Making room for writes")
}
// We need to poll a bit because both hasRoomForWrite and the flusher need access to s.imm.
// When flushChan is full and you are blocked there, and the flusher is trying to update s.imm,
// you will get a deadlock.
time.Sleep(10 * time.Millisecond)
}
if err != nil {
done(err)
return errors.Wrap(err, "writeRequests")
}
if err := db.writeToLSM(b); err != nil {
done(err)
return errors.Wrap(err, "writeRequests")
}
db.updateOffset(b.Ptrs)
}
done(nil)
db.elog.Printf("%d entries written", count)
return nil
}
func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) {
var count, size int64
for _, e := range entries {
size += int64(e.estimateSize(db.opt.ValueThreshold))
count++
}
if count >= db.opt.maxBatchCount || size >= db.opt.maxBatchSize {
return nil, ErrTxnTooBig
}
// We can only service one request because we need each txn to be stored in a contigous section.
// Txns should not interleave among other txns or rewrites.
req := requestPool.Get().(*request)
req.Entries = entries
req.Wg = sync.WaitGroup{}
req.Wg.Add(1)
db.writeCh <- req // Handled in doWrites.
y.NumPuts.Add(int64(len(entries)))
return req, nil
}
func (db *DB) doWrites(lc *y.Closer) {
defer lc.Done()
pendingCh := make(chan struct{}, 1)
writeRequests := func(reqs []*request) {
if err := db.writeRequests(reqs); err != nil {
log.Printf("ERROR in Badger::writeRequests: %v", err)
}
<-pendingCh
}
// This variable tracks the number of pending writes.
reqLen := new(expvar.Int)
y.PendingWrites.Set(db.opt.Dir, reqLen)
reqs := make([]*request, 0, 10)
for {
var r *request
select {
case r = <-db.writeCh:
case <-lc.HasBeenClosed():
goto closedCase
}
for {
reqs = append(reqs, r)
reqLen.Set(int64(len(reqs)))
if len(reqs) >= 3*kvWriteChCapacity {
pendingCh <- struct{}{} // blocking.
goto writeCase
}
select {
// Either push to pending, or continue to pick from writeCh.
case r = <-db.writeCh:
case pendingCh <- struct{}{}:
goto writeCase
case <-lc.HasBeenClosed():
goto closedCase
}
}
closedCase:
close(db.writeCh)
for r := range db.writeCh { // Flush the channel.
reqs = append(reqs, r)
}
pendingCh <- struct{}{} // Push to pending before doing a write.
writeRequests(reqs)
return
writeCase:
go writeRequests(reqs)
reqs = make([]*request, 0, 10)
reqLen.Set(0)
}
}
// batchSet applies a list of badger.Entry. If a request level error occurs it
// will be returned.
// Check(kv.BatchSet(entries))
func (db *DB) batchSet(entries []*Entry) error {
req, err := db.sendToWriteCh(entries)
if err != nil {
return err
}
return req.Wait()
}
// batchSetAsync is the asynchronous version of batchSet. It accepts a callback
// function which is called when all the sets are complete. If a request level
// error occurs, it will be passed back via the callback.
// err := kv.BatchSetAsync(entries, func(err error)) {
// Check(err)
// }
func (db *DB) batchSetAsync(entries []*Entry, f func(error)) error {
req, err := db.sendToWriteCh(entries)
if err != nil {
return err
}
go func() {
err := req.Wait()
// Write is complete. Let's call the callback function now.
f(err)
}()
return nil
}
var errNoRoom = errors.New("No room for write")
// ensureRoomForWrite is always called serially.
func (db *DB) ensureRoomForWrite() error {
var err error
db.Lock()
defer db.Unlock()
if db.mt.MemSize() < db.opt.MaxTableSize {
return nil
}
y.AssertTrue(db.mt != nil) // A nil mt indicates that DB is being closed.
select {
case db.flushChan <- flushTask{db.mt, db.vptr}:
db.elog.Printf("Flushing value log to disk if async mode.")
// Ensure value log is synced to disk so this memtable's contents wouldn't be lost.
err = db.vlog.sync()
if err != nil {
return err
}
db.elog.Printf("Flushing memtable, mt.size=%d size of flushChan: %d\n",
db.mt.MemSize(), len(db.flushChan))
// We manage to push this task. Let's modify imm.
db.imm = append(db.imm, db.mt)
db.mt = skl.NewSkiplist(arenaSize(db.opt))
// New memtable is empty. We certainly have room.
return nil
default:
// We need to do this to unlock and allow the flusher to modify imm.
return errNoRoom
}
}
func arenaSize(opt Options) int64 {
return opt.MaxTableSize + opt.maxBatchSize + opt.maxBatchCount*int64(skl.MaxNodeSize)
}
// WriteLevel0Table flushes memtable. It drops deleteValues.
func writeLevel0Table(s *skl.Skiplist, f *os.File) error {
iter := s.NewIterator()
defer iter.Close()
b := table.NewTableBuilder()
defer b.Close()
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
if err := b.Add(iter.Key(), iter.Value()); err != nil {
return err
}
}
_, err := f.Write(b.Finish())
return err
}
type flushTask struct {
mt *skl.Skiplist
vptr valuePointer
}
// TODO: Ensure that this function doesn't return, or is handled by another wrapper function.
// Otherwise, we would have no goroutine which can flush memtables.
func (db *DB) flushMemtable(lc *y.Closer) error {
defer lc.Done()
for ft := range db.flushChan {
if ft.mt == nil {
return nil
}
if !ft.mt.Empty() {
// Store badger head even if vptr is zero, need it for readTs
db.elog.Printf("Storing offset: %+v\n", ft.vptr)
offset := make([]byte, vptrSize)
ft.vptr.Encode(offset)
// Pick the max commit ts, so in case of crash, our read ts would be higher than all the
// commits.
headTs := y.KeyWithTs(head, db.orc.commitTs())
ft.mt.Put(headTs, y.ValueStruct{Value: offset})
}
fileID := db.lc.reserveFileID()
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, db.opt.Dir), true)
if err != nil {
return y.Wrap(err)
}
// Don't block just to sync the directory entry.
dirSyncCh := make(chan error)
go func() { dirSyncCh <- syncDir(db.opt.Dir) }()
err = writeLevel0Table(ft.mt, fd)
dirSyncErr := <-dirSyncCh
if err != nil {
db.elog.Errorf("ERROR while writing to level 0: %v", err)
return err
}
if dirSyncErr != nil {
db.elog.Errorf("ERROR while syncing level directory: %v", dirSyncErr)
return err
}
tbl, err := table.OpenTable(fd, db.opt.TableLoadingMode)
if err != nil {
db.elog.Printf("ERROR while opening table: %v", err)
return err
}
// We own a ref on tbl.
err = db.lc.addLevel0Table(tbl) // This will incrRef (if we don't error, sure)
tbl.DecrRef() // Releases our ref.
if err != nil {
return err
}
// Update s.imm. Need a lock.
db.Lock()
y.AssertTrue(ft.mt == db.imm[0]) //For now, single threaded.
db.imm = db.imm[1:]
ft.mt.DecrRef() // Return memory.
db.Unlock()
}
return nil
}
func exists(path string) (bool, error) {
_, err := os.Stat(path)
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return true, err
}
// This function does a filewalk, calculates the size of vlog and sst files and stores it in
// y.LSMSize and y.VlogSize.
func (db *DB) calculateSize() {
newInt := func(val int64) *expvar.Int {
v := new(expvar.Int)
v.Add(val)
return v
}
totalSize := func(dir string) (int64, int64) {
var lsmSize, vlogSize int64
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
ext := filepath.Ext(path)
if ext == ".sst" {
lsmSize += info.Size()
} else if ext == ".vlog" {
vlogSize += info.Size()
}
return nil
})
if err != nil {
db.elog.Printf("Got error while calculating total size of directory: %s", dir)
}
return lsmSize, vlogSize
}
lsmSize, vlogSize := totalSize(db.opt.Dir)
y.LSMSize.Set(db.opt.Dir, newInt(lsmSize))
// If valueDir is different from dir, we'd have to do another walk.
if db.opt.ValueDir != db.opt.Dir {
_, vlogSize = totalSize(db.opt.ValueDir)
}
y.VlogSize.Set(db.opt.Dir, newInt(vlogSize))
}
func (db *DB) updateSize(lc *y.Closer) {
defer lc.Done()
metricsTicker := time.NewTicker(time.Minute)
defer metricsTicker.Stop()
for {
select {
case <-metricsTicker.C:
db.calculateSize()
case <-lc.HasBeenClosed():
return
}
}
}
// PurgeVersionsBelow will delete all versions of a key below the specified version
func (db *DB) PurgeVersionsBelow(key []byte, ts uint64) error {
txn := db.NewTransaction(false)
defer txn.Discard()
return db.purgeVersionsBelow(txn, key, ts)
}
func (db *DB) purgeVersionsBelow(txn *Txn, key []byte, ts uint64) error {
opts := DefaultIteratorOptions
opts.AllVersions = true
opts.PrefetchValues = false
it := txn.NewIterator(opts)
defer it.Close()
var entries []*Entry
for it.Seek(key); it.ValidForPrefix(key); it.Next() {
item := it.Item()
if !bytes.Equal(key, item.Key()) || item.Version() >= ts {
continue
}
if isDeletedOrExpired(item.meta, item.ExpiresAt()) {
continue
}
// Found an older version. Mark for deletion
entries = append(entries,
&Entry{
Key: y.KeyWithTs(key, item.version),
meta: bitDelete,
})
db.vlog.updateGCStats(item)
}
return db.batchSet(entries)
}
// PurgeOlderVersions deletes older versions of all keys.
//
// This function could be called prior to doing garbage collection to clean up
// older versions that are no longer needed. The caller must make sure that
// there are no long-running read transactions running before this function is
// called, otherwise they will not work as expected.
func (db *DB) PurgeOlderVersions() error {
return db.View(func(txn *Txn) error {
opts := DefaultIteratorOptions
opts.AllVersions = true
opts.PrefetchValues = false
it := txn.NewIterator(opts)
defer it.Close()
var entries []*Entry
var lastKey []byte
var count, size int
var wg sync.WaitGroup
errChan := make(chan error, 1)
// func to check for pending error before sending off a batch for writing
batchSetAsyncIfNoErr := func(entries []*Entry) error {
select {
case err := <-errChan:
return err
default:
wg.Add(1)
return txn.db.batchSetAsync(entries, func(err error) {
defer wg.Done()
if err != nil {
select {
case errChan <- err:
default:
}
}
})
}
}
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
if !bytes.Equal(lastKey, item.Key()) {
lastKey = y.SafeCopy(lastKey, item.Key())
continue
}
if isDeletedOrExpired(item.meta, item.ExpiresAt()) {
continue
}
// Found an older version. Mark for deletion
e := &Entry{
Key: y.KeyWithTs(lastKey, item.version),
meta: bitDelete,
}
db.vlog.updateGCStats(item)
curSize := e.estimateSize(db.opt.ValueThreshold)
// Batch up min(1000, maxBatchCount) entries at a time and write
// Ensure that total batch size doesn't exceed maxBatchSize
if count == 1000 || count+1 >= int(db.opt.maxBatchCount) ||
size+curSize >= int(db.opt.maxBatchSize) {
if err := batchSetAsyncIfNoErr(entries); err != nil {
return err
}
count = 0
size = 0
entries = []*Entry{}
}
size += curSize
count++
entries = append(entries, e)
}
// Write last batch pending deletes
if count > 0 {
if err := batchSetAsyncIfNoErr(entries); err != nil {
return err
}
}
wg.Wait()
select {
case err := <-errChan:
return err
default:
return nil
}
})
}
// RunValueLogGC triggers a value log garbage collection.
//
// It picks value log files to perform GC based on statistics that are collected
// duing the session, when DB.PurgeOlderVersions() and DB.PurgeVersions() is
// called. If no such statistics are available, then log files are picked in
// random order. The process stops as soon as the first log file is encountered
// which does not result in garbage collection.
//
// When a log file is picked, it is first sampled If the sample shows that we
// can discard at least discardRatio space of that file, it would be rewritten.
//
// If a call to RunValueLogGC results in no rewrites, then an ErrNoRewrite is
// thrown indicating that the call resulted in no file rewrites.
//
// We recommend setting discardRatio to 0.5, thus indicating that a file be
// rewritten if half the space can be discarded. This results in a lifetime
// value log write amplification of 2 (1 from original write + 0.5 rewrite +
// 0.25 + 0.125 + ... = 2). Setting it to higher value would result in fewer
// space reclaims, while setting it to a lower value would result in more space
// reclaims at the cost of increased activity on the LSM tree. discardRatio
// must be in the range (0.0, 1.0), both endpoints excluded, otherwise an
// ErrInvalidRequest is returned.
//
// Only one GC is allowed at a time. If another value log GC is running, or DB
// has been closed, this would return an ErrRejected.
//
// Note: Every time GC is run, it would produce a spike of activity on the LSM
// tree.
func (db *DB) RunValueLogGC(discardRatio float64) error {
if discardRatio >= 1.0 || discardRatio <= 0.0 {
return ErrInvalidRequest
}
// Find head on disk
headKey := y.KeyWithTs(head, math.MaxUint64)
// Need to pass with timestamp, lsm get removes the last 8 bytes and compares key
var maxVs y.ValueStruct
val, err := db.lc.get(headKey, maxVs)
if err != nil {
return errors.Wrap(err, "Retrieving head from on-disk LSM")
}
var head valuePointer
if len(val.Value) > 0 {
head.Decode(val.Value)
}
// Pick a log file and run GC
return db.vlog.runGC(discardRatio, head)
}
// Size returns the size of lsm and value log files in bytes. It can be used to decide how often to
// call RunValueLogGC.
func (db *DB) Size() (lsm int64, vlog int64) {
if y.LSMSize.Get(db.opt.Dir) == nil {
lsm, vlog = 0, 0
return
}
lsm = y.LSMSize.Get(db.opt.Dir).(*expvar.Int).Value()
vlog = y.VlogSize.Get(db.opt.Dir).(*expvar.Int).Value()
return
}
// Sequence represents a Badger sequence.
type Sequence struct {
sync.Mutex
db *DB
key []byte
next uint64
leased uint64
bandwidth uint64
}
// Next would return the next integer in the sequence, updating the lease by running a transaction
// if needed.
func (seq *Sequence) Next() (uint64, error) {
seq.Lock()
defer seq.Unlock()
if seq.next >= seq.leased {
if err := seq.updateLease(); err != nil {
return 0, err
}
}
val := seq.next
seq.next++
return val, nil
}
// Release the leased sequence to avoid wasted integers. This should be done right
// before closing the associated DB. However it is valid to use the sequence after
// it was released, causing a new lease with full bandwidth.
func (seq *Sequence) Release() error {
seq.Lock()
defer seq.Unlock()
err := seq.db.Update(func(txn *Txn) error {
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], seq.next)
return txn.Set(seq.key, buf[:])
})
if err != nil {
return err
}
seq.leased = seq.next
return nil
}
func (seq *Sequence) updateLease() error {
return seq.db.Update(func(txn *Txn) error {
item, err := txn.Get(seq.key)
if err == ErrKeyNotFound {
seq.next = 0
} else if err != nil {
return err
} else {
val, err := item.Value()
if err != nil {
return err
}
num := binary.BigEndian.Uint64(val)
seq.next = num
}
lease := seq.next + seq.bandwidth
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], lease)
if err = txn.Set(seq.key, buf[:]); err != nil {
return err
}
seq.leased = lease
return nil
})
}
// GetSequence would initiate a new sequence object, generating it from the stored lease, if
// available, in the database. Sequence can be used to get a list of monotonically increasing
// integers. Multiple sequences can be created by providing different keys. Bandwidth sets the
// size of the lease, determining how many Next() requests can be served from memory.
func (db *DB) GetSequence(key []byte, bandwidth uint64) (*Sequence, error) {
switch {
case len(key) == 0:
return nil, ErrEmptyKey
case bandwidth == 0:
return nil, ErrZeroBandwidth
}
seq := &Sequence{
db: db,
key: key,
next: 0,
leased: 0,
bandwidth: bandwidth,
}
err := seq.updateLease()
return seq, err
}
// MergeOperator represents a Badger merge operator.
type MergeOperator struct {
sync.RWMutex
f MergeFunc
db *DB
key []byte
skipAtOrBelow uint64
closer *y.Closer
}
// MergeFunc accepts two byte slices, one representing an existing value, and
// another representing a new value that needs to be merged into it. MergeFunc
// contains the logic to perform the merge and return an updated value.
// MergeFunc could perform operations like integer addition, list appends etc.
// Note that the ordering of the operands is unspecified, so the merge func
// should either be agnostic to ordering or do additional handling if ordering
// is required.
type MergeFunc func(existing, val []byte) []byte
// GetMergeOperator creates a new MergeOperator for a given key and returns a
// pointer to it. It also fires off a goroutine that performs a compaction using
// the merge function that runs periodically, as specified by dur.
func (db *DB) GetMergeOperator(key []byte,
f MergeFunc, dur time.Duration) *MergeOperator {
op := &MergeOperator{
f: f,
db: db,
key: key,
closer: y.NewCloser(1),
}
go op.runCompactions(dur)
return op
}
func (op *MergeOperator) iterateAndMerge(txn *Txn) (maxVersion uint64, val []byte, err error) {
opt := DefaultIteratorOptions
opt.AllVersions = true
it := txn.NewIterator(opt)
var first bool
for it.Rewind(); it.ValidForPrefix(op.key); it.Next() {
item := it.Item()
if item.Version() <= op.skipAtOrBelow {
continue
}
if item.Version() > maxVersion {
maxVersion = item.Version()
}
if !first {
first = true
val, err = item.ValueCopy(val)
if err != nil {
return 0, nil, err
}
} else {
newVal, err := item.Value()
if err != nil {
return 0, nil, err
}
val = op.f(val, newVal)
}
}
if !first {
return 0, nil, ErrKeyNotFound
}
return maxVersion, val, nil
}
func (op *MergeOperator) compact() error {
op.Lock()
defer op.Unlock()
var maxVersion uint64
err := op.db.Update(func(txn *Txn) error {
var (
val []byte
err error
)
maxVersion, val, err = op.iterateAndMerge(txn)
if err != nil {
return err
}
// Write value back to db
if maxVersion > op.skipAtOrBelow {
if err := txn.Set(op.key, val); err != nil {
return err
}
}
return nil
})
if err != nil && err != ErrKeyNotFound { // Ignore ErrKeyNotFound errors during compaction
return err
}
// Update version
op.skipAtOrBelow = maxVersion
return nil
}
func (op *MergeOperator) runCompactions(dur time.Duration) {
ticker := time.NewTicker(dur)
defer op.closer.Done()
var stop bool
for {
select {
case <-op.closer.HasBeenClosed():
stop = true
case <-ticker.C: // wait for tick
}
oldSkipVersion := op.skipAtOrBelow
if err := op.compact(); err != nil {
log.Printf("Error while running merge operation: %s", err)
}
// Purge older versions if version has updated
if op.skipAtOrBelow > oldSkipVersion {
if err := op.db.PurgeVersionsBelow(op.key, op.skipAtOrBelow+1); err != nil {
log.Printf("Error purging merged keys: %s", err)
}
}
if stop {
ticker.Stop()
break
}
}
}
// Add records a value in Badger which will eventually be merged by a background
// routine into the values that were recorded by previous invocations to Add().
func (op *MergeOperator) Add(val []byte) error {
return op.db.Update(func(txn *Txn) error {
return txn.Set(op.key, val)
})
}
// Get returns the latest value for the merge operator, which is derived by
// applying the merge function to all the values added so far.
//
// If Add has not been called even once, Get will return ErrKeyNotFound
func (op *MergeOperator) Get() ([]byte, error) {
op.RLock()
defer op.RUnlock()
var existing []byte
err := op.db.View(func(txn *Txn) (err error) {
_, existing, err = op.iterateAndMerge(txn)
return err
})
return existing, err
}
// Stop waits for any pending merge to complete and then stops the background
// goroutine.
func (op *MergeOperator) Stop() {
op.closer.SignalAndWait()
}