ceremonyclient/pebble/sstable/block_property_test.go

1488 lines
39 KiB
Go
Raw Permalink Normal View History

2024-01-03 07:31:42 +00:00
// Copyright 2021 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.
package sstable
import (
"bytes"
"context"
"fmt"
"io"
"math"
"math/rand"
"sort"
"strconv"
"strings"
"testing"
"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/rangekey"
"github.com/cockroachdb/pebble/internal/testkeys"
"github.com/stretchr/testify/require"
)
func TestIntervalEncodeDecode(t *testing.T) {
testCases := []struct {
name string
lower uint64
upper uint64
len int
}{
{
name: "empty zero",
lower: 0,
upper: 0,
len: 0,
},
{
name: "empty non-zero",
lower: 5,
upper: 5,
len: 0,
},
{
name: "empty lower > upper",
lower: math.MaxUint64,
upper: math.MaxUint64 - 1,
len: 0,
},
{
name: "small",
lower: 50,
upper: 61,
len: 2,
},
{
name: "big",
lower: 0,
upper: math.MaxUint64,
len: 11,
},
}
for _, tc := range testCases {
buf := make([]byte, 100)
t.Run(tc.name, func(t *testing.T) {
i1 := interval{lower: tc.lower, upper: tc.upper}
b1 := i1.encode(nil)
b2 := i1.encode(buf[:0])
require.True(t, bytes.Equal(b1, b2), "%x != %x", b1, b2)
expectedInterval := i1
if expectedInterval.lower >= expectedInterval.upper {
expectedInterval = interval{}
}
// Arbitrary initial value.
arbitraryInterval := interval{lower: 1000, upper: 1000}
i2 := arbitraryInterval
i2.decode(b1)
require.Equal(t, expectedInterval, i2)
i2 = arbitraryInterval
i2.decode(b2)
require.Equal(t, expectedInterval, i2)
require.Equal(t, tc.len, len(b1))
})
}
}
func TestIntervalUnionIntersects(t *testing.T) {
testCases := []struct {
name string
i1 interval
i2 interval
union interval
intersects bool
}{
{
name: "empty and empty",
i1: interval{},
i2: interval{},
union: interval{},
intersects: false,
},
{
name: "empty and empty non-zero",
i1: interval{},
i2: interval{100, 99},
union: interval{},
intersects: false,
},
{
name: "empty and non-empty",
i1: interval{},
i2: interval{80, 100},
union: interval{80, 100},
intersects: false,
},
{
name: "disjoint sets",
i1: interval{50, 60},
i2: interval{math.MaxUint64 - 5, math.MaxUint64},
union: interval{50, math.MaxUint64},
intersects: false,
},
{
name: "adjacent sets",
i1: interval{50, 60},
i2: interval{60, 100},
union: interval{50, 100},
intersects: false,
},
{
name: "overlapping sets",
i1: interval{50, 60},
i2: interval{59, 120},
union: interval{50, 120},
intersects: true,
},
}
isEmpty := func(i interval) bool {
return i.lower >= i.upper
}
// adjustUnionExpectation exists because union does not try to
// canonicalize empty sets by turning them into [0, 0), since it is
// unnecessary -- the higher level context of the BlockIntervalCollector
// will do so when calling interval.encode.
adjustUnionExpectation := func(expected interval, i1 interval, i2 interval) interval {
if isEmpty(i2) {
return i1
}
if isEmpty(i1) {
return i2
}
return expected
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.intersects, tc.i1.intersects(tc.i2))
require.Equal(t, tc.intersects, tc.i2.intersects(tc.i1))
require.Equal(t, !isEmpty(tc.i1), tc.i1.intersects(tc.i1))
require.Equal(t, !isEmpty(tc.i2), tc.i2.intersects(tc.i2))
union := tc.i1
union.union(tc.i2)
require.Equal(t, adjustUnionExpectation(tc.union, tc.i1, tc.i2), union)
union = tc.i2
union.union(tc.i1)
require.Equal(t, adjustUnionExpectation(tc.union, tc.i2, tc.i1), union)
})
}
}
type testDataBlockIntervalCollector struct {
i interval
}
func (c *testDataBlockIntervalCollector) Add(key InternalKey, value []byte) error {
return nil
}
func (c *testDataBlockIntervalCollector) FinishDataBlock() (lower uint64, upper uint64, err error) {
return c.i.lower, c.i.upper, nil
}
func TestBlockIntervalCollector(t *testing.T) {
var points, ranges testDataBlockIntervalCollector
bic := NewBlockIntervalCollector("foo", &points, &ranges)
require.Equal(t, "foo", bic.Name())
// Set up the point key collector with an initial (empty) interval.
points.i = interval{1, 1}
// First data block has empty point key interval.
encoded, err := bic.FinishDataBlock(nil)
require.NoError(t, err)
require.True(t, bytes.Equal(nil, encoded))
bic.AddPrevDataBlockToIndexBlock()
// Second data block contains a point and range key interval. The latter
// should not contribute to the block interval.
points.i = interval{20, 25}
ranges.i = interval{5, 150}
encoded, err = bic.FinishDataBlock(nil)
require.NoError(t, err)
var decoded interval
require.NoError(t, decoded.decode(encoded))
require.Equal(t, interval{20, 25}, decoded)
var encodedIndexBlock []byte
// Finish index block before including second data block.
encodedIndexBlock, err = bic.FinishIndexBlock(nil)
require.NoError(t, err)
require.True(t, bytes.Equal(nil, encodedIndexBlock))
bic.AddPrevDataBlockToIndexBlock()
// Third data block.
points.i = interval{10, 15}
encoded, err = bic.FinishDataBlock(nil)
require.NoError(t, err)
require.NoError(t, decoded.decode(encoded))
require.Equal(t, interval{10, 15}, decoded)
bic.AddPrevDataBlockToIndexBlock()
// Fourth data block.
points.i = interval{100, 105}
encoded, err = bic.FinishDataBlock(nil)
require.NoError(t, err)
require.NoError(t, decoded.decode(encoded))
require.Equal(t, interval{100, 105}, decoded)
// Finish index block before including fourth data block.
encodedIndexBlock, err = bic.FinishIndexBlock(nil)
require.NoError(t, err)
require.NoError(t, decoded.decode(encodedIndexBlock))
require.Equal(t, interval{10, 25}, decoded)
bic.AddPrevDataBlockToIndexBlock()
// Finish index block that contains only fourth data block.
encodedIndexBlock, err = bic.FinishIndexBlock(nil)
require.NoError(t, err)
require.NoError(t, decoded.decode(encodedIndexBlock))
require.Equal(t, interval{100, 105}, decoded)
var encodedTable []byte
// Finish table. The table interval is the union of the current point key
// table interval [10, 105) and the range key interval [5, 150).
encodedTable, err = bic.FinishTable(nil)
require.NoError(t, err)
require.NoError(t, decoded.decode(encodedTable))
require.Equal(t, interval{5, 150}, decoded)
}
func TestBlockIntervalFilter(t *testing.T) {
testCases := []struct {
name string
filter interval
prop interval
intersects bool
}{
{
name: "non-empty and empty",
filter: interval{10, 15},
prop: interval{},
intersects: false,
},
{
name: "does not intersect",
filter: interval{10, 15},
prop: interval{15, 20},
intersects: false,
},
{
name: "intersects",
filter: interval{10, 15},
prop: interval{14, 20},
intersects: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var points testDataBlockIntervalCollector
name := "foo"
bic := NewBlockIntervalCollector(name, &points, nil)
bif := NewBlockIntervalFilter(name, tc.filter.lower, tc.filter.upper)
points.i = tc.prop
prop, _ := bic.FinishDataBlock(nil)
intersects, err := bif.Intersects(prop)
require.NoError(t, err)
require.Equal(t, tc.intersects, intersects)
})
}
}
func TestBlockPropertiesEncoderDecoder(t *testing.T) {
var encoder blockPropertiesEncoder
scratch := encoder.getScratchForProp()
scratch = append(scratch, []byte("foo")...)
encoder.addProp(1, scratch)
scratch = encoder.getScratchForProp()
require.LessOrEqual(t, 3, cap(scratch))
scratch = append(scratch, []byte("cockroach")...)
encoder.addProp(10, scratch)
props1 := encoder.props()
unsafeProps := encoder.unsafeProps()
require.True(t, bytes.Equal(props1, unsafeProps), "%x != %x", props1, unsafeProps)
decodeProps1 := func() {
decoder := blockPropertiesDecoder{props: props1}
require.False(t, decoder.done())
id, prop, err := decoder.next()
require.NoError(t, err)
require.Equal(t, shortID(1), id)
require.Equal(t, string(prop), "foo")
require.False(t, decoder.done())
id, prop, err = decoder.next()
require.NoError(t, err)
require.Equal(t, shortID(10), id)
require.Equal(t, string(prop), "cockroach")
require.True(t, decoder.done())
}
decodeProps1()
encoder.resetProps()
scratch = encoder.getScratchForProp()
require.LessOrEqual(t, 9, cap(scratch))
scratch = append(scratch, []byte("bar")...)
encoder.addProp(10, scratch)
props2 := encoder.props()
unsafeProps = encoder.unsafeProps()
require.True(t, bytes.Equal(props2, unsafeProps), "%x != %x", props2, unsafeProps)
// Safe props should still decode.
decodeProps1()
// Decode props2
decoder := blockPropertiesDecoder{props: props2}
require.False(t, decoder.done())
id, prop, err := decoder.next()
require.NoError(t, err)
require.Equal(t, shortID(10), id)
require.Equal(t, string(prop), "bar")
require.True(t, decoder.done())
}
// filterWithTrueForEmptyProp is a wrapper for BlockPropertyFilter that
// delegates to it except when the property is empty, in which case it returns
// true.
type filterWithTrueForEmptyProp struct {
BlockPropertyFilter
}
func (b filterWithTrueForEmptyProp) Intersects(prop []byte) (bool, error) {
if len(prop) == 0 {
return true, nil
}
return b.BlockPropertyFilter.Intersects(prop)
}
func TestBlockPropertiesFilterer_IntersectsUserPropsAndFinishInit(t *testing.T) {
// props with id=0, interval [10, 20); id=10, interval [110, 120).
var dbic testDataBlockIntervalCollector
bic0 := NewBlockIntervalCollector("p0", &dbic, nil)
bic0Id := byte(0)
bic10 := NewBlockIntervalCollector("p10", &dbic, nil)
bic10Id := byte(10)
dbic.i = interval{10, 20}
prop0 := append([]byte(nil), bic0Id)
_, err := bic0.FinishDataBlock(nil)
require.NoError(t, err)
prop0, err = bic0.FinishTable(prop0)
require.NoError(t, err)
dbic.i = interval{110, 120}
prop10 := append([]byte(nil), bic10Id)
_, err = bic10.FinishDataBlock(nil)
require.NoError(t, err)
prop10, err = bic10.FinishTable(prop10)
require.NoError(t, err)
prop0Str := string(prop0)
prop10Str := string(prop10)
type filter struct {
name string
i interval
}
testCases := []struct {
name string
userProps map[string]string
filters []filter
// Expected results
intersects bool
shortIDToFiltersIndex []int
}{
{
name: "no filter, no props",
userProps: map[string]string{},
filters: nil,
intersects: true,
},
{
name: "no props",
userProps: map[string]string{},
filters: []filter{
{name: "p0", i: interval{20, 30}},
{name: "p10", i: interval{20, 30}},
},
intersects: true,
},
{
name: "prop0, does not intersect",
userProps: map[string]string{"p0": prop0Str},
filters: []filter{
{name: "p0", i: interval{20, 30}},
{name: "p10", i: interval{20, 30}},
},
intersects: false,
},
{
name: "prop0, intersects",
userProps: map[string]string{"p0": prop0Str},
filters: []filter{
{name: "p0", i: interval{11, 21}},
{name: "p10", i: interval{20, 30}},
},
intersects: true,
shortIDToFiltersIndex: []int{0},
},
{
name: "prop10, does not intersect",
userProps: map[string]string{"p10": prop10Str},
filters: []filter{
{name: "p0", i: interval{11, 21}},
{name: "p10", i: interval{20, 30}},
},
intersects: false,
},
{
name: "prop10, intersects",
userProps: map[string]string{"p10": prop10Str},
filters: []filter{
{name: "p0", i: interval{11, 21}},
{name: "p10", i: interval{115, 125}},
},
intersects: true,
shortIDToFiltersIndex: []int{-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 1},
},
{
name: "prop10, intersects",
userProps: map[string]string{"p10": prop10Str},
filters: []filter{
{name: "p10", i: interval{115, 125}},
{name: "p0", i: interval{11, 21}},
},
intersects: true,
shortIDToFiltersIndex: []int{-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 0},
},
{
name: "prop0 and prop10, does not intersect",
userProps: map[string]string{"p0": prop0Str, "p10": prop10Str},
filters: []filter{
{name: "p10", i: interval{115, 125}},
{name: "p0", i: interval{20, 30}},
},
intersects: false,
shortIDToFiltersIndex: []int{-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 0},
},
{
name: "prop0 and prop10, does not intersect",
userProps: map[string]string{"p0": prop0Str, "p10": prop10Str},
filters: []filter{
{name: "p0", i: interval{10, 20}},
{name: "p10", i: interval{125, 135}},
},
intersects: false,
shortIDToFiltersIndex: []int{0},
},
{
name: "prop0 and prop10, intersects",
userProps: map[string]string{"p0": prop0Str, "p10": prop10Str},
filters: []filter{
{name: "p10", i: interval{115, 125}},
{name: "p0", i: interval{10, 20}},
},
intersects: true,
shortIDToFiltersIndex: []int{1, -1, -1, -1, -1, -1, -1, -1, -1, -1, 0},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var filters []BlockPropertyFilter
for _, f := range tc.filters {
filter := NewBlockIntervalFilter(f.name, f.i.lower, f.i.upper)
filters = append(filters, filter)
}
filterer := newBlockPropertiesFilterer(filters, nil)
intersects, err := filterer.intersectsUserPropsAndFinishInit(tc.userProps)
require.NoError(t, err)
require.Equal(t, tc.intersects, intersects)
require.Equal(t, tc.shortIDToFiltersIndex, filterer.shortIDToFiltersIndex)
})
}
}
func TestBlockPropertiesFilterer_Intersects(t *testing.T) {
// Setup two different properties values to filter against.
var emptyProps []byte
// props with id=0, interval [10, 20); id=10, interval [110, 120).
var encoder blockPropertiesEncoder
var dbic testDataBlockIntervalCollector
bic0 := NewBlockIntervalCollector("", &dbic, nil)
bic0Id := shortID(0)
bic10 := NewBlockIntervalCollector("", &dbic, nil)
bic10Id := shortID(10)
dbic.i = interval{10, 20}
prop, err := bic0.FinishDataBlock(encoder.getScratchForProp())
require.NoError(t, err)
encoder.addProp(bic0Id, prop)
dbic.i = interval{110, 120}
prop, err = bic10.FinishDataBlock(encoder.getScratchForProp())
require.NoError(t, err)
encoder.addProp(bic10Id, prop)
props0And10 := encoder.props()
type filter struct {
shortID shortID
i interval
intersectsForEmptyProp bool
}
testCases := []struct {
name string
props []byte
// filters must be in ascending order of shortID.
filters []filter
intersects bool
}{
{
name: "no filter, empty props",
props: emptyProps,
intersects: true,
},
{
name: "no filter",
props: props0And10,
intersects: true,
},
{
name: "filter 0, empty props, does not intersect",
props: emptyProps,
filters: []filter{
{
shortID: 0,
i: interval{5, 15},
},
},
intersects: false,
},
{
name: "filter 10, empty props, does not intersect",
props: emptyProps,
filters: []filter{
{
shortID: 0,
i: interval{105, 111},
},
},
intersects: false,
},
{
name: "filter 0, intersects",
props: props0And10,
filters: []filter{
{
shortID: 0,
i: interval{5, 15},
},
},
intersects: true,
},
{
name: "filter 0, does not intersect",
props: props0And10,
filters: []filter{
{
shortID: 0,
i: interval{20, 25},
},
},
intersects: false,
},
{
name: "filter 10, intersects",
props: props0And10,
filters: []filter{
{
shortID: 10,
i: interval{105, 111},
},
},
intersects: true,
},
{
name: "filter 10, does not intersect",
props: props0And10,
filters: []filter{
{
shortID: 10,
i: interval{105, 110},
},
},
intersects: false,
},
{
name: "filter 5, does not intersect since no property",
props: props0And10,
filters: []filter{
{
shortID: 5,
i: interval{105, 110},
},
},
intersects: false,
},
{
name: "filter 0 and 5, intersects and not intersects means overall not intersects",
props: props0And10,
filters: []filter{
{
shortID: 0,
i: interval{5, 15},
},
{
shortID: 5,
i: interval{105, 110},
},
},
intersects: false,
},
{
name: "filter 0, 5, 7, 11, all intersect",
props: props0And10,
filters: []filter{
{
shortID: 0,
i: interval{5, 15},
},
{
shortID: 5,
i: interval{105, 110},
intersectsForEmptyProp: true,
},
{
shortID: 7,
i: interval{105, 110},
intersectsForEmptyProp: true,
},
{
shortID: 11,
i: interval{105, 110},
intersectsForEmptyProp: true,
},
},
intersects: true,
},
{
name: "filter 0, 5, 7, 10, 11, all intersect",
props: props0And10,
filters: []filter{
{
shortID: 0,
i: interval{5, 15},
},
{
shortID: 5,
i: interval{105, 110},
intersectsForEmptyProp: true,
},
{
shortID: 7,
i: interval{105, 110},
intersectsForEmptyProp: true,
},
{
shortID: 10,
i: interval{105, 111},
},
{
shortID: 11,
i: interval{105, 110},
intersectsForEmptyProp: true,
},
},
intersects: true,
},
{
name: "filter 0, 5, 7, 10, 11, all intersect except for 10",
props: props0And10,
filters: []filter{
{
shortID: 0,
i: interval{5, 15},
},
{
shortID: 5,
i: interval{105, 110},
intersectsForEmptyProp: true,
},
{
shortID: 7,
i: interval{105, 110},
intersectsForEmptyProp: true,
},
{
shortID: 10,
i: interval{105, 110},
},
{
shortID: 11,
i: interval{105, 110},
intersectsForEmptyProp: true,
},
},
intersects: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var filters []BlockPropertyFilter
var shortIDToFiltersIndex []int
if len(tc.filters) > 0 {
shortIDToFiltersIndex = make([]int, tc.filters[len(tc.filters)-1].shortID+1)
for i := range shortIDToFiltersIndex {
shortIDToFiltersIndex[i] = -1
}
}
for _, f := range tc.filters {
filter := NewBlockIntervalFilter("", f.i.lower, f.i.upper)
bpf := BlockPropertyFilter(filter)
if f.intersectsForEmptyProp {
bpf = filterWithTrueForEmptyProp{filter}
}
shortIDToFiltersIndex[f.shortID] = len(filters)
filters = append(filters, bpf)
}
doFiltering := func() {
bpFilterer := BlockPropertiesFilterer{
filters: filters,
shortIDToFiltersIndex: shortIDToFiltersIndex,
boundLimitedShortID: -1,
}
intersects, err := bpFilterer.intersects(tc.props)
require.NoError(t, err)
require.Equal(t, tc.intersects, intersects == blockIntersects)
}
doFiltering()
if len(filters) > 1 {
// Permute the filters so that the use of
// shortIDToFiltersIndex is better tested.
permutation := rand.Perm(len(filters))
filterPerm := make([]BlockPropertyFilter, len(filters))
for i := range permutation {
filterPerm[i] = filters[permutation[i]]
shortIDToFiltersIndex[tc.filters[permutation[i]].shortID] = i
}
filters = filterPerm
doFiltering()
}
})
}
}
// valueCharBlockIntervalCollector implements DataBlockIntervalCollector by
// maintaining the (inclusive) lower and (exclusive) upper bound of a fixed
// character position in the value, when represented as an integer.
type valueCharBlockIntervalCollector struct {
charIdx int
initialized bool
lower, upper uint64
}
var _ DataBlockIntervalCollector = &valueCharBlockIntervalCollector{}
// Add implements DataBlockIntervalCollector by maintaining the lower and upper
// bound of a fixed character position in the value.
func (c *valueCharBlockIntervalCollector) Add(_ InternalKey, value []byte) error {
charIdx := c.charIdx
if charIdx == -1 {
charIdx = len(value) - 1
}
val, err := strconv.Atoi(string(value[charIdx]))
if err != nil {
return err
}
uval := uint64(val)
if !c.initialized {
c.lower, c.upper = uval, uval+1
c.initialized = true
return nil
}
if uval < c.lower {
c.lower = uval
}
if uval >= c.upper {
c.upper = uval + 1
}
return nil
}
// Finish implements DataBlockIntervalCollector, returning the lower and upper
// bound for the block. The range is reset to zero in anticipation of the next
// block.
func (c *valueCharBlockIntervalCollector) FinishDataBlock() (lower, upper uint64, err error) {
l, u := c.lower, c.upper
c.lower, c.upper = 0, 0
c.initialized = false
return l, u, nil
}
// testKeysSuffixIntervalCollector maintains an interval over the timestamps in
// MVCC-like suffixes for keys (e.g. foo@123).
type suffixIntervalCollector struct {
initialized bool
lower, upper uint64
}
// Add implements DataBlockIntervalCollector by adding the timestamp(s) in the
// suffix(es) of this record to the current interval.
//
// Note that range sets and unsets may have multiple suffixes. Range key deletes
// do not have a suffix. All other point keys have a single suffix.
func (c *suffixIntervalCollector) Add(key InternalKey, value []byte) error {
var bs [][]byte
// Range keys have their suffixes encoded into the value.
if rangekey.IsRangeKey(key.Kind()) {
if key.Kind() == base.InternalKeyKindRangeKeyDelete {
return nil
}
s, err := rangekey.Decode(key, value, nil)
if err != nil {
return err
}
for _, k := range s.Keys {
if len(k.Suffix) > 0 {
bs = append(bs, k.Suffix)
}
}
} else {
// All other keys have a single suffix encoded into the value.
bs = append(bs, key.UserKey)
}
for _, b := range bs {
i := testkeys.Comparer.Split(b)
ts, err := strconv.Atoi(string(b[i+1:]))
if err != nil {
return err
}
uts := uint64(ts)
if !c.initialized {
c.lower, c.upper = uts, uts+1
c.initialized = true
continue
}
if uts < c.lower {
c.lower = uts
}
if uts >= c.upper {
c.upper = uts + 1
}
}
return nil
}
// FinishDataBlock implements DataBlockIntervalCollector.
func (c *suffixIntervalCollector) FinishDataBlock() (lower, upper uint64, err error) {
l, u := c.lower, c.upper
c.lower, c.upper = 0, 0
c.initialized = false
return l, u, nil
}
func TestBlockProperties(t *testing.T) {
var r *Reader
defer func() {
if r != nil {
require.NoError(t, r.Close())
}
}()
var stats base.InternalIteratorStats
datadriven.RunTest(t, "testdata/block_properties", func(t *testing.T, td *datadriven.TestData) string {
switch td.Cmd {
case "build":
if r != nil {
_ = r.Close()
r = nil
}
var output string
r, output = runBlockPropertiesBuildCmd(td)
return output
case "collectors":
return runCollectorsCmd(r, td)
case "table-props":
return runTablePropsCmd(r, td)
case "block-props":
return runBlockPropsCmd(r, td)
case "filter":
var points, ranges []BlockPropertyFilter
for _, cmd := range td.CmdArgs {
filter, err := parseIntervalFilter(cmd)
if err != nil {
return err.Error()
}
switch cmd.Key {
case "point-filter":
points = append(points, filter)
case "range-filter":
ranges = append(ranges, filter)
default:
return fmt.Sprintf("unknown command: %s", td.Cmd)
}
}
// Point keys filter matches.
var buf bytes.Buffer
var f *BlockPropertiesFilterer
buf.WriteString("points: ")
if len(points) > 0 {
f = newBlockPropertiesFilterer(points, nil)
ok, err := f.intersectsUserPropsAndFinishInit(r.Properties.UserProperties)
if err != nil {
return err.Error()
}
buf.WriteString(strconv.FormatBool(ok))
if !ok {
f = nil
}
// Enumerate point key data blocks encoded into the index.
if f != nil {
indexH, err := r.readIndex(context.Background(), nil, nil)
if err != nil {
return err.Error()
}
defer indexH.Release()
buf.WriteString(", blocks=[")
var blocks []int
var i int
iter, _ := newBlockIter(r.Compare, indexH.Get())
for key, value := iter.First(); key != nil; key, value = iter.Next() {
bh, err := decodeBlockHandleWithProperties(value.InPlaceValue())
if err != nil {
return err.Error()
}
intersects, err := f.intersects(bh.Props)
if err != nil {
return err.Error()
}
if intersects == blockIntersects {
blocks = append(blocks, i)
}
i++
}
for i, b := range blocks {
buf.WriteString(strconv.Itoa(b))
if i < len(blocks)-1 {
buf.WriteString(",")
}
}
buf.WriteString("]")
}
} else {
// Without filters, the table matches by default.
buf.WriteString("true (no filters provided)")
}
buf.WriteString("\n")
// Range key filter matches.
buf.WriteString("ranges: ")
if len(ranges) > 0 {
f := newBlockPropertiesFilterer(ranges, nil)
ok, err := f.intersectsUserPropsAndFinishInit(r.Properties.UserProperties)
if err != nil {
return err.Error()
}
buf.WriteString(strconv.FormatBool(ok))
} else {
// Without filters, the table matches by default.
buf.WriteString("true (no filters provided)")
}
buf.WriteString("\n")
return buf.String()
case "iter":
var lower, upper []byte
var filters []BlockPropertyFilter
for _, arg := range td.CmdArgs {
switch arg.Key {
case "lower":
lower = []byte(arg.Vals[0])
case "upper":
upper = []byte(arg.Vals[0])
case "point-key-filter":
f, err := parseIntervalFilter(arg)
if err != nil {
return err.Error()
}
filters = append(filters, f)
}
}
filterer := newBlockPropertiesFilterer(filters, nil)
ok, err := filterer.intersectsUserPropsAndFinishInit(r.Properties.UserProperties)
if err != nil {
return err.Error()
} else if !ok {
return "filter excludes entire table"
}
iter, err := r.NewIterWithBlockPropertyFilters(
lower, upper, filterer, false /* use (bloom) filter */, &stats,
CategoryAndQoS{}, nil, TrivialReaderProvider{Reader: r})
if err != nil {
return err.Error()
}
return runIterCmd(td, iter, false, runIterCmdEveryOpAfter(func(w io.Writer) {
// After every op, point the value of MaybeFilteredKeys.
fmt.Fprintf(w, " MaybeFilteredKeys()=%t", iter.MaybeFilteredKeys())
}))
default:
return fmt.Sprintf("unknown command: %s", td.Cmd)
}
})
}
func TestBlockProperties_BoundLimited(t *testing.T) {
var r *Reader
defer func() {
if r != nil {
require.NoError(t, r.Close())
}
}()
var stats base.InternalIteratorStats
datadriven.RunTest(t, "testdata/block_properties_boundlimited", func(t *testing.T, td *datadriven.TestData) string {
switch td.Cmd {
case "build":
if r != nil {
_ = r.Close()
r = nil
}
var output string
r, output = runBlockPropertiesBuildCmd(td)
return output
case "collectors":
return runCollectorsCmd(r, td)
case "table-props":
return runTablePropsCmd(r, td)
case "block-props":
return runBlockPropsCmd(r, td)
case "iter":
var buf bytes.Buffer
var lower, upper []byte
filter := boundLimitedWrapper{
w: &buf,
cmp: testkeys.Comparer.Compare,
}
for _, arg := range td.CmdArgs {
switch arg.Key {
case "lower":
lower = []byte(arg.Vals[0])
case "upper":
upper = []byte(arg.Vals[0])
case "filter":
f, err := parseIntervalFilter(arg)
if err != nil {
return err.Error()
}
filter.inner = f
case "filter-upper":
ik := base.MakeInternalKey([]byte(arg.Vals[0]), 0, base.InternalKeyKindSet)
filter.upper = &ik
case "filter-lower":
ik := base.MakeInternalKey([]byte(arg.Vals[0]), 0, base.InternalKeyKindSet)
filter.lower = &ik
}
}
if filter.inner == nil {
return "missing block property filter"
}
filterer := newBlockPropertiesFilterer(nil, &filter)
ok, err := filterer.intersectsUserPropsAndFinishInit(r.Properties.UserProperties)
if err != nil {
return err.Error()
} else if !ok {
return "filter excludes entire table"
}
iter, err := r.NewIterWithBlockPropertyFilters(
lower, upper, filterer, false /* use (bloom) filter */, &stats,
CategoryAndQoS{}, nil, TrivialReaderProvider{Reader: r})
if err != nil {
return err.Error()
}
return runIterCmd(td, iter, false, runIterCmdEveryOp(func(w io.Writer) {
// Copy the bound-limited-wrapper's accumulated output to the
// iterator's writer. This interleaves its output with the
// iterator output.
io.Copy(w, &buf)
buf.Reset()
}), runIterCmdEveryOpAfter(func(w io.Writer) {
// After every op, point the value of MaybeFilteredKeys.
fmt.Fprintf(w, " MaybeFilteredKeys()=%t", iter.MaybeFilteredKeys())
}))
default:
return fmt.Sprintf("unrecognized command %q", td.Cmd)
}
})
}
type boundLimitedWrapper struct {
w io.Writer
cmp base.Compare
inner BlockPropertyFilter
lower *InternalKey
upper *InternalKey
}
func (bl *boundLimitedWrapper) Name() string { return bl.inner.Name() }
func (bl *boundLimitedWrapper) Intersects(prop []byte) (bool, error) {
propString := fmt.Sprintf("%x", prop)
var i interval
if err := i.decode(prop); err == nil {
// If it decodes as an interval, pretty print it as an interval.
propString = fmt.Sprintf("[%d, %d)", i.lower, i.upper)
}
v, err := bl.inner.Intersects(prop)
if bl.w != nil {
fmt.Fprintf(bl.w, " filter.Intersects(%s) = (%t, %v)\n", propString, v, err)
}
return v, err
}
func (bl *boundLimitedWrapper) KeyIsWithinLowerBound(key []byte) (ret bool) {
if bl.lower == nil {
ret = true
} else {
ret = bl.cmp(key, bl.lower.UserKey) >= 0
}
if bl.w != nil {
fmt.Fprintf(bl.w, " filter.KeyIsWithinLowerBound(%s) = %t\n", key, ret)
}
return ret
}
func (bl *boundLimitedWrapper) KeyIsWithinUpperBound(key []byte) (ret bool) {
if bl.upper == nil {
ret = true
} else {
ret = bl.cmp(key, bl.upper.UserKey) <= 0
}
if bl.w != nil {
fmt.Fprintf(bl.w, " filter.KeyIsWithinUpperBound(%s) = %t\n", key, ret)
}
return ret
}
func parseIntervalFilter(cmd datadriven.CmdArg) (BlockPropertyFilter, error) {
name := cmd.Vals[0]
minS, maxS := cmd.Vals[1], cmd.Vals[2]
min, err := strconv.ParseUint(minS, 10, 64)
if err != nil {
return nil, err
}
max, err := strconv.ParseUint(maxS, 10, 64)
if err != nil {
return nil, err
}
return NewBlockIntervalFilter(name, min, max), nil
}
func runCollectorsCmd(r *Reader, td *datadriven.TestData) string {
var lines []string
for k, v := range r.Properties.UserProperties {
lines = append(lines, fmt.Sprintf("%d: %s", v[0], k))
}
linesSorted := sort.StringSlice(lines)
linesSorted.Sort()
return strings.Join(lines, "\n")
}
func runTablePropsCmd(r *Reader, td *datadriven.TestData) string {
var lines []string
for _, val := range r.Properties.UserProperties {
id := shortID(val[0])
var i interval
if err := i.decode([]byte(val[1:])); err != nil {
return err.Error()
}
lines = append(lines, fmt.Sprintf("%d: [%d, %d)", id, i.lower, i.upper))
}
linesSorted := sort.StringSlice(lines)
linesSorted.Sort()
return strings.Join(lines, "\n")
}
func runBlockPropertiesBuildCmd(td *datadriven.TestData) (r *Reader, out string) {
opts := WriterOptions{
TableFormat: TableFormatPebblev2,
IndexBlockSize: math.MaxInt32, // Default to a single level index for simplicity.
}
for _, cmd := range td.CmdArgs {
switch cmd.Key {
case "block-size":
if len(cmd.Vals) != 1 {
return r, fmt.Sprintf("%s: arg %s expects 1 value", td.Cmd, cmd.Key)
}
var err error
opts.BlockSize, err = strconv.Atoi(cmd.Vals[0])
if err != nil {
return r, err.Error()
}
case "collectors":
for _, c := range cmd.Vals {
var points, ranges DataBlockIntervalCollector
switch c {
case "value-first":
points = &valueCharBlockIntervalCollector{charIdx: 0}
case "value-last":
points = &valueCharBlockIntervalCollector{charIdx: -1}
case "suffix":
points, ranges = &suffixIntervalCollector{}, &suffixIntervalCollector{}
case "suffix-point-keys-only":
points = &suffixIntervalCollector{}
case "suffix-range-keys-only":
ranges = &suffixIntervalCollector{}
case "nil-points-and-ranges":
points, ranges = nil, nil
default:
return r, fmt.Sprintf("unknown collector: %s", c)
}
name := c
opts.BlockPropertyCollectors = append(
opts.BlockPropertyCollectors,
func() BlockPropertyCollector {
return NewBlockIntervalCollector(name, points, ranges)
})
}
case "index-block-size":
var err error
opts.IndexBlockSize, err = strconv.Atoi(cmd.Vals[0])
if err != nil {
return r, err.Error()
}
}
}
var meta *WriterMetadata
var err error
func() {
defer func() {
if r := recover(); r != nil {
err = errors.Errorf("%v", r)
}
}()
meta, r, err = runBuildCmd(td, &opts, 0)
}()
if err != nil {
return r, err.Error()
}
return r, fmt.Sprintf("point: [%s,%s]\nrangedel: [%s,%s]\nrangekey: [%s,%s]\nseqnums: [%d,%d]\n",
meta.SmallestPoint, meta.LargestPoint,
meta.SmallestRangeDel, meta.LargestRangeDel,
meta.SmallestRangeKey, meta.LargestRangeKey,
meta.SmallestSeqNum, meta.LargestSeqNum)
}
func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string {
bh, err := r.readIndex(context.Background(), nil, nil)
if err != nil {
return err.Error()
}
twoLevelIndex := r.Properties.IndexPartitions > 0
i, err := newBlockIter(r.Compare, bh.Get())
if err != nil {
return err.Error()
}
defer bh.Release()
var sb strings.Builder
decodeProps := func(props []byte, indent string) error {
d := blockPropertiesDecoder{props: props}
var lines []string
for !d.done() {
id, prop, err := d.next()
if err != nil {
return err
}
var i interval
if err := i.decode(prop); err != nil {
return err
}
lines = append(lines, fmt.Sprintf("%s%d: [%d, %d)\n", indent, id, i.lower, i.upper))
}
linesSorted := sort.StringSlice(lines)
linesSorted.Sort()
for _, line := range lines {
sb.WriteString(line)
}
return nil
}
for key, val := i.First(); key != nil; key, val = i.Next() {
sb.WriteString(fmt.Sprintf("%s:\n", key))
bhp, err := decodeBlockHandleWithProperties(val.InPlaceValue())
if err != nil {
return err.Error()
}
if err := decodeProps(bhp.Props, " "); err != nil {
return err.Error()
}
// If the table has a two-level index, also decode the index
// block that bhp points to, along with its block properties.
if twoLevelIndex {
subiter := &blockIter{}
subIndex, err := r.readBlock(
context.Background(), bhp.BlockHandle, nil, nil, nil, nil, nil)
if err != nil {
return err.Error()
}
if err := subiter.init(
r.Compare, subIndex.Get(), 0 /* globalSeqNum */, false); err != nil {
return err.Error()
}
for key, value := subiter.First(); key != nil; key, value = subiter.Next() {
sb.WriteString(fmt.Sprintf(" %s:\n", key))
dataBH, err := decodeBlockHandleWithProperties(value.InPlaceValue())
if err != nil {
return err.Error()
}
if err := decodeProps(dataBH.Props, " "); err != nil {
return err.Error()
}
}
subIndex.Release()
}
}
return sb.String()
}
type keyCountCollector struct {
name string
block, index, table int
}
var _ BlockPropertyCollector = &keyCountCollector{}
var _ SuffixReplaceableBlockCollector = &keyCountCollector{}
func keyCountCollectorFn(name string) func() BlockPropertyCollector {
return func() BlockPropertyCollector { return &keyCountCollector{name: name} }
}
func (p *keyCountCollector) Name() string { return p.name }
func (p *keyCountCollector) Add(k InternalKey, _ []byte) error {
if rangekey.IsRangeKey(k.Kind()) {
p.table++
} else {
p.block++
}
return nil
}
func (p *keyCountCollector) FinishDataBlock(buf []byte) ([]byte, error) {
buf = append(buf, []byte(strconv.Itoa(int(p.block)))...)
p.table += p.block
return buf, nil
}
func (p *keyCountCollector) AddPrevDataBlockToIndexBlock() {
p.index += p.block
p.block = 0
}
func (p *keyCountCollector) FinishIndexBlock(buf []byte) ([]byte, error) {
buf = append(buf, []byte(strconv.Itoa(int(p.index)))...)
p.index = 0
return buf, nil
}
func (p *keyCountCollector) FinishTable(buf []byte) ([]byte, error) {
buf = append(buf, []byte(strconv.Itoa(int(p.table)))...)
p.table = 0
return buf, nil
}
func (p *keyCountCollector) UpdateKeySuffixes(old []byte, _, _ []byte) error {
n, err := strconv.Atoi(string(old))
if err != nil {
return err
}
p.block = n
return nil
}
// intSuffixCollector is testing prop collector that collects the min and
// max value of numeric suffix of keys (interpreting suffixLen bytes as ascii
// for conversion with atoi).
type intSuffixCollector struct {
suffixLen int
min, max uint64 // inclusive
}
func makeIntSuffixCollector(len int) intSuffixCollector {
return intSuffixCollector{len, math.MaxUint64, 0}
}
func (p *intSuffixCollector) setFromSuffix(to []byte) error {
if len(to) >= p.suffixLen {
parsed, err := strconv.Atoi(string(to[len(to)-p.suffixLen:]))
if err != nil {
return err
}
p.min = uint64(parsed)
p.max = uint64(parsed)
}
return nil
}
type intSuffixTablePropCollector struct {
name string
intSuffixCollector
}
var _ TablePropertyCollector = &intSuffixTablePropCollector{}
var _ SuffixReplaceableTableCollector = &intSuffixTablePropCollector{}
func intSuffixTablePropCollectorFn(name string, len int) func() TablePropertyCollector {
return func() TablePropertyCollector { return &intSuffixTablePropCollector{name, makeIntSuffixCollector(len)} }
}
func (p *intSuffixCollector) Add(key InternalKey, _ []byte) error {
if len(key.UserKey) > p.suffixLen {
parsed, err := strconv.Atoi(string(key.UserKey[len(key.UserKey)-p.suffixLen:]))
if err != nil {
return err
}
v := uint64(parsed)
if v > p.max {
p.max = v
}
if v < p.min {
p.min = v
}
}
return nil
}
func (p *intSuffixTablePropCollector) Finish(userProps map[string]string) error {
userProps[p.name+".min"] = fmt.Sprint(p.min)
userProps[p.name+".max"] = fmt.Sprint(p.max)
return nil
}
func (p *intSuffixTablePropCollector) Name() string { return p.name }
func (p *intSuffixTablePropCollector) UpdateKeySuffixes(
oldProps map[string]string, from, to []byte,
) error {
return p.setFromSuffix(to)
}
// testIntSuffixIntervalCollector is a wrapper for testIntSuffixCollector that
// uses it to implement a block interval collector.
type intSuffixIntervalCollector struct {
intSuffixCollector
}
func intSuffixIntervalCollectorFn(name string, length int) func() BlockPropertyCollector {
return func() BlockPropertyCollector {
return NewBlockIntervalCollector(name, &intSuffixIntervalCollector{makeIntSuffixCollector(length)}, nil)
}
}
var _ DataBlockIntervalCollector = &intSuffixIntervalCollector{}
var _ SuffixReplaceableBlockCollector = &intSuffixIntervalCollector{}
func (p *intSuffixIntervalCollector) FinishDataBlock() (lower uint64, upper uint64, err error) {
return p.min, p.max + 1, nil
}
func (p *intSuffixIntervalCollector) UpdateKeySuffixes(oldProp []byte, from, to []byte) error {
return p.setFromSuffix(to)
}