mirror of
https://source.quilibrium.com/quilibrium/ceremonyclient.git
synced 2025-01-26 15:47:11 +00:00
1558 lines
46 KiB
Go
1558 lines
46 KiB
Go
|
// Copyright 2019 The LevelDB-Go and Pebble Authors. All rights reserved. Use
|
||
|
// of this source code is governed by a BSD-style license that can be found in
|
||
|
// the LICENSE file.
|
||
|
|
||
|
package metamorphic
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"context"
|
||
|
"crypto/rand"
|
||
|
"encoding/binary"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"path"
|
||
|
"path/filepath"
|
||
|
"strings"
|
||
|
|
||
|
"github.com/cockroachdb/errors"
|
||
|
"github.com/cockroachdb/pebble"
|
||
|
"github.com/cockroachdb/pebble/internal/base"
|
||
|
"github.com/cockroachdb/pebble/internal/keyspan"
|
||
|
"github.com/cockroachdb/pebble/internal/private"
|
||
|
"github.com/cockroachdb/pebble/internal/rangekey"
|
||
|
"github.com/cockroachdb/pebble/internal/testkeys"
|
||
|
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
|
||
|
"github.com/cockroachdb/pebble/sstable"
|
||
|
"github.com/cockroachdb/pebble/vfs/errorfs"
|
||
|
)
|
||
|
|
||
|
// op defines the interface for a single operation, such as creating a batch,
|
||
|
// or advancing an iterator.
|
||
|
type op interface {
|
||
|
String() string
|
||
|
run(t *test, h historyRecorder)
|
||
|
|
||
|
// receiver returns the object ID of the object the operation is performed
|
||
|
// on. Every operation has a receiver (eg, batch0.Set(...) has `batch0` as
|
||
|
// its receiver). Receivers are used for synchronization when running with
|
||
|
// concurrency.
|
||
|
receiver() objID
|
||
|
|
||
|
// syncObjs returns an additional set of object IDs—excluding the
|
||
|
// receiver—that the operation must synchronize with. At execution time,
|
||
|
// the operation will run serially with respect to all other operations
|
||
|
// that return these objects from their own syncObjs or receiver methods.
|
||
|
syncObjs() objIDSlice
|
||
|
}
|
||
|
|
||
|
// initOp performs test initialization
|
||
|
type initOp struct {
|
||
|
dbSlots uint32
|
||
|
batchSlots uint32
|
||
|
iterSlots uint32
|
||
|
snapshotSlots uint32
|
||
|
}
|
||
|
|
||
|
func (o *initOp) run(t *test, h historyRecorder) {
|
||
|
t.batches = make([]*pebble.Batch, o.batchSlots)
|
||
|
t.iters = make([]*retryableIter, o.iterSlots)
|
||
|
t.snapshots = make([]readerCloser, o.snapshotSlots)
|
||
|
h.Recordf("%s", o)
|
||
|
}
|
||
|
|
||
|
func (o *initOp) String() string {
|
||
|
return fmt.Sprintf("Init(%d /* dbs */, %d /* batches */, %d /* iters */, %d /* snapshots */)",
|
||
|
o.dbSlots, o.batchSlots, o.iterSlots, o.snapshotSlots)
|
||
|
}
|
||
|
|
||
|
func (o *initOp) receiver() objID { return makeObjID(dbTag, 1) }
|
||
|
func (o *initOp) syncObjs() objIDSlice {
|
||
|
syncObjs := make([]objID, 0)
|
||
|
// Add any additional DBs to syncObjs.
|
||
|
for i := uint32(2); i < o.dbSlots+1; i++ {
|
||
|
syncObjs = append(syncObjs, makeObjID(dbTag, i))
|
||
|
}
|
||
|
return syncObjs
|
||
|
}
|
||
|
|
||
|
// applyOp models a Writer.Apply operation.
|
||
|
type applyOp struct {
|
||
|
writerID objID
|
||
|
batchID objID
|
||
|
}
|
||
|
|
||
|
func (o *applyOp) run(t *test, h historyRecorder) {
|
||
|
b := t.getBatch(o.batchID)
|
||
|
w := t.getWriter(o.writerID)
|
||
|
var err error
|
||
|
if o.writerID.tag() == dbTag && t.testOpts.asyncApplyToDB && t.writeOpts.Sync {
|
||
|
err = w.(*pebble.DB).ApplyNoSyncWait(b, t.writeOpts)
|
||
|
if err == nil {
|
||
|
err = b.SyncWait()
|
||
|
}
|
||
|
} else {
|
||
|
err = w.Apply(b, t.writeOpts)
|
||
|
}
|
||
|
h.Recordf("%s // %v", o, err)
|
||
|
// batch will be closed by a closeOp which is guaranteed to be generated
|
||
|
}
|
||
|
|
||
|
func (o *applyOp) String() string { return fmt.Sprintf("%s.Apply(%s)", o.writerID, o.batchID) }
|
||
|
func (o *applyOp) receiver() objID { return o.writerID }
|
||
|
func (o *applyOp) syncObjs() objIDSlice {
|
||
|
// Apply should not be concurrent with operations that are mutating the
|
||
|
// batch.
|
||
|
return []objID{o.batchID}
|
||
|
}
|
||
|
|
||
|
// checkpointOp models a DB.Checkpoint operation.
|
||
|
type checkpointOp struct {
|
||
|
dbID objID
|
||
|
// If non-empty, the checkpoint is restricted to these spans.
|
||
|
spans []pebble.CheckpointSpan
|
||
|
}
|
||
|
|
||
|
func (o *checkpointOp) run(t *test, h historyRecorder) {
|
||
|
// TODO(josh): db.Checkpoint does not work with shared storage yet.
|
||
|
// It would be better to filter out ahead of calling run on the op,
|
||
|
// by setting the weight that generator.go uses to zero, or similar.
|
||
|
// But IIUC the ops are shared for ALL the metamorphic test runs, so
|
||
|
// not sure how to do that easily:
|
||
|
// https://github.com/cockroachdb/pebble/blob/master/metamorphic/meta.go#L177
|
||
|
if t.testOpts.sharedStorageEnabled {
|
||
|
h.Recordf("%s // %v", o, nil)
|
||
|
return
|
||
|
}
|
||
|
var opts []pebble.CheckpointOption
|
||
|
if len(o.spans) > 0 {
|
||
|
opts = append(opts, pebble.WithRestrictToSpans(o.spans))
|
||
|
}
|
||
|
db := t.getDB(o.dbID)
|
||
|
err := withRetries(func() error {
|
||
|
return db.Checkpoint(o.dir(t.dir, h.op), opts...)
|
||
|
})
|
||
|
h.Recordf("%s // %v", o, err)
|
||
|
}
|
||
|
|
||
|
func (o *checkpointOp) dir(dataDir string, idx int) string {
|
||
|
return filepath.Join(dataDir, "checkpoints", fmt.Sprintf("op-%06d", idx))
|
||
|
}
|
||
|
|
||
|
func (o *checkpointOp) String() string {
|
||
|
var spanStr bytes.Buffer
|
||
|
for i, span := range o.spans {
|
||
|
if i > 0 {
|
||
|
spanStr.WriteString(",")
|
||
|
}
|
||
|
fmt.Fprintf(&spanStr, "%q,%q", span.Start, span.End)
|
||
|
}
|
||
|
return fmt.Sprintf("%s.Checkpoint(%s)", o.dbID, spanStr.String())
|
||
|
}
|
||
|
|
||
|
func (o *checkpointOp) receiver() objID { return o.dbID }
|
||
|
func (o *checkpointOp) syncObjs() objIDSlice { return nil }
|
||
|
|
||
|
// closeOp models a {Batch,Iterator,Snapshot}.Close operation.
|
||
|
type closeOp struct {
|
||
|
objID objID
|
||
|
derivedDBID objID
|
||
|
}
|
||
|
|
||
|
func (o *closeOp) run(t *test, h historyRecorder) {
|
||
|
c := t.getCloser(o.objID)
|
||
|
if o.objID.tag() == dbTag && t.opts.DisableWAL {
|
||
|
// Special case: If WAL is disabled, do a flush right before DB Close. This
|
||
|
// allows us to reuse this run's data directory as initial state for
|
||
|
// future runs without losing any mutations.
|
||
|
_ = t.getDB(o.objID).Flush()
|
||
|
}
|
||
|
t.clearObj(o.objID)
|
||
|
err := c.Close()
|
||
|
h.Recordf("%s // %v", o, err)
|
||
|
}
|
||
|
|
||
|
func (o *closeOp) String() string { return fmt.Sprintf("%s.Close()", o.objID) }
|
||
|
func (o *closeOp) receiver() objID { return o.objID }
|
||
|
func (o *closeOp) syncObjs() objIDSlice {
|
||
|
// Synchronize on the database so that we don't close the database before
|
||
|
// all its iterators, snapshots and batches are closed.
|
||
|
// TODO(jackson): It would be nice to relax this so that Close calls can
|
||
|
// execute in parallel.
|
||
|
if o.objID.tag() == dbTag {
|
||
|
return nil
|
||
|
}
|
||
|
if o.derivedDBID != 0 {
|
||
|
return []objID{o.derivedDBID}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// compactOp models a DB.Compact operation.
|
||
|
type compactOp struct {
|
||
|
dbID objID
|
||
|
start []byte
|
||
|
end []byte
|
||
|
parallelize bool
|
||
|
}
|
||
|
|
||
|
func (o *compactOp) run(t *test, h historyRecorder) {
|
||
|
err := withRetries(func() error {
|
||
|
return t.getDB(o.dbID).Compact(o.start, o.end, o.parallelize)
|
||
|
})
|
||
|
h.Recordf("%s // %v", o, err)
|
||
|
}
|
||
|
|
||
|
func (o *compactOp) String() string {
|
||
|
return fmt.Sprintf("%s.Compact(%q, %q, %t /* parallelize */)", o.dbID, o.start, o.end, o.parallelize)
|
||
|
}
|
||
|
|
||
|
func (o *compactOp) receiver() objID { return o.dbID }
|
||
|
func (o *compactOp) syncObjs() objIDSlice { return nil }
|
||
|
|
||
|
// deleteOp models a Write.Delete operation.
|
||
|
type deleteOp struct {
|
||
|
writerID objID
|
||
|
key []byte
|
||
|
|
||
|
derivedDBID objID
|
||
|
}
|
||
|
|
||
|
func (o *deleteOp) run(t *test, h historyRecorder) {
|
||
|
w := t.getWriter(o.writerID)
|
||
|
var err error
|
||
|
if t.testOpts.deleteSized && t.isFMV(o.derivedDBID, pebble.FormatDeleteSizedAndObsolete) {
|
||
|
// Call DeleteSized with a deterministic size derived from the index.
|
||
|
// The size does not need to be accurate for correctness.
|
||
|
err = w.DeleteSized(o.key, hashSize(t.idx), t.writeOpts)
|
||
|
} else {
|
||
|
err = w.Delete(o.key, t.writeOpts)
|
||
|
}
|
||
|
h.Recordf("%s // %v", o, err)
|
||
|
}
|
||
|
|
||
|
func hashSize(index int) uint32 {
|
||
|
// Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
|
||
|
return uint32((11400714819323198485 * uint64(index)) % maxValueSize)
|
||
|
}
|
||
|
|
||
|
func (o *deleteOp) String() string {
|
||
|
return fmt.Sprintf("%s.Delete(%q)", o.writerID, o.key)
|
||
|
}
|
||
|
func (o *deleteOp) receiver() objID { return o.writerID }
|
||
|
func (o *deleteOp) syncObjs() objIDSlice { return nil }
|
||
|
|
||
|
// singleDeleteOp models a Write.SingleDelete operation.
|
||
|
type singleDeleteOp struct {
|
||
|
writerID objID
|
||
|
key []byte
|
||
|
maybeReplaceDelete bool
|
||
|
}
|
||
|
|
||
|
func (o *singleDeleteOp) run(t *test, h historyRecorder) {
|
||
|
w := t.getWriter(o.writerID)
|
||
|
var err error
|
||
|
if t.testOpts.replaceSingleDelete && o.maybeReplaceDelete {
|
||
|
err = w.Delete(o.key, t.writeOpts)
|
||
|
} else {
|
||
|
err = w.SingleDelete(o.key, t.writeOpts)
|
||
|
}
|
||
|
// NOTE: even if the SINGLEDEL was replaced with a DELETE, we must still
|
||
|
// write the former to the history log. The log line will indicate whether
|
||
|
// or not the delete *could* have been replaced. The OPTIONS file should
|
||
|
// also be consulted to determine what happened at runtime (i.e. by taking
|
||
|
// the logical AND).
|
||
|
h.Recordf("%s // %v", o, err)
|
||
|
}
|
||
|
|
||
|
func (o *singleDeleteOp) String() string {
|
||
|
return fmt.Sprintf("%s.SingleDelete(%q, %v /* maybeReplaceDelete */)", o.writerID, o.key, o.maybeReplaceDelete)
|
||
|
}
|
||
|
|
||
|
func (o *singleDeleteOp) receiver() objID { return o.writerID }
|
||
|
func (o *singleDeleteOp) syncObjs() objIDSlice { return nil }
|
||
|
|
||
|
// deleteRangeOp models a Write.DeleteRange operation.
|
||
|
type deleteRangeOp struct {
|
||
|
writerID objID
|
||
|
start []byte
|
||
|
end []byte
|
||
|
}
|
||
|
|
||
|
func (o *deleteRangeOp) run(t *test, h historyRecorder) {
|
||
|
w := t.getWriter(o.writerID)
|
||
|
err := w.DeleteRange(o.start, o.end, t.writeOpts)
|
||
|
h.Recordf("%s // %v", o, err)
|
||
|
}
|
||
|
|
||
|
func (o *deleteRangeOp) String() string {
|
||
|
return fmt.Sprintf("%s.DeleteRange(%q, %q)", o.writerID, o.start, o.end)
|
||
|
}
|
||
|
|
||
|
func (o *deleteRangeOp) receiver() objID { return o.writerID }
|
||
|
func (o *deleteRangeOp) syncObjs() objIDSlice { return nil }
|
||
|
|
||
|
// flushOp models a DB.Flush operation.
|
||
|
type flushOp struct {
|
||
|
db objID
|
||
|
}
|
||
|
|
||
|
func (o *flushOp) run(t *test, h historyRecorder) {
|
||
|
db := t.getDB(o.db)
|
||
|
err := db.Flush()
|
||
|
h.Recordf("%s // %v", o, err)
|
||
|
}
|
||
|
|
||
|
func (o *flushOp) String() string { return fmt.Sprintf("%s.Flush()", o.db) }
|
||
|
func (o *flushOp) receiver() objID { return o.db }
|
||
|
func (o *flushOp) syncObjs() objIDSlice { return nil }
|
||
|
|
||
|
// mergeOp models a Write.Merge operation.
|
||
|
type mergeOp struct {
|
||
|
writerID objID
|
||
|
key []byte
|
||
|
value []byte
|
||
|
}
|
||
|
|
||
|
func (o *mergeOp) run(t *test, h historyRecorder) {
|
||
|
w := t.getWriter(o.writerID)
|
||
|
err := w.Merge(o.key, o.value, t.writeOpts)
|
||
|
h.Recordf("%s // %v", o, err)
|
||
|
}
|
||
|
|
||
|
func (o *mergeOp) String() string { return fmt.Sprintf("%s.Merge(%q, %q)", o.writerID, o.key, o.value) }
|
||
|
func (o *mergeOp) receiver() objID { return o.writerID }
|
||
|
func (o *mergeOp) syncObjs() objIDSlice { return nil }
|
||
|
|
||
|
// setOp models a Write.Set operation.
|
||
|
type setOp struct {
|
||
|
writerID objID
|
||
|
key []byte
|
||
|
value []byte
|
||
|
}
|
||
|
|
||
|
func (o *setOp) run(t *test, h historyRecorder) {
|
||
|
w := t.getWriter(o.writerID)
|
||
|
err := w.Set(o.key, o.value, t.writeOpts)
|
||
|
h.Recordf("%s // %v", o, err)
|
||
|
}
|
||
|
|
||
|
func (o *setOp) String() string { return fmt.Sprintf("%s.Set(%q, %q)", o.writerID, o.key, o.value) }
|
||
|
func (o *setOp) receiver() objID { return o.writerID }
|
||
|
func (o *setOp) syncObjs() objIDSlice { return nil }
|
||
|
|
||
|
// rangeKeyDeleteOp models a Write.RangeKeyDelete operation.
|
||
|
type rangeKeyDeleteOp struct {
|
||
|
writerID objID
|
||
|
start []byte
|
||
|
end []byte
|
||
|
}
|
||
|
|
||
|
func (o *rangeKeyDeleteOp) run(t *test, h historyRecorder) {
|
||
|
w := t.getWriter(o.writerID)
|
||
|
err := w.RangeKeyDelete(o.start, o.end, t.writeOpts)
|
||
|
h.Recordf("%s // %v", o, err)
|
||
|
}
|
||
|
|
||
|
func (o *rangeKeyDeleteOp) String() string {
|
||
|
return fmt.Sprintf("%s.RangeKeyDelete(%q, %q)", o.writerID, o.start, o.end)
|
||
|
}
|
||
|
|
||
|
func (o *rangeKeyDeleteOp) receiver() objID { return o.writerID }
|
||
|
func (o *rangeKeyDeleteOp) syncObjs() objIDSlice { return nil }
|
||
|
|
||
|
// rangeKeySetOp models a Write.RangeKeySet operation.
|
||
|
type rangeKeySetOp struct {
|
||
|
writerID objID
|
||
|
start []byte
|
||
|
end []byte
|
||
|
suffix []byte
|
||
|
value []byte
|
||
|
}
|
||
|
|
||
|
func (o *rangeKeySetOp) run(t *test, h historyRecorder) {
|
||
|
w := t.getWriter(o.writerID)
|
||
|
err := w.RangeKeySet(o.start, o.end, o.suffix, o.value, t.writeOpts)
|
||
|
h.Recordf("%s // %v", o, err)
|
||
|
}
|
||
|
|
||
|
func (o *rangeKeySetOp) String() string {
|
||
|
return fmt.Sprintf("%s.RangeKeySet(%q, %q, %q, %q)",
|
||
|
o.writerID, o.start, o.end, o.suffix, o.value)
|
||
|
}
|
||
|
|
||
|
func (o *rangeKeySetOp) receiver() objID { return o.writerID }
|
||
|
func (o *rangeKeySetOp) syncObjs() objIDSlice { return nil }
|
||
|
|
||
|
// rangeKeyUnsetOp models a Write.RangeKeyUnset operation.
|
||
|
type rangeKeyUnsetOp struct {
|
||
|
writerID objID
|
||
|
start []byte
|
||
|
end []byte
|
||
|
suffix []byte
|
||
|
}
|
||
|
|
||
|
func (o *rangeKeyUnsetOp) run(t *test, h historyRecorder) {
|
||
|
w := t.getWriter(o.writerID)
|
||
|
err := w.RangeKeyUnset(o.start, o.end, o.suffix, t.writeOpts)
|
||
|
h.Recordf("%s // %v", o, err)
|
||
|
}
|
||
|
|
||
|
func (o *rangeKeyUnsetOp) String() string {
|
||
|
return fmt.Sprintf("%s.RangeKeyUnset(%q, %q, %q)",
|
||
|
o.writerID, o.start, o.end, o.suffix)
|
||
|
}
|
||
|
|
||
|
func (o *rangeKeyUnsetOp) receiver() objID { return o.writerID }
|
||
|
func (o *rangeKeyUnsetOp) syncObjs() objIDSlice { return nil }
|
||
|
|
||
|
// newBatchOp models a Write.NewBatch operation.
|
||
|
type newBatchOp struct {
|
||
|
dbID objID
|
||
|
batchID objID
|
||
|
}
|
||
|
|
||
|
func (o *newBatchOp) run(t *test, h historyRecorder) {
|
||
|
b := t.getDB(o.dbID).NewBatch()
|
||
|
t.setBatch(o.batchID, b)
|
||
|
h.Recordf("%s", o)
|
||
|
}
|
||
|
|
||
|
func (o *newBatchOp) String() string { return fmt.Sprintf("%s = %s.NewBatch()", o.batchID, o.dbID) }
|
||
|
func (o *newBatchOp) receiver() objID { return o.dbID }
|
||
|
func (o *newBatchOp) syncObjs() objIDSlice {
|
||
|
// NewBatch should not be concurrent with operations that interact with that
|
||
|
// same batch.
|
||
|
return []objID{o.batchID}
|
||
|
}
|
||
|
|
||
|
// newIndexedBatchOp models a Write.NewIndexedBatch operation.
|
||
|
type newIndexedBatchOp struct {
|
||
|
dbID objID
|
||
|
batchID objID
|
||
|
}
|
||
|
|
||
|
func (o *newIndexedBatchOp) run(t *test, h historyRecorder) {
|
||
|
b := t.getDB(o.dbID).NewIndexedBatch()
|
||
|
t.setBatch(o.batchID, b)
|
||
|
h.Recordf("%s", o)
|
||
|
}
|
||
|
|
||
|
func (o *newIndexedBatchOp) String() string {
|
||
|
return fmt.Sprintf("%s = %s.NewIndexedBatch()", o.batchID, o.dbID)
|
||
|
}
|
||
|
func (o *newIndexedBatchOp) receiver() objID { return o.dbID }
|
||
|
func (o *newIndexedBatchOp) syncObjs() objIDSlice {
|
||
|
// NewIndexedBatch should not be concurrent with operations that interact
|
||
|
// with that same batch.
|
||
|
return []objID{o.batchID}
|
||
|
}
|
||
|
|
||
|
// batchCommitOp models a Batch.Commit operation.
|
||
|
type batchCommitOp struct {
|
||
|
dbID objID
|
||
|
batchID objID
|
||
|
}
|
||
|
|
||
|
func (o *batchCommitOp) run(t *test, h historyRecorder) {
|
||
|
b := t.getBatch(o.batchID)
|
||
|
err := b.Commit(t.writeOpts)
|
||
|
h.Recordf("%s // %v", o, err)
|
||
|
}
|
||
|
|
||
|
func (o *batchCommitOp) String() string { return fmt.Sprintf("%s.Commit()", o.batchID) }
|
||
|
func (o *batchCommitOp) receiver() objID { return o.batchID }
|
||
|
func (o *batchCommitOp) syncObjs() objIDSlice {
|
||
|
// Synchronize on the database so that NewIters wait for the commit.
|
||
|
return []objID{o.dbID}
|
||
|
}
|
||
|
|
||
|
// ingestOp models a DB.Ingest operation.
|
||
|
type ingestOp struct {
|
||
|
dbID objID
|
||
|
batchIDs []objID
|
||
|
|
||
|
derivedDBIDs []objID
|
||
|
}
|
||
|
|
||
|
func (o *ingestOp) run(t *test, h historyRecorder) {
|
||
|
// We can only use apply as an alternative for ingestion if we are ingesting
|
||
|
// a single batch. If we are ingesting multiple batches, the batches may
|
||
|
// overlap which would cause ingestion to fail but apply would succeed.
|
||
|
if t.testOpts.ingestUsingApply && len(o.batchIDs) == 1 && o.derivedDBIDs[0] == o.dbID {
|
||
|
id := o.batchIDs[0]
|
||
|
b := t.getBatch(id)
|
||
|
iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
|
||
|
db := t.getDB(o.dbID)
|
||
|
c, err := o.collapseBatch(t, db, iter, rangeDelIter, rangeKeyIter, b)
|
||
|
if err == nil {
|
||
|
err = db.Apply(c, t.writeOpts)
|
||
|
}
|
||
|
_ = b.Close()
|
||
|
_ = c.Close()
|
||
|
t.clearObj(id)
|
||
|
h.Recordf("%s // %v", o, err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
var paths []string
|
||
|
var err error
|
||
|
for i, id := range o.batchIDs {
|
||
|
b := t.getBatch(id)
|
||
|
t.clearObj(id)
|
||
|
path, err2 := o.build(t, h, b, i)
|
||
|
if err2 != nil {
|
||
|
h.Recordf("Build(%s) // %v", id, err2)
|
||
|
}
|
||
|
err = firstError(err, err2)
|
||
|
if err2 == nil {
|
||
|
paths = append(paths, path)
|
||
|
}
|
||
|
err = firstError(err, b.Close())
|
||
|
}
|
||
|
|
||
|
err = firstError(err, withRetries(func() error {
|
||
|
return t.getDB(o.dbID).Ingest(paths)
|
||
|
}))
|
||
|
|
||
|
h.Recordf("%s // %v", o, err)
|
||
|
}
|
||
|
|
||
|
func (o *ingestOp) build(t *test, h historyRecorder, b *pebble.Batch, i int) (string, error) {
|
||
|
path := t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", o.dbID.slot(), i))
|
||
|
f, err := t.opts.FS.Create(path)
|
||
|
if err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
db := t.getDB(o.dbID)
|
||
|
|
||
|
iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
|
||
|
defer closeIters(iter, rangeDelIter, rangeKeyIter)
|
||
|
|
||
|
equal := t.opts.Comparer.Equal
|
||
|
tableFormat := db.FormatMajorVersion().MaxTableFormat()
|
||
|
w := sstable.NewWriter(
|
||
|
objstorageprovider.NewFileWritable(f),
|
||
|
t.opts.MakeWriterOptions(0, tableFormat),
|
||
|
)
|
||
|
|
||
|
var lastUserKey []byte
|
||
|
for key, value := iter.First(); key != nil; key, value = iter.Next() {
|
||
|
// Ignore duplicate keys.
|
||
|
if equal(lastUserKey, key.UserKey) {
|
||
|
continue
|
||
|
}
|
||
|
// NB: We don't have to copy the key or value since we're reading from a
|
||
|
// batch which doesn't do prefix compression.
|
||
|
lastUserKey = key.UserKey
|
||
|
|
||
|
key.SetSeqNum(base.SeqNumZero)
|
||
|
// It's possible that we wrote the key on a batch from a db that supported
|
||
|
// DeleteSized, but are now ingesting into a db that does not. Detect
|
||
|
// this case and translate the key to an InternalKeyKindDelete.
|
||
|
if key.Kind() == pebble.InternalKeyKindDeleteSized && !t.isFMV(o.dbID, pebble.FormatDeleteSizedAndObsolete) {
|
||
|
value = pebble.LazyValue{}
|
||
|
key.SetKind(pebble.InternalKeyKindDelete)
|
||
|
}
|
||
|
if err := w.Add(*key, value.InPlaceValue()); err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
}
|
||
|
if err := iter.Close(); err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
iter = nil
|
||
|
|
||
|
if rangeDelIter != nil {
|
||
|
// NB: The range tombstones have already been fragmented by the Batch.
|
||
|
for t := rangeDelIter.First(); t != nil; t = rangeDelIter.Next() {
|
||
|
// NB: We don't have to copy the key or value since we're reading from a
|
||
|
// batch which doesn't do prefix compression.
|
||
|
if err := w.DeleteRange(t.Start, t.End); err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
}
|
||
|
if err := rangeDelIter.Close(); err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
rangeDelIter = nil
|
||
|
}
|
||
|
|
||
|
if rangeKeyIter != nil {
|
||
|
for span := rangeKeyIter.First(); span != nil; span = rangeKeyIter.Next() {
|
||
|
// Coalesce the keys of this span and then zero the sequence
|
||
|
// numbers. This is necessary in order to make the range keys within
|
||
|
// the ingested sstable internally consistent at the sequence number
|
||
|
// it's ingested at. The individual keys within a batch are
|
||
|
// committed at unique sequence numbers, whereas all the keys of an
|
||
|
// ingested sstable are given the same sequence number. A span
|
||
|
// contaning keys that both set and unset the same suffix at the
|
||
|
// same sequence number is nonsensical, so we "coalesce" or collapse
|
||
|
// the keys.
|
||
|
collapsed := keyspan.Span{
|
||
|
Start: span.Start,
|
||
|
End: span.End,
|
||
|
Keys: make([]keyspan.Key, 0, len(span.Keys)),
|
||
|
}
|
||
|
err = rangekey.Coalesce(t.opts.Comparer.Compare, equal, span.Keys, &collapsed.Keys)
|
||
|
if err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
for i := range collapsed.Keys {
|
||
|
collapsed.Keys[i].Trailer = base.MakeTrailer(0, collapsed.Keys[i].Kind())
|
||
|
}
|
||
|
keyspan.SortKeysByTrailer(&collapsed.Keys)
|
||
|
if err := rangekey.Encode(&collapsed, w.AddRangeKey); err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
}
|
||
|
if err := rangeKeyIter.Error(); err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
if err := rangeKeyIter.Close(); err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
rangeKeyIter = nil
|
||
|
}
|
||
|
|
||
|
if err := w.Close(); err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
return path, nil
|
||
|
}
|
||
|
|
||
|
func (o *ingestOp) receiver() objID { return o.dbID }
|
||
|
func (o *ingestOp) syncObjs() objIDSlice {
|
||
|
// Ingest should not be concurrent with mutating the batches that will be
|
||
|
// ingested as sstables.
|
||
|
objs := make([]objID, 0, len(o.batchIDs)+1)
|
||
|
objs = append(objs, o.batchIDs...)
|
||
|
addedDBs := make(map[objID]struct{})
|
||
|
for i := range o.derivedDBIDs {
|
||
|
_, ok := addedDBs[o.derivedDBIDs[i]]
|
||
|
if !ok && o.derivedDBIDs[i] != o.dbID {
|
||
|
objs = append(objs, o.derivedDBIDs[i])
|
||
|
addedDBs[o.derivedDBIDs[i]] = struct{}{}
|
||
|
}
|
||
|
}
|
||
|
return objs
|
||
|
}
|
||
|
|
||
|
func closeIters(
|
||
|
pointIter base.InternalIterator,
|
||
|
rangeDelIter keyspan.FragmentIterator,
|
||
|
rangeKeyIter keyspan.FragmentIterator,
|
||
|
) {
|
||
|
if pointIter != nil {
|
||
|
pointIter.Close()
|
||
|
}
|
||
|
if rangeDelIter != nil {
|
||
|
rangeDelIter.Close()
|
||
|
}
|
||
|
if rangeKeyIter != nil {
|
||
|
rangeKeyIter.Close()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// collapseBatch collapses the mutations in a batch to be equivalent to an
|
||
|
// sstable ingesting those mutations. Duplicate updates to a key are collapsed
|
||
|
// so that only the latest update is performed. All range deletions are
|
||
|
// performed first in the batch to match the semantics of ingestion where a
|
||
|
// range deletion does not delete a point record contained in the sstable.
|
||
|
func (o *ingestOp) collapseBatch(
|
||
|
t *test,
|
||
|
db *pebble.DB,
|
||
|
pointIter base.InternalIterator,
|
||
|
rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
|
||
|
b *pebble.Batch,
|
||
|
) (*pebble.Batch, error) {
|
||
|
defer closeIters(pointIter, rangeDelIter, rangeKeyIter)
|
||
|
equal := t.opts.Comparer.Equal
|
||
|
collapsed := db.NewBatch()
|
||
|
|
||
|
if rangeDelIter != nil {
|
||
|
// NB: The range tombstones have already been fragmented by the Batch.
|
||
|
for t := rangeDelIter.First(); t != nil; t = rangeDelIter.Next() {
|
||
|
// NB: We don't have to copy the key or value since we're reading from a
|
||
|
// batch which doesn't do prefix compression.
|
||
|
if err := collapsed.DeleteRange(t.Start, t.End, nil); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
}
|
||
|
if err := rangeDelIter.Close(); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
rangeDelIter = nil
|
||
|
}
|
||
|
|
||
|
if pointIter != nil {
|
||
|
var lastUserKey []byte
|
||
|
for key, value := pointIter.First(); key != nil; key, value = pointIter.Next() {
|
||
|
// Ignore duplicate keys.
|
||
|
//
|
||
|
// Note: this is necessary due to MERGE keys, otherwise it would be
|
||
|
// fine to include all the keys in the batch and let the normal
|
||
|
// sequence number precedence determine which of the keys "wins".
|
||
|
// But the code to build the ingested sstable will only keep the
|
||
|
// most recent internal key and will not merge across internal keys.
|
||
|
if equal(lastUserKey, key.UserKey) {
|
||
|
continue
|
||
|
}
|
||
|
// NB: We don't have to copy the key or value since we're reading from a
|
||
|
// batch which doesn't do prefix compression.
|
||
|
lastUserKey = key.UserKey
|
||
|
|
||
|
var err error
|
||
|
switch key.Kind() {
|
||
|
case pebble.InternalKeyKindDelete:
|
||
|
err = collapsed.Delete(key.UserKey, nil)
|
||
|
case pebble.InternalKeyKindDeleteSized:
|
||
|
v, _ := binary.Uvarint(value.InPlaceValue())
|
||
|
// Batch.DeleteSized takes just the length of the value being
|
||
|
// deleted and adds the key's length to derive the overall entry
|
||
|
// size of the value being deleted. This has already been done
|
||
|
// to the key we're reading from the batch, so we must subtract
|
||
|
// the key length from the encoded value before calling
|
||
|
// collapsed.DeleteSized, which will again add the key length
|
||
|
// before encoding.
|
||
|
err = collapsed.DeleteSized(key.UserKey, uint32(v-uint64(len(key.UserKey))), nil)
|
||
|
case pebble.InternalKeyKindSingleDelete:
|
||
|
err = collapsed.SingleDelete(key.UserKey, nil)
|
||
|
case pebble.InternalKeyKindSet:
|
||
|
err = collapsed.Set(key.UserKey, value.InPlaceValue(), nil)
|
||
|
case pebble.InternalKeyKindMerge:
|
||
|
err = collapsed.Merge(key.UserKey, value.InPlaceValue(), nil)
|
||
|
case pebble.InternalKeyKindLogData:
|
||
|
err = collapsed.LogData(key.UserKey, nil)
|
||
|
default:
|
||
|
err = errors.Errorf("unknown batch record kind: %d", key.Kind())
|
||
|
}
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
}
|
||
|
if err := pointIter.Close(); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
pointIter = nil
|
||
|
}
|
||
|
|
||
|
// There's no equivalent of a MERGE operator for range keys, so there's no
|
||
|
// need to collapse the range keys here. Rather than reading the range keys
|
||
|
// from `rangeKeyIter`, which will already be fragmented, read the range
|
||
|
// keys from the batch and copy them verbatim. This marginally improves our
|
||
|
// test coverage over the alternative approach of pre-fragmenting and
|
||
|
// pre-coalescing before writing to the batch.
|
||
|
//
|
||
|
// The `rangeKeyIter` is used only to determine if there are any range keys
|
||
|
// in the batch at all, and only because we already have it handy from
|
||
|
// private.BatchSort.
|
||
|
if rangeKeyIter != nil {
|
||
|
for r := b.Reader(); ; {
|
||
|
kind, key, value, ok, err := r.Next()
|
||
|
if !ok {
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
break
|
||
|
} else if !rangekey.IsRangeKey(kind) {
|
||
|
continue
|
||
|
}
|
||
|
ik := base.MakeInternalKey(key, 0, kind)
|
||
|
if err := collapsed.AddInternalKey(&ik, value, nil); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
}
|
||
|
if err := rangeKeyIter.Close(); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
rangeKeyIter = nil
|
||
|
}
|
||
|
|
||
|
return collapsed, nil
|
||
|
}
|
||
|
|
||
|
func (o *ingestOp) String() string {
|
||
|
var buf strings.Builder
|
||
|
buf.WriteString(o.dbID.String())
|
||
|
buf.WriteString(".Ingest(")
|
||
|
for i, id := range o.batchIDs {
|
||
|
if i > 0 {
|
||
|
buf.WriteString(", ")
|
||
|
}
|
||
|
buf.WriteString(id.String())
|
||
|
}
|
||
|
buf.WriteString(")")
|
||
|
return buf.String()
|
||
|
}
|
||
|
|
||
|
// getOp models a Reader.Get operation.
|
||
|
type getOp struct {
|
||
|
readerID objID
|
||
|
key []byte
|
||
|
derivedDBID objID
|
||
|
}
|
||
|
|
||
|
func (o *getOp) run(t *test, h historyRecorder) {
|
||
|
r := t.getReader(o.readerID)
|
||
|
var val []byte
|
||
|
var closer io.Closer
|
||
|
err := withRetries(func() (err error) {
|
||
|
val, closer, err = r.Get(o.key)
|
||
|
return err
|
||
|
})
|
||
|
h.Recordf("%s // [%q] %v", o, val, err)
|
||
|
if closer != nil {
|
||
|
closer.Close()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (o *getOp) String() string { return fmt.Sprintf("%s.Get(%q)", o.readerID, o.key) }
|
||
|
func (o *getOp) receiver() objID { return o.readerID }
|
||
|
func (o *getOp) syncObjs() objIDSlice {
|
||
|
if o.readerID.tag() == dbTag {
|
||
|
return nil
|
||
|
}
|
||
|
// batch.Get reads through to the current database state.
|
||
|
if o.derivedDBID != 0 {
|
||
|
return []objID{o.derivedDBID}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// newIterOp models a Reader.NewIter operation.
|
||
|
type newIterOp struct {
|
||
|
readerID objID
|
||
|
iterID objID
|
||
|
iterOpts
|
||
|
derivedDBID objID
|
||
|
}
|
||
|
|
||
|
func (o *newIterOp) run(t *test, h historyRecorder) {
|
||
|
r := t.getReader(o.readerID)
|
||
|
opts := iterOptions(o.iterOpts)
|
||
|
|
||
|
var i *pebble.Iterator
|
||
|
for {
|
||
|
i, _ = r.NewIter(opts)
|
||
|
if err := i.Error(); !errors.Is(err, errorfs.ErrInjected) {
|
||
|
break
|
||
|
}
|
||
|
// close this iter and retry NewIter
|
||
|
_ = i.Close()
|
||
|
}
|
||
|
t.setIter(o.iterID, i)
|
||
|
|
||
|
// Trash the bounds to ensure that Pebble doesn't rely on the stability of
|
||
|
// the user-provided bounds.
|
||
|
if opts != nil {
|
||
|
rand.Read(opts.LowerBound[:])
|
||
|
rand.Read(opts.UpperBound[:])
|
||
|
}
|
||
|
h.Recordf("%s // %v", o, i.Error())
|
||
|
}
|
||
|
|
||
|
func (o *newIterOp) String() string {
|
||
|
return fmt.Sprintf("%s = %s.NewIter(%q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
|
||
|
o.iterID, o.readerID, o.lower, o.upper, o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
|
||
|
}
|
||
|
|
||
|
func (o *newIterOp) receiver() objID { return o.readerID }
|
||
|
func (o *newIterOp) syncObjs() objIDSlice {
|
||
|
// Prevent o.iterID ops from running before it exists.
|
||
|
objs := []objID{o.iterID}
|
||
|
// If reading through a batch or snapshot, the new iterator will also observe database
|
||
|
// state, and we must synchronize on the database state for a consistent
|
||
|
// view.
|
||
|
if o.readerID.tag() == batchTag || o.readerID.tag() == snapTag {
|
||
|
objs = append(objs, o.derivedDBID)
|
||
|
}
|
||
|
return objs
|
||
|
}
|
||
|
|
||
|
// newIterUsingCloneOp models a Iterator.Clone operation.
|
||
|
type newIterUsingCloneOp struct {
|
||
|
existingIterID objID
|
||
|
iterID objID
|
||
|
refreshBatch bool
|
||
|
iterOpts
|
||
|
|
||
|
// derivedReaderID is the ID of the underlying reader that backs both the
|
||
|
// existing iterator and the new iterator. The derivedReaderID is NOT
|
||
|
// serialized by String and is derived from other operations during parse.
|
||
|
derivedReaderID objID
|
||
|
}
|
||
|
|
||
|
func (o *newIterUsingCloneOp) run(t *test, h historyRecorder) {
|
||
|
iter := t.getIter(o.existingIterID)
|
||
|
cloneOpts := pebble.CloneOptions{
|
||
|
IterOptions: iterOptions(o.iterOpts),
|
||
|
RefreshBatchView: o.refreshBatch,
|
||
|
}
|
||
|
i, err := iter.iter.Clone(cloneOpts)
|
||
|
if err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
t.setIter(o.iterID, i)
|
||
|
h.Recordf("%s // %v", o, i.Error())
|
||
|
}
|
||
|
|
||
|
func (o *newIterUsingCloneOp) String() string {
|
||
|
return fmt.Sprintf("%s = %s.Clone(%t, %q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
|
||
|
o.iterID, o.existingIterID, o.refreshBatch, o.lower, o.upper,
|
||
|
o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
|
||
|
}
|
||
|
|
||
|
func (o *newIterUsingCloneOp) receiver() objID { return o.existingIterID }
|
||
|
|
||
|
func (o *newIterUsingCloneOp) syncObjs() objIDSlice {
|
||
|
objIDs := []objID{o.iterID}
|
||
|
// If the underlying reader is a batch, we must synchronize with the batch.
|
||
|
// If refreshBatch=true, synchronizing is necessary to observe all the
|
||
|
// mutations up to until this op and no more. Even when refreshBatch=false,
|
||
|
// we must synchronize because iterator construction may access state cached
|
||
|
// on the indexed batch to avoid refragmenting range tombstones or range
|
||
|
// keys.
|
||
|
if o.derivedReaderID.tag() == batchTag {
|
||
|
objIDs = append(objIDs, o.derivedReaderID)
|
||
|
}
|
||
|
return objIDs
|
||
|
}
|
||
|
|
||
|
// iterSetBoundsOp models an Iterator.SetBounds operation.
|
||
|
type iterSetBoundsOp struct {
|
||
|
iterID objID
|
||
|
lower []byte
|
||
|
upper []byte
|
||
|
}
|
||
|
|
||
|
func (o *iterSetBoundsOp) run(t *test, h historyRecorder) {
|
||
|
i := t.getIter(o.iterID)
|
||
|
var lower, upper []byte
|
||
|
if o.lower != nil {
|
||
|
lower = append(lower, o.lower...)
|
||
|
}
|
||
|
if o.upper != nil {
|
||
|
upper = append(upper, o.upper...)
|
||
|
}
|
||
|
i.SetBounds(lower, upper)
|
||
|
|
||
|
// Trash the bounds to ensure that Pebble doesn't rely on the stability of
|
||
|
// the user-provided bounds.
|
||
|
rand.Read(lower[:])
|
||
|
rand.Read(upper[:])
|
||
|
|
||
|
h.Recordf("%s // %v", o, i.Error())
|
||
|
}
|
||
|
|
||
|
func (o *iterSetBoundsOp) String() string {
|
||
|
return fmt.Sprintf("%s.SetBounds(%q, %q)", o.iterID, o.lower, o.upper)
|
||
|
}
|
||
|
|
||
|
func (o *iterSetBoundsOp) receiver() objID { return o.iterID }
|
||
|
func (o *iterSetBoundsOp) syncObjs() objIDSlice { return nil }
|
||
|
|
||
|
// iterSetOptionsOp models an Iterator.SetOptions operation.
|
||
|
type iterSetOptionsOp struct {
|
||
|
iterID objID
|
||
|
iterOpts
|
||
|
|
||
|
// derivedReaderID is the ID of the underlying reader that backs the
|
||
|
// iterator. The derivedReaderID is NOT serialized by String and is derived
|
||
|
// from other operations during parse.
|
||
|
derivedReaderID objID
|
||
|
}
|
||
|
|
||
|
func (o *iterSetOptionsOp) run(t *test, h historyRecorder) {
|
||
|
i := t.getIter(o.iterID)
|
||
|
|
||
|
opts := iterOptions(o.iterOpts)
|
||
|
if opts == nil {
|
||
|
opts = &pebble.IterOptions{}
|
||
|
}
|
||
|
i.SetOptions(opts)
|
||
|
|
||
|
// Trash the bounds to ensure that Pebble doesn't rely on the stability of
|
||
|
// the user-provided bounds.
|
||
|
rand.Read(opts.LowerBound[:])
|
||
|
rand.Read(opts.UpperBound[:])
|
||
|
|
||
|
h.Recordf("%s // %v", o, i.Error())
|
||
|
}
|
||
|
|
||
|
func (o *iterSetOptionsOp) String() string {
|
||
|
return fmt.Sprintf("%s.SetOptions(%q, %q, %d /* key types */, %d, %d, %t /* use L6 filters */, %q /* masking suffix */)",
|
||
|
o.iterID, o.lower, o.upper, o.keyTypes, o.filterMin, o.filterMax, o.useL6Filters, o.maskSuffix)
|
||
|
}
|
||
|
|
||
|
func iterOptions(o iterOpts) *pebble.IterOptions {
|
||
|
if o.IsZero() {
|
||
|
return nil
|
||
|
}
|
||
|
var lower, upper []byte
|
||
|
if o.lower != nil {
|
||
|
lower = append(lower, o.lower...)
|
||
|
}
|
||
|
if o.upper != nil {
|
||
|
upper = append(upper, o.upper...)
|
||
|
}
|
||
|
opts := &pebble.IterOptions{
|
||
|
LowerBound: lower,
|
||
|
UpperBound: upper,
|
||
|
KeyTypes: pebble.IterKeyType(o.keyTypes),
|
||
|
RangeKeyMasking: pebble.RangeKeyMasking{
|
||
|
Suffix: o.maskSuffix,
|
||
|
},
|
||
|
UseL6Filters: o.useL6Filters,
|
||
|
}
|
||
|
if opts.RangeKeyMasking.Suffix != nil {
|
||
|
opts.RangeKeyMasking.Filter = func() pebble.BlockPropertyFilterMask {
|
||
|
return sstable.NewTestKeysMaskingFilter()
|
||
|
}
|
||
|
}
|
||
|
if o.filterMax > 0 {
|
||
|
opts.PointKeyFilters = []pebble.BlockPropertyFilter{
|
||
|
sstable.NewTestKeysBlockPropertyFilter(o.filterMin, o.filterMax),
|
||
|
}
|
||
|
// Enforce the timestamp bounds in SkipPoint, so that the iterator never
|
||
|
// returns a key outside the filterMin, filterMax bounds. This provides
|
||
|
// deterministic iteration.
|
||
|
opts.SkipPoint = func(k []byte) (skip bool) {
|
||
|
n := testkeys.Comparer.Split(k)
|
||
|
if n == len(k) {
|
||
|
// No suffix, don't skip it.
|
||
|
return false
|
||
|
}
|
||
|
v, err := testkeys.ParseSuffix(k[n:])
|
||
|
if err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
ts := uint64(v)
|
||
|
return ts < o.filterMin || ts >= o.filterMax
|
||
|
}
|
||
|
}
|
||
|
return opts
|
||
|
}
|
||
|
|
||
|
func (o *iterSetOptionsOp) receiver() objID { return o.iterID }
|
||
|
|
||
|
func (o *iterSetOptionsOp) syncObjs() objIDSlice {
|
||
|
if o.derivedReaderID.tag() == batchTag {
|
||
|
// If the underlying reader is a batch, we must synchronize with the
|
||
|
// batch so that we observe all the mutations up until this operation
|
||
|
// and no more.
|
||
|
return []objID{o.derivedReaderID}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// iterSeekGEOp models an Iterator.SeekGE[WithLimit] operation.
|
||
|
type iterSeekGEOp struct {
|
||
|
iterID objID
|
||
|
key []byte
|
||
|
limit []byte
|
||
|
|
||
|
derivedReaderID objID
|
||
|
}
|
||
|
|
||
|
func iteratorPos(i *retryableIter) string {
|
||
|
var buf bytes.Buffer
|
||
|
fmt.Fprintf(&buf, "%q", i.Key())
|
||
|
hasPoint, hasRange := i.HasPointAndRange()
|
||
|
if hasPoint {
|
||
|
fmt.Fprintf(&buf, ",%q", i.Value())
|
||
|
} else {
|
||
|
fmt.Fprint(&buf, ",<no point>")
|
||
|
}
|
||
|
if hasRange {
|
||
|
start, end := i.RangeBounds()
|
||
|
fmt.Fprintf(&buf, ",[%q,%q)=>{", start, end)
|
||
|
for i, rk := range i.RangeKeys() {
|
||
|
if i > 0 {
|
||
|
fmt.Fprint(&buf, ",")
|
||
|
}
|
||
|
fmt.Fprintf(&buf, "%q=%q", rk.Suffix, rk.Value)
|
||
|
}
|
||
|
fmt.Fprint(&buf, "}")
|
||
|
} else {
|
||
|
fmt.Fprint(&buf, ",<no range>")
|
||
|
}
|
||
|
if i.RangeKeyChanged() {
|
||
|
fmt.Fprint(&buf, "*")
|
||
|
}
|
||
|
return buf.String()
|
||
|
}
|
||
|
|
||
|
func validBoolToStr(valid bool) string {
|
||
|
return fmt.Sprintf("%t", valid)
|
||
|
}
|
||
|
|
||
|
func validityStateToStr(validity pebble.IterValidityState) (bool, string) {
|
||
|
// We can't distinguish between IterExhausted and IterAtLimit in a
|
||
|
// deterministic manner.
|
||
|
switch validity {
|
||
|
case pebble.IterExhausted, pebble.IterAtLimit:
|
||
|
return false, "invalid"
|
||
|
case pebble.IterValid:
|
||
|
return true, "valid"
|
||
|
default:
|
||
|
panic("unknown validity")
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (o *iterSeekGEOp) run(t *test, h historyRecorder) {
|
||
|
i := t.getIter(o.iterID)
|
||
|
var valid bool
|
||
|
var validStr string
|
||
|
if o.limit == nil {
|
||
|
valid = i.SeekGE(o.key)
|
||
|
validStr = validBoolToStr(valid)
|
||
|
} else {
|
||
|
valid, validStr = validityStateToStr(i.SeekGEWithLimit(o.key, o.limit))
|
||
|
}
|
||
|
if valid {
|
||
|
h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
|
||
|
} else {
|
||
|
h.Recordf("%s // [%s] %v", o, validStr, i.Error())
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (o *iterSeekGEOp) String() string {
|
||
|
return fmt.Sprintf("%s.SeekGE(%q, %q)", o.iterID, o.key, o.limit)
|
||
|
}
|
||
|
func (o *iterSeekGEOp) receiver() objID { return o.iterID }
|
||
|
func (o *iterSeekGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
|
||
|
|
||
|
func onlyBatchIDs(ids ...objID) objIDSlice {
|
||
|
var ret objIDSlice
|
||
|
for _, id := range ids {
|
||
|
if id.tag() == batchTag {
|
||
|
ret = append(ret, id)
|
||
|
}
|
||
|
}
|
||
|
return ret
|
||
|
}
|
||
|
|
||
|
// iterSeekPrefixGEOp models an Iterator.SeekPrefixGE operation.
|
||
|
type iterSeekPrefixGEOp struct {
|
||
|
iterID objID
|
||
|
key []byte
|
||
|
|
||
|
derivedReaderID objID
|
||
|
}
|
||
|
|
||
|
func (o *iterSeekPrefixGEOp) run(t *test, h historyRecorder) {
|
||
|
i := t.getIter(o.iterID)
|
||
|
valid := i.SeekPrefixGE(o.key)
|
||
|
if valid {
|
||
|
h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
|
||
|
} else {
|
||
|
h.Recordf("%s // [%t] %v", o, valid, i.Error())
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (o *iterSeekPrefixGEOp) String() string {
|
||
|
return fmt.Sprintf("%s.SeekPrefixGE(%q)", o.iterID, o.key)
|
||
|
}
|
||
|
func (o *iterSeekPrefixGEOp) receiver() objID { return o.iterID }
|
||
|
func (o *iterSeekPrefixGEOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
|
||
|
|
||
|
// iterSeekLTOp models an Iterator.SeekLT[WithLimit] operation.
|
||
|
type iterSeekLTOp struct {
|
||
|
iterID objID
|
||
|
key []byte
|
||
|
limit []byte
|
||
|
|
||
|
derivedReaderID objID
|
||
|
}
|
||
|
|
||
|
func (o *iterSeekLTOp) run(t *test, h historyRecorder) {
|
||
|
i := t.getIter(o.iterID)
|
||
|
var valid bool
|
||
|
var validStr string
|
||
|
if o.limit == nil {
|
||
|
valid = i.SeekLT(o.key)
|
||
|
validStr = validBoolToStr(valid)
|
||
|
} else {
|
||
|
valid, validStr = validityStateToStr(i.SeekLTWithLimit(o.key, o.limit))
|
||
|
}
|
||
|
if valid {
|
||
|
h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
|
||
|
} else {
|
||
|
h.Recordf("%s // [%s] %v", o, validStr, i.Error())
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (o *iterSeekLTOp) String() string {
|
||
|
return fmt.Sprintf("%s.SeekLT(%q, %q)", o.iterID, o.key, o.limit)
|
||
|
}
|
||
|
|
||
|
func (o *iterSeekLTOp) receiver() objID { return o.iterID }
|
||
|
func (o *iterSeekLTOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
|
||
|
|
||
|
// iterFirstOp models an Iterator.First operation.
|
||
|
type iterFirstOp struct {
|
||
|
iterID objID
|
||
|
|
||
|
derivedReaderID objID
|
||
|
}
|
||
|
|
||
|
func (o *iterFirstOp) run(t *test, h historyRecorder) {
|
||
|
i := t.getIter(o.iterID)
|
||
|
valid := i.First()
|
||
|
if valid {
|
||
|
h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
|
||
|
} else {
|
||
|
h.Recordf("%s // [%t] %v", o, valid, i.Error())
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (o *iterFirstOp) String() string { return fmt.Sprintf("%s.First()", o.iterID) }
|
||
|
func (o *iterFirstOp) receiver() objID { return o.iterID }
|
||
|
func (o *iterFirstOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
|
||
|
|
||
|
// iterLastOp models an Iterator.Last operation.
|
||
|
type iterLastOp struct {
|
||
|
iterID objID
|
||
|
|
||
|
derivedReaderID objID
|
||
|
}
|
||
|
|
||
|
func (o *iterLastOp) run(t *test, h historyRecorder) {
|
||
|
i := t.getIter(o.iterID)
|
||
|
valid := i.Last()
|
||
|
if valid {
|
||
|
h.Recordf("%s // [%t,%s] %v", o, valid, iteratorPos(i), i.Error())
|
||
|
} else {
|
||
|
h.Recordf("%s // [%t] %v", o, valid, i.Error())
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (o *iterLastOp) String() string { return fmt.Sprintf("%s.Last()", o.iterID) }
|
||
|
func (o *iterLastOp) receiver() objID { return o.iterID }
|
||
|
func (o *iterLastOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
|
||
|
|
||
|
// iterNextOp models an Iterator.Next[WithLimit] operation.
|
||
|
type iterNextOp struct {
|
||
|
iterID objID
|
||
|
limit []byte
|
||
|
|
||
|
derivedReaderID objID
|
||
|
}
|
||
|
|
||
|
func (o *iterNextOp) run(t *test, h historyRecorder) {
|
||
|
i := t.getIter(o.iterID)
|
||
|
var valid bool
|
||
|
var validStr string
|
||
|
if o.limit == nil {
|
||
|
valid = i.Next()
|
||
|
validStr = validBoolToStr(valid)
|
||
|
} else {
|
||
|
valid, validStr = validityStateToStr(i.NextWithLimit(o.limit))
|
||
|
}
|
||
|
if valid {
|
||
|
h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
|
||
|
} else {
|
||
|
h.Recordf("%s // [%s] %v", o, validStr, i.Error())
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (o *iterNextOp) String() string { return fmt.Sprintf("%s.Next(%q)", o.iterID, o.limit) }
|
||
|
func (o *iterNextOp) receiver() objID { return o.iterID }
|
||
|
func (o *iterNextOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
|
||
|
|
||
|
// iterNextPrefixOp models an Iterator.NextPrefix operation.
|
||
|
type iterNextPrefixOp struct {
|
||
|
iterID objID
|
||
|
|
||
|
derivedReaderID objID
|
||
|
}
|
||
|
|
||
|
func (o *iterNextPrefixOp) run(t *test, h historyRecorder) {
|
||
|
i := t.getIter(o.iterID)
|
||
|
valid := i.NextPrefix()
|
||
|
validStr := validBoolToStr(valid)
|
||
|
if valid {
|
||
|
h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
|
||
|
} else {
|
||
|
h.Recordf("%s // [%s] %v", o, validStr, i.Error())
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (o *iterNextPrefixOp) String() string { return fmt.Sprintf("%s.NextPrefix()", o.iterID) }
|
||
|
func (o *iterNextPrefixOp) receiver() objID { return o.iterID }
|
||
|
func (o *iterNextPrefixOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
|
||
|
|
||
|
// iterCanSingleDelOp models a call to CanDeterministicallySingleDelete with an
|
||
|
// Iterator.
|
||
|
type iterCanSingleDelOp struct {
|
||
|
iterID objID
|
||
|
|
||
|
derivedReaderID objID
|
||
|
}
|
||
|
|
||
|
func (o *iterCanSingleDelOp) run(t *test, h historyRecorder) {
|
||
|
// TODO(jackson): When we perform error injection, we'll need to rethink
|
||
|
// this.
|
||
|
_, err := pebble.CanDeterministicallySingleDelete(t.getIter(o.iterID).iter)
|
||
|
// The return value of CanDeterministicallySingleDelete is dependent on
|
||
|
// internal LSM state and non-deterministic, so we don't record it.
|
||
|
// Including the operation within the metamorphic test at all helps ensure
|
||
|
// that it does not change the result of any other Iterator operation that
|
||
|
// should be deterministic, regardless of its own outcome.
|
||
|
//
|
||
|
// We still record the value of the error because it's deterministic, at
|
||
|
// least for now. The possible error cases are:
|
||
|
// - The iterator was already in an error state when the operation ran.
|
||
|
// - The operation is deterministically invalid (like using an InternalNext
|
||
|
// to change directions.)
|
||
|
h.Recordf("%s // %v", o, err)
|
||
|
}
|
||
|
|
||
|
func (o *iterCanSingleDelOp) String() string { return fmt.Sprintf("%s.InternalNext()", o.iterID) }
|
||
|
func (o *iterCanSingleDelOp) receiver() objID { return o.iterID }
|
||
|
func (o *iterCanSingleDelOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
|
||
|
|
||
|
// iterPrevOp models an Iterator.Prev[WithLimit] operation.
|
||
|
type iterPrevOp struct {
|
||
|
iterID objID
|
||
|
limit []byte
|
||
|
|
||
|
derivedReaderID objID
|
||
|
}
|
||
|
|
||
|
func (o *iterPrevOp) run(t *test, h historyRecorder) {
|
||
|
i := t.getIter(o.iterID)
|
||
|
var valid bool
|
||
|
var validStr string
|
||
|
if o.limit == nil {
|
||
|
valid = i.Prev()
|
||
|
validStr = validBoolToStr(valid)
|
||
|
} else {
|
||
|
valid, validStr = validityStateToStr(i.PrevWithLimit(o.limit))
|
||
|
}
|
||
|
if valid {
|
||
|
h.Recordf("%s // [%s,%s] %v", o, validStr, iteratorPos(i), i.Error())
|
||
|
} else {
|
||
|
h.Recordf("%s // [%s] %v", o, validStr, i.Error())
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (o *iterPrevOp) String() string { return fmt.Sprintf("%s.Prev(%q)", o.iterID, o.limit) }
|
||
|
func (o *iterPrevOp) receiver() objID { return o.iterID }
|
||
|
func (o *iterPrevOp) syncObjs() objIDSlice { return onlyBatchIDs(o.derivedReaderID) }
|
||
|
|
||
|
// newSnapshotOp models a DB.NewSnapshot operation.
|
||
|
type newSnapshotOp struct {
|
||
|
dbID objID
|
||
|
snapID objID
|
||
|
// If nonempty, this snapshot must not be used to read any keys outside of
|
||
|
// the provided bounds. This allows some implementations to use 'Eventually
|
||
|
// file-only snapshots,' which require bounds.
|
||
|
bounds []pebble.KeyRange
|
||
|
}
|
||
|
|
||
|
func (o *newSnapshotOp) run(t *test, h historyRecorder) {
|
||
|
// Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
|
||
|
if len(t.dbs) > 1 || (len(o.bounds) > 0 && ((11400714819323198485*uint64(t.idx)*t.testOpts.seedEFOS)>>63) == 1) {
|
||
|
s := t.getDB(o.dbID).NewEventuallyFileOnlySnapshot(o.bounds)
|
||
|
t.setSnapshot(o.snapID, s)
|
||
|
} else {
|
||
|
s := t.getDB(o.dbID).NewSnapshot()
|
||
|
t.setSnapshot(o.snapID, s)
|
||
|
}
|
||
|
h.Recordf("%s", o)
|
||
|
}
|
||
|
|
||
|
func (o *newSnapshotOp) String() string {
|
||
|
var buf bytes.Buffer
|
||
|
fmt.Fprintf(&buf, "%s = %s.NewSnapshot(", o.snapID, o.dbID)
|
||
|
for i := range o.bounds {
|
||
|
if i > 0 {
|
||
|
fmt.Fprint(&buf, ", ")
|
||
|
}
|
||
|
fmt.Fprintf(&buf, "%q, %q", o.bounds[i].Start, o.bounds[i].End)
|
||
|
}
|
||
|
fmt.Fprint(&buf, ")")
|
||
|
return buf.String()
|
||
|
}
|
||
|
func (o *newSnapshotOp) receiver() objID { return o.dbID }
|
||
|
func (o *newSnapshotOp) syncObjs() objIDSlice { return []objID{o.snapID} }
|
||
|
|
||
|
type dbRatchetFormatMajorVersionOp struct {
|
||
|
dbID objID
|
||
|
vers pebble.FormatMajorVersion
|
||
|
}
|
||
|
|
||
|
func (o *dbRatchetFormatMajorVersionOp) run(t *test, h historyRecorder) {
|
||
|
var err error
|
||
|
// NB: We no-op the operation if we're already at or above the provided
|
||
|
// format major version. Different runs start at different format major
|
||
|
// versions, making the presence of an error and the error message itself
|
||
|
// non-deterministic if we attempt to upgrade to an older version.
|
||
|
//
|
||
|
//Regardless, subsequent operations should behave identically, which is what
|
||
|
//we're really aiming to test by including this format major version ratchet
|
||
|
//operation.
|
||
|
if t.getDB(o.dbID).FormatMajorVersion() < o.vers {
|
||
|
err = t.getDB(o.dbID).RatchetFormatMajorVersion(o.vers)
|
||
|
}
|
||
|
h.Recordf("%s // %v", o, err)
|
||
|
}
|
||
|
|
||
|
func (o *dbRatchetFormatMajorVersionOp) String() string {
|
||
|
return fmt.Sprintf("%s.RatchetFormatMajorVersion(%s)", o.dbID, o.vers)
|
||
|
}
|
||
|
func (o *dbRatchetFormatMajorVersionOp) receiver() objID { return o.dbID }
|
||
|
func (o *dbRatchetFormatMajorVersionOp) syncObjs() objIDSlice { return nil }
|
||
|
|
||
|
type dbRestartOp struct {
|
||
|
dbID objID
|
||
|
}
|
||
|
|
||
|
func (o *dbRestartOp) run(t *test, h historyRecorder) {
|
||
|
if err := t.restartDB(o.dbID); err != nil {
|
||
|
h.Recordf("%s // %v", o, err)
|
||
|
h.history.err.Store(errors.Wrap(err, "dbRestartOp"))
|
||
|
} else {
|
||
|
h.Recordf("%s", o)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (o *dbRestartOp) String() string { return fmt.Sprintf("%s.Restart()", o.dbID) }
|
||
|
func (o *dbRestartOp) receiver() objID { return o.dbID }
|
||
|
func (o *dbRestartOp) syncObjs() objIDSlice { return nil }
|
||
|
|
||
|
func formatOps(ops []op) string {
|
||
|
var buf strings.Builder
|
||
|
for _, op := range ops {
|
||
|
fmt.Fprintf(&buf, "%s\n", op)
|
||
|
}
|
||
|
return buf.String()
|
||
|
}
|
||
|
|
||
|
// replicateOp models an operation that could copy keys from one db to
|
||
|
// another through either an IngestAndExcise, or an Ingest.
|
||
|
type replicateOp struct {
|
||
|
source, dest objID
|
||
|
start, end []byte
|
||
|
}
|
||
|
|
||
|
func (r *replicateOp) runSharedReplicate(
|
||
|
t *test, h historyRecorder, source, dest *pebble.DB, w *sstable.Writer, sstPath string,
|
||
|
) {
|
||
|
var sharedSSTs []pebble.SharedSSTMeta
|
||
|
var err error
|
||
|
err = source.ScanInternal(context.TODO(), sstable.CategoryAndQoS{}, r.start, r.end,
|
||
|
func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error {
|
||
|
val, _, err := value.Value(nil)
|
||
|
if err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
return w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val)
|
||
|
},
|
||
|
func(start, end []byte, seqNum uint64) error {
|
||
|
return w.DeleteRange(start, end)
|
||
|
},
|
||
|
func(start, end []byte, keys []keyspan.Key) error {
|
||
|
s := keyspan.Span{
|
||
|
Start: start,
|
||
|
End: end,
|
||
|
Keys: keys,
|
||
|
KeysOrder: 0,
|
||
|
}
|
||
|
return rangekey.Encode(&s, func(k base.InternalKey, v []byte) error {
|
||
|
return w.AddRangeKey(base.MakeInternalKey(k.UserKey, 0, k.Kind()), v)
|
||
|
})
|
||
|
},
|
||
|
func(sst *pebble.SharedSSTMeta) error {
|
||
|
sharedSSTs = append(sharedSSTs, *sst)
|
||
|
return nil
|
||
|
},
|
||
|
)
|
||
|
if err != nil {
|
||
|
h.Recordf("%s // %v", r, err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
_, err = dest.IngestAndExcise([]string{sstPath}, sharedSSTs, pebble.KeyRange{Start: r.start, End: r.end})
|
||
|
h.Recordf("%s // %v", r, err)
|
||
|
}
|
||
|
|
||
|
func (r *replicateOp) run(t *test, h historyRecorder) {
|
||
|
// Shared replication only works if shared storage is enabled.
|
||
|
useSharedIngest := t.testOpts.useSharedReplicate
|
||
|
if !t.testOpts.sharedStorageEnabled {
|
||
|
useSharedIngest = false
|
||
|
}
|
||
|
|
||
|
source := t.getDB(r.source)
|
||
|
dest := t.getDB(r.dest)
|
||
|
sstPath := path.Join(t.tmpDir, fmt.Sprintf("ext-replicate%d.sst", t.idx))
|
||
|
f, err := t.opts.FS.Create(sstPath)
|
||
|
if err != nil {
|
||
|
h.Recordf("%s // %v", r, err)
|
||
|
return
|
||
|
}
|
||
|
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), t.opts.MakeWriterOptions(0, dest.FormatMajorVersion().MaxTableFormat()))
|
||
|
|
||
|
if useSharedIngest {
|
||
|
r.runSharedReplicate(t, h, source, dest, w, sstPath)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
iter, err := source.NewIter(&pebble.IterOptions{
|
||
|
LowerBound: r.start,
|
||
|
UpperBound: r.end,
|
||
|
KeyTypes: pebble.IterKeyTypePointsAndRanges,
|
||
|
})
|
||
|
if err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
defer iter.Close()
|
||
|
|
||
|
// Write rangedels and rangekeydels for the range. This mimics the Excise
|
||
|
// that runSharedReplicate would do.
|
||
|
if err := w.DeleteRange(r.start, r.end); err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
if err := w.RangeKeyDelete(r.start, r.end); err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
|
||
|
for ok := iter.SeekGE(r.start); ok && iter.Error() != nil; ok = iter.Next() {
|
||
|
hasPoint, hasRange := iter.HasPointAndRange()
|
||
|
if hasPoint {
|
||
|
val, err := iter.ValueAndErr()
|
||
|
if err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
if err := w.Set(iter.Key(), val); err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
}
|
||
|
if hasRange && iter.RangeKeyChanged() {
|
||
|
rangeKeys := iter.RangeKeys()
|
||
|
rkStart, rkEnd := iter.RangeBounds()
|
||
|
for i := range rangeKeys {
|
||
|
if err := w.RangeKeySet(rkStart, rkEnd, rangeKeys[i].Suffix, rangeKeys[i].Value); err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
if err := w.Close(); err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
|
||
|
err = dest.Ingest([]string{sstPath})
|
||
|
h.Recordf("%s // %v", r, err)
|
||
|
}
|
||
|
|
||
|
func (r *replicateOp) String() string {
|
||
|
return fmt.Sprintf("%s.Replicate(%s, %q, %q)", r.source, r.dest, r.start, r.end)
|
||
|
}
|
||
|
|
||
|
func (r *replicateOp) receiver() objID { return r.source }
|
||
|
func (r *replicateOp) syncObjs() objIDSlice { return objIDSlice{r.dest} }
|