mirror of
https://source.quilibrium.com/quilibrium/ceremonyclient.git
synced 2025-01-23 14:15:18 +00:00
2118 lines
60 KiB
Go
2118 lines
60 KiB
Go
|
// Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
|
||
|
// of this source code is governed by a BSD-style license that can be found in
|
||
|
// the LICENSE file.
|
||
|
|
||
|
package sstable
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"context"
|
||
|
"encoding/binary"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"math"
|
||
|
"os"
|
||
|
"path"
|
||
|
"path/filepath"
|
||
|
"strings"
|
||
|
"testing"
|
||
|
"time"
|
||
|
|
||
|
"github.com/cockroachdb/datadriven"
|
||
|
"github.com/cockroachdb/errors"
|
||
|
"github.com/cockroachdb/pebble/bloom"
|
||
|
"github.com/cockroachdb/pebble/internal/base"
|
||
|
"github.com/cockroachdb/pebble/internal/cache"
|
||
|
"github.com/cockroachdb/pebble/internal/humanize"
|
||
|
"github.com/cockroachdb/pebble/internal/manifest"
|
||
|
"github.com/cockroachdb/pebble/internal/testkeys"
|
||
|
"github.com/cockroachdb/pebble/objstorage"
|
||
|
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
|
||
|
"github.com/cockroachdb/pebble/vfs"
|
||
|
"github.com/cockroachdb/pebble/vfs/errorfs"
|
||
|
"github.com/stretchr/testify/require"
|
||
|
"golang.org/x/exp/rand"
|
||
|
)
|
||
|
|
||
|
// get is a testing helper that simulates a read and helps verify bloom filters
|
||
|
// until they are available through iterators.
|
||
|
func (r *Reader) get(key []byte) (value []byte, err error) {
|
||
|
if r.err != nil {
|
||
|
return nil, r.err
|
||
|
}
|
||
|
|
||
|
if r.tableFilter != nil {
|
||
|
dataH, err := r.readFilter(context.Background(), nil /* stats */, nil)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
var lookupKey []byte
|
||
|
if r.Split != nil {
|
||
|
lookupKey = key[:r.Split(key)]
|
||
|
} else {
|
||
|
lookupKey = key
|
||
|
}
|
||
|
mayContain := r.tableFilter.mayContain(dataH.Get(), lookupKey)
|
||
|
dataH.Release()
|
||
|
if !mayContain {
|
||
|
return nil, base.ErrNotFound
|
||
|
}
|
||
|
}
|
||
|
|
||
|
i, err := r.NewIter(nil /* lower */, nil /* upper */)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
var v base.LazyValue
|
||
|
ikey, v := i.SeekGE(key, base.SeekGEFlagsNone)
|
||
|
value, _, err = v.Value(nil)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
if ikey == nil || r.Compare(key, ikey.UserKey) != 0 {
|
||
|
err := i.Close()
|
||
|
if err == nil {
|
||
|
err = base.ErrNotFound
|
||
|
}
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
// The value will be "freed" when the iterator is closed, so make a copy
|
||
|
// which will outlast the lifetime of the iterator.
|
||
|
newValue := make([]byte, len(value))
|
||
|
copy(newValue, value)
|
||
|
if err := i.Close(); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
return newValue, nil
|
||
|
}
|
||
|
|
||
|
// iterAdapter adapts the new Iterator API which returns the key and value from
|
||
|
// positioning methods (Seek*, First, Last, Next, Prev) to the old API which
|
||
|
// returned a boolean corresponding to Valid. Only used by test code.
|
||
|
type iterAdapter struct {
|
||
|
Iterator
|
||
|
key *InternalKey
|
||
|
val []byte
|
||
|
}
|
||
|
|
||
|
func newIterAdapter(iter Iterator) *iterAdapter {
|
||
|
return &iterAdapter{
|
||
|
Iterator: iter,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (i *iterAdapter) update(key *InternalKey, val base.LazyValue) bool {
|
||
|
i.key = key
|
||
|
if v, _, err := val.Value(nil); err != nil {
|
||
|
i.key = nil
|
||
|
i.val = nil
|
||
|
} else {
|
||
|
i.val = v
|
||
|
}
|
||
|
return i.key != nil
|
||
|
}
|
||
|
|
||
|
func (i *iterAdapter) String() string {
|
||
|
return "iter-adapter"
|
||
|
}
|
||
|
|
||
|
func (i *iterAdapter) SeekGE(key []byte, flags base.SeekGEFlags) bool {
|
||
|
return i.update(i.Iterator.SeekGE(key, flags))
|
||
|
}
|
||
|
|
||
|
func (i *iterAdapter) SeekPrefixGE(prefix, key []byte, flags base.SeekGEFlags) bool {
|
||
|
return i.update(i.Iterator.SeekPrefixGE(prefix, key, flags))
|
||
|
}
|
||
|
|
||
|
func (i *iterAdapter) SeekLT(key []byte, flags base.SeekLTFlags) bool {
|
||
|
return i.update(i.Iterator.SeekLT(key, flags))
|
||
|
}
|
||
|
|
||
|
func (i *iterAdapter) First() bool {
|
||
|
return i.update(i.Iterator.First())
|
||
|
}
|
||
|
|
||
|
func (i *iterAdapter) Last() bool {
|
||
|
return i.update(i.Iterator.Last())
|
||
|
}
|
||
|
|
||
|
func (i *iterAdapter) Next() bool {
|
||
|
return i.update(i.Iterator.Next())
|
||
|
}
|
||
|
|
||
|
func (i *iterAdapter) NextPrefix(succKey []byte) bool {
|
||
|
return i.update(i.Iterator.NextPrefix(succKey))
|
||
|
}
|
||
|
|
||
|
func (i *iterAdapter) NextIgnoreResult() {
|
||
|
i.Iterator.Next()
|
||
|
i.update(nil, base.LazyValue{})
|
||
|
}
|
||
|
|
||
|
func (i *iterAdapter) Prev() bool {
|
||
|
return i.update(i.Iterator.Prev())
|
||
|
}
|
||
|
|
||
|
func (i *iterAdapter) Key() *InternalKey {
|
||
|
return i.key
|
||
|
}
|
||
|
|
||
|
func (i *iterAdapter) Value() []byte {
|
||
|
return i.val
|
||
|
}
|
||
|
|
||
|
func (i *iterAdapter) Valid() bool {
|
||
|
return i.key != nil
|
||
|
}
|
||
|
|
||
|
func (i *iterAdapter) SetBounds(lower, upper []byte) {
|
||
|
i.Iterator.SetBounds(lower, upper)
|
||
|
i.key = nil
|
||
|
}
|
||
|
|
||
|
func (i *iterAdapter) SetContext(ctx context.Context) {
|
||
|
i.Iterator.SetContext(ctx)
|
||
|
}
|
||
|
|
||
|
func TestVirtualReader(t *testing.T) {
|
||
|
// A faux filenum used to create fake filemetadata for testing.
|
||
|
var fileNum int = 1
|
||
|
nextFileNum := func() base.FileNum {
|
||
|
fileNum++
|
||
|
return base.FileNum(fileNum - 1)
|
||
|
}
|
||
|
|
||
|
// Set during the latest build command.
|
||
|
var r *Reader
|
||
|
var meta manifest.PhysicalFileMeta
|
||
|
var bp BufferPool
|
||
|
|
||
|
// Set during the latest virtualize command.
|
||
|
var vMeta1 manifest.VirtualFileMeta
|
||
|
var v VirtualReader
|
||
|
|
||
|
defer func() {
|
||
|
if r != nil {
|
||
|
require.NoError(t, r.Close())
|
||
|
bp.Release()
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
createPhysicalMeta := func(w *WriterMetadata, r *Reader) (manifest.PhysicalFileMeta, error) {
|
||
|
meta := &manifest.FileMetadata{}
|
||
|
meta.FileNum = nextFileNum()
|
||
|
meta.CreationTime = time.Now().Unix()
|
||
|
meta.Size = w.Size
|
||
|
meta.SmallestSeqNum = w.SmallestSeqNum
|
||
|
meta.LargestSeqNum = w.LargestSeqNum
|
||
|
|
||
|
if w.HasPointKeys {
|
||
|
meta.ExtendPointKeyBounds(r.Compare, w.SmallestPoint, w.LargestPoint)
|
||
|
}
|
||
|
if w.HasRangeDelKeys {
|
||
|
meta.ExtendPointKeyBounds(r.Compare, w.SmallestRangeDel, w.LargestRangeDel)
|
||
|
}
|
||
|
if w.HasRangeKeys {
|
||
|
meta.ExtendRangeKeyBounds(r.Compare, w.SmallestRangeKey, w.LargestRangeKey)
|
||
|
}
|
||
|
meta.InitPhysicalBacking()
|
||
|
|
||
|
if err := meta.Validate(r.Compare, r.opts.Comparer.FormatKey); err != nil {
|
||
|
return manifest.PhysicalFileMeta{}, err
|
||
|
}
|
||
|
|
||
|
return meta.PhysicalMeta(), nil
|
||
|
}
|
||
|
|
||
|
formatWMeta := func(m *WriterMetadata) string {
|
||
|
var b bytes.Buffer
|
||
|
if m.HasPointKeys {
|
||
|
fmt.Fprintf(&b, "point: [%s-%s]\n", m.SmallestPoint, m.LargestPoint)
|
||
|
}
|
||
|
if m.HasRangeDelKeys {
|
||
|
fmt.Fprintf(&b, "rangedel: [%s-%s]\n", m.SmallestRangeDel, m.LargestRangeDel)
|
||
|
}
|
||
|
if m.HasRangeKeys {
|
||
|
fmt.Fprintf(&b, "rangekey: [%s-%s]\n", m.SmallestRangeKey, m.LargestRangeKey)
|
||
|
}
|
||
|
fmt.Fprintf(&b, "seqnums: [%d-%d]\n", m.SmallestSeqNum, m.LargestSeqNum)
|
||
|
return b.String()
|
||
|
}
|
||
|
|
||
|
formatVirtualReader := func(v *VirtualReader) string {
|
||
|
var b bytes.Buffer
|
||
|
fmt.Fprintf(&b, "bounds: [%s-%s]\n", v.vState.lower, v.vState.upper)
|
||
|
fmt.Fprintf(&b, "filenum: %s\n", v.vState.fileNum.String())
|
||
|
fmt.Fprintf(
|
||
|
&b, "props: %s: %d, %s: %d, %s: %d, %s: %d, %s: %d, %s: %d, %s: %d, %s: %d, %s: %d, %s: %d, %s: %d\n",
|
||
|
"NumEntries",
|
||
|
v.Properties.NumEntries,
|
||
|
"RawKeySize",
|
||
|
v.Properties.RawKeySize,
|
||
|
"RawValueSize",
|
||
|
v.Properties.RawValueSize,
|
||
|
"RawPointTombstoneKeySize",
|
||
|
v.Properties.RawPointTombstoneKeySize,
|
||
|
"RawPointTombstoneValueSize",
|
||
|
v.Properties.RawPointTombstoneValueSize,
|
||
|
"NumSizedDeletions",
|
||
|
v.Properties.NumSizedDeletions,
|
||
|
"NumDeletions",
|
||
|
v.Properties.NumDeletions,
|
||
|
"NumRangeDeletions",
|
||
|
v.Properties.NumRangeDeletions,
|
||
|
"NumRangeKeyDels",
|
||
|
v.Properties.NumRangeKeyDels,
|
||
|
"NumRangeKeySets",
|
||
|
v.Properties.NumRangeKeySets,
|
||
|
"ValueBlocksSize",
|
||
|
v.Properties.ValueBlocksSize,
|
||
|
)
|
||
|
return b.String()
|
||
|
}
|
||
|
|
||
|
datadriven.RunTest(t, "testdata/virtual_reader", func(t *testing.T, td *datadriven.TestData) string {
|
||
|
switch td.Cmd {
|
||
|
case "build":
|
||
|
if r != nil {
|
||
|
bp.Release()
|
||
|
_ = r.Close()
|
||
|
r = nil
|
||
|
meta.FileMetadata = nil
|
||
|
vMeta1.FileMetadata = nil
|
||
|
v = VirtualReader{}
|
||
|
}
|
||
|
var wMeta *WriterMetadata
|
||
|
var err error
|
||
|
writerOpts := &WriterOptions{
|
||
|
TableFormat: TableFormatMax,
|
||
|
}
|
||
|
// Use a single level index by default.
|
||
|
writerOpts.IndexBlockSize = 100000
|
||
|
if len(td.CmdArgs) == 1 {
|
||
|
if td.CmdArgs[0].String() == "twoLevel" {
|
||
|
// Force a two level index.
|
||
|
writerOpts.IndexBlockSize = 1
|
||
|
writerOpts.BlockSize = 1
|
||
|
}
|
||
|
}
|
||
|
wMeta, r, err = runBuildCmd(td, writerOpts, 0)
|
||
|
if err != nil {
|
||
|
return err.Error()
|
||
|
}
|
||
|
bp.Init(5)
|
||
|
|
||
|
// Create a fake filemetada using the writer meta.
|
||
|
meta, err = createPhysicalMeta(wMeta, r)
|
||
|
if err != nil {
|
||
|
return err.Error()
|
||
|
}
|
||
|
r.fileNum = meta.FileBacking.DiskFileNum
|
||
|
return formatWMeta(wMeta)
|
||
|
|
||
|
case "virtualize":
|
||
|
// virtualize will split the previously built physical sstable into
|
||
|
// a single sstable with virtual bounds. The command assumes that
|
||
|
// the bounds for the virtual sstable are valid. For the purposes of
|
||
|
// this command the bounds must be valid keys. In general, and for
|
||
|
// this command, range key/range del spans must also not span across
|
||
|
// virtual sstable bounds.
|
||
|
if meta.FileMetadata == nil {
|
||
|
return "build must be called at least once before virtualize"
|
||
|
}
|
||
|
if vMeta1.FileMetadata != nil {
|
||
|
vMeta1.FileMetadata = nil
|
||
|
v = VirtualReader{}
|
||
|
}
|
||
|
vMeta := &manifest.FileMetadata{
|
||
|
FileBacking: meta.FileBacking,
|
||
|
SmallestSeqNum: meta.SmallestSeqNum,
|
||
|
LargestSeqNum: meta.LargestSeqNum,
|
||
|
Virtual: true,
|
||
|
}
|
||
|
// Parse the virtualization bounds.
|
||
|
bounds := strings.Split(td.CmdArgs[0].String(), "-")
|
||
|
vMeta.Smallest = base.ParseInternalKey(bounds[0])
|
||
|
vMeta.Largest = base.ParseInternalKey(bounds[1])
|
||
|
vMeta.FileNum = nextFileNum()
|
||
|
var err error
|
||
|
vMeta.Size, err = r.EstimateDiskUsage(vMeta.Smallest.UserKey, vMeta.Largest.UserKey)
|
||
|
if err != nil {
|
||
|
return err.Error()
|
||
|
}
|
||
|
vMeta.ValidateVirtual(meta.FileMetadata)
|
||
|
|
||
|
vMeta1 = vMeta.VirtualMeta()
|
||
|
v = MakeVirtualReader(r, vMeta1, false /* isForeign */)
|
||
|
return formatVirtualReader(&v)
|
||
|
|
||
|
case "citer":
|
||
|
// Creates a compaction iterator from the virtual reader, and then
|
||
|
// just scans the keyspace. Which is all a compaction iterator is
|
||
|
// used for. This tests the First and Next calls.
|
||
|
if vMeta1.FileMetadata == nil {
|
||
|
return "virtualize must be called before creating compaction iters"
|
||
|
}
|
||
|
|
||
|
var rp ReaderProvider
|
||
|
var bytesIterated uint64
|
||
|
iter, err := v.NewCompactionIter(&bytesIterated, CategoryAndQoS{}, nil, rp, &bp)
|
||
|
if err != nil {
|
||
|
return err.Error()
|
||
|
}
|
||
|
|
||
|
var buf bytes.Buffer
|
||
|
for key, val := iter.First(); key != nil; key, val = iter.Next() {
|
||
|
fmt.Fprintf(&buf, "%s:%s\n", key.String(), val.InPlaceValue())
|
||
|
}
|
||
|
err = iter.Close()
|
||
|
if err != nil {
|
||
|
return err.Error()
|
||
|
}
|
||
|
return buf.String()
|
||
|
|
||
|
case "constrain":
|
||
|
if vMeta1.FileMetadata == nil {
|
||
|
return "virtualize must be called before constrain"
|
||
|
}
|
||
|
splits := strings.Split(td.CmdArgs[0].String(), ",")
|
||
|
of, ol := []byte(splits[0]), []byte(splits[1])
|
||
|
inclusive, f, l := v.vState.constrainBounds(of, ol, splits[2] == "true")
|
||
|
var buf bytes.Buffer
|
||
|
buf.Write(f)
|
||
|
buf.WriteByte(',')
|
||
|
buf.Write(l)
|
||
|
buf.WriteByte(',')
|
||
|
if inclusive {
|
||
|
buf.WriteString("true")
|
||
|
} else {
|
||
|
buf.WriteString("false")
|
||
|
}
|
||
|
buf.WriteByte('\n')
|
||
|
return buf.String()
|
||
|
|
||
|
case "scan-range-del":
|
||
|
if vMeta1.FileMetadata == nil {
|
||
|
return "virtualize must be called before scan-range-del"
|
||
|
}
|
||
|
iter, err := v.NewRawRangeDelIter()
|
||
|
if err != nil {
|
||
|
return err.Error()
|
||
|
}
|
||
|
if iter == nil {
|
||
|
return ""
|
||
|
}
|
||
|
defer iter.Close()
|
||
|
|
||
|
var buf bytes.Buffer
|
||
|
for s := iter.First(); s != nil; s = iter.Next() {
|
||
|
fmt.Fprintf(&buf, "%s\n", s)
|
||
|
}
|
||
|
return buf.String()
|
||
|
|
||
|
case "scan-range-key":
|
||
|
if vMeta1.FileMetadata == nil {
|
||
|
return "virtualize must be called before scan-range-key"
|
||
|
}
|
||
|
iter, err := v.NewRawRangeKeyIter()
|
||
|
if err != nil {
|
||
|
return err.Error()
|
||
|
}
|
||
|
if iter == nil {
|
||
|
return ""
|
||
|
}
|
||
|
defer iter.Close()
|
||
|
|
||
|
var buf bytes.Buffer
|
||
|
for s := iter.First(); s != nil; s = iter.Next() {
|
||
|
fmt.Fprintf(&buf, "%s\n", s)
|
||
|
}
|
||
|
return buf.String()
|
||
|
|
||
|
case "iter":
|
||
|
if vMeta1.FileMetadata == nil {
|
||
|
return "virtualize must be called before iter"
|
||
|
}
|
||
|
var lower, upper []byte
|
||
|
if len(td.CmdArgs) > 0 {
|
||
|
splits := strings.Split(td.CmdArgs[0].String(), "-")
|
||
|
lower, upper = []byte(splits[0]), []byte(splits[1])
|
||
|
}
|
||
|
|
||
|
var stats base.InternalIteratorStats
|
||
|
iter, err := v.NewIterWithBlockPropertyFiltersAndContextEtc(
|
||
|
context.Background(), lower, upper, nil, false, false,
|
||
|
&stats, CategoryAndQoS{}, nil, TrivialReaderProvider{Reader: r})
|
||
|
if err != nil {
|
||
|
return err.Error()
|
||
|
}
|
||
|
return runIterCmd(td, iter, true, runIterCmdStats(&stats))
|
||
|
|
||
|
default:
|
||
|
return fmt.Sprintf("unknown command: %s", td.Cmd)
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func TestReader(t *testing.T) {
|
||
|
writerOpts := map[string]WriterOptions{
|
||
|
// No bloom filters.
|
||
|
"default": {},
|
||
|
"bloom10bit": {
|
||
|
// The standard policy.
|
||
|
FilterPolicy: bloom.FilterPolicy(10),
|
||
|
FilterType: base.TableFilter,
|
||
|
},
|
||
|
"bloom1bit": {
|
||
|
// A policy with many false positives.
|
||
|
FilterPolicy: bloom.FilterPolicy(1),
|
||
|
FilterType: base.TableFilter,
|
||
|
},
|
||
|
"bloom100bit": {
|
||
|
// A policy unlikely to have false positives.
|
||
|
FilterPolicy: bloom.FilterPolicy(100),
|
||
|
FilterType: base.TableFilter,
|
||
|
},
|
||
|
}
|
||
|
|
||
|
blockSizes := map[string]int{
|
||
|
"1bytes": 1,
|
||
|
"5bytes": 5,
|
||
|
"10bytes": 10,
|
||
|
"25bytes": 25,
|
||
|
"Maxbytes": math.MaxInt32,
|
||
|
}
|
||
|
|
||
|
opts := map[string]*Comparer{
|
||
|
"default": testkeys.Comparer,
|
||
|
"prefixFilter": fixtureComparer,
|
||
|
}
|
||
|
|
||
|
testDirs := map[string]string{
|
||
|
"default": "testdata/reader",
|
||
|
"prefixFilter": "testdata/prefixreader",
|
||
|
}
|
||
|
|
||
|
for format := TableFormatPebblev2; format <= TableFormatMax; format++ {
|
||
|
for dName, blockSize := range blockSizes {
|
||
|
for iName, indexBlockSize := range blockSizes {
|
||
|
for lName, tableOpt := range writerOpts {
|
||
|
for oName, cmp := range opts {
|
||
|
tableOpt.BlockSize = blockSize
|
||
|
tableOpt.Comparer = cmp
|
||
|
tableOpt.IndexBlockSize = indexBlockSize
|
||
|
tableOpt.TableFormat = format
|
||
|
|
||
|
t.Run(
|
||
|
fmt.Sprintf("format=%d,opts=%s,writerOpts=%s,blockSize=%s,indexSize=%s",
|
||
|
format, oName, lName, dName, iName),
|
||
|
func(t *testing.T) {
|
||
|
runTestReader(
|
||
|
t, tableOpt, testDirs[oName], nil /* Reader */, true)
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestReaderHideObsolete(t *testing.T) {
|
||
|
blockSizes := map[string]int{
|
||
|
"1bytes": 1,
|
||
|
"5bytes": 5,
|
||
|
"10bytes": 10,
|
||
|
"25bytes": 25,
|
||
|
"Maxbytes": math.MaxInt32,
|
||
|
}
|
||
|
for dName, blockSize := range blockSizes {
|
||
|
opts := WriterOptions{
|
||
|
TableFormat: TableFormatPebblev4,
|
||
|
BlockSize: blockSize,
|
||
|
IndexBlockSize: blockSize,
|
||
|
Comparer: testkeys.Comparer,
|
||
|
}
|
||
|
t.Run(fmt.Sprintf("blockSize=%s", dName), func(t *testing.T) {
|
||
|
runTestReader(
|
||
|
t, opts, "testdata/reader_hide_obsolete",
|
||
|
nil /* Reader */, true)
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestHamletReader(t *testing.T) {
|
||
|
prebuiltSSTs := []string{
|
||
|
"testdata/h.ldb",
|
||
|
"testdata/h.sst",
|
||
|
"testdata/h.no-compression.sst",
|
||
|
"testdata/h.no-compression.two_level_index.sst",
|
||
|
"testdata/h.block-bloom.no-compression.sst",
|
||
|
"testdata/h.table-bloom.no-compression.prefix_extractor.no_whole_key_filter.sst",
|
||
|
"testdata/h.table-bloom.no-compression.sst",
|
||
|
}
|
||
|
|
||
|
for _, prebuiltSST := range prebuiltSSTs {
|
||
|
f, err := os.Open(filepath.FromSlash(prebuiltSST))
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
r, err := newReader(f, ReaderOptions{})
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
t.Run(
|
||
|
fmt.Sprintf("sst=%s", prebuiltSST),
|
||
|
func(t *testing.T) {
|
||
|
runTestReader(t, WriterOptions{}, "testdata/hamletreader", r, false)
|
||
|
},
|
||
|
)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func forEveryTableFormat[I any](
|
||
|
t *testing.T, formatTable [NumTableFormats]I, runTest func(*testing.T, TableFormat, I),
|
||
|
) {
|
||
|
t.Helper()
|
||
|
for tf := TableFormatUnspecified + 1; tf <= TableFormatMax; tf++ {
|
||
|
t.Run(tf.String(), func(t *testing.T) {
|
||
|
runTest(t, tf, formatTable[tf])
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestReaderStats(t *testing.T) {
|
||
|
forEveryTableFormat[string](t,
|
||
|
[NumTableFormats]string{
|
||
|
TableFormatUnspecified: "",
|
||
|
TableFormatLevelDB: "testdata/readerstats_LevelDB",
|
||
|
TableFormatRocksDBv2: "testdata/readerstats_LevelDB",
|
||
|
TableFormatPebblev1: "testdata/readerstats_LevelDB",
|
||
|
TableFormatPebblev2: "testdata/readerstats_LevelDB",
|
||
|
TableFormatPebblev3: "testdata/readerstats_Pebblev3",
|
||
|
TableFormatPebblev4: "testdata/readerstats_Pebblev3",
|
||
|
}, func(t *testing.T, format TableFormat, dir string) {
|
||
|
if dir == "" {
|
||
|
t.Skip()
|
||
|
}
|
||
|
writerOpt := WriterOptions{
|
||
|
BlockSize: 32 << 10,
|
||
|
IndexBlockSize: 32 << 10,
|
||
|
Comparer: testkeys.Comparer,
|
||
|
TableFormat: format,
|
||
|
}
|
||
|
runTestReader(t, writerOpt, dir, nil /* Reader */, false /* printValue */)
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func TestReaderWithBlockPropertyFilter(t *testing.T) {
|
||
|
// Some of these tests examine internal iterator state, so they require
|
||
|
// determinism. When the invariants tag is set, disableBoundsOpt may disable
|
||
|
// the bounds optimization depending on the iterator pointer address. This
|
||
|
// can add nondeterminism to the internal iterator statae. Disable this
|
||
|
// nondeterminism for the duration of this test.
|
||
|
ensureBoundsOptDeterminism = true
|
||
|
defer func() { ensureBoundsOptDeterminism = false }()
|
||
|
|
||
|
forEveryTableFormat[string](t,
|
||
|
[NumTableFormats]string{
|
||
|
TableFormatUnspecified: "", // Block properties unsupported
|
||
|
TableFormatLevelDB: "", // Block properties unsupported
|
||
|
TableFormatRocksDBv2: "", // Block properties unsupported
|
||
|
TableFormatPebblev1: "", // Block properties unsupported
|
||
|
TableFormatPebblev2: "testdata/reader_bpf/Pebblev2",
|
||
|
TableFormatPebblev3: "testdata/reader_bpf/Pebblev3",
|
||
|
TableFormatPebblev4: "testdata/reader_bpf/Pebblev3",
|
||
|
}, func(t *testing.T, format TableFormat, dir string) {
|
||
|
if dir == "" {
|
||
|
t.Skip("Block-properties unsupported")
|
||
|
}
|
||
|
writerOpt := WriterOptions{
|
||
|
Comparer: testkeys.Comparer,
|
||
|
TableFormat: format,
|
||
|
BlockPropertyCollectors: []func() BlockPropertyCollector{NewTestKeysBlockPropertyCollector},
|
||
|
}
|
||
|
runTestReader(t, writerOpt, dir, nil /* Reader */, false)
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func TestInjectedErrors(t *testing.T) {
|
||
|
prebuiltSSTs := []string{
|
||
|
"testdata/h.ldb",
|
||
|
"testdata/h.sst",
|
||
|
"testdata/h.no-compression.sst",
|
||
|
"testdata/h.no-compression.two_level_index.sst",
|
||
|
"testdata/h.block-bloom.no-compression.sst",
|
||
|
"testdata/h.table-bloom.no-compression.prefix_extractor.no_whole_key_filter.sst",
|
||
|
"testdata/h.table-bloom.no-compression.sst",
|
||
|
}
|
||
|
|
||
|
for _, prebuiltSST := range prebuiltSSTs {
|
||
|
run := func(i int) (reterr error) {
|
||
|
f, err := vfs.Default.Open(filepath.FromSlash(prebuiltSST))
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
r, err := newReader(errorfs.WrapFile(f, errorfs.ErrInjected.If(errorfs.OnIndex(int32(i)))), ReaderOptions{})
|
||
|
if err != nil {
|
||
|
return firstError(err, f.Close())
|
||
|
}
|
||
|
defer func() { reterr = firstError(reterr, r.Close()) }()
|
||
|
|
||
|
_, err = r.EstimateDiskUsage([]byte("borrower"), []byte("lender"))
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
iter, err := r.NewIter(nil, nil)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
defer func() { reterr = firstError(reterr, iter.Close()) }()
|
||
|
for k, v := iter.First(); k != nil; k, v = iter.Next() {
|
||
|
val, _, err := v.Value(nil)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
if val == nil {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
if err = iter.Error(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
for i := 0; ; i++ {
|
||
|
err := run(i)
|
||
|
if errors.Is(err, errorfs.ErrInjected) {
|
||
|
t.Logf("%q, index %d: %s", prebuiltSST, i, err)
|
||
|
continue
|
||
|
}
|
||
|
if err != nil {
|
||
|
t.Errorf("%q, index %d: non-injected error: %+v", prebuiltSST, i, err)
|
||
|
break
|
||
|
}
|
||
|
t.Logf("%q: no error at index %d", prebuiltSST, i)
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestInvalidReader(t *testing.T) {
|
||
|
invalid, err := NewSimpleReadable(vfs.NewMemFile([]byte("invalid sst bytes")))
|
||
|
if err != nil {
|
||
|
t.Fatal(err)
|
||
|
}
|
||
|
testCases := []struct {
|
||
|
readable objstorage.Readable
|
||
|
expected string
|
||
|
}{
|
||
|
{nil, "nil file"},
|
||
|
{invalid, "invalid table"},
|
||
|
}
|
||
|
for _, tc := range testCases {
|
||
|
r, err := NewReader(tc.readable, ReaderOptions{})
|
||
|
if !strings.Contains(err.Error(), tc.expected) {
|
||
|
t.Fatalf("expected %q, but found %q", tc.expected, err.Error())
|
||
|
}
|
||
|
if r != nil {
|
||
|
t.Fatalf("found non-nil reader returned with non-nil error %q", err.Error())
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func indexLayoutString(t *testing.T, r *Reader) string {
|
||
|
indexH, err := r.readIndex(context.Background(), nil, nil)
|
||
|
require.NoError(t, err)
|
||
|
defer indexH.Release()
|
||
|
var buf strings.Builder
|
||
|
twoLevelIndex := r.Properties.IndexType == twoLevelIndex
|
||
|
buf.WriteString("index entries:\n")
|
||
|
iter, err := newBlockIter(r.Compare, indexH.Get())
|
||
|
defer func() {
|
||
|
require.NoError(t, iter.Close())
|
||
|
}()
|
||
|
require.NoError(t, err)
|
||
|
for key, value := iter.First(); key != nil; key, value = iter.Next() {
|
||
|
bh, err := decodeBlockHandleWithProperties(value.InPlaceValue())
|
||
|
require.NoError(t, err)
|
||
|
fmt.Fprintf(&buf, " %s: size %d\n", string(key.UserKey), bh.Length)
|
||
|
if twoLevelIndex {
|
||
|
b, err := r.readBlock(
|
||
|
context.Background(), bh.BlockHandle, nil, nil, nil, nil, nil)
|
||
|
require.NoError(t, err)
|
||
|
defer b.Release()
|
||
|
iter2, err := newBlockIter(r.Compare, b.Get())
|
||
|
defer func() {
|
||
|
require.NoError(t, iter2.Close())
|
||
|
}()
|
||
|
require.NoError(t, err)
|
||
|
for key, value := iter2.First(); key != nil; key, value = iter2.Next() {
|
||
|
bh, err := decodeBlockHandleWithProperties(value.InPlaceValue())
|
||
|
require.NoError(t, err)
|
||
|
fmt.Fprintf(&buf, " %s: size %d\n", string(key.UserKey), bh.Length)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return buf.String()
|
||
|
}
|
||
|
|
||
|
func runTestReader(t *testing.T, o WriterOptions, dir string, r *Reader, printValue bool) {
|
||
|
datadriven.Walk(t, dir, func(t *testing.T, path string) {
|
||
|
defer func() {
|
||
|
if r != nil {
|
||
|
r.Close()
|
||
|
r = nil
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
|
||
|
switch d.Cmd {
|
||
|
case "build":
|
||
|
if r != nil {
|
||
|
r.Close()
|
||
|
r = nil
|
||
|
}
|
||
|
var cacheSize int
|
||
|
var printLayout bool
|
||
|
d.MaybeScanArgs(t, "cache-size", &cacheSize)
|
||
|
d.MaybeScanArgs(t, "print-layout", &printLayout)
|
||
|
d.MaybeScanArgs(t, "block-size", &o.BlockSize)
|
||
|
d.MaybeScanArgs(t, "index-block-size", &o.IndexBlockSize)
|
||
|
|
||
|
var err error
|
||
|
_, r, err = runBuildCmd(d, &o, cacheSize)
|
||
|
if err != nil {
|
||
|
return err.Error()
|
||
|
}
|
||
|
if printLayout {
|
||
|
return indexLayoutString(t, r)
|
||
|
}
|
||
|
return ""
|
||
|
|
||
|
case "iter":
|
||
|
seqNum, err := scanGlobalSeqNum(d)
|
||
|
if err != nil {
|
||
|
return err.Error()
|
||
|
}
|
||
|
var stats base.InternalIteratorStats
|
||
|
r.Properties.GlobalSeqNum = seqNum
|
||
|
var bpfs []BlockPropertyFilter
|
||
|
if d.HasArg("block-property-filter") {
|
||
|
var filterMin, filterMax uint64
|
||
|
d.ScanArgs(t, "block-property-filter", &filterMin, &filterMax)
|
||
|
bpf := NewTestKeysBlockPropertyFilter(filterMin, filterMax)
|
||
|
bpfs = append(bpfs, bpf)
|
||
|
}
|
||
|
hideObsoletePoints := false
|
||
|
if d.HasArg("hide-obsolete-points") {
|
||
|
d.ScanArgs(t, "hide-obsolete-points", &hideObsoletePoints)
|
||
|
if hideObsoletePoints {
|
||
|
hideObsoletePoints, bpfs = r.TryAddBlockPropertyFilterForHideObsoletePoints(
|
||
|
InternalKeySeqNumMax, InternalKeySeqNumMax-1, bpfs)
|
||
|
require.True(t, hideObsoletePoints)
|
||
|
}
|
||
|
}
|
||
|
var filterer *BlockPropertiesFilterer
|
||
|
if len(bpfs) > 0 {
|
||
|
filterer = newBlockPropertiesFilterer(bpfs, nil)
|
||
|
intersects, err :=
|
||
|
filterer.intersectsUserPropsAndFinishInit(r.Properties.UserProperties)
|
||
|
if err != nil {
|
||
|
return err.Error()
|
||
|
}
|
||
|
if !intersects {
|
||
|
return "table does not intersect BlockPropertyFilter"
|
||
|
}
|
||
|
}
|
||
|
iter, err := r.NewIterWithBlockPropertyFiltersAndContextEtc(
|
||
|
context.Background(),
|
||
|
nil, /* lower */
|
||
|
nil, /* upper */
|
||
|
filterer,
|
||
|
hideObsoletePoints,
|
||
|
true, /* use filter block */
|
||
|
&stats,
|
||
|
CategoryAndQoS{},
|
||
|
nil,
|
||
|
TrivialReaderProvider{Reader: r},
|
||
|
)
|
||
|
if err != nil {
|
||
|
return err.Error()
|
||
|
}
|
||
|
return runIterCmd(d, iter, printValue, runIterCmdStats(&stats))
|
||
|
|
||
|
case "get":
|
||
|
var b bytes.Buffer
|
||
|
for _, k := range strings.Split(d.Input, "\n") {
|
||
|
v, err := r.get([]byte(k))
|
||
|
if err != nil {
|
||
|
fmt.Fprintf(&b, "<err: %s>\n", err)
|
||
|
} else {
|
||
|
fmt.Fprintln(&b, string(v))
|
||
|
}
|
||
|
}
|
||
|
return b.String()
|
||
|
default:
|
||
|
return fmt.Sprintf("unknown command: %s", d.Cmd)
|
||
|
}
|
||
|
})
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func TestReaderCheckComparerMerger(t *testing.T) {
|
||
|
const testTable = "test"
|
||
|
|
||
|
testComparer := &base.Comparer{
|
||
|
Name: "test.comparer",
|
||
|
Compare: base.DefaultComparer.Compare,
|
||
|
Equal: base.DefaultComparer.Equal,
|
||
|
Separator: base.DefaultComparer.Separator,
|
||
|
Successor: base.DefaultComparer.Successor,
|
||
|
}
|
||
|
testMerger := &base.Merger{
|
||
|
Name: "test.merger",
|
||
|
Merge: base.DefaultMerger.Merge,
|
||
|
}
|
||
|
writerOpts := WriterOptions{
|
||
|
Comparer: testComparer,
|
||
|
MergerName: "test.merger",
|
||
|
}
|
||
|
|
||
|
mem := vfs.NewMem()
|
||
|
f0, err := mem.Create(testTable)
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
w := NewWriter(objstorageprovider.NewFileWritable(f0), writerOpts)
|
||
|
require.NoError(t, w.Set([]byte("test"), nil))
|
||
|
require.NoError(t, w.Close())
|
||
|
|
||
|
testCases := []struct {
|
||
|
comparers []*base.Comparer
|
||
|
mergers []*base.Merger
|
||
|
expected string
|
||
|
}{
|
||
|
{
|
||
|
[]*base.Comparer{testComparer},
|
||
|
[]*base.Merger{testMerger},
|
||
|
"",
|
||
|
},
|
||
|
{
|
||
|
[]*base.Comparer{testComparer, base.DefaultComparer},
|
||
|
[]*base.Merger{testMerger, base.DefaultMerger},
|
||
|
"",
|
||
|
},
|
||
|
{
|
||
|
[]*base.Comparer{},
|
||
|
[]*base.Merger{testMerger},
|
||
|
"unknown comparer test.comparer",
|
||
|
},
|
||
|
{
|
||
|
[]*base.Comparer{base.DefaultComparer},
|
||
|
[]*base.Merger{testMerger},
|
||
|
"unknown comparer test.comparer",
|
||
|
},
|
||
|
{
|
||
|
[]*base.Comparer{testComparer},
|
||
|
[]*base.Merger{},
|
||
|
"unknown merger test.merger",
|
||
|
},
|
||
|
{
|
||
|
[]*base.Comparer{testComparer},
|
||
|
[]*base.Merger{base.DefaultMerger},
|
||
|
"unknown merger test.merger",
|
||
|
},
|
||
|
}
|
||
|
|
||
|
for _, c := range testCases {
|
||
|
t.Run("", func(t *testing.T) {
|
||
|
f1, err := mem.Open(testTable)
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
comparers := make(Comparers)
|
||
|
for _, comparer := range c.comparers {
|
||
|
comparers[comparer.Name] = comparer
|
||
|
}
|
||
|
mergers := make(Mergers)
|
||
|
for _, merger := range c.mergers {
|
||
|
mergers[merger.Name] = merger
|
||
|
}
|
||
|
|
||
|
r, err := newReader(f1, ReaderOptions{}, comparers, mergers)
|
||
|
if err != nil {
|
||
|
if r != nil {
|
||
|
t.Fatalf("found non-nil reader returned with non-nil error %q", err.Error())
|
||
|
}
|
||
|
if !strings.HasSuffix(err.Error(), c.expected) {
|
||
|
t.Fatalf("expected %q, but found %q", c.expected, err.Error())
|
||
|
}
|
||
|
} else if c.expected != "" {
|
||
|
t.Fatalf("expected %q, but found success", c.expected)
|
||
|
}
|
||
|
if r != nil {
|
||
|
_ = r.Close()
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
func checkValidPrefix(prefix, key []byte) bool {
|
||
|
return prefix == nil || bytes.HasPrefix(key, prefix)
|
||
|
}
|
||
|
|
||
|
func testBytesIteratedWithCompression(
|
||
|
t *testing.T,
|
||
|
compression Compression,
|
||
|
allowedSizeDeviationPercent uint64,
|
||
|
blockSizes []int,
|
||
|
maxNumEntries []uint64,
|
||
|
) {
|
||
|
for i, blockSize := range blockSizes {
|
||
|
for _, indexBlockSize := range blockSizes {
|
||
|
for _, numEntries := range []uint64{0, 1, maxNumEntries[i]} {
|
||
|
r := buildTestTable(t, numEntries, blockSize, indexBlockSize, compression)
|
||
|
var bytesIterated, prevIterated uint64
|
||
|
var pool BufferPool
|
||
|
pool.Init(5)
|
||
|
citer, err := r.NewCompactionIter(
|
||
|
&bytesIterated, CategoryAndQoS{}, nil, TrivialReaderProvider{Reader: r}, &pool)
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
for key, _ := citer.First(); key != nil; key, _ = citer.Next() {
|
||
|
if bytesIterated < prevIterated {
|
||
|
t.Fatalf("bytesIterated moved backward: %d < %d", bytesIterated, prevIterated)
|
||
|
}
|
||
|
prevIterated = bytesIterated
|
||
|
}
|
||
|
|
||
|
expected := r.Properties.DataSize
|
||
|
allowedSizeDeviation := expected * allowedSizeDeviationPercent / 100
|
||
|
// There is some inaccuracy due to compression estimation.
|
||
|
if bytesIterated < expected-allowedSizeDeviation || bytesIterated > expected+allowedSizeDeviation {
|
||
|
t.Fatalf("bytesIterated: got %d, want %d", bytesIterated, expected)
|
||
|
}
|
||
|
|
||
|
require.NoError(t, citer.Close())
|
||
|
require.NoError(t, r.Close())
|
||
|
pool.Release()
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestBytesIterated(t *testing.T) {
|
||
|
blockSizes := []int{10, 100, 1000, 4096, math.MaxInt32}
|
||
|
t.Run("Compressed", func(t *testing.T) {
|
||
|
testBytesIteratedWithCompression(t, SnappyCompression, 1, blockSizes, []uint64{1e5, 1e5, 1e5, 1e5, 1e5})
|
||
|
})
|
||
|
t.Run("Uncompressed", func(t *testing.T) {
|
||
|
testBytesIteratedWithCompression(t, NoCompression, 0, blockSizes, []uint64{1e5, 1e5, 1e5, 1e5, 1e5})
|
||
|
})
|
||
|
t.Run("Zstd", func(t *testing.T) {
|
||
|
// compression with zstd is extremely slow with small block size (esp the nocgo version).
|
||
|
// use less numEntries to make the test run at reasonable speed (under 10 seconds).
|
||
|
maxNumEntries := []uint64{1e2, 1e2, 1e3, 4e3, 1e5}
|
||
|
if useStandardZstdLib {
|
||
|
maxNumEntries = []uint64{1e3, 1e3, 1e4, 4e4, 1e5}
|
||
|
}
|
||
|
testBytesIteratedWithCompression(t, ZstdCompression, 1, blockSizes, maxNumEntries)
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func TestCompactionIteratorSetupForCompaction(t *testing.T) {
|
||
|
tmpDir := path.Join(t.TempDir())
|
||
|
provider, err := objstorageprovider.Open(objstorageprovider.DefaultSettings(vfs.Default, tmpDir))
|
||
|
require.NoError(t, err)
|
||
|
defer provider.Close()
|
||
|
blockSizes := []int{10, 100, 1000, 4096, math.MaxInt32}
|
||
|
for _, blockSize := range blockSizes {
|
||
|
for _, indexBlockSize := range blockSizes {
|
||
|
for _, numEntries := range []uint64{0, 1, 1e5} {
|
||
|
r := buildTestTableWithProvider(t, provider, numEntries, blockSize, indexBlockSize, DefaultCompression)
|
||
|
var bytesIterated uint64
|
||
|
var pool BufferPool
|
||
|
pool.Init(5)
|
||
|
citer, err := r.NewCompactionIter(
|
||
|
&bytesIterated, CategoryAndQoS{}, nil, TrivialReaderProvider{Reader: r}, &pool)
|
||
|
require.NoError(t, err)
|
||
|
switch i := citer.(type) {
|
||
|
case *compactionIterator:
|
||
|
require.True(t, objstorageprovider.TestingCheckMaxReadahead(i.dataRH))
|
||
|
// Each key has one version, so no value block, regardless of
|
||
|
// sstable version.
|
||
|
require.Nil(t, i.vbRH)
|
||
|
case *twoLevelCompactionIterator:
|
||
|
require.True(t, objstorageprovider.TestingCheckMaxReadahead(i.dataRH))
|
||
|
// Each key has one version, so no value block, regardless of
|
||
|
// sstable version.
|
||
|
require.Nil(t, i.vbRH)
|
||
|
default:
|
||
|
require.Failf(t, fmt.Sprintf("unknown compaction iterator type: %T", citer), "")
|
||
|
}
|
||
|
require.NoError(t, citer.Close())
|
||
|
require.NoError(t, r.Close())
|
||
|
pool.Release()
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestReadaheadSetupForV3TablesWithMultipleVersions(t *testing.T) {
|
||
|
tmpDir := path.Join(t.TempDir())
|
||
|
provider, err := objstorageprovider.Open(objstorageprovider.DefaultSettings(vfs.Default, tmpDir))
|
||
|
require.NoError(t, err)
|
||
|
defer provider.Close()
|
||
|
f0, _, err := provider.Create(context.Background(), base.FileTypeTable, base.FileNum(0).DiskFileNum(), objstorage.CreateOptions{})
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
w := NewWriter(f0, WriterOptions{
|
||
|
TableFormat: TableFormatPebblev3,
|
||
|
Comparer: testkeys.Comparer,
|
||
|
})
|
||
|
keys := testkeys.Alpha(1)
|
||
|
keyBuf := make([]byte, 1+testkeys.MaxSuffixLen)
|
||
|
// Write a few keys with multiple timestamps (MVCC versions).
|
||
|
for i := int64(0); i < 2; i++ {
|
||
|
for j := int64(2); j >= 1; j-- {
|
||
|
n := testkeys.WriteKeyAt(keyBuf[:], keys, i, j)
|
||
|
key := keyBuf[:n]
|
||
|
require.NoError(t, w.Set(key, key))
|
||
|
}
|
||
|
}
|
||
|
require.NoError(t, w.Close())
|
||
|
f1, err := provider.OpenForReading(context.Background(), base.FileTypeTable, base.FileNum(0).DiskFileNum(), objstorage.OpenOptions{})
|
||
|
require.NoError(t, err)
|
||
|
r, err := NewReader(f1, ReaderOptions{Comparer: testkeys.Comparer})
|
||
|
require.NoError(t, err)
|
||
|
defer r.Close()
|
||
|
{
|
||
|
var pool BufferPool
|
||
|
pool.Init(5)
|
||
|
citer, err := r.NewCompactionIter(
|
||
|
nil, CategoryAndQoS{}, nil, TrivialReaderProvider{Reader: r}, &pool)
|
||
|
require.NoError(t, err)
|
||
|
defer citer.Close()
|
||
|
i := citer.(*compactionIterator)
|
||
|
require.True(t, objstorageprovider.TestingCheckMaxReadahead(i.dataRH))
|
||
|
require.True(t, objstorageprovider.TestingCheckMaxReadahead(i.vbRH))
|
||
|
}
|
||
|
{
|
||
|
iter, err := r.NewIter(nil, nil)
|
||
|
require.NoError(t, err)
|
||
|
defer iter.Close()
|
||
|
i := iter.(*singleLevelIterator)
|
||
|
require.False(t, objstorageprovider.TestingCheckMaxReadahead(i.dataRH))
|
||
|
require.False(t, objstorageprovider.TestingCheckMaxReadahead(i.vbRH))
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestReaderChecksumErrors(t *testing.T) {
|
||
|
for _, checksumType := range []ChecksumType{ChecksumTypeCRC32c, ChecksumTypeXXHash64} {
|
||
|
t.Run(fmt.Sprintf("checksum-type=%d", checksumType), func(t *testing.T) {
|
||
|
for _, twoLevelIndex := range []bool{false, true} {
|
||
|
t.Run(fmt.Sprintf("two-level-index=%t", twoLevelIndex), func(t *testing.T) {
|
||
|
mem := vfs.NewMem()
|
||
|
|
||
|
{
|
||
|
// Create an sstable with 3 data blocks.
|
||
|
f, err := mem.Create("test")
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
const blockSize = 32
|
||
|
indexBlockSize := 4096
|
||
|
if twoLevelIndex {
|
||
|
indexBlockSize = 1
|
||
|
}
|
||
|
|
||
|
w := NewWriter(objstorageprovider.NewFileWritable(f), WriterOptions{
|
||
|
BlockSize: blockSize,
|
||
|
IndexBlockSize: indexBlockSize,
|
||
|
Checksum: checksumType,
|
||
|
})
|
||
|
require.NoError(t, w.Set(bytes.Repeat([]byte("a"), blockSize), nil))
|
||
|
require.NoError(t, w.Set(bytes.Repeat([]byte("b"), blockSize), nil))
|
||
|
require.NoError(t, w.Set(bytes.Repeat([]byte("c"), blockSize), nil))
|
||
|
require.NoError(t, w.Close())
|
||
|
}
|
||
|
|
||
|
// Load the layout so that we no the location of the data blocks.
|
||
|
var layout *Layout
|
||
|
{
|
||
|
f, err := mem.Open("test")
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
r, err := newReader(f, ReaderOptions{})
|
||
|
require.NoError(t, err)
|
||
|
layout, err = r.Layout()
|
||
|
require.NoError(t, err)
|
||
|
require.EqualValues(t, len(layout.Data), 3)
|
||
|
require.NoError(t, r.Close())
|
||
|
}
|
||
|
|
||
|
for _, bh := range layout.Data {
|
||
|
// Read the sstable and corrupt the first byte in the target data
|
||
|
// block.
|
||
|
orig, err := mem.Open("test")
|
||
|
require.NoError(t, err)
|
||
|
data, err := io.ReadAll(orig)
|
||
|
require.NoError(t, err)
|
||
|
require.NoError(t, orig.Close())
|
||
|
|
||
|
// Corrupt the first byte in the block.
|
||
|
data[bh.Offset] ^= 0xff
|
||
|
|
||
|
corrupted, err := mem.Create("corrupted")
|
||
|
require.NoError(t, err)
|
||
|
_, err = corrupted.Write(data)
|
||
|
require.NoError(t, err)
|
||
|
require.NoError(t, corrupted.Close())
|
||
|
|
||
|
// Verify that we encounter a checksum mismatch error while iterating
|
||
|
// over the sstable.
|
||
|
corrupted, err = mem.Open("corrupted")
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
r, err := newReader(corrupted, ReaderOptions{})
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
iter, err := r.NewIter(nil, nil)
|
||
|
require.NoError(t, err)
|
||
|
for k, _ := iter.First(); k != nil; k, _ = iter.Next() {
|
||
|
}
|
||
|
require.Regexp(t, `checksum mismatch`, iter.Error())
|
||
|
require.Regexp(t, `checksum mismatch`, iter.Close())
|
||
|
|
||
|
iter, err = r.NewIter(nil, nil)
|
||
|
require.NoError(t, err)
|
||
|
for k, _ := iter.Last(); k != nil; k, _ = iter.Prev() {
|
||
|
}
|
||
|
require.Regexp(t, `checksum mismatch`, iter.Error())
|
||
|
require.Regexp(t, `checksum mismatch`, iter.Close())
|
||
|
|
||
|
require.NoError(t, r.Close())
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestValidateBlockChecksums(t *testing.T) {
|
||
|
seed := uint64(time.Now().UnixNano())
|
||
|
rng := rand.New(rand.NewSource(seed))
|
||
|
t.Logf("using seed = %d", seed)
|
||
|
|
||
|
allFiles := []string{
|
||
|
"testdata/h.no-compression.sst",
|
||
|
"testdata/h.no-compression.two_level_index.sst",
|
||
|
"testdata/h.sst",
|
||
|
"testdata/h.table-bloom.no-compression.prefix_extractor.no_whole_key_filter.sst",
|
||
|
"testdata/h.table-bloom.no-compression.sst",
|
||
|
"testdata/h.table-bloom.sst",
|
||
|
"testdata/h.zstd-compression.sst",
|
||
|
}
|
||
|
|
||
|
type corruptionLocation int
|
||
|
const (
|
||
|
corruptionLocationData corruptionLocation = iota
|
||
|
corruptionLocationIndex
|
||
|
corruptionLocationTopIndex
|
||
|
corruptionLocationFilter
|
||
|
corruptionLocationRangeDel
|
||
|
corruptionLocationProperties
|
||
|
corruptionLocationMetaIndex
|
||
|
)
|
||
|
|
||
|
testCases := []struct {
|
||
|
name string
|
||
|
files []string
|
||
|
corruptionLocations []corruptionLocation
|
||
|
}{
|
||
|
{
|
||
|
name: "no corruption",
|
||
|
corruptionLocations: []corruptionLocation{},
|
||
|
},
|
||
|
{
|
||
|
name: "data block corruption",
|
||
|
corruptionLocations: []corruptionLocation{
|
||
|
corruptionLocationData,
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
name: "index block corruption",
|
||
|
corruptionLocations: []corruptionLocation{
|
||
|
corruptionLocationIndex,
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
name: "top index block corruption",
|
||
|
files: []string{
|
||
|
"testdata/h.no-compression.two_level_index.sst",
|
||
|
},
|
||
|
corruptionLocations: []corruptionLocation{
|
||
|
corruptionLocationTopIndex,
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
name: "filter block corruption",
|
||
|
files: []string{
|
||
|
"testdata/h.table-bloom.no-compression.prefix_extractor.no_whole_key_filter.sst",
|
||
|
"testdata/h.table-bloom.no-compression.sst",
|
||
|
"testdata/h.table-bloom.sst",
|
||
|
},
|
||
|
corruptionLocations: []corruptionLocation{
|
||
|
corruptionLocationFilter,
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
name: "range deletion block corruption",
|
||
|
corruptionLocations: []corruptionLocation{
|
||
|
corruptionLocationRangeDel,
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
name: "properties block corruption",
|
||
|
corruptionLocations: []corruptionLocation{
|
||
|
corruptionLocationProperties,
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
name: "metaindex block corruption",
|
||
|
corruptionLocations: []corruptionLocation{
|
||
|
corruptionLocationMetaIndex,
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
name: "multiple blocks corrupted",
|
||
|
corruptionLocations: []corruptionLocation{
|
||
|
corruptionLocationData,
|
||
|
corruptionLocationIndex,
|
||
|
corruptionLocationRangeDel,
|
||
|
corruptionLocationProperties,
|
||
|
corruptionLocationMetaIndex,
|
||
|
},
|
||
|
},
|
||
|
}
|
||
|
|
||
|
testFn := func(t *testing.T, file string, corruptionLocations []corruptionLocation) {
|
||
|
// Create a copy of the SSTable that we can freely corrupt.
|
||
|
f, err := os.Open(filepath.FromSlash(file))
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
pathCopy := path.Join(t.TempDir(), path.Base(file))
|
||
|
fCopy, err := os.OpenFile(pathCopy, os.O_CREATE|os.O_RDWR, 0600)
|
||
|
require.NoError(t, err)
|
||
|
defer fCopy.Close()
|
||
|
|
||
|
_, err = io.Copy(fCopy, f)
|
||
|
require.NoError(t, err)
|
||
|
err = fCopy.Sync()
|
||
|
require.NoError(t, err)
|
||
|
require.NoError(t, f.Close())
|
||
|
|
||
|
filter := bloom.FilterPolicy(10)
|
||
|
r, err := newReader(fCopy, ReaderOptions{
|
||
|
Filters: map[string]FilterPolicy{
|
||
|
filter.Name(): filter,
|
||
|
},
|
||
|
})
|
||
|
require.NoError(t, err)
|
||
|
defer func() { require.NoError(t, r.Close()) }()
|
||
|
|
||
|
// Prior to corruption, validation is successful.
|
||
|
require.NoError(t, r.ValidateBlockChecksums())
|
||
|
|
||
|
// If we are not testing for corruption, we can stop here.
|
||
|
if len(corruptionLocations) == 0 {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Perform bit flips in various corruption locations.
|
||
|
layout, err := r.Layout()
|
||
|
require.NoError(t, err)
|
||
|
for _, location := range corruptionLocations {
|
||
|
var bh BlockHandle
|
||
|
switch location {
|
||
|
case corruptionLocationData:
|
||
|
bh = layout.Data[rng.Intn(len(layout.Data))].BlockHandle
|
||
|
case corruptionLocationIndex:
|
||
|
bh = layout.Index[rng.Intn(len(layout.Index))]
|
||
|
case corruptionLocationTopIndex:
|
||
|
bh = layout.TopIndex
|
||
|
case corruptionLocationFilter:
|
||
|
bh = layout.Filter
|
||
|
case corruptionLocationRangeDel:
|
||
|
bh = layout.RangeDel
|
||
|
case corruptionLocationProperties:
|
||
|
bh = layout.Properties
|
||
|
case corruptionLocationMetaIndex:
|
||
|
bh = layout.MetaIndex
|
||
|
default:
|
||
|
t.Fatalf("unknown location")
|
||
|
}
|
||
|
|
||
|
// Corrupt a random byte within the selected block.
|
||
|
pos := int64(bh.Offset) + rng.Int63n(int64(bh.Length))
|
||
|
t.Logf("altering file=%s @ offset = %d", file, pos)
|
||
|
|
||
|
b := make([]byte, 1)
|
||
|
n, err := fCopy.ReadAt(b, pos)
|
||
|
require.NoError(t, err)
|
||
|
require.Equal(t, 1, n)
|
||
|
t.Logf("data (before) = %08b", b)
|
||
|
|
||
|
b[0] ^= 0xff
|
||
|
t.Logf("data (after) = %08b", b)
|
||
|
|
||
|
_, err = fCopy.WriteAt(b, pos)
|
||
|
require.NoError(t, err)
|
||
|
}
|
||
|
|
||
|
// Write back to the file.
|
||
|
err = fCopy.Sync()
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
// Confirm that checksum validation fails.
|
||
|
err = r.ValidateBlockChecksums()
|
||
|
require.Error(t, err)
|
||
|
require.Regexp(t, `checksum mismatch`, err.Error())
|
||
|
}
|
||
|
|
||
|
for _, tc := range testCases {
|
||
|
// By default, test across all files, unless overridden.
|
||
|
files := tc.files
|
||
|
if files == nil {
|
||
|
files = allFiles
|
||
|
}
|
||
|
for _, file := range files {
|
||
|
t.Run(tc.name+" "+path.Base(file), func(t *testing.T) {
|
||
|
testFn(t, file, tc.corruptionLocations)
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func TestReader_TableFormat(t *testing.T) {
|
||
|
test := func(t *testing.T, want TableFormat) {
|
||
|
fs := vfs.NewMem()
|
||
|
f, err := fs.Create("test")
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
opts := WriterOptions{TableFormat: want}
|
||
|
w := NewWriter(objstorageprovider.NewFileWritable(f), opts)
|
||
|
err = w.Close()
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
f, err = fs.Open("test")
|
||
|
require.NoError(t, err)
|
||
|
r, err := newReader(f, ReaderOptions{})
|
||
|
require.NoError(t, err)
|
||
|
defer r.Close()
|
||
|
|
||
|
got, err := r.TableFormat()
|
||
|
require.NoError(t, err)
|
||
|
require.Equal(t, want, got)
|
||
|
}
|
||
|
|
||
|
for tf := TableFormatLevelDB; tf <= TableFormatMax; tf++ {
|
||
|
t.Run(tf.String(), func(t *testing.T) {
|
||
|
test(t, tf)
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func buildTestTable(
|
||
|
t *testing.T, numEntries uint64, blockSize, indexBlockSize int, compression Compression,
|
||
|
) *Reader {
|
||
|
provider, err := objstorageprovider.Open(objstorageprovider.DefaultSettings(vfs.NewMem(), "" /* dirName */))
|
||
|
require.NoError(t, err)
|
||
|
defer provider.Close()
|
||
|
return buildTestTableWithProvider(t, provider, numEntries, blockSize, indexBlockSize, compression)
|
||
|
}
|
||
|
|
||
|
func buildTestTableWithProvider(
|
||
|
t *testing.T,
|
||
|
provider objstorage.Provider,
|
||
|
numEntries uint64,
|
||
|
blockSize, indexBlockSize int,
|
||
|
compression Compression,
|
||
|
) *Reader {
|
||
|
f0, _, err := provider.Create(context.Background(), base.FileTypeTable, base.FileNum(0).DiskFileNum(), objstorage.CreateOptions{})
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
w := NewWriter(f0, WriterOptions{
|
||
|
BlockSize: blockSize,
|
||
|
IndexBlockSize: indexBlockSize,
|
||
|
Compression: compression,
|
||
|
FilterPolicy: nil,
|
||
|
})
|
||
|
|
||
|
var ikey InternalKey
|
||
|
for i := uint64(0); i < numEntries; i++ {
|
||
|
key := make([]byte, 8+i%3)
|
||
|
value := make([]byte, i%100)
|
||
|
binary.BigEndian.PutUint64(key, i)
|
||
|
ikey.UserKey = key
|
||
|
w.Add(ikey, value)
|
||
|
}
|
||
|
|
||
|
require.NoError(t, w.Close())
|
||
|
|
||
|
// Re-open that filename for reading.
|
||
|
f1, err := provider.OpenForReading(context.Background(), base.FileTypeTable, base.FileNum(0).DiskFileNum(), objstorage.OpenOptions{})
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
c := cache.New(128 << 20)
|
||
|
defer c.Unref()
|
||
|
r, err := NewReader(f1, ReaderOptions{
|
||
|
Cache: c,
|
||
|
})
|
||
|
require.NoError(t, err)
|
||
|
return r
|
||
|
}
|
||
|
|
||
|
func buildBenchmarkTable(
|
||
|
b *testing.B, options WriterOptions, confirmTwoLevelIndex bool, offset int,
|
||
|
) (*Reader, [][]byte) {
|
||
|
mem := vfs.NewMem()
|
||
|
f0, err := mem.Create("bench")
|
||
|
if err != nil {
|
||
|
b.Fatal(err)
|
||
|
}
|
||
|
|
||
|
w := NewWriter(objstorageprovider.NewFileWritable(f0), options)
|
||
|
|
||
|
var keys [][]byte
|
||
|
var ikey InternalKey
|
||
|
for i := uint64(0); i < 1e6; i++ {
|
||
|
key := make([]byte, 8)
|
||
|
binary.BigEndian.PutUint64(key, i+uint64(offset))
|
||
|
keys = append(keys, key)
|
||
|
ikey.UserKey = key
|
||
|
w.Add(ikey, nil)
|
||
|
}
|
||
|
|
||
|
if err := w.Close(); err != nil {
|
||
|
b.Fatal(err)
|
||
|
}
|
||
|
|
||
|
// Re-open that filename for reading.
|
||
|
f1, err := mem.Open("bench")
|
||
|
if err != nil {
|
||
|
b.Fatal(err)
|
||
|
}
|
||
|
c := cache.New(128 << 20)
|
||
|
defer c.Unref()
|
||
|
r, err := newReader(f1, ReaderOptions{
|
||
|
Cache: c,
|
||
|
})
|
||
|
if err != nil {
|
||
|
b.Fatal(err)
|
||
|
}
|
||
|
if confirmTwoLevelIndex && r.Properties.IndexPartitions == 0 {
|
||
|
b.Fatalf("should have constructed two level index")
|
||
|
}
|
||
|
return r, keys
|
||
|
}
|
||
|
|
||
|
var basicBenchmarks = []struct {
|
||
|
name string
|
||
|
options WriterOptions
|
||
|
}{
|
||
|
{
|
||
|
name: "restart=16,compression=Snappy",
|
||
|
options: WriterOptions{
|
||
|
BlockSize: 32 << 10,
|
||
|
BlockRestartInterval: 16,
|
||
|
FilterPolicy: nil,
|
||
|
Compression: SnappyCompression,
|
||
|
TableFormat: TableFormatPebblev2,
|
||
|
},
|
||
|
},
|
||
|
{
|
||
|
name: "restart=16,compression=ZSTD",
|
||
|
options: WriterOptions{
|
||
|
BlockSize: 32 << 10,
|
||
|
BlockRestartInterval: 16,
|
||
|
FilterPolicy: nil,
|
||
|
Compression: ZstdCompression,
|
||
|
TableFormat: TableFormatPebblev2,
|
||
|
},
|
||
|
},
|
||
|
}
|
||
|
|
||
|
func BenchmarkTableIterSeekGE(b *testing.B) {
|
||
|
for _, bm := range basicBenchmarks {
|
||
|
b.Run(bm.name,
|
||
|
func(b *testing.B) {
|
||
|
r, keys := buildBenchmarkTable(b, bm.options, false, 0)
|
||
|
it, err := r.NewIter(nil /* lower */, nil /* upper */)
|
||
|
require.NoError(b, err)
|
||
|
rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano())))
|
||
|
|
||
|
b.ResetTimer()
|
||
|
for i := 0; i < b.N; i++ {
|
||
|
it.SeekGE(keys[rng.Intn(len(keys))], base.SeekGEFlagsNone)
|
||
|
}
|
||
|
|
||
|
b.StopTimer()
|
||
|
it.Close()
|
||
|
r.Close()
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func BenchmarkTableIterSeekLT(b *testing.B) {
|
||
|
for _, bm := range basicBenchmarks {
|
||
|
b.Run(bm.name,
|
||
|
func(b *testing.B) {
|
||
|
r, keys := buildBenchmarkTable(b, bm.options, false, 0)
|
||
|
it, err := r.NewIter(nil /* lower */, nil /* upper */)
|
||
|
require.NoError(b, err)
|
||
|
rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano())))
|
||
|
|
||
|
b.ResetTimer()
|
||
|
for i := 0; i < b.N; i++ {
|
||
|
it.SeekLT(keys[rng.Intn(len(keys))], base.SeekLTFlagsNone)
|
||
|
}
|
||
|
|
||
|
b.StopTimer()
|
||
|
it.Close()
|
||
|
r.Close()
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func BenchmarkTableIterNext(b *testing.B) {
|
||
|
for _, bm := range basicBenchmarks {
|
||
|
b.Run(bm.name,
|
||
|
func(b *testing.B) {
|
||
|
r, _ := buildBenchmarkTable(b, bm.options, false, 0)
|
||
|
it, err := r.NewIter(nil /* lower */, nil /* upper */)
|
||
|
require.NoError(b, err)
|
||
|
|
||
|
b.ResetTimer()
|
||
|
var sum int64
|
||
|
var key *InternalKey
|
||
|
for i := 0; i < b.N; i++ {
|
||
|
if key == nil {
|
||
|
key, _ = it.First()
|
||
|
}
|
||
|
sum += int64(binary.BigEndian.Uint64(key.UserKey))
|
||
|
key, _ = it.Next()
|
||
|
}
|
||
|
if testing.Verbose() {
|
||
|
fmt.Fprint(io.Discard, sum)
|
||
|
}
|
||
|
|
||
|
b.StopTimer()
|
||
|
it.Close()
|
||
|
r.Close()
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func BenchmarkTableIterPrev(b *testing.B) {
|
||
|
for _, bm := range basicBenchmarks {
|
||
|
b.Run(bm.name,
|
||
|
func(b *testing.B) {
|
||
|
r, _ := buildBenchmarkTable(b, bm.options, false, 0)
|
||
|
it, err := r.NewIter(nil /* lower */, nil /* upper */)
|
||
|
require.NoError(b, err)
|
||
|
|
||
|
b.ResetTimer()
|
||
|
var sum int64
|
||
|
var key *InternalKey
|
||
|
for i := 0; i < b.N; i++ {
|
||
|
if key == nil {
|
||
|
key, _ = it.Last()
|
||
|
}
|
||
|
sum += int64(binary.BigEndian.Uint64(key.UserKey))
|
||
|
key, _ = it.Prev()
|
||
|
}
|
||
|
if testing.Verbose() {
|
||
|
fmt.Fprint(io.Discard, sum)
|
||
|
}
|
||
|
|
||
|
b.StopTimer()
|
||
|
it.Close()
|
||
|
r.Close()
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func BenchmarkLayout(b *testing.B) {
|
||
|
r, _ := buildBenchmarkTable(b, WriterOptions{}, false, 0)
|
||
|
b.ResetTimer()
|
||
|
for i := 0; i < b.N; i++ {
|
||
|
r.Layout()
|
||
|
}
|
||
|
b.StopTimer()
|
||
|
r.Close()
|
||
|
}
|
||
|
|
||
|
func BenchmarkSeqSeekGEExhausted(b *testing.B) {
|
||
|
// Snappy with no bloom filter.
|
||
|
options := basicBenchmarks[0].options
|
||
|
|
||
|
for _, twoLevelIndex := range []bool{false, true} {
|
||
|
switch twoLevelIndex {
|
||
|
case false:
|
||
|
options.IndexBlockSize = 0
|
||
|
case true:
|
||
|
options.IndexBlockSize = 512
|
||
|
}
|
||
|
const offsetCount = 5000
|
||
|
reader, keys := buildBenchmarkTable(b, options, twoLevelIndex, offsetCount)
|
||
|
var preKeys [][]byte
|
||
|
for i := 0; i < offsetCount; i++ {
|
||
|
key := make([]byte, 8)
|
||
|
binary.BigEndian.PutUint64(key, uint64(i))
|
||
|
preKeys = append(preKeys, key)
|
||
|
}
|
||
|
var postKeys [][]byte
|
||
|
for i := 0; i < offsetCount; i++ {
|
||
|
key := make([]byte, 8)
|
||
|
binary.BigEndian.PutUint64(key, uint64(i+offsetCount+len(keys)))
|
||
|
postKeys = append(postKeys, key)
|
||
|
}
|
||
|
for _, exhaustedBounds := range []bool{false, true} {
|
||
|
for _, prefixSeek := range []bool{false, true} {
|
||
|
exhausted := "file"
|
||
|
if exhaustedBounds {
|
||
|
exhausted = "bounds"
|
||
|
}
|
||
|
seekKind := "ge"
|
||
|
if prefixSeek {
|
||
|
seekKind = "prefix-ge"
|
||
|
}
|
||
|
b.Run(fmt.Sprintf(
|
||
|
"two-level=%t/exhausted=%s/seek=%s", twoLevelIndex, exhausted, seekKind),
|
||
|
func(b *testing.B) {
|
||
|
var upper []byte
|
||
|
var seekKeys [][]byte
|
||
|
if exhaustedBounds {
|
||
|
seekKeys = preKeys
|
||
|
upper = keys[0]
|
||
|
} else {
|
||
|
seekKeys = postKeys
|
||
|
}
|
||
|
it, err := reader.NewIter(nil /* lower */, upper)
|
||
|
require.NoError(b, err)
|
||
|
b.ResetTimer()
|
||
|
pos := 0
|
||
|
var seekGEFlags SeekGEFlags
|
||
|
for i := 0; i < b.N; i++ {
|
||
|
seekKey := seekKeys[0]
|
||
|
var k *InternalKey
|
||
|
if prefixSeek {
|
||
|
k, _ = it.SeekPrefixGE(seekKey, seekKey, seekGEFlags)
|
||
|
} else {
|
||
|
k, _ = it.SeekGE(seekKey, seekGEFlags)
|
||
|
}
|
||
|
if k != nil {
|
||
|
b.Fatal("found a key")
|
||
|
}
|
||
|
if it.Error() != nil {
|
||
|
b.Fatalf("%s", it.Error().Error())
|
||
|
}
|
||
|
pos++
|
||
|
if pos == len(seekKeys) {
|
||
|
pos = 0
|
||
|
seekGEFlags = seekGEFlags.DisableTrySeekUsingNext()
|
||
|
} else {
|
||
|
seekGEFlags = seekGEFlags.EnableTrySeekUsingNext()
|
||
|
}
|
||
|
}
|
||
|
b.StopTimer()
|
||
|
it.Close()
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
reader.Close()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func BenchmarkIteratorScanManyVersions(b *testing.B) {
|
||
|
options := WriterOptions{
|
||
|
BlockSize: 32 << 10,
|
||
|
BlockRestartInterval: 16,
|
||
|
FilterPolicy: nil,
|
||
|
Compression: SnappyCompression,
|
||
|
Comparer: testkeys.Comparer,
|
||
|
}
|
||
|
// 10,000 key prefixes, each with 100 versions.
|
||
|
const keyCount = 10000
|
||
|
const sharedPrefixLen = 32
|
||
|
const unsharedPrefixLen = 8
|
||
|
const versionCount = 100
|
||
|
|
||
|
// Take the very large keyspace consisting of alphabetic characters of
|
||
|
// lengths up to unsharedPrefixLen and reduce it down to keyCount keys by
|
||
|
// picking every 1 key every keyCount keys.
|
||
|
keys := testkeys.Alpha(unsharedPrefixLen)
|
||
|
keys = keys.EveryN(keys.Count() / keyCount)
|
||
|
if keys.Count() < keyCount {
|
||
|
b.Fatalf("expected %d keys, found %d", keyCount, keys.Count())
|
||
|
}
|
||
|
keyBuf := make([]byte, sharedPrefixLen+unsharedPrefixLen+testkeys.MaxSuffixLen)
|
||
|
for i := 0; i < sharedPrefixLen; i++ {
|
||
|
keyBuf[i] = 'A' + byte(i)
|
||
|
}
|
||
|
// v2 sstable is 115,178,070 bytes. v3 sstable is 107,181,105 bytes with
|
||
|
// 99,049,269 bytes in value blocks.
|
||
|
setupBench := func(b *testing.B, tableFormat TableFormat, cacheSize int64) *Reader {
|
||
|
mem := vfs.NewMem()
|
||
|
f0, err := mem.Create("bench")
|
||
|
require.NoError(b, err)
|
||
|
options.TableFormat = tableFormat
|
||
|
w := NewWriter(objstorageprovider.NewFileWritable(f0), options)
|
||
|
val := make([]byte, 100)
|
||
|
rng := rand.New(rand.NewSource(100))
|
||
|
for i := int64(0); i < keys.Count(); i++ {
|
||
|
for v := 0; v < versionCount; v++ {
|
||
|
n := testkeys.WriteKeyAt(keyBuf[sharedPrefixLen:], keys, i, int64(versionCount-v+1))
|
||
|
key := keyBuf[:n+sharedPrefixLen]
|
||
|
rng.Read(val)
|
||
|
require.NoError(b, w.Set(key, val))
|
||
|
}
|
||
|
}
|
||
|
require.NoError(b, w.Close())
|
||
|
c := cache.New(cacheSize)
|
||
|
defer c.Unref()
|
||
|
// Re-open the filename for reading.
|
||
|
f0, err = mem.Open("bench")
|
||
|
require.NoError(b, err)
|
||
|
r, err := newReader(f0, ReaderOptions{
|
||
|
Cache: c,
|
||
|
Comparer: testkeys.Comparer,
|
||
|
})
|
||
|
require.NoError(b, err)
|
||
|
return r
|
||
|
}
|
||
|
for _, format := range []TableFormat{TableFormatPebblev2, TableFormatPebblev3} {
|
||
|
b.Run(fmt.Sprintf("format=%s", format.String()), func(b *testing.B) {
|
||
|
// 150MiB results in a high cache hit rate for both formats. 20MiB
|
||
|
// results in a high cache hit rate for the data blocks in
|
||
|
// TableFormatPebblev3.
|
||
|
for _, cacheSize := range []int64{20 << 20, 150 << 20} {
|
||
|
b.Run(fmt.Sprintf("cache-size=%s", humanize.Bytes.Int64(cacheSize)),
|
||
|
func(b *testing.B) {
|
||
|
r := setupBench(b, format, cacheSize)
|
||
|
defer func() {
|
||
|
require.NoError(b, r.Close())
|
||
|
}()
|
||
|
for _, readValue := range []bool{false, true} {
|
||
|
b.Run(fmt.Sprintf("read-value=%t", readValue), func(b *testing.B) {
|
||
|
iter, err := r.NewIter(nil, nil)
|
||
|
require.NoError(b, err)
|
||
|
var k *InternalKey
|
||
|
var v base.LazyValue
|
||
|
var valBuf [100]byte
|
||
|
b.ResetTimer()
|
||
|
for i := 0; i < b.N; i++ {
|
||
|
if k == nil {
|
||
|
k, _ = iter.First()
|
||
|
if k == nil {
|
||
|
b.Fatalf("k is nil")
|
||
|
}
|
||
|
}
|
||
|
k, v = iter.Next()
|
||
|
if k != nil && readValue {
|
||
|
_, callerOwned, err := v.Value(valBuf[:])
|
||
|
if err != nil {
|
||
|
b.Fatal(err)
|
||
|
} else if callerOwned {
|
||
|
b.Fatalf("unexpected callerOwned: %t", callerOwned)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func BenchmarkIteratorScanNextPrefix(b *testing.B) {
|
||
|
options := WriterOptions{
|
||
|
BlockSize: 32 << 10,
|
||
|
BlockRestartInterval: 16,
|
||
|
FilterPolicy: nil,
|
||
|
Compression: SnappyCompression,
|
||
|
TableFormat: TableFormatPebblev3,
|
||
|
Comparer: testkeys.Comparer,
|
||
|
}
|
||
|
const keyCount = 10000
|
||
|
const sharedPrefixLen = 32
|
||
|
const unsharedPrefixLen = 8
|
||
|
val := make([]byte, 100)
|
||
|
rand.New(rand.NewSource(100)).Read(val)
|
||
|
|
||
|
// Take the very large keyspace consisting of alphabetic characters of
|
||
|
// lengths up to unsharedPrefixLen and reduce it down to keyCount keys by
|
||
|
// picking every 1 key every keyCount keys.
|
||
|
keys := testkeys.Alpha(unsharedPrefixLen)
|
||
|
keys = keys.EveryN(keys.Count() / keyCount)
|
||
|
if keys.Count() < keyCount {
|
||
|
b.Fatalf("expected %d keys, found %d", keyCount, keys.Count())
|
||
|
}
|
||
|
keyBuf := make([]byte, sharedPrefixLen+unsharedPrefixLen+testkeys.MaxSuffixLen)
|
||
|
for i := 0; i < sharedPrefixLen; i++ {
|
||
|
keyBuf[i] = 'A' + byte(i)
|
||
|
}
|
||
|
setupBench := func(b *testing.B, versCount int) (r *Reader, succKeys [][]byte) {
|
||
|
mem := vfs.NewMem()
|
||
|
f0, err := mem.Create("bench")
|
||
|
require.NoError(b, err)
|
||
|
w := NewWriter(objstorageprovider.NewFileWritable(f0), options)
|
||
|
for i := int64(0); i < keys.Count(); i++ {
|
||
|
for v := 0; v < versCount; v++ {
|
||
|
n := testkeys.WriteKeyAt(keyBuf[sharedPrefixLen:], keys, i, int64(versCount-v+1))
|
||
|
key := keyBuf[:n+sharedPrefixLen]
|
||
|
require.NoError(b, w.Set(key, val))
|
||
|
if v == 0 {
|
||
|
prefixLen := testkeys.Comparer.Split(key)
|
||
|
prefixKey := key[:prefixLen]
|
||
|
succKey := testkeys.Comparer.ImmediateSuccessor(nil, prefixKey)
|
||
|
succKeys = append(succKeys, succKey)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
require.NoError(b, w.Close())
|
||
|
// NB: This 200MiB cache is sufficient for even the largest file: 10,000
|
||
|
// keys * 100 versions = 1M keys, where each key-value pair is ~140 bytes
|
||
|
// = 140MB. So we are not measuring the caching benefit of
|
||
|
// TableFormatPebblev3 storing older values in value blocks.
|
||
|
c := cache.New(200 << 20)
|
||
|
defer c.Unref()
|
||
|
// Re-open the filename for reading.
|
||
|
f0, err = mem.Open("bench")
|
||
|
require.NoError(b, err)
|
||
|
r, err = newReader(f0, ReaderOptions{
|
||
|
Cache: c,
|
||
|
Comparer: testkeys.Comparer,
|
||
|
})
|
||
|
require.NoError(b, err)
|
||
|
return r, succKeys
|
||
|
}
|
||
|
// Analysis of some sample results with TableFormatPebblev2:
|
||
|
// versions=1/method=seek-ge-10 22107622 53.57 ns/op
|
||
|
// versions=1/method=next-prefix-10 36292837 33.07 ns/op
|
||
|
// versions=2/method=seek-ge-10 14429138 82.92 ns/op
|
||
|
// versions=2/method=next-prefix-10 19676055 60.78 ns/op
|
||
|
// versions=10/method=seek-ge-10 1453726 825.2 ns/op
|
||
|
// versions=10/method=next-prefix-10 2450498 489.6 ns/op
|
||
|
// versions=100/method=seek-ge-10 965143 1257 ns/op
|
||
|
// versions=100/method=next-prefix-10 1000000 1054 ns/op
|
||
|
//
|
||
|
// With 1 version, both SeekGE and NextPrefix will be able to complete after
|
||
|
// doing a single call to blockIter.Next. However, SeekGE has to do two key
|
||
|
// comparisons unlike the one key comparison in NextPrefix. This is because
|
||
|
// SeekGE also compares *before* calling Next since it is possible that the
|
||
|
// preceding SeekGE is already at the right place.
|
||
|
//
|
||
|
// With 2 versions, both will do two calls to blockIter.Next. The difference
|
||
|
// in the cost is the same as in the 1 version case.
|
||
|
//
|
||
|
// With 10 versions, it is still likely that the desired key is in the same
|
||
|
// data block. NextPrefix will seek only the blockIter. And in the rare case
|
||
|
// that the key is in the next data block, it will step the index block (not
|
||
|
// seek). In comparison, SeekGE will seek the index block too.
|
||
|
//
|
||
|
// With 100 versions we more often cross from one data block to the next, so
|
||
|
// the difference in cost declines.
|
||
|
//
|
||
|
// Some sample results with TableFormatPebblev3:
|
||
|
|
||
|
// versions=1/method=seek-ge-10 18702609 53.90 ns/op
|
||
|
// versions=1/method=next-prefix-10 77440167 15.41 ns/op
|
||
|
// versions=2/method=seek-ge-10 13554286 87.91 ns/op
|
||
|
// versions=2/method=next-prefix-10 62148526 19.25 ns/op
|
||
|
// versions=10/method=seek-ge-10 1316676 910.5 ns/op
|
||
|
// versions=10/method=next-prefix-10 18829448 62.61 ns/op
|
||
|
// versions=100/method=seek-ge-10 1166139 1025 ns/op
|
||
|
// versions=100/method=next-prefix-10 4443386 265.3 ns/op
|
||
|
//
|
||
|
// NextPrefix is much cheaper than in TableFormatPebblev2 with larger number
|
||
|
// of versions. It is also cheaper with 1 and 2 versions since
|
||
|
// setHasSamePrefix=false eliminates a key comparison.
|
||
|
for _, versionCount := range []int{1, 2, 10, 100} {
|
||
|
b.Run(fmt.Sprintf("versions=%d", versionCount), func(b *testing.B) {
|
||
|
r, succKeys := setupBench(b, versionCount)
|
||
|
defer func() {
|
||
|
require.NoError(b, r.Close())
|
||
|
}()
|
||
|
for _, method := range []string{"seek-ge", "next-prefix"} {
|
||
|
b.Run(fmt.Sprintf("method=%s", method), func(b *testing.B) {
|
||
|
for _, readValue := range []bool{false, true} {
|
||
|
b.Run(fmt.Sprintf("read-value=%t", readValue), func(b *testing.B) {
|
||
|
iter, err := r.NewIter(nil, nil)
|
||
|
require.NoError(b, err)
|
||
|
var nextFunc func(index int) (*InternalKey, base.LazyValue)
|
||
|
switch method {
|
||
|
case "seek-ge":
|
||
|
nextFunc = func(index int) (*InternalKey, base.LazyValue) {
|
||
|
var flags base.SeekGEFlags
|
||
|
return iter.SeekGE(succKeys[index], flags.EnableTrySeekUsingNext())
|
||
|
}
|
||
|
case "next-prefix":
|
||
|
nextFunc = func(index int) (*InternalKey, base.LazyValue) {
|
||
|
return iter.NextPrefix(succKeys[index])
|
||
|
}
|
||
|
default:
|
||
|
b.Fatalf("unknown method %s", method)
|
||
|
}
|
||
|
n := keys.Count()
|
||
|
j := n
|
||
|
var k *InternalKey
|
||
|
var v base.LazyValue
|
||
|
var valBuf [100]byte
|
||
|
b.ResetTimer()
|
||
|
for i := 0; i < b.N; i++ {
|
||
|
if k == nil {
|
||
|
if j != n {
|
||
|
b.Fatalf("unexpected %d != %d", j, n)
|
||
|
}
|
||
|
k, _ = iter.First()
|
||
|
j = 0
|
||
|
} else {
|
||
|
k, v = nextFunc(int(j - 1))
|
||
|
if k != nil && readValue {
|
||
|
_, callerOwned, err := v.Value(valBuf[:])
|
||
|
if err != nil {
|
||
|
b.Fatal(err)
|
||
|
} else if callerOwned {
|
||
|
b.Fatalf("unexpected callerOwned: %t", callerOwned)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
}
|
||
|
if k != nil {
|
||
|
j++
|
||
|
}
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func BenchmarkIteratorScanObsolete(b *testing.B) {
|
||
|
options := WriterOptions{
|
||
|
BlockSize: 32 << 10,
|
||
|
BlockRestartInterval: 16,
|
||
|
FilterPolicy: nil,
|
||
|
Compression: SnappyCompression,
|
||
|
Comparer: testkeys.Comparer,
|
||
|
}
|
||
|
const keyCount = 1 << 20
|
||
|
const keyLen = 10
|
||
|
|
||
|
// Take the very large keyspace consisting of alphabetic characters of
|
||
|
// lengths up to unsharedPrefixLen and reduce it down to keyCount keys by
|
||
|
// picking every 1 key every keyCount keys.
|
||
|
keys := testkeys.Alpha(keyLen)
|
||
|
keys = keys.EveryN(keys.Count() / keyCount)
|
||
|
if keys.Count() < keyCount {
|
||
|
b.Fatalf("expected %d keys, found %d", keyCount, keys.Count())
|
||
|
}
|
||
|
expectedKeyCount := keys.Count()
|
||
|
keyBuf := make([]byte, keyLen)
|
||
|
setupBench := func(b *testing.B, tableFormat TableFormat, cacheSize int64) *Reader {
|
||
|
mem := vfs.NewMem()
|
||
|
f0, err := mem.Create("bench")
|
||
|
require.NoError(b, err)
|
||
|
options.TableFormat = tableFormat
|
||
|
w := NewWriter(objstorageprovider.NewFileWritable(f0), options)
|
||
|
val := make([]byte, 100)
|
||
|
rng := rand.New(rand.NewSource(100))
|
||
|
for i := int64(0); i < keys.Count(); i++ {
|
||
|
n := testkeys.WriteKey(keyBuf, keys, i)
|
||
|
key := keyBuf[:n]
|
||
|
rng.Read(val)
|
||
|
forceObsolete := true
|
||
|
if i == 0 {
|
||
|
forceObsolete = false
|
||
|
}
|
||
|
require.NoError(b, w.AddWithForceObsolete(
|
||
|
base.MakeInternalKey(key, 0, InternalKeyKindSet), val, forceObsolete))
|
||
|
}
|
||
|
require.NoError(b, w.Close())
|
||
|
c := cache.New(cacheSize)
|
||
|
defer c.Unref()
|
||
|
// Re-open the filename for reading.
|
||
|
f0, err = mem.Open("bench")
|
||
|
require.NoError(b, err)
|
||
|
r, err := newReader(f0, ReaderOptions{
|
||
|
Cache: c,
|
||
|
Comparer: testkeys.Comparer,
|
||
|
})
|
||
|
require.NoError(b, err)
|
||
|
return r
|
||
|
}
|
||
|
for _, format := range []TableFormat{TableFormatPebblev3, TableFormatPebblev4} {
|
||
|
b.Run(fmt.Sprintf("format=%s", format.String()), func(b *testing.B) {
|
||
|
// 150MiB results in a high cache hit rate for both formats.
|
||
|
for _, cacheSize := range []int64{1, 150 << 20} {
|
||
|
b.Run(fmt.Sprintf("cache-size=%s", humanize.Bytes.Int64(cacheSize)),
|
||
|
func(b *testing.B) {
|
||
|
r := setupBench(b, format, cacheSize)
|
||
|
defer func() {
|
||
|
require.NoError(b, r.Close())
|
||
|
}()
|
||
|
for _, hideObsoletePoints := range []bool{false, true} {
|
||
|
b.Run(fmt.Sprintf("hide-obsolete=%t", hideObsoletePoints), func(b *testing.B) {
|
||
|
var filterer *BlockPropertiesFilterer
|
||
|
if format == TableFormatPebblev4 && hideObsoletePoints {
|
||
|
filterer = newBlockPropertiesFilterer(
|
||
|
[]BlockPropertyFilter{obsoleteKeyBlockPropertyFilter{}}, nil)
|
||
|
intersects, err :=
|
||
|
filterer.intersectsUserPropsAndFinishInit(r.Properties.UserProperties)
|
||
|
if err != nil {
|
||
|
b.Fatalf("%s", err.Error())
|
||
|
}
|
||
|
if !intersects {
|
||
|
b.Fatalf("sstable does not intersect")
|
||
|
}
|
||
|
}
|
||
|
iter, err := r.NewIterWithBlockPropertyFiltersAndContextEtc(
|
||
|
context.Background(), nil, nil, filterer, hideObsoletePoints,
|
||
|
true, nil, CategoryAndQoS{}, nil,
|
||
|
TrivialReaderProvider{Reader: r})
|
||
|
require.NoError(b, err)
|
||
|
b.ResetTimer()
|
||
|
for i := 0; i < b.N; i++ {
|
||
|
count := int64(0)
|
||
|
k, _ := iter.First()
|
||
|
for k != nil {
|
||
|
count++
|
||
|
k, _ = iter.Next()
|
||
|
}
|
||
|
if format == TableFormatPebblev4 && hideObsoletePoints {
|
||
|
if count != 1 {
|
||
|
b.Fatalf("found %d points", count)
|
||
|
}
|
||
|
} else {
|
||
|
if count != expectedKeyCount {
|
||
|
b.Fatalf("found %d points", count)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func newReader(r ReadableFile, o ReaderOptions, extraOpts ...ReaderOption) (*Reader, error) {
|
||
|
readable, err := NewSimpleReadable(r)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
return NewReader(readable, o, extraOpts...)
|
||
|
}
|