// Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package pebble

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"os"
	"path/filepath"
	"reflect"
	"runtime/debug"
	"sort"
	"strconv"
	"strings"
	"sync/atomic"
	"syscall"
	"testing"

	"github.com/cockroachdb/datadriven"
	"github.com/cockroachdb/errors"
	"github.com/cockroachdb/pebble/internal/base"
	"github.com/cockroachdb/pebble/internal/cache"
	"github.com/cockroachdb/pebble/internal/manifest"
	"github.com/cockroachdb/pebble/objstorage"
	"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
	"github.com/cockroachdb/pebble/objstorage/remote"
	"github.com/cockroachdb/pebble/vfs"
	"github.com/cockroachdb/pebble/vfs/atomicfs"
	"github.com/cockroachdb/pebble/vfs/errorfs"
	"github.com/cockroachdb/redact"
	"github.com/kr/pretty"
	"github.com/stretchr/testify/require"
)

func TestOpenSharedTableCache(t *testing.T) {
	c := cache.New(cacheDefaultSize)
	tc := NewTableCache(c, 16, 100)
	defer tc.Unref()
	defer c.Unref()

	d0, err := Open("", testingRandomized(t, &Options{
		FS:         vfs.NewMem(),
		Cache:      c,
		TableCache: tc,
	}))
	if err != nil {
		t.Errorf("d0 Open: %s", err.Error())
	}
	defer d0.Close()

	d1, err := Open("", testingRandomized(t, &Options{
		FS:         vfs.NewMem(),
		Cache:      c,
		TableCache: tc,
	}))
	if err != nil {
		t.Errorf("d1 Open: %s", err.Error())
	}
	defer d1.Close()

	// Make sure that the Open function is using the passed in table cache
	// when the TableCache option is set.
	require.Equalf(
		t, d0.tableCache.tableCache, d1.tableCache.tableCache,
		"expected tableCache for both d0 and d1 to be the same",
	)
}

func TestErrorIfExists(t *testing.T) {
	opts := testingRandomized(t, &Options{
		FS:            vfs.NewMem(),
		ErrorIfExists: true,
	})
	defer ensureFilesClosed(t, opts)()

	d0, err := Open("", opts)
	require.NoError(t, err)
	require.NoError(t, d0.Close())

	if _, err := Open("", opts); !errors.Is(err, ErrDBAlreadyExists) {
		t.Fatalf("expected db-already-exists error, got %v", err)
	}

	opts.ErrorIfExists = false
	d1, err := Open("", opts)
	require.NoError(t, err)
	require.NoError(t, d1.Close())
}

func TestErrorIfNotExists(t *testing.T) {
	opts := testingRandomized(t, &Options{
		FS:               vfs.NewMem(),
		ErrorIfNotExists: true,
	})
	defer ensureFilesClosed(t, opts)()

	_, err := Open("", opts)
	if !errors.Is(err, ErrDBDoesNotExist) {
		t.Fatalf("expected db-does-not-exist error, got %v", err)
	}

	// Create the DB and try again.
	opts.ErrorIfNotExists = false
	d0, err := Open("", opts)
	require.NoError(t, err)
	require.NoError(t, d0.Close())

	opts.ErrorIfNotExists = true
	d1, err := Open("", opts)
	require.NoError(t, err)
	require.NoError(t, d1.Close())
}

func TestErrorIfNotPristine(t *testing.T) {
	opts := testingRandomized(t, &Options{
		FS:                 vfs.NewMem(),
		ErrorIfNotPristine: true,
	})
	defer ensureFilesClosed(t, opts)()

	d0, err := Open("", opts)
	require.NoError(t, err)
	require.NoError(t, d0.Close())

	// Store is pristine; ok to open.
	d1, err := Open("", opts)
	require.NoError(t, err)
	require.NoError(t, d1.Set([]byte("foo"), []byte("bar"), Sync))
	require.NoError(t, d1.Close())

	if _, err := Open("", opts); !errors.Is(err, ErrDBNotPristine) {
		t.Fatalf("expected db-not-pristine error, got %v", err)
	}

	// Run compaction and make sure we're still not allowed to open.
	opts.ErrorIfNotPristine = false
	d2, err := Open("", opts)
	require.NoError(t, err)
	require.NoError(t, d2.Compact([]byte("a"), []byte("z"), false /* parallelize */))
	require.NoError(t, d2.Close())

	opts.ErrorIfNotPristine = true
	if _, err := Open("", opts); !errors.Is(err, ErrDBNotPristine) {
		t.Fatalf("expected db-already-exists error, got %v", err)
	}
}

func TestOpenAlreadyLocked(t *testing.T) {
	runTest := func(t *testing.T, dirname string, fs vfs.FS) {
		opts := testingRandomized(t, &Options{FS: fs})
		var err error
		opts.Lock, err = LockDirectory(dirname, fs)
		require.NoError(t, err)

		d, err := Open(dirname, opts)
		require.NoError(t, err)
		require.NoError(t, d.Set([]byte("foo"), []byte("bar"), Sync))

		// Try to open the same database reusing the Options containing the same
		// Lock. It should error when it observes that it's already referenced.
		_, err = Open(dirname, opts)
		require.Error(t, err)

		// Close the database.
		require.NoError(t, d.Close())

		// Now Opening should succeed again.
		d, err = Open(dirname, opts)
		require.NoError(t, err)
		require.NoError(t, d.Close())

		require.NoError(t, opts.Lock.Close())
		// There should be no more remaining references.
		require.Equal(t, int32(0), opts.Lock.refs.Load())
	}
	t.Run("memfs", func(t *testing.T) {
		runTest(t, "", vfs.NewMem())
	})
	t.Run("disk", func(t *testing.T) {
		runTest(t, t.TempDir(), vfs.Default)
	})
}

func TestNewDBFilenames(t *testing.T) {
	versions := map[FormatMajorVersion][]string{
		FormatMostCompatible: {
			"000002.log",
			"CURRENT",
			"LOCK",
			"MANIFEST-000001",
			"OPTIONS-000003",
		},
		internalFormatNewest: {
			"000002.log",
			"CURRENT",
			"LOCK",
			"MANIFEST-000001",
			"OPTIONS-000003",
			"marker.format-version.000015.016",
			"marker.manifest.000001.MANIFEST-000001",
		},
	}

	for formatVers, want := range versions {
		t.Run(fmt.Sprintf("vers=%s", formatVers), func(t *testing.T) {
			mem := vfs.NewMem()
			fooBar := mem.PathJoin("foo", "bar")
			d, err := Open(fooBar, &Options{
				FS:                 mem,
				FormatMajorVersion: formatVers,
			})
			if err != nil {
				t.Fatalf("Open: %v", err)
			}
			if err := d.Close(); err != nil {
				t.Fatalf("Close: %v", err)
			}
			got, err := mem.List(fooBar)
			if err != nil {
				t.Fatalf("List: %v", err)
			}
			sort.Strings(got)
			if !reflect.DeepEqual(got, want) {
				t.Errorf("\ngot  %v\nwant %v", got, want)
			}
		})
	}
}

func testOpenCloseOpenClose(t *testing.T, fs vfs.FS, root string) {
	opts := testingRandomized(t, &Options{FS: fs})

	for _, startFromEmpty := range []bool{false, true} {
		for _, walDirname := range []string{"", "wal"} {
			for _, length := range []int{-1, 0, 1, 1000, 10000, 100000} {
				dirname := "sharedDatabase" + walDirname
				if startFromEmpty {
					dirname = "startFromEmpty" + walDirname + strconv.Itoa(length)
				}
				dirname = fs.PathJoin(root, dirname)
				if walDirname == "" {
					opts.WALDir = ""
				} else {
					opts.WALDir = fs.PathJoin(dirname, walDirname)
				}

				got, xxx := []byte(nil), ""
				if length >= 0 {
					xxx = strings.Repeat("x", length)
				}

				d0, err := Open(dirname, opts)
				if err != nil {
					t.Fatalf("sfe=%t, length=%d: Open #0: %v",
						startFromEmpty, length, err)
					continue
				}
				if length >= 0 {
					err = d0.Set([]byte("key"), []byte(xxx), nil)
					if err != nil {
						t.Errorf("sfe=%t, length=%d: Set: %v",
							startFromEmpty, length, err)
						continue
					}
				}
				err = d0.Close()
				if err != nil {
					t.Errorf("sfe=%t, length=%d: Close #0: %v",
						startFromEmpty, length, err)
					continue
				}

				d1, err := Open(dirname, opts)
				if err != nil {
					t.Errorf("sfe=%t, length=%d: Open #1: %v",
						startFromEmpty, length, err)
					continue
				}
				if length >= 0 {
					var closer io.Closer
					got, closer, err = d1.Get([]byte("key"))
					if err != nil {
						t.Errorf("sfe=%t, length=%d: Get: %v",
							startFromEmpty, length, err)
						continue
					}
					got = append([]byte(nil), got...)
					closer.Close()
				}
				err = d1.Close()
				if err != nil {
					t.Errorf("sfe=%t, length=%d: Close #1: %v",
						startFromEmpty, length, err)
					continue
				}

				if length >= 0 && string(got) != xxx {
					t.Errorf("sfe=%t, length=%d: got value differs from set value",
						startFromEmpty, length)
					continue
				}

				{
					got, err := opts.FS.List(dirname)
					if err != nil {
						t.Fatalf("List: %v", err)
					}
					var optionsCount int
					for _, s := range got {
						if t, _, ok := base.ParseFilename(opts.FS, s); ok && t == fileTypeOptions {
							optionsCount++
						}
					}
					if optionsCount != 1 {
						t.Fatalf("expected 1 OPTIONS file, but found %d", optionsCount)
					}
				}
			}
		}
	}
}

func TestOpenCloseOpenClose(t *testing.T) {
	for _, fstype := range []string{"disk", "mem"} {
		t.Run(fstype, func(t *testing.T) {
			var fs vfs.FS
			var dir string
			switch fstype {
			case "disk":
				var err error
				dir, err = os.MkdirTemp("", "open-close")
				require.NoError(t, err)
				defer func() {
					_ = os.RemoveAll(dir)
				}()
				fs = vfs.Default
			case "mem":
				dir = ""
				fs = vfs.NewMem()
			}
			testOpenCloseOpenClose(t, fs, dir)
		})
	}
}

func TestOpenOptionsCheck(t *testing.T) {
	mem := vfs.NewMem()
	opts := &Options{FS: mem}

	d, err := Open("", opts)
	require.NoError(t, err)
	require.NoError(t, d.Close())

	opts = &Options{
		Comparer: &Comparer{Name: "foo"},
		FS:       mem,
	}
	_, err = Open("", opts)
	require.Regexp(t, `comparer name from file.*!=.*`, err)

	opts = &Options{
		Merger: &Merger{Name: "bar"},
		FS:     mem,
	}
	_, err = Open("", opts)
	require.Regexp(t, `merger name from file.*!=.*`, err)
}

func TestOpenCrashWritingOptions(t *testing.T) {
	memFS := vfs.NewMem()

	d, err := Open("", &Options{FS: memFS})
	require.NoError(t, err)
	require.NoError(t, d.Close())

	// Open the database again, this time with a mocked filesystem that
	// will only succeed in partially writing the OPTIONS file.
	fs := optionsTornWriteFS{FS: memFS}
	_, err = Open("", &Options{FS: fs})
	require.Error(t, err)

	// Re-opening the database must succeed.
	d, err = Open("", &Options{FS: memFS})
	require.NoError(t, err)
	require.NoError(t, d.Close())
}

type optionsTornWriteFS struct {
	vfs.FS
}

func (fs optionsTornWriteFS) Create(name string) (vfs.File, error) {
	file, err := fs.FS.Create(name)
	if file != nil {
		file = optionsTornWriteFile{File: file}
	}
	return file, err
}

type optionsTornWriteFile struct {
	vfs.File
}

func (f optionsTornWriteFile) Write(b []byte) (int, error) {
	// Look for the OPTIONS-XXXXXX file's `comparer=` field.
	comparerKey := []byte("comparer=")
	i := bytes.Index(b, comparerKey)
	if i == -1 {
		return f.File.Write(b)
	}
	// Write only the contents through `comparer=` and return an error.
	n, _ := f.File.Write(b[:i+len(comparerKey)])
	return n, syscall.EIO
}

func TestOpenReadOnly(t *testing.T) {
	mem := vfs.NewMem()

	{
		// Opening a non-existent DB in read-only mode should result in no mutable
		// filesystem operations.
		var memLog base.InMemLogger
		_, err := Open("non-existent", testingRandomized(t, &Options{
			FS:       vfs.WithLogging(mem, memLog.Infof),
			ReadOnly: true,
			WALDir:   "non-existent-waldir",
		}))
		if err == nil {
			t.Fatalf("expected error, but found success")
		}
		const expected = `open-dir: non-existent`
		if trimmed := strings.TrimSpace(memLog.String()); expected != trimmed {
			t.Fatalf("expected %q, but found %q", expected, trimmed)
		}
	}

	{
		// Opening a DB with a non-existent WAL dir in read-only mode should result
		// in no mutable filesystem operations other than the LOCK.
		var memLog base.InMemLogger
		_, err := Open("", testingRandomized(t, &Options{
			FS:       vfs.WithLogging(mem, memLog.Infof),
			ReadOnly: true,
			WALDir:   "non-existent-waldir",
		}))
		if err == nil {
			t.Fatalf("expected error, but found success")
		}
		const expected = "open-dir: \nopen-dir: non-existent-waldir\nclose:"
		if trimmed := strings.TrimSpace(memLog.String()); expected != trimmed {
			t.Fatalf("expected %q, but found %q", expected, trimmed)
		}
	}

	var contents []string
	{
		// Create a new DB and populate it with a small amount of data.
		d, err := Open("", testingRandomized(t, &Options{
			FS: mem,
		}))
		require.NoError(t, err)
		require.NoError(t, d.Set([]byte("test"), nil, nil))
		require.NoError(t, d.Close())
		contents, err = mem.List("")
		require.NoError(t, err)
		sort.Strings(contents)
	}

	{
		// Re-open the DB read-only. The directory contents should be unchanged.
		d, err := Open("", testingRandomized(t, &Options{
			FS:       mem,
			ReadOnly: true,
		}))
		require.NoError(t, err)

		// Verify various write operations fail in read-only mode.
		require.EqualValues(t, ErrReadOnly, d.Compact(nil, []byte("\xff"), false))
		require.EqualValues(t, ErrReadOnly, d.Flush())
		require.EqualValues(t, ErrReadOnly, func() error { _, err := d.AsyncFlush(); return err }())

		require.EqualValues(t, ErrReadOnly, d.Delete(nil, nil))
		require.EqualValues(t, ErrReadOnly, d.DeleteRange(nil, nil, nil))
		require.EqualValues(t, ErrReadOnly, d.Ingest(nil))
		require.EqualValues(t, ErrReadOnly, d.LogData(nil, nil))
		require.EqualValues(t, ErrReadOnly, d.Merge(nil, nil, nil))
		require.EqualValues(t, ErrReadOnly, d.Set(nil, nil, nil))

		// Verify we can still read in read-only mode.
		require.NoError(t, func() error {
			_, closer, err := d.Get([]byte("test"))
			if closer != nil {
				closer.Close()
			}
			return err
		}())

		checkIter := func(iter *Iterator, err error) {
			t.Helper()

			var keys []string
			for valid := iter.First(); valid; valid = iter.Next() {
				keys = append(keys, string(iter.Key()))
			}
			require.NoError(t, iter.Close())
			expectedKeys := []string{"test"}
			if diff := pretty.Diff(keys, expectedKeys); diff != nil {
				t.Fatalf("%s\n%s", strings.Join(diff, "\n"), keys)
			}
		}

		checkIter(d.NewIter(nil))

		b := d.NewIndexedBatch()
		checkIter(b.NewIter(nil))
		require.EqualValues(t, ErrReadOnly, b.Commit(nil))
		require.EqualValues(t, ErrReadOnly, d.Apply(b, nil))

		s := d.NewSnapshot()
		checkIter(s.NewIter(nil))
		require.NoError(t, s.Close())

		require.NoError(t, d.Close())

		newContents, err := mem.List("")
		require.NoError(t, err)

		sort.Strings(newContents)
		if diff := pretty.Diff(contents, newContents); diff != nil {
			t.Fatalf("%s", strings.Join(diff, "\n"))
		}
	}
}

func TestOpenWALReplay(t *testing.T) {
	largeValue := []byte(strings.Repeat("a", 100<<10))
	hugeValue := []byte(strings.Repeat("b", 10<<20))
	checkIter := func(iter *Iterator, err error) {
		t.Helper()

		var keys []string
		for valid := iter.First(); valid; valid = iter.Next() {
			keys = append(keys, string(iter.Key()))
		}
		require.NoError(t, iter.Close())
		expectedKeys := []string{"1", "2", "3", "4", "5"}
		if diff := pretty.Diff(keys, expectedKeys); diff != nil {
			t.Fatalf("%s\n%s", strings.Join(diff, "\n"), keys)
		}
	}

	for _, readOnly := range []bool{false, true} {
		t.Run(fmt.Sprintf("read-only=%t", readOnly), func(t *testing.T) {
			// Create a new DB and populate it with some data.
			const dir = ""
			mem := vfs.NewMem()
			d, err := Open(dir, testingRandomized(t, &Options{
				FS:           mem,
				MemTableSize: 32 << 20,
			}))
			require.NoError(t, err)
			// All these values will fit in a single memtable, so on closing the db there
			// will be no sst and all the data is in a single WAL.
			require.NoError(t, d.Set([]byte("1"), largeValue, nil))
			require.NoError(t, d.Set([]byte("2"), largeValue, nil))
			require.NoError(t, d.Set([]byte("3"), largeValue, nil))
			require.NoError(t, d.Set([]byte("4"), hugeValue, nil))
			require.NoError(t, d.Set([]byte("5"), largeValue, nil))
			checkIter(d.NewIter(nil))
			require.NoError(t, d.Close())
			files, err := mem.List(dir)
			require.NoError(t, err)
			sort.Strings(files)
			logCount, sstCount := 0, 0
			for _, fname := range files {
				if strings.HasSuffix(fname, ".sst") {
					sstCount++
				}
				if strings.HasSuffix(fname, ".log") {
					logCount++
				}
			}
			require.Equal(t, 0, sstCount)
			// The memtable size starts at 256KB and doubles up to 32MB so we expect 5
			// logs (one for each doubling).
			require.Equal(t, 7, logCount)

			// Re-open the DB with a smaller memtable. Values for 1, 2 will fit in the first memtable;
			// value for 3 will go in the next memtable; value for 4 will be in a flushable batch
			// which will cause the previous memtable to be flushed; value for 5 will go in the next
			// memtable
			d, err = Open(dir, testingRandomized(t, &Options{
				FS:           mem,
				MemTableSize: 300 << 10,
				ReadOnly:     readOnly,
			}))
			require.NoError(t, err)

			if readOnly {
				m := d.Metrics()
				require.Equal(t, int64(logCount), m.WAL.Files)
				d.mu.Lock()
				require.NotNil(t, d.mu.mem.mutable)
				d.mu.Unlock()
			}
			checkIter(d.NewIter(nil))
			require.NoError(t, d.Close())
		})
	}
}

// Reproduction for https://github.com/cockroachdb/pebble/issues/2234.
func TestWALReplaySequenceNumBug(t *testing.T) {
	mem := vfs.NewMem()
	d, err := Open("", testingRandomized(t, &Options{
		FS: mem,
	}))
	require.NoError(t, err)

	d.mu.Lock()
	// Disable any flushes.
	d.mu.compact.flushing = true
	d.mu.Unlock()

	require.NoError(t, d.Set([]byte("1"), nil, nil))
	require.NoError(t, d.Set([]byte("2"), nil, nil))

	// Write a large batch. This should go to a separate memtable.
	largeValue := []byte(strings.Repeat("a", int(d.largeBatchThreshold)))
	require.NoError(t, d.Set([]byte("1"), largeValue, nil))

	// This write should go the mutable memtable after the large batch in the
	// memtable queue.
	d.Set([]byte("1"), nil, nil)

	d.mu.Lock()
	d.mu.compact.flushing = false
	d.mu.Unlock()

	// Make sure none of the flushables have been flushed.
	require.Equal(t, 3, len(d.mu.mem.queue))

	// Close the db. This doesn't cause a flush of the memtables, so they'll
	// have to be replayed when the db is reopened.
	require.NoError(t, d.Close())

	files, err := mem.List("")
	require.NoError(t, err)
	sort.Strings(files)
	sstCount := 0
	for _, fname := range files {
		if strings.HasSuffix(fname, ".sst") {
			sstCount++
		}
	}
	require.Equal(t, 0, sstCount)

	// Reopen db in read only mode to force read only wal replay.
	d, err = Open("", &Options{
		FS:       mem,
		ReadOnly: true,
	})
	require.NoError(t, err)
	val, c, _ := d.Get([]byte("1"))
	require.Equal(t, []byte{}, val)
	c.Close()
	require.NoError(t, d.Close())
}

// Similar to TestOpenWALReplay, except we test replay behavior after a
// memtable has been flushed. We test all 3 reasons for flushing: forced, size,
// and large-batch.
func TestOpenWALReplay2(t *testing.T) {
	for _, readOnly := range []bool{false, true} {
		t.Run(fmt.Sprintf("read-only=%t", readOnly), func(t *testing.T) {
			for _, reason := range []string{"forced", "size", "large-batch"} {
				t.Run(reason, func(t *testing.T) {
					mem := vfs.NewMem()
					d, err := Open("", testingRandomized(t, &Options{
						FS:           mem,
						MemTableSize: 256 << 10,
					}))
					require.NoError(t, err)

					switch reason {
					case "forced":
						require.NoError(t, d.Set([]byte("1"), nil, nil))
						require.NoError(t, d.Flush())
						require.NoError(t, d.Set([]byte("2"), nil, nil))
					case "size":
						largeValue := []byte(strings.Repeat("a", 100<<10))
						require.NoError(t, d.Set([]byte("1"), largeValue, nil))
						require.NoError(t, d.Set([]byte("2"), largeValue, nil))
						require.NoError(t, d.Set([]byte("3"), largeValue, nil))
					case "large-batch":
						largeValue := []byte(strings.Repeat("a", int(d.largeBatchThreshold)))
						require.NoError(t, d.Set([]byte("1"), nil, nil))
						require.NoError(t, d.Set([]byte("2"), largeValue, nil))
						require.NoError(t, d.Set([]byte("3"), nil, nil))
					}
					require.NoError(t, d.Close())

					files, err := mem.List("")
					require.NoError(t, err)
					sort.Strings(files)
					sstCount := 0
					for _, fname := range files {
						if strings.HasSuffix(fname, ".sst") {
							sstCount++
						}
					}
					require.Equal(t, 1, sstCount)

					// Re-open the DB with a smaller memtable. Values for 1, 2 will fit in the first memtable;
					// value for 3 will go in the next memtable; value for 4 will be in a flushable batch
					// which will cause the previous memtable to be flushed; value for 5 will go in the next
					// memtable
					d, err = Open("", testingRandomized(t, &Options{
						FS:           mem,
						MemTableSize: 300 << 10,
						ReadOnly:     readOnly,
					}))
					require.NoError(t, err)
					require.NoError(t, d.Close())
				})
			}
		})
	}
}

// TestTwoWALReplayCorrupt tests WAL-replay behavior when the first of the two
// WALs is corrupted with an sstable checksum error. Replay must stop at the
// first WAL because otherwise we may violate point-in-time recovery
// semantics. See #864.
func TestTwoWALReplayCorrupt(t *testing.T) {
	// Use the real filesystem so that we can seek and overwrite WAL data
	// easily.
	dir, err := os.MkdirTemp("", "wal-replay")
	require.NoError(t, err)
	defer os.RemoveAll(dir)

	d, err := Open(dir, testingRandomized(t, &Options{
		MemTableStopWritesThreshold: 4,
		MemTableSize:                2048,
	}))
	require.NoError(t, err)
	d.mu.Lock()
	d.mu.compact.flushing = true
	d.mu.Unlock()
	require.NoError(t, d.Set([]byte("1"), []byte(strings.Repeat("a", 1024)), nil))
	require.NoError(t, d.Set([]byte("2"), nil, nil))
	d.mu.Lock()
	d.mu.compact.flushing = false
	d.mu.Unlock()
	require.NoError(t, d.Close())

	// We should have two WALs.
	var logs []string
	ls, err := vfs.Default.List(dir)
	require.NoError(t, err)
	for _, name := range ls {
		if filepath.Ext(name) == ".log" {
			logs = append(logs, name)
		}
	}
	sort.Strings(logs)
	if len(logs) < 2 {
		t.Fatalf("expected at least two log files, found %d", len(logs))
	}

	// Corrupt the (n-1)th WAL by zeroing four bytes, 100 bytes from the end
	// of the file.
	f, err := os.OpenFile(filepath.Join(dir, logs[len(logs)-2]), os.O_RDWR, os.ModePerm)
	require.NoError(t, err)
	off, err := f.Seek(-100, 2)
	require.NoError(t, err)
	_, err = f.Write([]byte{0, 0, 0, 0})
	require.NoError(t, err)
	require.NoError(t, f.Close())
	t.Logf("zeored four bytes in %s at offset %d\n", logs[len(logs)-2], off)

	// Re-opening the database should detect and report the corruption.
	_, err = Open(dir, nil)
	require.Error(t, err, "pebble: corruption")
}

// TestTwoWALReplayCorrupt tests WAL-replay behavior when the first of the two
// WALs is corrupted with an sstable checksum error and the OPTIONS file does
// not enable the private strict_wal_tail option, indicating that the WAL was
// produced by a database that did not guarantee clean WAL tails. See #864.
func TestTwoWALReplayPermissive(t *testing.T) {
	// Use the real filesystem so that we can seek and overwrite WAL data
	// easily.
	dir, err := os.MkdirTemp("", "wal-replay")
	require.NoError(t, err)
	defer os.RemoveAll(dir)

	opts := &Options{
		MemTableStopWritesThreshold: 4,
		MemTableSize:                2048,
	}
	opts.testingRandomized(t)
	opts.EnsureDefaults()
	d, err := Open(dir, opts)
	require.NoError(t, err)
	d.mu.Lock()
	d.mu.compact.flushing = true
	d.mu.Unlock()
	require.NoError(t, d.Set([]byte("1"), []byte(strings.Repeat("a", 1024)), nil))
	require.NoError(t, d.Set([]byte("2"), nil, nil))
	d.mu.Lock()
	d.mu.compact.flushing = false
	d.mu.Unlock()
	require.NoError(t, d.Close())

	// We should have two WALs.
	var logs []string
	var optionFilename string
	ls, err := vfs.Default.List(dir)
	require.NoError(t, err)
	for _, name := range ls {
		if filepath.Ext(name) == ".log" {
			logs = append(logs, name)
		}
		if strings.HasPrefix(filepath.Base(name), "OPTIONS") {
			optionFilename = name
		}
	}
	sort.Strings(logs)
	if len(logs) < 2 {
		t.Fatalf("expected at least two log files, found %d", len(logs))
	}

	// Corrupt the (n-1)th WAL by zeroing four bytes, 100 bytes from the end
	// of the file.
	f, err := os.OpenFile(filepath.Join(dir, logs[len(logs)-2]), os.O_RDWR, os.ModePerm)
	require.NoError(t, err)
	off, err := f.Seek(-100, 2)
	require.NoError(t, err)
	_, err = f.Write([]byte{0, 0, 0, 0})
	require.NoError(t, err)
	require.NoError(t, f.Close())
	t.Logf("zeored four bytes in %s at offset %d\n", logs[len(logs)-2], off)

	// Remove the OPTIONS file containing the strict_wal_tail option.
	require.NoError(t, vfs.Default.Remove(filepath.Join(dir, optionFilename)))

	// Re-opening the database should not report the corruption.
	d, err = Open(dir, nil)
	require.NoError(t, err)
	require.NoError(t, d.Close())
}

// TestCrashOpenCrashAfterWALCreation tests a database that exits
// ungracefully, begins recovery, creates the new WAL but promptly exits
// ungracefully again.
//
// This sequence has the potential to be problematic with the strict_wal_tail
// behavior because the first crash's WAL has an unclean tail. By the time the
// new WAL is created, the current manifest's MinUnflushedLogNum must be
// higher than the previous WAL.
func TestCrashOpenCrashAfterWALCreation(t *testing.T) {
	fs := vfs.NewStrictMem()

	getLogs := func() (logs []string) {
		ls, err := fs.List("")
		require.NoError(t, err)
		for _, name := range ls {
			if filepath.Ext(name) == ".log" {
				logs = append(logs, name)
			}
		}
		return logs
	}

	{
		d, err := Open("", testingRandomized(t, &Options{FS: fs}))
		require.NoError(t, err)
		require.NoError(t, d.Set([]byte("abc"), nil, Sync))

		// Ignore syncs during close to simulate a crash. This will leave the WAL
		// without an EOF trailer. It won't be an 'unclean tail' yet since the
		// log file was not recycled, but we'll fix that down below.
		fs.SetIgnoreSyncs(true)
		require.NoError(t, d.Close())
		fs.ResetToSyncedState()
		fs.SetIgnoreSyncs(false)
	}

	// There should be one WAL.
	logs := getLogs()
	if len(logs) != 1 {
		t.Fatalf("expected one log file, found %d", len(logs))
	}

	// The one WAL file doesn't have an EOF trailer, but since it wasn't
	// recycled it won't have garbage at the end. Rewrite it so that it has
	// the same contents it currently has, followed by garbage.
	{
		f, err := fs.Open(logs[0])
		require.NoError(t, err)
		b, err := io.ReadAll(f)
		require.NoError(t, err)
		require.NoError(t, f.Close())
		f, err = fs.Create(logs[0])
		require.NoError(t, err)
		_, err = f.Write(b)
		require.NoError(t, err)
		_, err = f.Write([]byte{0xde, 0xad, 0xbe, 0xef})
		require.NoError(t, err)
		require.NoError(t, f.Sync())
		require.NoError(t, f.Close())
		dir, err := fs.OpenDir("")
		require.NoError(t, err)
		require.NoError(t, dir.Sync())
		require.NoError(t, dir.Close())
	}

	// Open the database again (with syncs respected again). Wrap the
	// filesystem with an errorfs that will turn off syncs after a new .log
	// file is created and after a subsequent directory sync occurs. This
	// simulates a crash after the new log file is created and synced.
	{
		var walCreated, dirSynced atomic.Bool
		d, err := Open("", &Options{
			FS: errorfs.Wrap(fs, errorfs.InjectorFunc(func(op errorfs.Op) error {
				if dirSynced.Load() {
					fs.SetIgnoreSyncs(true)
				}
				if op.Kind == errorfs.OpCreate && filepath.Ext(op.Path) == ".log" {
					walCreated.Store(true)
				}
				// Record when there's a sync of the data directory after the
				// WAL was created. The data directory will have an empty
				// path because that's what we passed into Open.
				if op.Kind == errorfs.OpFileSync && op.Path == "" && walCreated.Load() {
					dirSynced.Store(true)
				}
				return nil
			})),
		})
		require.NoError(t, err)
		require.NoError(t, d.Close())
	}

	fs.ResetToSyncedState()
	fs.SetIgnoreSyncs(false)

	if n := len(getLogs()); n != 2 {
		t.Fatalf("expected two logs, found %d\n", n)
	}

	// Finally, open the database with syncs enabled.
	d, err := Open("", testingRandomized(t, &Options{FS: fs}))
	require.NoError(t, err)
	require.NoError(t, d.Close())
}

// TestOpenWALReplayReadOnlySeqNums tests opening a database:
//   - in read-only mode
//   - with multiple unflushed log files that must replayed
//   - a MANIFEST that sets the last sequence number to a number greater than
//     the unflushed log files
//
// See cockroachdb/cockroach#48660.
func TestOpenWALReplayReadOnlySeqNums(t *testing.T) {
	const root = ""
	mem := vfs.NewMem()

	copyFiles := func(srcDir, dstDir string) {
		files, err := mem.List(srcDir)
		require.NoError(t, err)
		for _, f := range files {
			require.NoError(t, vfs.Copy(mem, mem.PathJoin(srcDir, f), mem.PathJoin(dstDir, f)))
		}
	}

	// Create a new database under `/original` with a couple sstables.
	dir := mem.PathJoin(root, "original")
	d, err := Open(dir, testingRandomized(t, &Options{FS: mem}))
	require.NoError(t, err)
	require.NoError(t, d.Set([]byte("a"), nil, nil))
	require.NoError(t, d.Flush())
	require.NoError(t, d.Set([]byte("a"), nil, nil))
	require.NoError(t, d.Flush())

	// Prevent flushes so that multiple unflushed log files build up.
	d.mu.Lock()
	d.mu.compact.flushing = true
	d.mu.Unlock()

	require.NoError(t, d.Set([]byte("b"), nil, nil))
	d.AsyncFlush()
	require.NoError(t, d.Set([]byte("c"), nil, nil))
	d.AsyncFlush()
	require.NoError(t, d.Set([]byte("e"), nil, nil))

	// Manually compact some of the key space so that the latest `logSeqNum` is
	// written to the MANIFEST. This produces a MANIFEST where the `logSeqNum`
	// is greater than the sequence numbers contained in the
	// `minUnflushedLogNum` log file
	require.NoError(t, d.Compact([]byte("a"), []byte("a\x00"), false))
	d.mu.Lock()
	for d.mu.compact.compactingCount > 0 {
		d.mu.compact.cond.Wait()
	}
	d.mu.Unlock()

	d.TestOnlyWaitForCleaning()
	// While the MANIFEST is still in this state, copy all the files in the
	// database to a new directory.
	replayDir := mem.PathJoin(root, "replay")
	require.NoError(t, mem.MkdirAll(replayDir, os.ModePerm))
	copyFiles(dir, replayDir)

	d.mu.Lock()
	d.mu.compact.flushing = false
	d.mu.Unlock()
	require.NoError(t, d.Close())

	// Open the copy of the database in read-only mode. Since we copied all
	// the files before the flushes were allowed to complete, there should be
	// multiple unflushed log files that need to replay. Since the manual
	// compaction completed, the `logSeqNum` read from the manifest should be
	// greater than the unflushed log files' sequence numbers.
	d, err = Open(replayDir, testingRandomized(t, &Options{
		FS:       mem,
		ReadOnly: true,
	}))
	require.NoError(t, err)
	require.NoError(t, d.Close())
}

func TestOpenWALReplayMemtableGrowth(t *testing.T) {
	mem := vfs.NewMem()
	const memTableSize = 64 * 1024 * 1024
	opts := &Options{
		MemTableSize: memTableSize,
		FS:           mem,
	}
	opts.testingRandomized(t)
	func() {
		db, err := Open("", opts)
		require.NoError(t, err)
		defer db.Close()
		b := db.NewBatch()
		defer b.Close()
		key := make([]byte, 8)
		val := make([]byte, 16*1024*1024)
		b.Set(key, val, nil)
		require.NoError(t, db.Apply(b, Sync))
	}()
	db, err := Open("", opts)
	require.NoError(t, err)
	db.Close()
}

func TestGetVersion(t *testing.T) {
	mem := vfs.NewMem()
	opts := &Options{
		FS: mem,
	}
	opts.testingRandomized(t)

	// Case 1: No options file.
	version, err := GetVersion("", mem)
	require.NoError(t, err)
	require.Empty(t, version)

	// Case 2: Pebble created file.
	db, err := Open("", opts)
	require.NoError(t, err)
	require.NoError(t, db.Close())
	version, err = GetVersion("", mem)
	require.NoError(t, err)
	require.Equal(t, "0.1", version)

	// Case 3: Manually created OPTIONS file with a higher number.
	highestOptionsNum := FileNum(0)
	ls, err := mem.List("")
	require.NoError(t, err)
	for _, filename := range ls {
		ft, fn, ok := base.ParseFilename(mem, filename)
		if !ok {
			continue
		}
		switch ft {
		case fileTypeOptions:
			if fn.FileNum() > highestOptionsNum {
				highestOptionsNum = fn.FileNum()
			}
		}
	}
	f, _ := mem.Create(fmt.Sprintf("OPTIONS-%d", highestOptionsNum+1))
	_, err = f.Write([]byte("[Version]\n  pebble_version=0.2\n"))
	require.NoError(t, err)
	err = f.Close()
	require.NoError(t, err)
	version, err = GetVersion("", mem)
	require.NoError(t, err)
	require.Equal(t, "0.2", version)

	// Case 4: Manually created OPTIONS file with a RocksDB number.
	f, _ = mem.Create(fmt.Sprintf("OPTIONS-%d", highestOptionsNum+2))
	_, err = f.Write([]byte("[Version]\n  rocksdb_version=6.2.1\n"))
	require.NoError(t, err)
	err = f.Close()
	require.NoError(t, err)
	version, err = GetVersion("", mem)
	require.NoError(t, err)
	require.Equal(t, "rocksdb v6.2.1", version)
}

func TestRocksDBNoFlushManifest(t *testing.T) {
	mem := vfs.NewMem()
	// Have the comparer and merger names match what's in the testdata
	// directory.
	comparer := *DefaultComparer
	merger := *DefaultMerger
	comparer.Name = "cockroach_comparator"
	merger.Name = "cockroach_merge_operator"
	opts := &Options{
		FS:       mem,
		Comparer: &comparer,
		Merger:   &merger,
	}

	// rocksdb-ingest-only is a RocksDB-generated db directory that has not had
	// a single flush yet, only ingestion operations. The manifest contains
	// a next-log-num but no log-num entry. Ensure that pebble can read these
	// directories without an issue.
	_, err := vfs.Clone(vfs.Default, mem, "testdata/rocksdb-ingest-only", "testdata")
	require.NoError(t, err)

	db, err := Open("testdata", opts)
	require.NoError(t, err)
	defer db.Close()

	val, closer, err := db.Get([]byte("ajulxeiombjiyw\x00\x00\x00\x00\x00\x00\x00\x01\x12\x09"))
	require.NoError(t, err)
	require.NotEmpty(t, val)
	require.NoError(t, closer.Close())
}

func TestOpen_ErrorIfUnknownFormatVersion(t *testing.T) {
	fs := vfs.NewMem()
	d, err := Open("", &Options{
		FS:                 fs,
		FormatMajorVersion: FormatVersioned,
	})
	require.NoError(t, err)
	require.NoError(t, d.Close())

	// Move the marker to a version that does not exist.
	m, _, err := atomicfs.LocateMarker(fs, "", formatVersionMarkerName)
	require.NoError(t, err)
	require.NoError(t, m.Move("999999"))
	require.NoError(t, m.Close())

	_, err = Open("", &Options{
		FS:                 fs,
		FormatMajorVersion: FormatVersioned,
	})
	require.Error(t, err)
	require.EqualError(t, err, `pebble: database "" written in format major version 999999`)
}

// ensureFilesClosed updates the provided Options to wrap the filesystem. It
// returns a closure that when invoked fails the test if any files opened by the
// filesystem are not closed.
//
// This function is intended to be used in tests with defer.
//
//	opts := &Options{FS: vfs.NewMem()}
//	defer ensureFilesClosed(t, opts)()
//	/* test code */
func ensureFilesClosed(t *testing.T, o *Options) func() {
	fs := &closeTrackingFS{
		FS:    o.FS,
		files: map[*closeTrackingFile]struct{}{},
	}
	o.FS = fs
	return func() {
		// fs.files should be empty if all the files were closed.
		for f := range fs.files {
			t.Errorf("An open file was never closed. Opened at:\n%s", f.stack)
		}
	}
}

type closeTrackingFS struct {
	vfs.FS
	files map[*closeTrackingFile]struct{}
}

func (fs *closeTrackingFS) wrap(file vfs.File, err error) (vfs.File, error) {
	if err != nil {
		return nil, err
	}
	f := &closeTrackingFile{
		File:  file,
		fs:    fs,
		stack: debug.Stack(),
	}
	fs.files[f] = struct{}{}
	return f, err
}

func (fs *closeTrackingFS) Create(name string) (vfs.File, error) {
	return fs.wrap(fs.FS.Create(name))
}

func (fs *closeTrackingFS) Open(name string, opts ...vfs.OpenOption) (vfs.File, error) {
	return fs.wrap(fs.FS.Open(name))
}

func (fs *closeTrackingFS) OpenDir(name string) (vfs.File, error) {
	return fs.wrap(fs.FS.OpenDir(name))
}

func (fs *closeTrackingFS) ReuseForWrite(oldname, newname string) (vfs.File, error) {
	return fs.wrap(fs.FS.ReuseForWrite(oldname, newname))
}

type closeTrackingFile struct {
	vfs.File
	fs    *closeTrackingFS
	stack []byte
}

func (f *closeTrackingFile) Close() error {
	delete(f.fs.files, f)
	return f.File.Close()
}

func TestCheckConsistency(t *testing.T) {
	const dir = "./test"
	mem := vfs.NewMem()
	mem.MkdirAll(dir, 0755)

	provider, err := objstorageprovider.Open(objstorageprovider.DefaultSettings(mem, dir))
	require.NoError(t, err)
	defer provider.Close()

	cmp := base.DefaultComparer.Compare
	fmtKey := base.DefaultComparer.FormatKey
	parseMeta := func(s string) (*manifest.FileMetadata, error) {
		if len(s) == 0 {
			return nil, nil
		}
		parts := strings.Split(s, ":")
		if len(parts) != 2 {
			return nil, errors.Errorf("malformed table spec: %q", s)
		}
		fileNum, err := strconv.Atoi(strings.TrimSpace(parts[0]))
		if err != nil {
			return nil, err
		}
		size, err := strconv.Atoi(strings.TrimSpace(parts[1]))
		if err != nil {
			return nil, err
		}
		m := &manifest.FileMetadata{
			FileNum: base.FileNum(fileNum),
			Size:    uint64(size),
		}
		m.InitPhysicalBacking()
		return m, nil
	}

	datadriven.RunTest(t, "testdata/version_check_consistency",
		func(t *testing.T, d *datadriven.TestData) string {
			switch d.Cmd {
			case "check-consistency":
				var filesByLevel [manifest.NumLevels][]*manifest.FileMetadata
				var files *[]*manifest.FileMetadata

				for _, data := range strings.Split(d.Input, "\n") {
					switch data {
					case "L0", "L1", "L2", "L3", "L4", "L5", "L6":
						level, err := strconv.Atoi(data[1:])
						if err != nil {
							return err.Error()
						}
						files = &filesByLevel[level]

					default:
						m, err := parseMeta(data)
						if err != nil {
							return err.Error()
						}
						if m != nil {
							*files = append(*files, m)
						}
					}
				}

				redactErr := false
				for _, arg := range d.CmdArgs {
					switch v := arg.String(); v {
					case "redact":
						redactErr = true
					default:
						return fmt.Sprintf("unknown argument: %q", v)
					}
				}

				v := manifest.NewVersion(cmp, fmtKey, 0, filesByLevel)
				err := checkConsistency(v, dir, provider)
				if err != nil {
					if redactErr {
						redacted := redact.Sprint(err).Redact()
						return string(redacted)
					}
					return err.Error()
				}
				return "OK"

			case "build":
				for _, data := range strings.Split(d.Input, "\n") {
					m, err := parseMeta(data)
					if err != nil {
						return err.Error()
					}
					path := base.MakeFilepath(mem, dir, base.FileTypeTable, m.FileBacking.DiskFileNum)
					_ = mem.Remove(path)
					f, err := mem.Create(path)
					if err != nil {
						return err.Error()
					}
					_, err = f.Write(make([]byte, m.Size))
					if err != nil {
						return err.Error()
					}
					f.Close()
				}
				return ""

			default:
				return fmt.Sprintf("unknown command: %s", d.Cmd)
			}
		})
}

func TestOpenRatchetsNextFileNum(t *testing.T) {
	mem := vfs.NewMem()
	memShared := remote.NewInMem()

	opts := &Options{FS: mem}
	opts.Experimental.CreateOnShared = remote.CreateOnSharedAll
	opts.Experimental.RemoteStorage = remote.MakeSimpleFactory(map[remote.Locator]remote.Storage{
		"": memShared,
	})
	d, err := Open("", opts)
	require.NoError(t, err)
	d.SetCreatorID(1)

	require.NoError(t, d.Set([]byte("foo"), []byte("value"), nil))
	require.NoError(t, d.Set([]byte("bar"), []byte("value"), nil))
	require.NoError(t, d.Flush())
	require.NoError(t, d.Compact([]byte("a"), []byte("z"), false))

	// Create a shared file with the newest file num and then close the db.
	d.mu.Lock()
	nextFileNum := d.mu.versions.getNextFileNum()
	w, _, err := d.objProvider.Create(context.TODO(), fileTypeTable, nextFileNum.DiskFileNum(), objstorage.CreateOptions{PreferSharedStorage: true})
	require.NoError(t, err)
	require.NoError(t, w.Write([]byte("foobar")))
	require.NoError(t, w.Finish())
	require.NoError(t, d.objProvider.Sync())
	d.mu.Unlock()

	// Write one key and then close the db. This write will stay in the memtable,
	// forcing the reopen to do a compaction on open.
	require.NoError(t, d.Set([]byte("foo1"), []byte("value"), nil))
	require.NoError(t, d.Close())

	// Reopen db. Compactions should happen without error.
	d, err = Open("", opts)
	require.NoError(t, err)
	require.NoError(t, d.Set([]byte("foo2"), []byte("value"), nil))
	require.NoError(t, d.Set([]byte("bar2"), []byte("value"), nil))
	require.NoError(t, d.Flush())
	require.NoError(t, d.Compact([]byte("a"), []byte("z"), false))

}