0g-chain/cmd/kava/opendb/opendb_rocksdb.go
2023-09-14 15:08:26 -04:00

385 lines
13 KiB
Go

//go:build rocksdb
// +build rocksdb
// Copyright 2023 Kava Labs, Inc.
// Copyright 2023 Cronos Labs, Inc.
//
// Derived from https://github.com/crypto-org-chain/cronos@496ce7e
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package opendb
import (
"errors"
"fmt"
"os"
"path/filepath"
"runtime"
"strings"
"time"
"github.com/cosmos/cosmos-sdk/server/types"
"github.com/linxGnu/grocksdb"
"github.com/spf13/cast"
dbm "github.com/tendermint/tm-db"
)
var ErrUnexpectedConfiguration = errors.New("unexpected rocksdb configuration, rocksdb should have only one column family named default")
const (
// default tm-db block cache size for RocksDB
defaultBlockCacheSize = 1 << 30
defaultColumnFamilyName = "default"
enableMetricsOptName = "rocksdb.enable-metrics"
reportMetricsIntervalSecsOptName = "rocksdb.report-metrics-interval-secs"
defaultReportMetricsIntervalSecs = 15
maxOpenFilesDBOptName = "rocksdb.max-open-files"
maxFileOpeningThreadsDBOptName = "rocksdb.max-file-opening-threads"
tableCacheNumshardbitsDBOptName = "rocksdb.table_cache_numshardbits"
allowMMAPWritesDBOptName = "rocksdb.allow_mmap_writes"
allowMMAPReadsDBOptName = "rocksdb.allow_mmap_reads"
useFsyncDBOptName = "rocksdb.use_fsync"
useAdaptiveMutexDBOptName = "rocksdb.use_adaptive_mutex"
bytesPerSyncDBOptName = "rocksdb.bytes_per_sync"
maxBackgroundJobsDBOptName = "rocksdb.max-background-jobs"
writeBufferSizeCFOptName = "rocksdb.write-buffer-size"
numLevelsCFOptName = "rocksdb.num-levels"
maxWriteBufferNumberCFOptName = "rocksdb.max_write_buffer_number"
minWriteBufferNumberToMergeCFOptName = "rocksdb.min_write_buffer_number_to_merge"
maxBytesForLevelBaseCFOptName = "rocksdb.max_bytes_for_level_base"
maxBytesForLevelMultiplierCFOptName = "rocksdb.max_bytes_for_level_multiplier"
targetFileSizeBaseCFOptName = "rocksdb.target_file_size_base"
targetFileSizeMultiplierCFOptName = "rocksdb.target_file_size_multiplier"
level0FileNumCompactionTriggerCFOptName = "rocksdb.level0_file_num_compaction_trigger"
level0SlowdownWritesTriggerCFOptName = "rocksdb.level0_slowdown_writes_trigger"
blockCacheSizeBBTOOptName = "rocksdb.block_cache_size"
bitsPerKeyBBTOOptName = "rocksdb.bits_per_key"
blockSizeBBTOOptName = "rocksdb.block_size"
cacheIndexAndFilterBlocksBBTOOptName = "rocksdb.cache_index_and_filter_blocks"
pinL0FilterAndIndexBlocksInCacheBBTOOptName = "rocksdb.pin_l0_filter_and_index_blocks_in_cache"
formatVersionBBTOOptName = "rocksdb.format_version"
)
func OpenDB(appOpts types.AppOptions, home string, backendType dbm.BackendType) (dbm.DB, error) {
dataDir := filepath.Join(home, "data")
if backendType == dbm.RocksDBBackend {
return openRocksdb(filepath.Join(dataDir, "application.db"), appOpts)
}
return dbm.NewDB("application", backendType, dataDir)
}
// openRocksdb loads existing options, overrides some of them with appOpts and opens database
// option will be overridden only in case if it explicitly specified in appOpts
func openRocksdb(dir string, appOpts types.AppOptions) (dbm.DB, error) {
dbOpts, cfOpts, err := loadLatestOptions(dir)
if err != nil {
return nil, err
}
// customize rocksdb options
bbtoOpts := bbtoFromAppOpts(appOpts)
dbOpts.SetBlockBasedTableFactory(bbtoOpts)
cfOpts.SetBlockBasedTableFactory(bbtoOpts)
dbOpts = overrideDBOpts(dbOpts, appOpts)
cfOpts = overrideCFOpts(cfOpts, appOpts)
enableMetrics := cast.ToBool(appOpts.Get(enableMetricsOptName))
reportMetricsIntervalSecs := cast.ToInt64(appOpts.Get(reportMetricsIntervalSecsOptName))
if reportMetricsIntervalSecs == 0 {
reportMetricsIntervalSecs = defaultReportMetricsIntervalSecs
}
return newRocksDBWithOptions("application", dir, dbOpts, cfOpts, enableMetrics, reportMetricsIntervalSecs)
}
// loadLatestOptions loads and returns database and column family options
// if options file not found, it means database isn't created yet, in such case default tm-db options will be returned
// if database exists it should have only one column family named default
func loadLatestOptions(dir string) (*grocksdb.Options, *grocksdb.Options, error) {
latestOpts, err := grocksdb.LoadLatestOptions(dir, grocksdb.NewDefaultEnv(), true, grocksdb.NewLRUCache(defaultBlockCacheSize))
if err != nil && strings.HasPrefix(err.Error(), "NotFound: ") {
return newDefaultOptions(), newDefaultOptions(), nil
}
if err != nil {
return nil, nil, err
}
cfNames := latestOpts.ColumnFamilyNames()
cfOpts := latestOpts.ColumnFamilyOpts()
// db should have only one column family named default
ok := len(cfNames) == 1 && cfNames[0] == defaultColumnFamilyName
if !ok {
return nil, nil, ErrUnexpectedConfiguration
}
// return db and cf opts
return latestOpts.Options(), &cfOpts[0], nil
}
// overrideDBOpts merges dbOpts and appOpts, appOpts takes precedence
func overrideDBOpts(dbOpts *grocksdb.Options, appOpts types.AppOptions) *grocksdb.Options {
maxOpenFiles := appOpts.Get(maxOpenFilesDBOptName)
if maxOpenFiles != nil {
dbOpts.SetMaxOpenFiles(cast.ToInt(maxOpenFiles))
}
maxFileOpeningThreads := appOpts.Get(maxFileOpeningThreadsDBOptName)
if maxFileOpeningThreads != nil {
dbOpts.SetMaxFileOpeningThreads(cast.ToInt(maxFileOpeningThreads))
}
tableCacheNumshardbits := appOpts.Get(tableCacheNumshardbitsDBOptName)
if tableCacheNumshardbits != nil {
dbOpts.SetTableCacheNumshardbits(cast.ToInt(tableCacheNumshardbits))
}
allowMMAPWrites := appOpts.Get(allowMMAPWritesDBOptName)
if allowMMAPWrites != nil {
dbOpts.SetAllowMmapWrites(cast.ToBool(allowMMAPWrites))
}
allowMMAPReads := appOpts.Get(allowMMAPReadsDBOptName)
if allowMMAPReads != nil {
dbOpts.SetAllowMmapReads(cast.ToBool(allowMMAPReads))
}
useFsync := appOpts.Get(useFsyncDBOptName)
if useFsync != nil {
dbOpts.SetUseFsync(cast.ToBool(useFsync))
}
useAdaptiveMutex := appOpts.Get(useAdaptiveMutexDBOptName)
if useAdaptiveMutex != nil {
dbOpts.SetUseAdaptiveMutex(cast.ToBool(useAdaptiveMutex))
}
bytesPerSync := appOpts.Get(bytesPerSyncDBOptName)
if bytesPerSync != nil {
dbOpts.SetBytesPerSync(cast.ToUint64(bytesPerSync))
}
maxBackgroundJobs := appOpts.Get(maxBackgroundJobsDBOptName)
if maxBackgroundJobs != nil {
dbOpts.SetMaxBackgroundJobs(cast.ToInt(maxBackgroundJobs))
}
return dbOpts
}
// overrideCFOpts merges cfOpts and appOpts, appOpts takes precedence
func overrideCFOpts(cfOpts *grocksdb.Options, appOpts types.AppOptions) *grocksdb.Options {
writeBufferSize := appOpts.Get(writeBufferSizeCFOptName)
if writeBufferSize != nil {
cfOpts.SetWriteBufferSize(cast.ToUint64(writeBufferSize))
}
numLevels := appOpts.Get(numLevelsCFOptName)
if numLevels != nil {
cfOpts.SetNumLevels(cast.ToInt(numLevels))
}
maxWriteBufferNumber := appOpts.Get(maxWriteBufferNumberCFOptName)
if maxWriteBufferNumber != nil {
cfOpts.SetMaxWriteBufferNumber(cast.ToInt(maxWriteBufferNumber))
}
minWriteBufferNumberToMerge := appOpts.Get(minWriteBufferNumberToMergeCFOptName)
if minWriteBufferNumberToMerge != nil {
cfOpts.SetMinWriteBufferNumberToMerge(cast.ToInt(minWriteBufferNumberToMerge))
}
maxBytesForLevelBase := appOpts.Get(maxBytesForLevelBaseCFOptName)
if maxBytesForLevelBase != nil {
cfOpts.SetMaxBytesForLevelBase(cast.ToUint64(maxBytesForLevelBase))
}
maxBytesForLevelMultiplier := appOpts.Get(maxBytesForLevelMultiplierCFOptName)
if maxBytesForLevelMultiplier != nil {
cfOpts.SetMaxBytesForLevelMultiplier(cast.ToFloat64(maxBytesForLevelMultiplier))
}
targetFileSizeBase := appOpts.Get(targetFileSizeBaseCFOptName)
if targetFileSizeBase != nil {
cfOpts.SetTargetFileSizeBase(cast.ToUint64(targetFileSizeBase))
}
targetFileSizeMultiplier := appOpts.Get(targetFileSizeMultiplierCFOptName)
if targetFileSizeMultiplier != nil {
cfOpts.SetTargetFileSizeMultiplier(cast.ToInt(targetFileSizeMultiplier))
}
level0FileNumCompactionTrigger := appOpts.Get(level0FileNumCompactionTriggerCFOptName)
if level0FileNumCompactionTrigger != nil {
cfOpts.SetLevel0FileNumCompactionTrigger(cast.ToInt(level0FileNumCompactionTrigger))
}
level0SlowdownWritesTrigger := appOpts.Get(level0SlowdownWritesTriggerCFOptName)
if level0SlowdownWritesTrigger != nil {
cfOpts.SetLevel0SlowdownWritesTrigger(cast.ToInt(level0SlowdownWritesTrigger))
}
return cfOpts
}
func bbtoFromAppOpts(appOpts types.AppOptions) *grocksdb.BlockBasedTableOptions {
bbto := defaultBBTO()
blockCacheSize := appOpts.Get(blockCacheSizeBBTOOptName)
if blockCacheSize != nil {
cache := grocksdb.NewLRUCache(cast.ToUint64(blockCacheSize))
bbto.SetBlockCache(cache)
}
bitsPerKey := appOpts.Get(bitsPerKeyBBTOOptName)
if bitsPerKey != nil {
filter := grocksdb.NewBloomFilter(cast.ToFloat64(bitsPerKey))
bbto.SetFilterPolicy(filter)
}
blockSize := appOpts.Get(blockSizeBBTOOptName)
if blockSize != nil {
bbto.SetBlockSize(cast.ToInt(blockSize))
}
cacheIndexAndFilterBlocks := appOpts.Get(cacheIndexAndFilterBlocksBBTOOptName)
if cacheIndexAndFilterBlocks != nil {
bbto.SetCacheIndexAndFilterBlocks(cast.ToBool(cacheIndexAndFilterBlocks))
}
pinL0FilterAndIndexBlocksInCache := appOpts.Get(pinL0FilterAndIndexBlocksInCacheBBTOOptName)
if pinL0FilterAndIndexBlocksInCache != nil {
bbto.SetPinL0FilterAndIndexBlocksInCache(cast.ToBool(pinL0FilterAndIndexBlocksInCache))
}
formatVersion := appOpts.Get(formatVersionBBTOOptName)
if formatVersion != nil {
bbto.SetFormatVersion(cast.ToInt(formatVersion))
}
return bbto
}
// newRocksDBWithOptions opens rocksdb with provided database and column family options
// newRocksDBWithOptions expects that db has only one column family named default
func newRocksDBWithOptions(
name string,
dir string,
dbOpts *grocksdb.Options,
cfOpts *grocksdb.Options,
enableMetrics bool,
reportMetricsIntervalSecs int64,
) (*dbm.RocksDB, error) {
dbPath := filepath.Join(dir, name+".db")
// Ensure path exists
if err := os.MkdirAll(dbPath, 0755); err != nil {
return nil, fmt.Errorf("failed to create db path: %w", err)
}
// EnableStatistics adds overhead so shouldn't be enabled in production
if enableMetrics {
dbOpts.EnableStatistics()
}
db, _, err := grocksdb.OpenDbColumnFamilies(dbOpts, dbPath, []string{defaultColumnFamilyName}, []*grocksdb.Options{cfOpts})
if err != nil {
return nil, err
}
if enableMetrics {
registerMetrics()
go reportMetrics(db, time.Second*time.Duration(reportMetricsIntervalSecs))
}
ro := grocksdb.NewDefaultReadOptions()
wo := grocksdb.NewDefaultWriteOptions()
woSync := grocksdb.NewDefaultWriteOptions()
woSync.SetSync(true)
return dbm.NewRocksDBWithRawDB(db, ro, wo, woSync), nil
}
// newDefaultOptions returns default tm-db options for RocksDB, see for details:
// https://github.com/Kava-Labs/tm-db/blob/94ff76d31724965f8883cddebabe91e0d01bc03f/rocksdb.go#L30
func newDefaultOptions() *grocksdb.Options {
// default rocksdb option, good enough for most cases, including heavy workloads.
// 1GB table cache, 512MB write buffer(may use 50% more on heavy workloads).
// compression: snappy as default, need to -lsnappy to enable.
bbto := defaultBBTO()
opts := grocksdb.NewDefaultOptions()
opts.SetBlockBasedTableFactory(bbto)
// SetMaxOpenFiles to 4096 seems to provide a reliable performance boost
opts.SetMaxOpenFiles(4096)
opts.SetCreateIfMissing(true)
opts.IncreaseParallelism(runtime.NumCPU())
// 1.5GB maximum memory use for writebuffer.
opts.OptimizeLevelStyleCompaction(512 * 1024 * 1024)
return opts
}
// defaultBBTO returns default tm-db bbto options for RocksDB, see for details:
// https://github.com/Kava-Labs/tm-db/blob/94ff76d31724965f8883cddebabe91e0d01bc03f/rocksdb.go#L30
func defaultBBTO() *grocksdb.BlockBasedTableOptions {
bbto := grocksdb.NewDefaultBlockBasedTableOptions()
bbto.SetBlockCache(grocksdb.NewLRUCache(defaultBlockCacheSize))
bbto.SetFilterPolicy(grocksdb.NewBloomFilter(10))
return bbto
}
// reportMetrics periodically requests stats from rocksdb and reports to prometheus
// NOTE: should be launched as a goroutine
func reportMetrics(db *grocksdb.DB, interval time.Duration) {
ticker := time.NewTicker(interval)
for {
select {
case <-ticker.C:
props, stats, err := getPropsAndStats(db)
if err != nil {
continue
}
rocksdbMetrics.report(props, stats)
}
}
}
// getPropsAndStats gets statistics from rocksdb
func getPropsAndStats(db *grocksdb.DB) (*properties, *stats, error) {
propsLoader := newPropsLoader(db)
props, err := propsLoader.load()
if err != nil {
return nil, nil, err
}
statMap, err := parseSerializedStats(props.OptionsStatistics)
if err != nil {
return nil, nil, err
}
statLoader := newStatLoader(statMap)
stats, err := statLoader.load()
if err != nil {
return nil, nil, err
}
return props, stats, nil
}