mirror of
https://github.com/0glabs/0g-chain.git
synced 2024-11-20 15:05:21 +00:00
feat(cli): Add rocksdb compact
command (#1804)
* Add rocksdb compact command * Increase compaction log output to 1 min * Use GetClient/ServerContextFromCmd * Update cmd info * Add doc to logColumnFamilyMetadata * Update RocksDBCmd docs * Add changelog entry * Load latest options from rocksdb * Allow application.db to be compacted * Rename more store -> db * Ensure compaction stats output does not run when db is closed * Add flag for custom stat output interval, return error
This commit is contained in:
parent
58621577ae
commit
3767030005
@ -39,6 +39,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
|
||||
### Features
|
||||
|
||||
- (cli) [#1785] Add `shard` CLI command to support creating partitions of data for standalone nodes
|
||||
- (cli) [#1804] Add `rocksdb compact` command for manual DB compaction of state or blockstore.
|
||||
|
||||
## [v0.25.0]
|
||||
|
||||
@ -320,6 +321,7 @@ the [changelog](https://github.com/cosmos/cosmos-sdk/blob/v0.38.4/CHANGELOG.md).
|
||||
- [#257](https://github.com/Kava-Labs/kava/pulls/257) Include scripts to run
|
||||
large-scale simulations remotely using aws-batch
|
||||
|
||||
[#1804]: https://github.com/Kava-Labs/kava/pull/1804
|
||||
[#1785]: https://github.com/Kava-Labs/kava/pull/1785
|
||||
[#1784]: https://github.com/Kava-Labs/kava/pull/1784
|
||||
[#1776]: https://github.com/Kava-Labs/kava/pull/1776
|
||||
|
216
cmd/kava/cmd/rocksdb/compact.go
Normal file
216
cmd/kava/cmd/rocksdb/compact.go
Normal file
@ -0,0 +1,216 @@
|
||||
//go:build rocksdb
|
||||
// +build rocksdb
|
||||
|
||||
package rocksdb
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/cosmos/cosmos-sdk/client"
|
||||
"github.com/cosmos/cosmos-sdk/server"
|
||||
"github.com/kava-labs/kava/cmd/kava/opendb"
|
||||
"github.com/linxGnu/grocksdb"
|
||||
"github.com/spf13/cobra"
|
||||
"golang.org/x/exp/slices"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
)
|
||||
|
||||
const (
|
||||
flagPrintStatsInterval = "print-stats-interval"
|
||||
)
|
||||
|
||||
var allowedDBs = []string{"application", "blockstore", "state"}
|
||||
|
||||
func CompactRocksDBCmd() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: fmt.Sprintf(
|
||||
"compact <%s>",
|
||||
strings.Join(allowedDBs, "|"),
|
||||
),
|
||||
Short: "force compacts RocksDB",
|
||||
Long: `This is a utility command that performs a force compaction on the state or
|
||||
blockstore. This should only be run once the node has stopped.`,
|
||||
Args: cobra.ExactArgs(1),
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout))
|
||||
|
||||
statsIntervalStr, err := cmd.Flags().GetString(flagPrintStatsInterval)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
statsInterval, err := time.ParseDuration(statsIntervalStr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse duration for --%s: %w", flagPrintStatsInterval, err)
|
||||
}
|
||||
|
||||
clientCtx := client.GetClientContextFromCmd(cmd)
|
||||
ctx := server.GetServerContextFromCmd(cmd)
|
||||
|
||||
if server.GetAppDBBackend(ctx.Viper) != "rocksdb" {
|
||||
return errors.New("compaction is currently only supported with rocksdb")
|
||||
}
|
||||
|
||||
if !slices.Contains(allowedDBs, args[0]) {
|
||||
return fmt.Errorf(
|
||||
"invalid db name, must be one of the following: %s",
|
||||
strings.Join(allowedDBs, ", "),
|
||||
)
|
||||
}
|
||||
|
||||
return compactRocksDBs(clientCtx.HomeDir, logger, args[0], statsInterval)
|
||||
},
|
||||
}
|
||||
|
||||
cmd.Flags().String(flagPrintStatsInterval, "1m", "duration string for how often to print compaction stats")
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
// compactRocksDBs performs a manual compaction on the given db.
|
||||
func compactRocksDBs(
|
||||
rootDir string,
|
||||
logger log.Logger,
|
||||
dbName string,
|
||||
statsInterval time.Duration,
|
||||
) error {
|
||||
dbPath := filepath.Join(rootDir, "data", dbName+".db")
|
||||
|
||||
dbOpts, cfOpts, err := opendb.LoadLatestOptions(dbPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Info("opening db", "path", dbPath)
|
||||
db, _, err := grocksdb.OpenDbColumnFamilies(
|
||||
dbOpts,
|
||||
dbPath,
|
||||
[]string{opendb.DefaultColumnFamilyName},
|
||||
[]*grocksdb.Options{cfOpts},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logger.Error("failed to initialize cometbft db", "path", dbPath, "err", err)
|
||||
return fmt.Errorf("failed to open db %s %w", dbPath, err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
logColumnFamilyMetadata(db, logger)
|
||||
|
||||
logger.Info("starting compaction...", "db", dbPath)
|
||||
|
||||
done := make(chan bool)
|
||||
registerSignalHandler(db, logger, done)
|
||||
startCompactionStatsOutput(db, logger, done, statsInterval)
|
||||
|
||||
// Actually run the compaction
|
||||
db.CompactRange(grocksdb.Range{Start: nil, Limit: nil})
|
||||
logger.Info("done compaction", "db", dbPath)
|
||||
|
||||
done <- true
|
||||
return nil
|
||||
}
|
||||
|
||||
// bytesToMB converts bytes to megabytes.
|
||||
func bytesToMB(bytes uint64) float64 {
|
||||
return float64(bytes) / 1024 / 1024
|
||||
}
|
||||
|
||||
// logColumnFamilyMetadata outputs the column family and level metadata.
|
||||
func logColumnFamilyMetadata(
|
||||
db *grocksdb.DB,
|
||||
logger log.Logger,
|
||||
) {
|
||||
metadata := db.GetColumnFamilyMetadata()
|
||||
|
||||
logger.Info(
|
||||
"column family metadata",
|
||||
"name", metadata.Name(),
|
||||
"sizeMB", bytesToMB(metadata.Size()),
|
||||
"fileCount", metadata.FileCount(),
|
||||
"levels", len(metadata.LevelMetas()),
|
||||
)
|
||||
|
||||
for _, level := range metadata.LevelMetas() {
|
||||
logger.Info(
|
||||
fmt.Sprintf("level %d metadata", level.Level()),
|
||||
"sstMetas", strconv.Itoa(len(level.SstMetas())),
|
||||
"sizeMB", strconv.FormatFloat(bytesToMB(level.Size()), 'f', 2, 64),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// startCompactionStatsOutput starts a goroutine that outputs compaction stats
|
||||
// every minute.
|
||||
func startCompactionStatsOutput(
|
||||
db *grocksdb.DB,
|
||||
logger log.Logger,
|
||||
done chan bool,
|
||||
statsInterval time.Duration,
|
||||
) {
|
||||
go func() {
|
||||
ticker := time.NewTicker(statsInterval)
|
||||
isClosed := false
|
||||
|
||||
for {
|
||||
select {
|
||||
// Make sure we don't try reading from the closed db.
|
||||
// We continue the loop so that we can make sure the done channel
|
||||
// does not stall indefinitely from repeated writes and no reader.
|
||||
case <-done:
|
||||
logger.Debug("stopping compaction stats output")
|
||||
isClosed = true
|
||||
case <-ticker.C:
|
||||
if !isClosed {
|
||||
compactionStats := db.GetProperty("rocksdb.stats")
|
||||
fmt.Printf("%s\n", compactionStats)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// registerSignalHandler registers a signal handler that will cancel any running
|
||||
// compaction when the user presses Ctrl+C.
|
||||
func registerSignalHandler(
|
||||
db *grocksdb.DB,
|
||||
logger log.Logger,
|
||||
done chan bool,
|
||||
) {
|
||||
// https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ
|
||||
// Q: Can I close the DB when a manual compaction is in progress?
|
||||
//
|
||||
// A: No, it's not safe to do that. However, you call
|
||||
// CancelAllBackgroundWork(db, true) in another thread to abort the
|
||||
// running compactions, so that you can close the DB sooner. Since
|
||||
// 6.5, you can also speed it up using
|
||||
// DB::DisableManualCompaction().
|
||||
c := make(chan os.Signal, 1)
|
||||
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
|
||||
|
||||
go func() {
|
||||
for sig := range c {
|
||||
logger.Info(fmt.Sprintf(
|
||||
"received %s signal, aborting running compaction... Do NOT kill me before compaction is cancelled. I will exit when compaction is cancelled.",
|
||||
sig,
|
||||
))
|
||||
db.DisableManualCompaction()
|
||||
logger.Info("manual compaction disabled")
|
||||
|
||||
// Stop the logging
|
||||
done <- true
|
||||
}
|
||||
}()
|
||||
}
|
19
cmd/kava/cmd/rocksdb/rocksdb.go
Normal file
19
cmd/kava/cmd/rocksdb/rocksdb.go
Normal file
@ -0,0 +1,19 @@
|
||||
//go:build rocksdb
|
||||
// +build rocksdb
|
||||
|
||||
package rocksdb
|
||||
|
||||
import (
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
// RocksDBCmd defines the root command containing subcommands that assist in
|
||||
// rocksdb related tasks such as manual compaction.
|
||||
var RocksDBCmd = &cobra.Command{
|
||||
Use: "rocksdb",
|
||||
Short: "RocksDB util commands",
|
||||
}
|
||||
|
||||
func init() {
|
||||
RocksDBCmd.AddCommand(CompactRocksDBCmd())
|
||||
}
|
14
cmd/kava/cmd/rocksdb/rocksdb_dummy.go
Normal file
14
cmd/kava/cmd/rocksdb/rocksdb_dummy.go
Normal file
@ -0,0 +1,14 @@
|
||||
//go:build !rocksdb
|
||||
// +build !rocksdb
|
||||
|
||||
package rocksdb
|
||||
|
||||
import (
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
// RocksDBCmd defines the root command when the rocksdb build tag is not set.
|
||||
var RocksDBCmd = &cobra.Command{
|
||||
Use: "rocksdb",
|
||||
Short: "RocksDB util commands, disabled because rocksdb build tag not set",
|
||||
}
|
@ -22,6 +22,7 @@ import (
|
||||
|
||||
"github.com/kava-labs/kava/app"
|
||||
"github.com/kava-labs/kava/app/params"
|
||||
"github.com/kava-labs/kava/cmd/kava/cmd/rocksdb"
|
||||
"github.com/kava-labs/kava/cmd/kava/opendb"
|
||||
)
|
||||
|
||||
@ -123,6 +124,7 @@ func addSubCmds(rootCmd *cobra.Command, encodingConfig params.EncodingConfig, de
|
||||
newQueryCmd(),
|
||||
newTxCmd(),
|
||||
keyCommands(app.DefaultNodeHome),
|
||||
rocksdb.RocksDBCmd,
|
||||
newShardCmd(opts),
|
||||
)
|
||||
}
|
||||
|
@ -41,7 +41,7 @@ const (
|
||||
// default tm-db block cache size for RocksDB
|
||||
defaultBlockCacheSize = 1 << 30
|
||||
|
||||
defaultColumnFamilyName = "default"
|
||||
DefaultColumnFamilyName = "default"
|
||||
|
||||
enableMetricsOptName = "rocksdb.enable-metrics"
|
||||
reportMetricsIntervalSecsOptName = "rocksdb.report-metrics-interval-secs"
|
||||
@ -91,7 +91,7 @@ func OpenDB(appOpts types.AppOptions, home string, backendType dbm.BackendType)
|
||||
// option will be overridden only in case if it explicitly specified in appOpts
|
||||
func openRocksdb(dir string, appOpts types.AppOptions) (dbm.DB, error) {
|
||||
optionsPath := filepath.Join(dir, "application.db")
|
||||
dbOpts, cfOpts, err := loadLatestOptions(optionsPath)
|
||||
dbOpts, cfOpts, err := LoadLatestOptions(optionsPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -112,10 +112,10 @@ func openRocksdb(dir string, appOpts types.AppOptions) (dbm.DB, error) {
|
||||
return newRocksDBWithOptions("application", dir, dbOpts, cfOpts, readOpts, enableMetrics, reportMetricsIntervalSecs)
|
||||
}
|
||||
|
||||
// loadLatestOptions loads and returns database and column family options
|
||||
// LoadLatestOptions loads and returns database and column family options
|
||||
// if options file not found, it means database isn't created yet, in such case default tm-db options will be returned
|
||||
// if database exists it should have only one column family named default
|
||||
func loadLatestOptions(dir string) (*grocksdb.Options, *grocksdb.Options, error) {
|
||||
func LoadLatestOptions(dir string) (*grocksdb.Options, *grocksdb.Options, error) {
|
||||
latestOpts, err := grocksdb.LoadLatestOptions(dir, grocksdb.NewDefaultEnv(), true, grocksdb.NewLRUCache(defaultBlockCacheSize))
|
||||
if err != nil && strings.HasPrefix(err.Error(), "NotFound: ") {
|
||||
return newDefaultOptions(), newDefaultOptions(), nil
|
||||
@ -127,7 +127,7 @@ func loadLatestOptions(dir string) (*grocksdb.Options, *grocksdb.Options, error)
|
||||
cfNames := latestOpts.ColumnFamilyNames()
|
||||
cfOpts := latestOpts.ColumnFamilyOpts()
|
||||
// db should have only one column family named default
|
||||
ok := len(cfNames) == 1 && cfNames[0] == defaultColumnFamilyName
|
||||
ok := len(cfNames) == 1 && cfNames[0] == DefaultColumnFamilyName
|
||||
if !ok {
|
||||
return nil, nil, ErrUnexpectedConfiguration
|
||||
}
|
||||
@ -312,7 +312,7 @@ func newRocksDBWithOptions(
|
||||
dbOpts.EnableStatistics()
|
||||
}
|
||||
|
||||
db, _, err := grocksdb.OpenDbColumnFamilies(dbOpts, dbPath, []string{defaultColumnFamilyName}, []*grocksdb.Options{cfOpts})
|
||||
db, _, err := grocksdb.OpenDbColumnFamilies(dbOpts, dbPath, []string{DefaultColumnFamilyName}, []*grocksdb.Options{cfOpts})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -83,7 +83,7 @@ func TestOpenRocksdb(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
dbOpts, cfOpts, err := loadLatestOptions(filepath.Join(dir, "application.db"))
|
||||
dbOpts, cfOpts, err := LoadLatestOptions(filepath.Join(dir, "application.db"))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.maxOpenFiles, dbOpts.GetMaxOpenFiles())
|
||||
require.Equal(t, tc.maxFileOpeningThreads, dbOpts.GetMaxFileOpeningThreads())
|
||||
@ -108,7 +108,7 @@ func TestOpenRocksdb(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
dbOpts, cfOpts, err := loadLatestOptions(filepath.Join(dir, "application.db"))
|
||||
dbOpts, cfOpts, err := LoadLatestOptions(filepath.Join(dir, "application.db"))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, defaultOpts.GetMaxOpenFiles(), dbOpts.GetMaxOpenFiles())
|
||||
require.Equal(t, defaultOpts.GetMaxFileOpeningThreads(), dbOpts.GetMaxFileOpeningThreads())
|
||||
@ -190,7 +190,7 @@ func TestLoadLatestOptions(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
dbOpts, cfOpts, err := loadLatestOptions(filepath.Join(dir, "application.db"))
|
||||
dbOpts, cfOpts, err := LoadLatestOptions(filepath.Join(dir, "application.db"))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.maxOpenFiles, dbOpts.GetMaxOpenFiles())
|
||||
require.Equal(t, tc.maxFileOpeningThreads, dbOpts.GetMaxFileOpeningThreads())
|
||||
@ -210,7 +210,7 @@ func TestLoadLatestOptions(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
dbOpts, cfOpts, err := loadLatestOptions(filepath.Join(dir, "application.db"))
|
||||
dbOpts, cfOpts, err := LoadLatestOptions(filepath.Join(dir, "application.db"))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, defaultOpts.GetMaxOpenFiles(), dbOpts.GetMaxOpenFiles())
|
||||
require.Equal(t, defaultOpts.GetMaxFileOpeningThreads(), dbOpts.GetMaxFileOpeningThreads())
|
||||
@ -368,7 +368,7 @@ func TestNewRocksDBWithOptions(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, db.Close())
|
||||
|
||||
dbOpts, cfOpts, err = loadLatestOptions(filepath.Join(dir, "application.db"))
|
||||
dbOpts, cfOpts, err = LoadLatestOptions(filepath.Join(dir, "application.db"))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 999, dbOpts.GetMaxOpenFiles())
|
||||
require.Equal(t, defaultOpts.GetMaxFileOpeningThreads(), dbOpts.GetMaxFileOpeningThreads())
|
||||
|
@ -1 +1 @@
|
||||
Subproject commit 075328bbb54010f2dcddd29fe0a0a0cfccaf41e2
|
||||
Subproject commit e1085562d203fd81c5dd8576170b29715b2de9ef
|
Loading…
Reference in New Issue
Block a user