gorush/vendor/github.com/dgraph-io/badger/value.go

1115 lines
27 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
* Copyright 2017 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package badger
import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"log"
"math"
"math/rand"
"os"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/dgraph-io/badger/options"
"github.com/dgraph-io/badger/y"
"github.com/pkg/errors"
"golang.org/x/net/trace"
)
// Values have their first byte being byteData or byteDelete. This helps us distinguish between
// a key that has never been seen and a key that has been explicitly deleted.
const (
bitDelete byte = 1 << 0 // Set if the key has been deleted.
bitValuePointer byte = 1 << 1 // Set if the value is NOT stored directly next to key.
// The MSB 2 bits are for transactions.
bitTxn byte = 1 << 6 // Set if the entry is part of a txn.
bitFinTxn byte = 1 << 7 // Set if the entry is to indicate end of txn in value log.
mi int64 = 1 << 20
)
type logFile struct {
path string
// This is a lock on the log file. It guards the fds value, the files
// existence and the files memory map.
//
// Use shared ownership when reading/writing the file or memory map, use
// exclusive ownership to open/close the descriptor, unmap or remove the file.
lock sync.RWMutex
fd *os.File
fid uint32
fmap []byte
size uint32
loadingMode options.FileLoadingMode
}
// openReadOnly assumes that we have a write lock on logFile.
func (lf *logFile) openReadOnly() error {
var err error
lf.fd, err = os.OpenFile(lf.path, os.O_RDONLY, 0666)
if err != nil {
return errors.Wrapf(err, "Unable to open %q as RDONLY.", lf.path)
}
fi, err := lf.fd.Stat()
if err != nil {
return errors.Wrapf(err, "Unable to check stat for %q", lf.path)
}
lf.size = uint32(fi.Size())
if err = lf.mmap(fi.Size()); err != nil {
_ = lf.fd.Close()
return y.Wrapf(err, "Unable to map file")
}
return nil
}
func (lf *logFile) mmap(size int64) (err error) {
if lf.loadingMode != options.MemoryMap {
// Nothing to do
return nil
}
lf.fmap, err = y.Mmap(lf.fd, false, size)
if err == nil {
err = y.Madvise(lf.fmap, false) // Disable readahead
}
return err
}
func (lf *logFile) munmap() (err error) {
if lf.loadingMode != options.MemoryMap {
// Nothing to do
return nil
}
if err := y.Munmap(lf.fmap); err != nil {
return errors.Wrapf(err, "Unable to munmap value log: %q", lf.path)
}
return nil
}
// Acquire lock on mmap/file if you are calling this
func (lf *logFile) read(p valuePointer, s *y.Slice) (buf []byte, err error) {
var nbr int64
offset := p.Offset
if lf.loadingMode == options.FileIO {
buf = s.Resize(int(p.Len))
var n int
n, err = lf.fd.ReadAt(buf, int64(offset))
nbr = int64(n)
} else {
size := uint32(len(lf.fmap))
valsz := p.Len
if offset >= size || offset+valsz > size {
err = y.ErrEOF
} else {
buf = lf.fmap[offset : offset+valsz]
nbr = int64(valsz)
}
}
y.NumReads.Add(1)
y.NumBytesRead.Add(nbr)
return buf, err
}
func (lf *logFile) doneWriting(offset uint32) error {
// Sync before acquiring lock. (We call this from write() and thus know we have shared access
// to the fd.)
if err := lf.fd.Sync(); err != nil {
return errors.Wrapf(err, "Unable to sync value log: %q", lf.path)
}
// Close and reopen the file read-only. Acquire lock because fd will become invalid for a bit.
// Acquiring the lock is bad because, while we don't hold the lock for a long time, it forces
// one batch of readers wait for the preceding batch of readers to finish.
//
// If there's a benefit to reopening the file read-only, it might be on Windows. I don't know
// what the benefit is. Consider keeping the file read-write, or use fcntl to change
// permissions.
lf.lock.Lock()
defer lf.lock.Unlock()
if err := lf.munmap(); err != nil {
return err
}
// TODO: Confirm if we need to run a file sync after truncation.
// Truncation must run after unmapping, otherwise Windows would crap itself.
if err := lf.fd.Truncate(int64(offset)); err != nil {
return errors.Wrapf(err, "Unable to truncate file: %q", lf.path)
}
if err := lf.fd.Close(); err != nil {
return errors.Wrapf(err, "Unable to close value log: %q", lf.path)
}
return lf.openReadOnly()
}
// You must hold lf.lock to sync()
func (lf *logFile) sync() error {
return lf.fd.Sync()
}
var errStop = errors.New("Stop iteration")
type logEntry func(e Entry, vp valuePointer) error
// iterate iterates over log file. It doesn't not allocate new memory for every kv pair.
// Therefore, the kv pair is only valid for the duration of fn call.
func (vlog *valueLog) iterate(lf *logFile, offset uint32, fn logEntry) error {
_, err := lf.fd.Seek(int64(offset), io.SeekStart)
if err != nil {
return y.Wrap(err)
}
reader := bufio.NewReader(lf.fd)
var hbuf [headerBufSize]byte
var h header
k := make([]byte, 1<<10)
v := make([]byte, 1<<20)
truncate := false
recordOffset := offset
var lastCommit uint64
var validEndOffset uint32
for {
hash := crc32.New(y.CastagnoliCrcTable)
tee := io.TeeReader(reader, hash)
// TODO: Move this entry decode into structs.go
if _, err = io.ReadFull(tee, hbuf[:]); err != nil {
if err == io.EOF {
break
} else if err == io.ErrUnexpectedEOF {
truncate = true
break
}
return err
}
var e Entry
e.offset = recordOffset
h.Decode(hbuf[:])
if h.klen > maxKeySize {
truncate = true
break
}
vl := int(h.vlen)
if cap(v) < vl {
v = make([]byte, 2*vl)
}
kl := int(h.klen)
if cap(k) < kl {
k = make([]byte, 2*kl)
}
e.Key = k[:kl]
e.Value = v[:vl]
if _, err = io.ReadFull(tee, e.Key); err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
truncate = true
break
}
return err
}
if _, err = io.ReadFull(tee, e.Value); err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
truncate = true
break
}
return err
}
var crcBuf [4]byte
if _, err = io.ReadFull(reader, crcBuf[:]); err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
truncate = true
break
}
return err
}
crc := binary.BigEndian.Uint32(crcBuf[:])
if crc != hash.Sum32() {
truncate = true
break
}
e.meta = h.meta
e.UserMeta = h.userMeta
e.ExpiresAt = h.expiresAt
var vp valuePointer
vp.Len = headerBufSize + h.klen + h.vlen + uint32(len(crcBuf))
recordOffset += vp.Len
vp.Offset = e.offset
vp.Fid = lf.fid
if e.meta&bitFinTxn > 0 {
txnTs, err := strconv.ParseUint(string(e.Value), 10, 64)
if err != nil || lastCommit != txnTs {
truncate = true
break
}
// Got the end of txn. Now we can store them.
lastCommit = 0
validEndOffset = recordOffset
} else if e.meta&bitTxn == 0 {
// We shouldn't get this entry in the middle of a transaction.
if lastCommit != 0 {
truncate = true
break
}
validEndOffset = recordOffset
} else {
// TODO: Remove this once we merge v2.0-candidate branch. This shouldn't
// happen in 2.0 because we are no longer moving entries within the value
// logs, everything should be either txn or txnfin.
txnTs := y.ParseTs(e.Key)
if lastCommit == 0 {
lastCommit = txnTs
}
if lastCommit != txnTs {
truncate = true
break
}
}
if vlog.opt.ReadOnly {
return ErrReplayNeeded
}
if err := fn(e, vp); err != nil {
if err == errStop {
break
}
return y.Wrap(err)
}
}
if vlog.opt.Truncate && truncate && len(lf.fmap) == 0 {
// Only truncate if the file isn't mmaped. Otherwise, Windows would puke.
if err := lf.fd.Truncate(int64(validEndOffset)); err != nil {
return err
}
} else if truncate {
return ErrTruncateNeeded
}
return nil
}
func (vlog *valueLog) rewrite(f *logFile) error {
maxFid := atomic.LoadUint32(&vlog.maxFid)
y.AssertTruef(uint32(f.fid) < maxFid, "fid to move: %d. Current max fid: %d", f.fid, maxFid)
elog := trace.NewEventLog("badger", "vlog-rewrite")
defer elog.Finish()
elog.Printf("Rewriting fid: %d", f.fid)
wb := make([]*Entry, 0, 1000)
var size int64
y.AssertTrue(vlog.kv != nil)
var count int
fe := func(e Entry) error {
count++
if count%10000 == 0 {
elog.Printf("Processing entry %d", count)
}
vs, err := vlog.kv.get(e.Key)
if err != nil {
return err
}
if discardEntry(e, vs) {
return nil
}
// Value is still present in value log.
if len(vs.Value) == 0 {
return errors.Errorf("Empty value: %+v", vs)
}
var vp valuePointer
vp.Decode(vs.Value)
if vp.Fid > f.fid {
return nil
}
if vp.Offset > e.offset {
return nil
}
if vp.Fid == f.fid && vp.Offset == e.offset {
// This new entry only contains the key, and a pointer to the value.
ne := new(Entry)
ne.meta = 0 // Remove all bits.
ne.UserMeta = e.UserMeta
ne.Key = make([]byte, len(e.Key))
copy(ne.Key, e.Key)
ne.Value = make([]byte, len(e.Value))
copy(ne.Value, e.Value)
wb = append(wb, ne)
size += int64(e.estimateSize(vlog.opt.ValueThreshold))
if size >= 64*mi {
elog.Printf("request has %d entries, size %d", len(wb), size)
if err := vlog.kv.batchSet(wb); err != nil {
return err
}
size = 0
wb = wb[:0]
}
} else {
log.Printf("WARNING: This entry should have been caught. %+v\n", e)
}
return nil
}
err := vlog.iterate(f, 0, func(e Entry, vp valuePointer) error {
return fe(e)
})
if err != nil {
return err
}
elog.Printf("request has %d entries, size %d", len(wb), size)
batchSize := 1024
var loops int
for i := 0; i < len(wb); {
loops++
if batchSize == 0 {
log.Printf("WARNING: We shouldn't reach batch size of zero.")
return ErrNoRewrite
}
end := i + batchSize
if end > len(wb) {
end = len(wb)
}
if err := vlog.kv.batchSet(wb[i:end]); err != nil {
if err == ErrTxnTooBig {
// Decrease the batch size to half.
batchSize = batchSize / 2
elog.Printf("Dropped batch size to %d", batchSize)
continue
}
return err
}
i += batchSize
}
elog.Printf("Processed %d entries in %d loops", len(wb), loops)
elog.Printf("Removing fid: %d", f.fid)
var deleteFileNow bool
// Entries written to LSM. Remove the older file now.
{
vlog.filesLock.Lock()
// Just a sanity-check.
if _, ok := vlog.filesMap[f.fid]; !ok {
vlog.filesLock.Unlock()
return errors.Errorf("Unable to find fid: %d", f.fid)
}
if vlog.numActiveIterators == 0 {
delete(vlog.filesMap, f.fid)
deleteFileNow = true
} else {
vlog.filesToBeDeleted = append(vlog.filesToBeDeleted, f.fid)
}
vlog.filesLock.Unlock()
}
if deleteFileNow {
vlog.deleteLogFile(f)
}
return nil
}
func (vlog *valueLog) incrIteratorCount() {
atomic.AddInt32(&vlog.numActiveIterators, 1)
}
func (vlog *valueLog) decrIteratorCount() error {
num := atomic.AddInt32(&vlog.numActiveIterators, -1)
if num != 0 {
return nil
}
vlog.filesLock.Lock()
lfs := make([]*logFile, 0, len(vlog.filesToBeDeleted))
for _, id := range vlog.filesToBeDeleted {
lfs = append(lfs, vlog.filesMap[id])
delete(vlog.filesMap, id)
}
vlog.filesToBeDeleted = nil
vlog.filesLock.Unlock()
for _, lf := range lfs {
if err := vlog.deleteLogFile(lf); err != nil {
return err
}
}
return nil
}
func (vlog *valueLog) deleteLogFile(lf *logFile) error {
path := vlog.fpath(lf.fid)
if err := lf.munmap(); err != nil {
_ = lf.fd.Close()
return err
}
if err := lf.fd.Close(); err != nil {
return err
}
return os.Remove(path)
}
// lfDiscardStats keeps track of the amount of data that could be discarded for
// a given logfile.
type lfDiscardStats struct {
sync.Mutex
m map[uint32]int64
}
type valueLog struct {
buf bytes.Buffer
dirPath string
elog trace.EventLog
// guards our view of which files exist, which to be deleted, how many active iterators
filesLock sync.RWMutex
filesMap map[uint32]*logFile
filesToBeDeleted []uint32
// A refcount of iterators -- when this hits zero, we can delete the filesToBeDeleted.
numActiveIterators int32
kv *DB
maxFid uint32
writableLogOffset uint32
opt Options
garbageCh chan struct{}
lfDiscardStats *lfDiscardStats
}
func vlogFilePath(dirPath string, fid uint32) string {
return fmt.Sprintf("%s%s%06d.vlog", dirPath, string(os.PathSeparator), fid)
}
func (vlog *valueLog) fpath(fid uint32) string {
return vlogFilePath(vlog.dirPath, fid)
}
func (vlog *valueLog) openOrCreateFiles(readOnly bool) error {
files, err := ioutil.ReadDir(vlog.dirPath)
if err != nil {
return errors.Wrapf(err, "Error while opening value log")
}
found := make(map[uint64]struct{})
var maxFid uint32 // Beware len(files) == 0 case, this starts at 0.
for _, file := range files {
if !strings.HasSuffix(file.Name(), ".vlog") {
continue
}
fsz := len(file.Name())
fid, err := strconv.ParseUint(file.Name()[:fsz-5], 10, 32)
if err != nil {
return errors.Wrapf(err, "Error while parsing value log id for file: %q", file.Name())
}
if _, ok := found[fid]; ok {
return errors.Errorf("Found the same value log file twice: %d", fid)
}
found[fid] = struct{}{}
lf := &logFile{
fid: uint32(fid),
path: vlog.fpath(uint32(fid)),
loadingMode: vlog.opt.ValueLogLoadingMode,
}
vlog.filesMap[uint32(fid)] = lf
if uint32(fid) > maxFid {
maxFid = uint32(fid)
}
}
vlog.maxFid = uint32(maxFid)
// Open all previous log files as read only. Open the last log file
// as read write (unless the DB is read only).
for fid, lf := range vlog.filesMap {
if fid == maxFid {
var flags uint32
if vlog.opt.SyncWrites {
flags |= y.Sync
}
if readOnly {
flags |= y.ReadOnly
}
if lf.fd, err = y.OpenExistingFile(vlog.fpath(fid), flags); err != nil {
return errors.Wrapf(err, "Unable to open value log file")
}
} else {
if err := lf.openReadOnly(); err != nil {
return err
}
}
}
// If no files are found, then create a new file.
if len(vlog.filesMap) == 0 {
// We already set vlog.maxFid above
_, err := vlog.createVlogFile(0)
if err != nil {
return err
}
}
return nil
}
func (vlog *valueLog) createVlogFile(fid uint32) (*logFile, error) {
path := vlog.fpath(fid)
lf := &logFile{fid: fid, path: path, loadingMode: vlog.opt.ValueLogLoadingMode}
vlog.writableLogOffset = 0
var err error
if lf.fd, err = y.CreateSyncedFile(path, vlog.opt.SyncWrites); err != nil {
return nil, errors.Wrapf(err, "Unable to create value log file")
}
if err = syncDir(vlog.dirPath); err != nil {
return nil, errors.Wrapf(err, "Unable to sync value log file dir")
}
vlog.filesLock.Lock()
vlog.filesMap[fid] = lf
vlog.filesLock.Unlock()
return lf, nil
}
func (vlog *valueLog) Open(kv *DB, opt Options) error {
vlog.dirPath = opt.ValueDir
vlog.opt = opt
vlog.kv = kv
vlog.filesMap = make(map[uint32]*logFile)
if err := vlog.openOrCreateFiles(kv.opt.ReadOnly); err != nil {
return errors.Wrapf(err, "Unable to open value log")
}
vlog.elog = trace.NewEventLog("Badger", "Valuelog")
vlog.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time.
vlog.lfDiscardStats = &lfDiscardStats{m: make(map[uint32]int64)}
return nil
}
func (vlog *valueLog) Close() error {
vlog.elog.Printf("Stopping garbage collection of values.")
defer vlog.elog.Finish()
var err error
for id, f := range vlog.filesMap {
f.lock.Lock() // We wont release the lock.
if munmapErr := f.munmap(); munmapErr != nil && err == nil {
err = munmapErr
}
if !vlog.opt.ReadOnly && id == vlog.maxFid {
// truncate writable log file to correct offset.
if truncErr := f.fd.Truncate(
int64(vlog.writableLogOffset)); truncErr != nil && err == nil {
err = truncErr
}
}
if closeErr := f.fd.Close(); closeErr != nil && err == nil {
err = closeErr
}
}
return err
}
// sortedFids returns the file id's not pending deletion, sorted. Assumes we have shared access to
// filesMap.
func (vlog *valueLog) sortedFids() []uint32 {
toBeDeleted := make(map[uint32]struct{})
for _, fid := range vlog.filesToBeDeleted {
toBeDeleted[fid] = struct{}{}
}
ret := make([]uint32, 0, len(vlog.filesMap))
for fid := range vlog.filesMap {
if _, ok := toBeDeleted[fid]; !ok {
ret = append(ret, fid)
}
}
sort.Slice(ret, func(i, j int) bool {
return ret[i] < ret[j]
})
return ret
}
// Replay replays the value log. The kv provided is only valid for the lifetime of function call.
func (vlog *valueLog) Replay(ptr valuePointer, fn logEntry) error {
fid := ptr.Fid
offset := ptr.Offset + ptr.Len
vlog.elog.Printf("Seeking at value pointer: %+v\n", ptr)
fids := vlog.sortedFids()
for _, id := range fids {
if id < fid {
continue
}
of := offset
if id > fid {
of = 0
}
f := vlog.filesMap[id]
err := vlog.iterate(f, of, fn)
if err != nil {
return errors.Wrapf(err, "Unable to replay value log: %q", f.path)
}
}
// Seek to the end to start writing.
var err error
last := vlog.filesMap[vlog.maxFid]
lastOffset, err := last.fd.Seek(0, io.SeekEnd)
atomic.AddUint32(&vlog.writableLogOffset, uint32(lastOffset))
return errors.Wrapf(err, "Unable to seek to end of value log: %q", last.path)
}
type request struct {
// Input values
Entries []*Entry
// Output values and wait group stuff below
Ptrs []valuePointer
Wg sync.WaitGroup
Err error
}
func (req *request) Wait() error {
req.Wg.Wait()
req.Entries = nil
err := req.Err
requestPool.Put(req)
return err
}
// sync is thread-unsafe and should not be called concurrently with write.
func (vlog *valueLog) sync() error {
if vlog.opt.SyncWrites {
return nil
}
vlog.filesLock.RLock()
if len(vlog.filesMap) == 0 {
vlog.filesLock.RUnlock()
return nil
}
curlf := vlog.filesMap[vlog.maxFid]
curlf.lock.RLock()
vlog.filesLock.RUnlock()
dirSyncCh := make(chan error)
go func() { dirSyncCh <- syncDir(vlog.opt.ValueDir) }()
err := curlf.sync()
curlf.lock.RUnlock()
dirSyncErr := <-dirSyncCh
if err != nil {
err = dirSyncErr
}
return err
}
func (vlog *valueLog) writableOffset() uint32 {
return atomic.LoadUint32(&vlog.writableLogOffset)
}
// write is thread-unsafe by design and should not be called concurrently.
func (vlog *valueLog) write(reqs []*request) error {
vlog.filesLock.RLock()
curlf := vlog.filesMap[vlog.maxFid]
vlog.filesLock.RUnlock()
toDisk := func() error {
if vlog.buf.Len() == 0 {
return nil
}
vlog.elog.Printf("Flushing %d blocks of total size: %d", len(reqs), vlog.buf.Len())
n, err := curlf.fd.Write(vlog.buf.Bytes())
if err != nil {
return errors.Wrapf(err, "Unable to write to value log file: %q", curlf.path)
}
y.NumWrites.Add(1)
y.NumBytesWritten.Add(int64(n))
vlog.elog.Printf("Done")
atomic.AddUint32(&vlog.writableLogOffset, uint32(n))
vlog.buf.Reset()
if vlog.writableOffset() > uint32(vlog.opt.ValueLogFileSize) {
var err error
if err = curlf.doneWriting(vlog.writableLogOffset); err != nil {
return err
}
newid := atomic.AddUint32(&vlog.maxFid, 1)
y.AssertTruef(newid <= math.MaxUint32, "newid will overflow uint32: %v", newid)
newlf, err := vlog.createVlogFile(newid)
if err != nil {
return err
}
if err = newlf.mmap(2 * vlog.opt.ValueLogFileSize); err != nil {
return err
}
curlf = newlf
}
return nil
}
for i := range reqs {
b := reqs[i]
b.Ptrs = b.Ptrs[:0]
for j := range b.Entries {
e := b.Entries[j]
var p valuePointer
p.Fid = curlf.fid
// Use the offset including buffer length so far.
p.Offset = vlog.writableOffset() + uint32(vlog.buf.Len())
plen, err := encodeEntry(e, &vlog.buf) // Now encode the entry into buffer.
if err != nil {
return err
}
p.Len = uint32(plen)
b.Ptrs = append(b.Ptrs, p)
}
// We write to disk here so that all entries that are part of the same transaction are
// written to the same vlog file.
if vlog.writableOffset()+uint32(vlog.buf.Len()) > uint32(vlog.opt.ValueLogFileSize) {
if err := toDisk(); err != nil {
return err
}
}
}
return toDisk()
// Acquire mutex locks around this manipulation, so that the reads don't try to use
// an invalid file descriptor.
}
// Gets the logFile and acquires and RLock() for the mmap. You must call RUnlock on the file
// (if non-nil)
func (vlog *valueLog) getFileRLocked(fid uint32) (*logFile, error) {
vlog.filesLock.RLock()
defer vlog.filesLock.RUnlock()
ret, ok := vlog.filesMap[fid]
if !ok {
// log file has gone away, will need to retry the operation.
return nil, ErrRetry
}
ret.lock.RLock()
return ret, nil
}
// Read reads the value log at a given location.
// TODO: Make this read private.
func (vlog *valueLog) Read(vp valuePointer, s *y.Slice) ([]byte, func(), error) {
// Check for valid offset if we are reading to writable log.
if vp.Fid == vlog.maxFid && vp.Offset >= vlog.writableOffset() {
return nil, nil, errors.Errorf(
"Invalid value pointer offset: %d greater than current offset: %d",
vp.Offset, vlog.writableOffset())
}
buf, cb, err := vlog.readValueBytes(vp, s)
if err != nil {
return nil, cb, err
}
var h header
h.Decode(buf)
n := uint32(headerBufSize) + h.klen
return buf[n : n+h.vlen], cb, nil
}
func (vlog *valueLog) readValueBytes(vp valuePointer, s *y.Slice) ([]byte, func(), error) {
lf, err := vlog.getFileRLocked(vp.Fid)
if err != nil {
return nil, nil, errors.Wrapf(err, "Unable to read from value log: %+v", vp)
}
buf, err := lf.read(vp, s)
if vlog.opt.ValueLogLoadingMode == options.MemoryMap {
return buf, lf.lock.RUnlock, err
}
// If we are using File I/O we unlock the file immediately
// and return an empty function as callback.
lf.lock.RUnlock()
return buf, nil, err
}
// Test helper
func valueBytesToEntry(buf []byte) (e Entry) {
var h header
h.Decode(buf)
n := uint32(headerBufSize)
e.Key = buf[n : n+h.klen]
n += h.klen
e.meta = h.meta
e.UserMeta = h.userMeta
e.Value = buf[n : n+h.vlen]
return
}
func (vlog *valueLog) pickLog(head valuePointer) *logFile {
vlog.filesLock.RLock()
defer vlog.filesLock.RUnlock()
fids := vlog.sortedFids()
if len(fids) <= 1 || head.Fid == 0 {
return nil
}
// Pick a candidate that contains the largest amount of discardable data
candidate := struct {
fid uint32
discard int64
}{math.MaxUint32, 0}
vlog.lfDiscardStats.Lock()
for _, fid := range fids {
if fid >= head.Fid {
break
}
if vlog.lfDiscardStats.m[fid] > candidate.discard {
candidate.fid = fid
candidate.discard = vlog.lfDiscardStats.m[fid]
}
}
vlog.lfDiscardStats.Unlock()
if candidate.fid != math.MaxUint32 { // Found a candidate
return vlog.filesMap[candidate.fid]
}
// Fallback to randomly picking a log file
var idxHead int
for i, fid := range fids {
if fid == head.Fid {
idxHead = i
break
}
}
if idxHead == 0 { // Not found or first file
return nil
}
idx := rand.Intn(idxHead) // Dont include head.Fid. We pick a random file before it.
if idx > 0 {
idx = rand.Intn(idx + 1) // Another level of rand to favor smaller fids.
}
return vlog.filesMap[fids[idx]]
}
func discardEntry(e Entry, vs y.ValueStruct) bool {
if vs.Version != y.ParseTs(e.Key) {
// Version not found. Discard.
return true
}
if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
return true
}
if (vs.Meta & bitValuePointer) == 0 {
// Key also stores the value in LSM. Discard.
return true
}
if (vs.Meta & bitFinTxn) > 0 {
// Just a txn finish entry. Discard.
return true
}
return false
}
func (vlog *valueLog) doRunGC(gcThreshold float64, head valuePointer) (err error) {
// Pick a log file for GC
lf := vlog.pickLog(head)
if lf == nil {
return ErrNoRewrite
}
// Update stats before exiting
defer func() {
if err == nil {
vlog.lfDiscardStats.Lock()
delete(vlog.lfDiscardStats.m, lf.fid)
vlog.lfDiscardStats.Unlock()
}
}()
type reason struct {
total float64
keep float64
discard float64
}
var r reason
var window = 100.0
count := 0
// Pick a random start point for the log.
skipFirstM := float64(rand.Intn(int(vlog.opt.ValueLogFileSize/mi))) - window
var skipped float64
start := time.Now()
y.AssertTrue(vlog.kv != nil)
s := new(y.Slice)
err = vlog.iterate(lf, 0, func(e Entry, vp valuePointer) error {
esz := float64(vp.Len) / (1 << 20) // in MBs. +4 for the CAS stuff.
skipped += esz
if skipped < skipFirstM {
return nil
}
count++
if count%100 == 0 {
time.Sleep(time.Millisecond)
}
r.total += esz
if r.total > window {
return errStop
}
if time.Since(start) > 10*time.Second {
return errStop
}
vs, err := vlog.kv.get(e.Key)
if err != nil {
return err
}
if discardEntry(e, vs) {
r.discard += esz
return nil
}
// Value is still present in value log.
y.AssertTrue(len(vs.Value) > 0)
vp.Decode(vs.Value)
if vp.Fid > lf.fid {
// Value is present in a later log. Discard.
r.discard += esz
return nil
}
if vp.Offset > e.offset {
// Value is present in a later offset, but in the same log.
r.discard += esz
return nil
}
if vp.Fid == lf.fid && vp.Offset == e.offset {
// This is still the active entry. This would need to be rewritten.
r.keep += esz
} else {
vlog.elog.Printf("Reason=%+v\n", r)
buf, cb, err := vlog.readValueBytes(vp, s)
if err != nil {
return errStop
}
ne := valueBytesToEntry(buf)
ne.offset = vp.Offset
ne.print("Latest Entry Header in LSM")
e.print("Latest Entry in Log")
runCallback(cb)
return errors.Errorf("This shouldn't happen. Latest Pointer:%+v. Meta:%v.",
vp, vs.Meta)
}
return nil
})
if err != nil {
vlog.elog.Errorf("Error while iterating for RunGC: %v", err)
return err
}
vlog.elog.Printf("Fid: %d Data status=%+v\n", lf.fid, r)
if r.total < 10.0 || r.discard < gcThreshold*r.total {
vlog.elog.Printf("Skipping GC on fid: %d\n\n", lf.fid)
return ErrNoRewrite
}
vlog.elog.Printf("REWRITING VLOG %d\n", lf.fid)
if err = vlog.rewrite(lf); err != nil {
return err
}
vlog.elog.Printf("Done rewriting.")
return nil
}
func (vlog *valueLog) waitOnGC(lc *y.Closer) {
defer lc.Done()
<-lc.HasBeenClosed() // Wait for lc to be closed.
// Block any GC in progress to finish, and don't allow any more writes to runGC by filling up
// the channel of size 1.
vlog.garbageCh <- struct{}{}
}
func (vlog *valueLog) runGC(gcThreshold float64, head valuePointer) error {
select {
case vlog.garbageCh <- struct{}{}:
// Run GC
var (
err error
count int
)
for {
err = vlog.doRunGC(gcThreshold, head)
if err != nil {
break
}
count++
}
<-vlog.garbageCh
if err == ErrNoRewrite && count > 0 {
return nil
}
return err
default:
return ErrRejected
}
}
func (vlog *valueLog) updateGCStats(item *Item) {
if item.meta&bitValuePointer > 0 {
var vp valuePointer
vp.Decode(item.vptr)
vlog.lfDiscardStats.Lock()
vlog.lfDiscardStats.m[vp.Fid] += int64(vp.Len)
vlog.lfDiscardStats.Unlock()
}
}