mirror of
https://source.quilibrium.com/quilibrium/ceremonyclient.git
synced 2025-01-23 22:25:19 +00:00
772 lines
25 KiB
Go
772 lines
25 KiB
Go
// Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
|
|
// of this source code is governed by a BSD-style license that can be found in
|
|
// the LICENSE file.
|
|
|
|
package record
|
|
|
|
import (
|
|
"context"
|
|
"encoding/binary"
|
|
"io"
|
|
"runtime/pprof"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/cockroachdb/errors"
|
|
"github.com/cockroachdb/pebble/internal/base"
|
|
"github.com/cockroachdb/pebble/internal/crc"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
)
|
|
|
|
var walSyncLabels = pprof.Labels("pebble", "wal-sync")
|
|
var errClosedWriter = errors.New("pebble/record: closed LogWriter")
|
|
|
|
type block struct {
|
|
// buf[:written] has already been filled with fragments. Updated atomically.
|
|
written atomic.Int32
|
|
// buf[:flushed] has already been flushed to w.
|
|
flushed int32
|
|
buf [blockSize]byte
|
|
}
|
|
|
|
type flusher interface {
|
|
Flush() error
|
|
}
|
|
|
|
type syncer interface {
|
|
Sync() error
|
|
}
|
|
|
|
const (
|
|
syncConcurrencyBits = 12
|
|
|
|
// SyncConcurrency is the maximum number of concurrent sync operations that
|
|
// can be performed. Note that a sync operation is initiated either by a call
|
|
// to SyncRecord or by a call to Close. Exported as this value also limits
|
|
// the commit concurrency in commitPipeline.
|
|
SyncConcurrency = 1 << syncConcurrencyBits
|
|
)
|
|
|
|
type syncSlot struct {
|
|
wg *sync.WaitGroup
|
|
err *error
|
|
}
|
|
|
|
// syncQueue is a lock-free fixed-size single-producer, single-consumer
|
|
// queue. The single-producer can push to the head, and the single-consumer can
|
|
// pop multiple values from the tail. Popping calls Done() on each of the
|
|
// available *sync.WaitGroup elements.
|
|
type syncQueue struct {
|
|
// headTail packs together a 32-bit head index and a 32-bit tail index. Both
|
|
// are indexes into slots modulo len(slots)-1.
|
|
//
|
|
// tail = index of oldest data in queue
|
|
// head = index of next slot to fill
|
|
//
|
|
// Slots in the range [tail, head) are owned by consumers. A consumer
|
|
// continues to own a slot outside this range until it nils the slot, at
|
|
// which point ownership passes to the producer.
|
|
//
|
|
// The head index is stored in the most-significant bits so that we can
|
|
// atomically add to it and the overflow is harmless.
|
|
headTail atomic.Uint64
|
|
|
|
// slots is a ring buffer of values stored in this queue. The size must be a
|
|
// power of 2. A slot is in use until the tail index has moved beyond it.
|
|
slots [SyncConcurrency]syncSlot
|
|
|
|
// blocked is an atomic boolean which indicates whether syncing is currently
|
|
// blocked or can proceed. It is used by the implementation of
|
|
// min-sync-interval to block syncing until the min interval has passed.
|
|
blocked atomic.Bool
|
|
}
|
|
|
|
const dequeueBits = 32
|
|
|
|
func (q *syncQueue) unpack(ptrs uint64) (head, tail uint32) {
|
|
const mask = 1<<dequeueBits - 1
|
|
head = uint32((ptrs >> dequeueBits) & mask)
|
|
tail = uint32(ptrs & mask)
|
|
return
|
|
}
|
|
|
|
func (q *syncQueue) push(wg *sync.WaitGroup, err *error) {
|
|
ptrs := q.headTail.Load()
|
|
head, tail := q.unpack(ptrs)
|
|
if (tail+uint32(len(q.slots)))&(1<<dequeueBits-1) == head {
|
|
panic("pebble: queue is full")
|
|
}
|
|
|
|
slot := &q.slots[head&uint32(len(q.slots)-1)]
|
|
slot.wg = wg
|
|
slot.err = err
|
|
|
|
// Increment head. This passes ownership of slot to dequeue and acts as a
|
|
// store barrier for writing the slot.
|
|
q.headTail.Add(1 << dequeueBits)
|
|
}
|
|
|
|
func (q *syncQueue) setBlocked() {
|
|
q.blocked.Store(true)
|
|
}
|
|
|
|
func (q *syncQueue) clearBlocked() {
|
|
q.blocked.Store(false)
|
|
}
|
|
|
|
func (q *syncQueue) empty() bool {
|
|
head, tail, _ := q.load()
|
|
return head == tail
|
|
}
|
|
|
|
// load returns the head, tail of the queue for what should be synced to the
|
|
// caller. It can return a head, tail of zero if syncing is blocked due to
|
|
// min-sync-interval. It additionally returns the real length of this queue,
|
|
// regardless of whether syncing is blocked.
|
|
func (q *syncQueue) load() (head, tail, realLength uint32) {
|
|
ptrs := q.headTail.Load()
|
|
head, tail = q.unpack(ptrs)
|
|
realLength = head - tail
|
|
if q.blocked.Load() {
|
|
return 0, 0, realLength
|
|
}
|
|
return head, tail, realLength
|
|
}
|
|
|
|
// REQUIRES: queueSemChan is non-nil.
|
|
func (q *syncQueue) pop(head, tail uint32, err error, queueSemChan chan struct{}) error {
|
|
if tail == head {
|
|
// Queue is empty.
|
|
return nil
|
|
}
|
|
|
|
for ; tail != head; tail++ {
|
|
slot := &q.slots[tail&uint32(len(q.slots)-1)]
|
|
wg := slot.wg
|
|
if wg == nil {
|
|
return errors.Errorf("nil waiter at %d", errors.Safe(tail&uint32(len(q.slots)-1)))
|
|
}
|
|
*slot.err = err
|
|
slot.wg = nil
|
|
slot.err = nil
|
|
// We need to bump the tail count before signalling the wait group as
|
|
// signalling the wait group can trigger release a blocked goroutine which
|
|
// will try to enqueue before we've "freed" space in the queue.
|
|
q.headTail.Add(1)
|
|
wg.Done()
|
|
// Is always non-nil in production.
|
|
if queueSemChan != nil {
|
|
<-queueSemChan
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// flusherCond is a specialized condition variable that allows its condition to
|
|
// change and readiness be signalled without holding its associated mutex. In
|
|
// particular, when a waiter is added to syncQueue atomically, this condition
|
|
// variable can be signalled without holding flusher.Mutex.
|
|
type flusherCond struct {
|
|
mu *sync.Mutex
|
|
q *syncQueue
|
|
cond sync.Cond
|
|
}
|
|
|
|
func (c *flusherCond) init(mu *sync.Mutex, q *syncQueue) {
|
|
c.mu = mu
|
|
c.q = q
|
|
// Yes, this is a bit circular, but that is intentional. flusherCond.cond.L
|
|
// points flusherCond so that when cond.L.Unlock is called flusherCond.Unlock
|
|
// will be called and we can check the !syncQueue.empty() condition.
|
|
c.cond.L = c
|
|
}
|
|
|
|
func (c *flusherCond) Signal() {
|
|
// Pass-through to the cond var.
|
|
c.cond.Signal()
|
|
}
|
|
|
|
func (c *flusherCond) Wait() {
|
|
// Pass-through to the cond var. Note that internally the cond var implements
|
|
// Wait as:
|
|
//
|
|
// t := notifyListAdd()
|
|
// L.Unlock()
|
|
// notifyListWait(t)
|
|
// L.Lock()
|
|
//
|
|
// We've configured the cond var to call flusherReady.Unlock() which allows
|
|
// us to check the !syncQueue.empty() condition without a danger of missing a
|
|
// notification. Any call to flusherReady.Signal() after notifyListAdd() is
|
|
// called will cause the subsequent notifyListWait() to return immediately.
|
|
c.cond.Wait()
|
|
}
|
|
|
|
func (c *flusherCond) Lock() {
|
|
c.mu.Lock()
|
|
}
|
|
|
|
func (c *flusherCond) Unlock() {
|
|
c.mu.Unlock()
|
|
if !c.q.empty() {
|
|
// If the current goroutine is about to block on sync.Cond.Wait, this call
|
|
// to Signal will prevent that. The comment in Wait above explains a bit
|
|
// about what is going on here, but it is worth reiterating:
|
|
//
|
|
// flusherCond.Wait()
|
|
// sync.Cond.Wait()
|
|
// t := notifyListAdd()
|
|
// flusherCond.Unlock() <-- we are here
|
|
// notifyListWait(t)
|
|
// flusherCond.Lock()
|
|
//
|
|
// The call to Signal here results in:
|
|
//
|
|
// sync.Cond.Signal()
|
|
// notifyListNotifyOne()
|
|
//
|
|
// The call to notifyListNotifyOne() will prevent the call to
|
|
// notifyListWait(t) from blocking.
|
|
c.cond.Signal()
|
|
}
|
|
}
|
|
|
|
type durationFunc func() time.Duration
|
|
|
|
// syncTimer is an interface for timers, modeled on the closure callback mode
|
|
// of time.Timer. See time.AfterFunc and LogWriter.afterFunc. syncTimer is used
|
|
// by tests to mock out the timer functionality used to implement
|
|
// min-sync-interval.
|
|
type syncTimer interface {
|
|
Reset(time.Duration) bool
|
|
Stop() bool
|
|
}
|
|
|
|
// LogWriter writes records to an underlying io.Writer. In order to support WAL
|
|
// file reuse, a LogWriter's records are tagged with the WAL's file
|
|
// number. When reading a log file a record from a previous incarnation of the
|
|
// file will return the error ErrInvalidLogNum.
|
|
type LogWriter struct {
|
|
// w is the underlying writer.
|
|
w io.Writer
|
|
// c is w as a closer.
|
|
c io.Closer
|
|
// s is w as a syncer.
|
|
s syncer
|
|
// logNum is the low 32-bits of the log's file number.
|
|
logNum uint32
|
|
// blockNum is the zero based block number for the current block.
|
|
blockNum int64
|
|
// err is any accumulated error. TODO(peter): This needs to be protected in
|
|
// some fashion. Perhaps using atomic.Value.
|
|
err error
|
|
// block is the current block being written. Protected by flusher.Mutex.
|
|
block *block
|
|
free struct {
|
|
sync.Mutex
|
|
blocks []*block
|
|
}
|
|
|
|
flusher struct {
|
|
sync.Mutex
|
|
// Flusher ready is a condition variable that is signalled when there are
|
|
// blocks to flush, syncing has been requested, or the LogWriter has been
|
|
// closed. For signalling of a sync, it is safe to call without holding
|
|
// flusher.Mutex.
|
|
ready flusherCond
|
|
// Set to true when the flush loop should be closed.
|
|
close bool
|
|
// Closed when the flush loop has terminated.
|
|
closed chan struct{}
|
|
// Accumulated flush error.
|
|
err error
|
|
// minSyncInterval is the minimum duration between syncs.
|
|
minSyncInterval durationFunc
|
|
fsyncLatency prometheus.Histogram
|
|
pending []*block
|
|
syncQ syncQueue
|
|
metrics *LogWriterMetrics
|
|
}
|
|
|
|
// afterFunc is a hook to allow tests to mock out the timer functionality
|
|
// used for min-sync-interval. In normal operation this points to
|
|
// time.AfterFunc.
|
|
afterFunc func(d time.Duration, f func()) syncTimer
|
|
|
|
// See the comment for LogWriterConfig.QueueSemChan.
|
|
queueSemChan chan struct{}
|
|
}
|
|
|
|
// LogWriterConfig is a struct used for configuring new LogWriters
|
|
type LogWriterConfig struct {
|
|
WALMinSyncInterval durationFunc
|
|
WALFsyncLatency prometheus.Histogram
|
|
// QueueSemChan is an optional channel to pop from when popping from
|
|
// LogWriter.flusher.syncQueue. It functions as a semaphore that prevents
|
|
// the syncQueue from overflowing (which will cause a panic). All production
|
|
// code ensures this is non-nil.
|
|
QueueSemChan chan struct{}
|
|
}
|
|
|
|
// initialAllocatedBlocksCap is the initial capacity of the various slices
|
|
// intended to hold LogWriter blocks. The LogWriter may allocate more blocks
|
|
// than this threshold allows.
|
|
const initialAllocatedBlocksCap = 32
|
|
|
|
// blockPool pools *blocks to avoid allocations. Blocks are only added to the
|
|
// Pool when a LogWriter is closed. Before that, free blocks are maintained
|
|
// within a LogWriter's own internal free list `w.free.blocks`.
|
|
var blockPool = sync.Pool{
|
|
New: func() any { return &block{} },
|
|
}
|
|
|
|
// NewLogWriter returns a new LogWriter.
|
|
func NewLogWriter(
|
|
w io.Writer, logNum base.DiskFileNum, logWriterConfig LogWriterConfig,
|
|
) *LogWriter {
|
|
c, _ := w.(io.Closer)
|
|
s, _ := w.(syncer)
|
|
r := &LogWriter{
|
|
w: w,
|
|
c: c,
|
|
s: s,
|
|
// NB: we truncate the 64-bit log number to 32-bits. This is ok because a)
|
|
// we are very unlikely to reach a file number of 4 billion and b) the log
|
|
// number is used as a validation check and using only the low 32-bits is
|
|
// sufficient for that purpose.
|
|
logNum: uint32(logNum),
|
|
afterFunc: func(d time.Duration, f func()) syncTimer {
|
|
return time.AfterFunc(d, f)
|
|
},
|
|
queueSemChan: logWriterConfig.QueueSemChan,
|
|
}
|
|
r.free.blocks = make([]*block, 0, initialAllocatedBlocksCap)
|
|
r.block = blockPool.Get().(*block)
|
|
r.flusher.ready.init(&r.flusher.Mutex, &r.flusher.syncQ)
|
|
r.flusher.closed = make(chan struct{})
|
|
r.flusher.pending = make([]*block, 0, cap(r.free.blocks))
|
|
r.flusher.metrics = &LogWriterMetrics{}
|
|
|
|
f := &r.flusher
|
|
f.minSyncInterval = logWriterConfig.WALMinSyncInterval
|
|
f.fsyncLatency = logWriterConfig.WALFsyncLatency
|
|
|
|
go func() {
|
|
pprof.Do(context.Background(), walSyncLabels, r.flushLoop)
|
|
}()
|
|
return r
|
|
}
|
|
|
|
func (w *LogWriter) flushLoop(context.Context) {
|
|
f := &w.flusher
|
|
f.Lock()
|
|
|
|
// Initialize idleStartTime to when the loop starts.
|
|
idleStartTime := time.Now()
|
|
var syncTimer syncTimer
|
|
defer func() {
|
|
// Capture the idle duration between the last piece of work and when the
|
|
// loop terminated.
|
|
f.metrics.WriteThroughput.IdleDuration += time.Since(idleStartTime)
|
|
if syncTimer != nil {
|
|
syncTimer.Stop()
|
|
}
|
|
close(f.closed)
|
|
f.Unlock()
|
|
}()
|
|
|
|
// The flush loop performs flushing of full and partial data blocks to the
|
|
// underlying writer (LogWriter.w), syncing of the writer, and notification
|
|
// to sync requests that they have completed.
|
|
//
|
|
// - flusher.ready is a condition variable that is signalled when there is
|
|
// work to do. Full blocks are contained in flusher.pending. The current
|
|
// partial block is in LogWriter.block. And sync operations are held in
|
|
// flusher.syncQ.
|
|
//
|
|
// - The decision to sync is determined by whether there are any sync
|
|
// requests present in flusher.syncQ and whether enough time has elapsed
|
|
// since the last sync. If not enough time has elapsed since the last sync,
|
|
// flusher.syncQ.blocked will be set to 1. If syncing is blocked,
|
|
// syncQueue.empty() will return true and syncQueue.load() will return 0,0
|
|
// (i.e. an empty list).
|
|
//
|
|
// - flusher.syncQ.blocked is cleared by a timer that is initialized when
|
|
// blocked is set to 1. When blocked is 1, no syncing will take place, but
|
|
// flushing will continue to be performed. The on/off toggle for syncing
|
|
// does not need to be carefully synchronized with the rest of processing
|
|
// -- all we need to ensure is that after any transition to blocked=1 there
|
|
// is eventually a transition to blocked=0. syncTimer performs this
|
|
// transition. Note that any change to min-sync-interval will not take
|
|
// effect until the previous timer elapses.
|
|
//
|
|
// - Picking up the syncing work to perform requires coordination with
|
|
// picking up the flushing work. Specifically, flushing work is queued
|
|
// before syncing work. The guarantee of this code is that when a sync is
|
|
// requested, any previously queued flush work will be synced. This
|
|
// motivates reading the syncing work (f.syncQ.load()) before picking up
|
|
// the flush work (w.block.written.Load()).
|
|
|
|
// The list of full blocks that need to be written. This is copied from
|
|
// f.pending on every loop iteration, though the number of elements is
|
|
// usually small (most frequently 1). In the case of the WAL LogWriter, the
|
|
// number of blocks is bounded by the size of the WAL's corresponding
|
|
// memtable (MemtableSize/BlockSize). With the default 64 MiB memtables,
|
|
// this works out to at most 2048 elements if the entirety of the memtable's
|
|
// contents are queued.
|
|
pending := make([]*block, 0, cap(f.pending))
|
|
for {
|
|
for {
|
|
// Grab the portion of the current block that requires flushing. Note that
|
|
// the current block can be added to the pending blocks list after we release
|
|
// the flusher lock, but it won't be part of pending.
|
|
written := w.block.written.Load()
|
|
if len(f.pending) > 0 || written > w.block.flushed || !f.syncQ.empty() {
|
|
break
|
|
}
|
|
if f.close {
|
|
// If the writer is closed, pretend the sync timer fired immediately so
|
|
// that we can process any queued sync requests.
|
|
f.syncQ.clearBlocked()
|
|
if !f.syncQ.empty() {
|
|
break
|
|
}
|
|
return
|
|
}
|
|
f.ready.Wait()
|
|
continue
|
|
}
|
|
// Found work to do, so no longer idle.
|
|
workStartTime := time.Now()
|
|
idleDuration := workStartTime.Sub(idleStartTime)
|
|
pending = append(pending[:0], f.pending...)
|
|
f.pending = f.pending[:0]
|
|
f.metrics.PendingBufferLen.AddSample(int64(len(pending)))
|
|
|
|
// Grab the list of sync waiters. Note that syncQueue.load() will return
|
|
// 0,0 while we're waiting for the min-sync-interval to expire. This
|
|
// allows flushing to proceed even if we're not ready to sync.
|
|
head, tail, realSyncQLen := f.syncQ.load()
|
|
f.metrics.SyncQueueLen.AddSample(int64(realSyncQLen))
|
|
|
|
// Grab the portion of the current block that requires flushing. Note that
|
|
// the current block can be added to the pending blocks list after we
|
|
// release the flusher lock, but it won't be part of pending. This has to
|
|
// be ordered after we get the list of sync waiters from syncQ in order to
|
|
// prevent a race where a waiter adds itself to syncQ, but this thread
|
|
// picks up the entry in syncQ and not the buffered data.
|
|
written := w.block.written.Load()
|
|
data := w.block.buf[w.block.flushed:written]
|
|
w.block.flushed = written
|
|
|
|
// If flusher has an error, we propagate it to waiters. Note in spite of
|
|
// error we consume the pending list above to free blocks for writers.
|
|
if f.err != nil {
|
|
f.syncQ.pop(head, tail, f.err, w.queueSemChan)
|
|
// Update the idleStartTime if work could not be done, so that we don't
|
|
// include the duration we tried to do work as idle. We don't bother
|
|
// with the rest of the accounting, which means we will undercount.
|
|
idleStartTime = time.Now()
|
|
continue
|
|
}
|
|
f.Unlock()
|
|
synced, syncLatency, bytesWritten, err := w.flushPending(data, pending, head, tail)
|
|
f.Lock()
|
|
if synced && f.fsyncLatency != nil {
|
|
f.fsyncLatency.Observe(float64(syncLatency))
|
|
}
|
|
f.err = err
|
|
if f.err != nil {
|
|
f.syncQ.clearBlocked()
|
|
// Update the idleStartTime if work could not be done, so that we don't
|
|
// include the duration we tried to do work as idle. We don't bother
|
|
// with the rest of the accounting, which means we will undercount.
|
|
idleStartTime = time.Now()
|
|
continue
|
|
}
|
|
|
|
if synced && f.minSyncInterval != nil {
|
|
// A sync was performed. Make sure we've waited for the min sync
|
|
// interval before syncing again.
|
|
if min := f.minSyncInterval(); min > 0 {
|
|
f.syncQ.setBlocked()
|
|
if syncTimer == nil {
|
|
syncTimer = w.afterFunc(min, func() {
|
|
f.syncQ.clearBlocked()
|
|
f.ready.Signal()
|
|
})
|
|
} else {
|
|
syncTimer.Reset(min)
|
|
}
|
|
}
|
|
}
|
|
// Finished work, and started idling.
|
|
idleStartTime = time.Now()
|
|
workDuration := idleStartTime.Sub(workStartTime)
|
|
f.metrics.WriteThroughput.Bytes += bytesWritten
|
|
f.metrics.WriteThroughput.WorkDuration += workDuration
|
|
f.metrics.WriteThroughput.IdleDuration += idleDuration
|
|
}
|
|
}
|
|
|
|
func (w *LogWriter) flushPending(
|
|
data []byte, pending []*block, head, tail uint32,
|
|
) (synced bool, syncLatency time.Duration, bytesWritten int64, err error) {
|
|
defer func() {
|
|
// Translate panics into errors. The errors will cause flushLoop to shut
|
|
// down, but allows us to do so in a controlled way and avoid swallowing
|
|
// the stack that created the panic if panic'ing itself hits a panic
|
|
// (e.g. unlock of unlocked mutex).
|
|
if r := recover(); r != nil {
|
|
err = errors.Newf("%v", r)
|
|
}
|
|
}()
|
|
|
|
for _, b := range pending {
|
|
bytesWritten += blockSize - int64(b.flushed)
|
|
if err = w.flushBlock(b); err != nil {
|
|
break
|
|
}
|
|
}
|
|
if n := len(data); err == nil && n > 0 {
|
|
bytesWritten += int64(n)
|
|
_, err = w.w.Write(data)
|
|
}
|
|
|
|
synced = head != tail
|
|
if synced {
|
|
if err == nil && w.s != nil {
|
|
syncLatency, err = w.syncWithLatency()
|
|
}
|
|
f := &w.flusher
|
|
if popErr := f.syncQ.pop(head, tail, err, w.queueSemChan); popErr != nil {
|
|
return synced, syncLatency, bytesWritten, popErr
|
|
}
|
|
}
|
|
|
|
return synced, syncLatency, bytesWritten, err
|
|
}
|
|
|
|
func (w *LogWriter) syncWithLatency() (time.Duration, error) {
|
|
start := time.Now()
|
|
err := w.s.Sync()
|
|
syncLatency := time.Since(start)
|
|
return syncLatency, err
|
|
}
|
|
|
|
func (w *LogWriter) flushBlock(b *block) error {
|
|
if _, err := w.w.Write(b.buf[b.flushed:]); err != nil {
|
|
return err
|
|
}
|
|
b.written.Store(0)
|
|
b.flushed = 0
|
|
w.free.Lock()
|
|
w.free.blocks = append(w.free.blocks, b)
|
|
w.free.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// queueBlock queues the current block for writing to the underlying writer,
|
|
// allocates a new block and reserves space for the next header.
|
|
func (w *LogWriter) queueBlock() {
|
|
// Allocate a new block, blocking until one is available. We do this first
|
|
// because w.block is protected by w.flusher.Mutex.
|
|
w.free.Lock()
|
|
if len(w.free.blocks) == 0 {
|
|
w.free.blocks = append(w.free.blocks, blockPool.Get().(*block))
|
|
}
|
|
nextBlock := w.free.blocks[len(w.free.blocks)-1]
|
|
w.free.blocks = w.free.blocks[:len(w.free.blocks)-1]
|
|
w.free.Unlock()
|
|
|
|
f := &w.flusher
|
|
f.Lock()
|
|
f.pending = append(f.pending, w.block)
|
|
w.block = nextBlock
|
|
f.ready.Signal()
|
|
w.err = w.flusher.err
|
|
f.Unlock()
|
|
|
|
w.blockNum++
|
|
}
|
|
|
|
// Close flushes and syncs any unwritten data and closes the writer.
|
|
// Where required, external synchronisation is provided by commitPipeline.mu.
|
|
func (w *LogWriter) Close() error {
|
|
f := &w.flusher
|
|
|
|
// Emit an EOF trailer signifying the end of this log. This helps readers
|
|
// differentiate between a corrupted entry in the middle of a log from
|
|
// garbage at the tail from a recycled log file.
|
|
w.emitEOFTrailer()
|
|
|
|
// Signal the flush loop to close.
|
|
f.Lock()
|
|
f.close = true
|
|
f.ready.Signal()
|
|
f.Unlock()
|
|
|
|
// Wait for the flush loop to close. The flush loop will not close until all
|
|
// pending data has been written or an error occurs.
|
|
<-f.closed
|
|
|
|
// Sync any flushed data to disk. NB: flushLoop will sync after flushing the
|
|
// last buffered data only if it was requested via syncQ, so we need to sync
|
|
// here to ensure that all the data is synced.
|
|
err := w.flusher.err
|
|
var syncLatency time.Duration
|
|
if err == nil && w.s != nil {
|
|
syncLatency, err = w.syncWithLatency()
|
|
}
|
|
f.Lock()
|
|
if f.fsyncLatency != nil {
|
|
f.fsyncLatency.Observe(float64(syncLatency))
|
|
}
|
|
free := w.free.blocks
|
|
f.Unlock()
|
|
|
|
if w.c != nil {
|
|
cerr := w.c.Close()
|
|
w.c = nil
|
|
if cerr != nil {
|
|
return cerr
|
|
}
|
|
}
|
|
|
|
for _, b := range free {
|
|
b.flushed = 0
|
|
b.written.Store(0)
|
|
blockPool.Put(b)
|
|
}
|
|
|
|
w.err = errClosedWriter
|
|
return err
|
|
}
|
|
|
|
// WriteRecord writes a complete record. Returns the offset just past the end
|
|
// of the record.
|
|
// External synchronisation provided by commitPipeline.mu.
|
|
func (w *LogWriter) WriteRecord(p []byte) (int64, error) {
|
|
logSize, err := w.SyncRecord(p, nil, nil)
|
|
return logSize, err
|
|
}
|
|
|
|
// SyncRecord writes a complete record. If wg != nil the record will be
|
|
// asynchronously persisted to the underlying writer and done will be called on
|
|
// the wait group upon completion. Returns the offset just past the end of the
|
|
// record.
|
|
// External synchronisation provided by commitPipeline.mu.
|
|
func (w *LogWriter) SyncRecord(
|
|
p []byte, wg *sync.WaitGroup, err *error,
|
|
) (logSize int64, err2 error) {
|
|
if w.err != nil {
|
|
return -1, w.err
|
|
}
|
|
|
|
// The `i == 0` condition ensures we handle empty records. Such records can
|
|
// possibly be generated for VersionEdits stored in the MANIFEST. While the
|
|
// MANIFEST is currently written using Writer, it is good to support the same
|
|
// semantics with LogWriter.
|
|
for i := 0; i == 0 || len(p) > 0; i++ {
|
|
p = w.emitFragment(i, p)
|
|
}
|
|
|
|
if wg != nil {
|
|
// If we've been asked to persist the record, add the WaitGroup to the sync
|
|
// queue and signal the flushLoop. Note that flushLoop will write partial
|
|
// blocks to the file if syncing has been requested. The contract is that
|
|
// any record written to the LogWriter to this point will be flushed to the
|
|
// OS and synced to disk.
|
|
f := &w.flusher
|
|
f.syncQ.push(wg, err)
|
|
f.ready.Signal()
|
|
}
|
|
|
|
offset := w.blockNum*blockSize + int64(w.block.written.Load())
|
|
// Note that we don't return w.err here as a concurrent call to Close would
|
|
// race with our read. That's ok because the only error we could be seeing is
|
|
// one to syncing for which the caller can receive notification of by passing
|
|
// in a non-nil err argument.
|
|
return offset, nil
|
|
}
|
|
|
|
// Size returns the current size of the file.
|
|
// External synchronisation provided by commitPipeline.mu.
|
|
func (w *LogWriter) Size() int64 {
|
|
return w.blockNum*blockSize + int64(w.block.written.Load())
|
|
}
|
|
|
|
func (w *LogWriter) emitEOFTrailer() {
|
|
// Write a recyclable chunk header with a different log number. Readers
|
|
// will treat the header as EOF when the log number does not match.
|
|
b := w.block
|
|
i := b.written.Load()
|
|
binary.LittleEndian.PutUint32(b.buf[i+0:i+4], 0) // CRC
|
|
binary.LittleEndian.PutUint16(b.buf[i+4:i+6], 0) // Size
|
|
b.buf[i+6] = recyclableFullChunkType
|
|
binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum+1) // Log number
|
|
b.written.Store(i + int32(recyclableHeaderSize))
|
|
}
|
|
|
|
func (w *LogWriter) emitFragment(n int, p []byte) (remainingP []byte) {
|
|
b := w.block
|
|
i := b.written.Load()
|
|
first := n == 0
|
|
last := blockSize-i-recyclableHeaderSize >= int32(len(p))
|
|
|
|
if last {
|
|
if first {
|
|
b.buf[i+6] = recyclableFullChunkType
|
|
} else {
|
|
b.buf[i+6] = recyclableLastChunkType
|
|
}
|
|
} else {
|
|
if first {
|
|
b.buf[i+6] = recyclableFirstChunkType
|
|
} else {
|
|
b.buf[i+6] = recyclableMiddleChunkType
|
|
}
|
|
}
|
|
|
|
binary.LittleEndian.PutUint32(b.buf[i+7:i+11], w.logNum)
|
|
|
|
r := copy(b.buf[i+recyclableHeaderSize:], p)
|
|
j := i + int32(recyclableHeaderSize+r)
|
|
binary.LittleEndian.PutUint32(b.buf[i+0:i+4], crc.New(b.buf[i+6:j]).Value())
|
|
binary.LittleEndian.PutUint16(b.buf[i+4:i+6], uint16(r))
|
|
b.written.Store(j)
|
|
|
|
if blockSize-b.written.Load() < recyclableHeaderSize {
|
|
// There is no room for another fragment in the block, so fill the
|
|
// remaining bytes with zeros and queue the block for flushing.
|
|
for i := b.written.Load(); i < blockSize; i++ {
|
|
b.buf[i] = 0
|
|
}
|
|
w.queueBlock()
|
|
}
|
|
return p[r:]
|
|
}
|
|
|
|
// Metrics must be called after Close. The callee will no longer modify the
|
|
// returned LogWriterMetrics.
|
|
func (w *LogWriter) Metrics() *LogWriterMetrics {
|
|
return w.flusher.metrics
|
|
}
|
|
|
|
// LogWriterMetrics contains misc metrics for the log writer.
|
|
type LogWriterMetrics struct {
|
|
WriteThroughput base.ThroughputMetric
|
|
PendingBufferLen base.GaugeSampleMetric
|
|
SyncQueueLen base.GaugeSampleMetric
|
|
}
|
|
|
|
// Merge merges metrics from x. Requires that x is non-nil.
|
|
func (m *LogWriterMetrics) Merge(x *LogWriterMetrics) error {
|
|
m.WriteThroughput.Merge(x.WriteThroughput)
|
|
m.PendingBufferLen.Merge(x.PendingBufferLen)
|
|
m.SyncQueueLen.Merge(x.SyncQueueLen)
|
|
return nil
|
|
}
|