cli: refactor shard command & add recovery options (#1837)

* expose SkipLoadLatest override via AppOpts

* add --force-app-version option to shard command

* refactor sharding of application.db

* refactor sharding of blockstore & state.db

* add --only-cometbft-state flag

* add comment divisions

* update usage doc

* prevent infinite loop during cometbft rollback
This commit is contained in:
Robert Pirtle 2024-02-29 16:22:09 -08:00 committed by GitHub
parent 2a1e9a6631
commit f5384a1f11
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 188 additions and 94 deletions

View File

@ -31,6 +31,7 @@ import (
const (
flagMempoolEnableAuth = "mempool.enable-authentication"
flagMempoolAuthAddresses = "mempool.authorized-addresses"
flagSkipLoadLatest = "skip-load-latest"
)
// appCreator holds functions used by the sdk server to control the kava app.
@ -101,10 +102,15 @@ func (ac appCreator) newApp(
chainID = appGenesis.ChainID
}
skipLoadLatest := false
if appOpts.Get(flagSkipLoadLatest) != nil {
skipLoadLatest = cast.ToBool(appOpts.Get(flagSkipLoadLatest))
}
return app.NewApp(
logger, db, homeDir, traceStore, ac.encodingConfig,
app.Options{
SkipLoadLatest: false,
SkipLoadLatest: skipLoadLatest,
SkipUpgradeHeights: skipUpgradeHeights,
SkipGenesisInvariants: cast.ToBool(appOpts.Get(crisis.FlagSkipGenesisInvariants)),
InvariantCheckPeriod: cast.ToUint(appOpts.Get(server.FlagInvCheckPeriod)),

View File

@ -4,6 +4,7 @@ import (
"fmt"
"strings"
"github.com/kava-labs/kava/app"
"github.com/spf13/cobra"
dbm "github.com/cometbft/cometbft-db"
@ -23,9 +24,11 @@ import (
)
const (
flagShardStartBlock = "start"
flagShardEndBlock = "end"
flagShardOnlyAppState = "only-app-state"
flagShardStartBlock = "start"
flagShardEndBlock = "end"
flagShardOnlyAppState = "only-app-state"
flagShardForceAppVersion = "force-app-version"
flagShardOnlyCometbftState = "only-cometbft-state"
// TODO: --preserve flag for creating & operating on a copy?
// allow using -1 to mean "latest" (perform no rollbacks)
@ -34,7 +37,7 @@ const (
func newShardCmd(opts ethermintserver.StartOptions) *cobra.Command {
cmd := &cobra.Command{
Use: "shard --home <path-to-home-dir> --start <start-block> --end <end-block> [--only-app-state]",
Use: "shard --home <path-to-home-dir> --start <start-block> --end <end-block> [--only-app-state] [--only-cometbft-state] [--force-app-version <app-version>]",
Short: "Strip all blocks from the database outside of a given range",
Long: `shard opens a local kava home directory's databases and removes all blocks outside a range defined by --start and --end. The range is inclusive of the end block.
@ -42,8 +45,14 @@ It works by first rolling back the latest state to the block before the end bloc
Setting the end block to -1 signals to keep the latest block (no rollbacks).
The application.db can be loaded at a particular height via the --force-app-version option. This is useful if the sharding process is prematurely terminated while the application.db is being sharded.
The --only-app-state flag can be used to skip the pruning of the blockstore and cometbft state. This matches the functionality of the cosmos-sdk's "prune" command. Note that rolled back blocks will still affect all stores.
Similarly, the --only-cometbft-state flag skips pruning app state. This can be useful if the shard command is prematurely terminated during the shard process.
The shard command only flags the iavl tree nodes for deletion. Actual removal from the databases will be performed when each database is compacted.
WARNING: this is a destructive action.`,
Example: `Create a 1M block data shard (keeps blocks kava 1,000,000 to 2,000,000)
$ kava shard --home path/to/.kava --start 1000000 --end 2000000
@ -54,7 +63,9 @@ $ kava shard --home path/to/.kava --start 5000000 --end -1
Prune first 1M blocks _without_ affecting blockstore or cometBFT state:
$ kava shard --home path/to/.kava --start 1000000 --end -1 --only-app-state`,
RunE: func(cmd *cobra.Command, args []string) error {
// read & validate flags
//////////////////////////
// parse & validate flags
//////////////////////////
startBlock, err := cmd.Flags().GetInt64(flagShardStartBlock)
if err != nil {
return err
@ -70,16 +81,23 @@ $ kava shard --home path/to/.kava --start 1000000 --end -1 --only-app-state`,
if err != nil {
return err
}
forceAppVersion, err := cmd.Flags().GetInt64(flagShardForceAppVersion)
if err != nil {
return err
}
onlyCometbftState, err := cmd.Flags().GetBool(flagShardOnlyCometbftState)
if err != nil {
return err
}
clientCtx := client.GetClientContextFromCmd(cmd)
ctx := server.GetServerContextFromCmd(cmd)
ctx.Config.SetRoot(clientCtx.HomeDir)
//////////////////////////////
// Rollback state to endBlock
//////////////////////////////
////////////////////////
// manage db connection
////////////////////////
// connect to database
db, err := opts.DBOpener(ctx.Viper, clientCtx.HomeDir, server.GetAppDBBackend(ctx.Viper))
if err != nil {
@ -93,109 +111,59 @@ $ kava shard --home path/to/.kava --start 1000000 --end -1 --only-app-state`,
}
}()
///////////////////
// load multistore
///////////////////
// create app in order to load the multistore
// skip loading the latest version so the desired height can be manually loaded
ctx.Viper.Set("skip-load-latest", true)
app := opts.AppCreator(ctx.Logger, db, nil, ctx.Viper).(*app.App)
if forceAppVersion == shardEndBlockLatest {
if err := app.LoadLatestVersion(); err != nil {
return err
}
} else {
if err := app.LoadVersion(forceAppVersion); err != nil {
return err
}
}
// get the multistore
app := opts.AppCreator(ctx.Logger, db, nil, ctx.Viper)
cms := app.CommitMultiStore()
multistore, ok := cms.(*rootmulti.Store)
if !ok {
return fmt.Errorf("only sharding of rootmulti.Store type is supported")
}
// handle desired endblock being latest
latest := multistore.LatestVersion()
fmt.Printf("latest height: %d\n", latest)
if endBlock == shardEndBlockLatest {
endBlock = latest
}
shardSize := endBlock - startBlock + 1
// error if requesting block range the database does not have
if endBlock > latest {
return fmt.Errorf("data does not contain end block (%d): latest version is %d", endBlock, latest)
}
fmt.Printf("pruning data in %s down to heights %d - %d (%d blocks)\n", clientCtx.HomeDir, startBlock, endBlock, shardSize)
// set pruning options to prevent no-ops from `PruneStores`
multistore.SetPruning(pruningtypes.PruningOptions{KeepRecent: uint64(shardSize), Interval: 0})
// rollback application state
if err = multistore.RollbackToVersion(endBlock); err != nil {
return fmt.Errorf("failed to rollback application state: %s", err)
////////////////////////
// shard application.db
////////////////////////
if !onlyCometbftState {
if err := shardApplicationDb(multistore, startBlock, endBlock); err != nil {
return err
}
} else {
fmt.Printf("[%s] skipping sharding of application.db\n", flagShardOnlyCometbftState)
}
//////////////////////////////////
// shard blockstore.db & state.db
//////////////////////////////////
// open block store & cometbft state
blockStore, stateStore, err := openCometBftDbs(ctx.Config)
if err != nil {
return fmt.Errorf("failed to open cometbft dbs: %s", err)
}
// prep for outputting progress repeatedly to same line
needsRollback := endBlock < latest
progress := "rolling back blockstore & cometbft state to height %d"
numChars := len(fmt.Sprintf(progress, latest))
clearLine := fmt.Sprintf("\r%s\r", strings.Repeat(" ", numChars))
printRollbackProgress := func(h int64) {
fmt.Print(clearLine)
fmt.Printf(progress, h)
}
// rollback tendermint db
height := latest
for height > endBlock {
printRollbackProgress(height - 1)
height, _, err = tmstate.Rollback(blockStore, stateStore, true)
if err != nil {
return fmt.Errorf("failed to rollback tendermint state: %w", err)
}
}
if needsRollback {
fmt.Println()
} else {
fmt.Printf("latest store height is already %d\n", latest)
}
//////////////////////////////
// Prune blocks to startBlock
//////////////////////////////
// enumerate all heights to prune
pruneHeights := make([]int64, 0, latest-shardSize)
for i := int64(1); i < startBlock; i++ {
pruneHeights = append(pruneHeights, i)
}
if len(pruneHeights) > 0 {
// prune application state
fmt.Printf("pruning application state to height %d\n", startBlock)
if err := multistore.PruneStores(true, pruneHeights); err != nil {
return fmt.Errorf("failed to prune application state: %s", err)
}
}
// get starting block of block store
baseBlock := blockStore.Base()
// only prune if data exists, otherwise blockStore.PruneBlocks will panic
if !onlyAppState && baseBlock < startBlock {
// prune block store
fmt.Printf("pruning block store from %d - %d\n", baseBlock, startBlock)
if _, err := blockStore.PruneBlocks(startBlock); err != nil {
return fmt.Errorf("failed to prune block store (retainHeight=%d): %s", startBlock, err)
}
// prune cometbft state
fmt.Printf("pruning cometbft state from %d - %d\n", baseBlock, startBlock)
if err := stateStore.PruneStates(baseBlock, startBlock); err != nil {
return fmt.Errorf("failed to prune cometbft state store (%d - %d): %s", baseBlock, startBlock, err)
if !onlyAppState {
if err := shardCometBftDbs(blockStore, stateStore, startBlock, endBlock); err != nil {
return err
}
} else {
fmt.Printf("blockstore and cometbft state begins at block %d\n", baseBlock)
fmt.Printf("[%s] skipping sharding of blockstore.db and state.db\n", flagShardOnlyAppState)
fmt.Printf("blockstore contains blocks %d - %d\n", blockStore.Base(), blockStore.Height())
}
// TODO: db compaction
return nil
},
}
@ -204,10 +172,130 @@ $ kava shard --home path/to/.kava --start 1000000 --end -1 --only-app-state`,
cmd.Flags().Int64(flagShardStartBlock, 1, "Start block of data shard (inclusive)")
cmd.Flags().Int64(flagShardEndBlock, 0, "End block of data shard (inclusive)")
cmd.Flags().Bool(flagShardOnlyAppState, false, "Skip pruning of blockstore & cometbft state")
cmd.Flags().Bool(flagShardOnlyCometbftState, false, "Skip pruning of application state")
cmd.Flags().Int64(flagShardForceAppVersion, shardEndBlockLatest, "Instead of loading latest, force set the version of the multistore that is loaded")
return cmd
}
// shardApplicationDb prunes the multistore up to startBlock and rolls it back to endBlock
func shardApplicationDb(multistore *rootmulti.Store, startBlock, endBlock int64) error {
//////////////////////////////
// Rollback state to endBlock
//////////////////////////////
// handle desired endblock being latest
latest := multistore.LastCommitID().Version
if latest == 0 {
return fmt.Errorf("failed to find latest height >0")
}
fmt.Printf("latest height: %d\n", latest)
if endBlock == shardEndBlockLatest {
endBlock = latest
}
shardSize := endBlock - startBlock + 1
// error if requesting block range the database does not have
if endBlock > latest {
return fmt.Errorf("data does not contain end block (%d): latest version is %d", endBlock, latest)
}
fmt.Printf("pruning data down to heights %d - %d (%d blocks)\n", startBlock, endBlock, shardSize)
// set pruning options to prevent no-ops from `PruneStores`
multistore.SetPruning(pruningtypes.PruningOptions{KeepRecent: uint64(shardSize), Interval: 0})
// rollback application state
if err := multistore.RollbackToVersion(endBlock); err != nil {
return fmt.Errorf("failed to rollback application state: %s", err)
}
//////////////////////////////
// Prune blocks to startBlock
//////////////////////////////
// enumerate all heights to prune
pruneHeights := make([]int64, 0, latest-shardSize)
for i := int64(1); i < startBlock; i++ {
pruneHeights = append(pruneHeights, i)
}
if len(pruneHeights) > 0 {
// prune application state
fmt.Printf("pruning application state to height %d\n", startBlock)
if err := multistore.PruneStores(true, pruneHeights); err != nil {
return fmt.Errorf("failed to prune application state: %s", err)
}
}
return nil
}
// shardCometBftDbs shrinks blockstore.db & state.db down to the desired block range
func shardCometBftDbs(blockStore *store.BlockStore, stateStore tmstate.Store, startBlock, endBlock int64) error {
var err error
latest := blockStore.Height()
if endBlock == shardEndBlockLatest {
endBlock = latest
}
//////////////////////////////
// Rollback state to endBlock
//////////////////////////////
// prep for outputting progress repeatedly to same line
needsRollback := endBlock < latest
progress := "rolling back blockstore & cometbft state to height %d"
numChars := len(fmt.Sprintf(progress, latest))
clearLine := fmt.Sprintf("\r%s\r", strings.Repeat(" ", numChars))
printRollbackProgress := func(h int64) {
fmt.Print(clearLine)
fmt.Printf(progress, h)
}
// rollback tendermint db
height := latest
for height > endBlock {
beforeRollbackHeight := height
printRollbackProgress(height - 1)
height, _, err = tmstate.Rollback(blockStore, stateStore, true)
if err != nil {
return fmt.Errorf("failed to rollback cometbft state: %w", err)
}
if beforeRollbackHeight == height {
return fmt.Errorf("attempting to rollback cometbft state height %d failed (no rollback performed)", height)
}
}
if needsRollback {
fmt.Println()
} else {
fmt.Printf("latest store height is already %d\n", latest)
}
//////////////////////////////
// Prune blocks to startBlock
//////////////////////////////
// get starting block of block store
baseBlock := blockStore.Base()
// only prune if data exists, otherwise blockStore.PruneBlocks will panic
if baseBlock < startBlock {
// prune block store
fmt.Printf("pruning block store from %d - %d\n", baseBlock, startBlock)
if _, err := blockStore.PruneBlocks(startBlock); err != nil {
return fmt.Errorf("failed to prune block store (retainHeight=%d): %s", startBlock, err)
}
// prune cometbft state
fmt.Printf("pruning cometbft state from %d - %d\n", baseBlock, startBlock)
if err := stateStore.PruneStates(baseBlock, startBlock); err != nil {
return fmt.Errorf("failed to prune cometbft state store (%d - %d): %s", baseBlock, startBlock, err)
}
} else {
fmt.Printf("blockstore and cometbft state begins at block %d\n", baseBlock)
}
return nil
}
// inspired by https://github.com/Kava-Labs/cometbft/blob/277b0853db3f67865a55aa1c54f59790b5f591be/node/node.go#L234
func openCometBftDbs(config *tmconfig.Config) (blockStore *store.BlockStore, stateStore tmstate.Store, err error) {
dbProvider := node.DefaultDBProvider