mirror of
https://github.com/0glabs/0g-chain.git
synced 2024-12-27 00:35:18 +00:00
* Make rocksdb configurable (#1658)
* Make rocksdb configurable
* Make sure rocksdb tests are running in CI
* Updating ci-rocksdb-build workflow
* Remove test.sh
* Update tm-db dependency
(cherry picked from commit 90fbe1aad7
)
* Rocksdb Metrics (#1692)
* Rocksdb Metrics
* Add rocksdb namespace for options
* Adding help to the metrics
* CR's fixes
* CR's fixes
* CR's fixes
* Increase number of options to configure rocksdb (#1696)
---------
Co-authored-by: Evgeniy Scherbina <evgeniy.shcherbina.es@gmail.com>
This commit is contained in:
parent
bdfcf56fba
commit
e35747a5f5
29
.github/workflows/ci-rocksdb-build.yml
vendored
29
.github/workflows/ci-rocksdb-build.yml
vendored
@ -1,5 +1,8 @@
|
|||||||
name: Continuous Integration (Rocksdb Build)
|
name: Continuous Integration (Rocksdb Build)
|
||||||
|
|
||||||
|
env:
|
||||||
|
ROCKSDB_VERSION: v8.1.1
|
||||||
|
|
||||||
on:
|
on:
|
||||||
workflow_call:
|
workflow_call:
|
||||||
jobs:
|
jobs:
|
||||||
@ -16,7 +19,29 @@ jobs:
|
|||||||
cache: true
|
cache: true
|
||||||
- name: build rocksdb dependency
|
- name: build rocksdb dependency
|
||||||
run: bash ${GITHUB_WORKSPACE}/.github/scripts/install-rocksdb.sh
|
run: bash ${GITHUB_WORKSPACE}/.github/scripts/install-rocksdb.sh
|
||||||
env:
|
|
||||||
ROCKSDB_VERSION: v7.10.2
|
|
||||||
- name: build application
|
- name: build application
|
||||||
run: make build COSMOS_BUILD_OPTIONS=rocksdb
|
run: make build COSMOS_BUILD_OPTIONS=rocksdb
|
||||||
|
test:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: install RocksDB dependencies
|
||||||
|
run: sudo apt-get update
|
||||||
|
&& sudo apt-get install -y git make gcc libgflags-dev libsnappy-dev zlib1g-dev libbz2-dev liblz4-dev libzstd-dev
|
||||||
|
- name: install RocksDB as shared library
|
||||||
|
run: git clone https://github.com/facebook/rocksdb.git
|
||||||
|
&& cd rocksdb
|
||||||
|
&& git checkout $ROCKSDB_VERSION
|
||||||
|
&& sudo make -j$(nproc) install-shared
|
||||||
|
&& sudo ldconfig
|
||||||
|
- name: checkout repo from current commit
|
||||||
|
uses: actions/checkout@v3
|
||||||
|
with:
|
||||||
|
submodules: true
|
||||||
|
- name: Set up Go
|
||||||
|
uses: actions/setup-go@v3
|
||||||
|
with:
|
||||||
|
go-version: "1.20"
|
||||||
|
check-latest: true
|
||||||
|
cache: true
|
||||||
|
- name: run unit tests
|
||||||
|
run: make test-rocksdb
|
||||||
|
@ -10,7 +10,7 @@ WORKDIR /root
|
|||||||
# default home directory is /root
|
# default home directory is /root
|
||||||
|
|
||||||
# install rocksdb
|
# install rocksdb
|
||||||
ARG rocksdb_version=v7.10.2
|
ARG rocksdb_version=v8.1.1
|
||||||
ENV ROCKSDB_VERSION=$rocksdb_version
|
ENV ROCKSDB_VERSION=$rocksdb_version
|
||||||
|
|
||||||
RUN git clone https://github.com/facebook/rocksdb.git \
|
RUN git clone https://github.com/facebook/rocksdb.git \
|
||||||
|
3
Makefile
3
Makefile
@ -302,6 +302,9 @@ test-e2e: docker-build
|
|||||||
test:
|
test:
|
||||||
@go test $$(go list ./... | grep -v 'contrib' | grep -v 'tests/e2e')
|
@go test $$(go list ./... | grep -v 'contrib' | grep -v 'tests/e2e')
|
||||||
|
|
||||||
|
test-rocksdb:
|
||||||
|
@go test -tags=rocksdb ./cmd/kava/opendb
|
||||||
|
|
||||||
# Run cli integration tests
|
# Run cli integration tests
|
||||||
# `-p 4` to use 4 cores, `-tags cli_test` to tell go not to ignore the cli package
|
# `-p 4` to use 4 cores, `-tags cli_test` to tell go not to ignore the cli package
|
||||||
# These tests use the `kvd` or `kvcli` binaries in the build dir, or in `$BUILDDIR` if that env var is set.
|
# These tests use the `kvd` or `kvcli` binaries in the build dir, or in `$BUILDDIR` if that env var is set.
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
"github.com/kava-labs/kava/app"
|
"github.com/kava-labs/kava/app"
|
||||||
"github.com/kava-labs/kava/app/params"
|
"github.com/kava-labs/kava/app/params"
|
||||||
kavaclient "github.com/kava-labs/kava/client"
|
kavaclient "github.com/kava-labs/kava/client"
|
||||||
|
"github.com/kava-labs/kava/cmd/kava/opendb"
|
||||||
)
|
)
|
||||||
|
|
||||||
// EnvPrefix is the prefix environment variables must have to configure the app.
|
// EnvPrefix is the prefix environment variables must have to configure the app.
|
||||||
@ -105,13 +106,15 @@ func addSubCmds(rootCmd *cobra.Command, encodingConfig params.EncodingConfig, de
|
|||||||
encodingConfig: encodingConfig,
|
encodingConfig: encodingConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
opts := ethermintserver.StartOptions{
|
||||||
|
AppCreator: ac.newApp,
|
||||||
|
DefaultNodeHome: app.DefaultNodeHome,
|
||||||
|
DBOpener: opendb.OpenDB,
|
||||||
|
}
|
||||||
// ethermintserver adds additional flags to start the JSON-RPC server for evm support
|
// ethermintserver adds additional flags to start the JSON-RPC server for evm support
|
||||||
ethermintserver.AddCommands(
|
ethermintserver.AddCommands(
|
||||||
rootCmd,
|
rootCmd,
|
||||||
ethermintserver.NewDefaultStartOptions(
|
opts,
|
||||||
ac.newApp,
|
|
||||||
app.DefaultNodeHome,
|
|
||||||
),
|
|
||||||
ac.appExport,
|
ac.appExport,
|
||||||
ac.addStartCmdFlags,
|
ac.addStartCmdFlags,
|
||||||
)
|
)
|
||||||
|
166
cmd/kava/opendb/metrics.go
Normal file
166
cmd/kava/opendb/metrics.go
Normal file
@ -0,0 +1,166 @@
|
|||||||
|
//go:build rocksdb
|
||||||
|
// +build rocksdb
|
||||||
|
|
||||||
|
package opendb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/go-kit/kit/metrics"
|
||||||
|
"github.com/go-kit/kit/metrics/prometheus"
|
||||||
|
stdprometheus "github.com/prometheus/client_golang/prometheus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// rocksdbMetrics will be initialized in registerMetrics() if enableRocksdbMetrics flag set to true
|
||||||
|
var rocksdbMetrics *Metrics
|
||||||
|
|
||||||
|
// Metrics contains all rocksdb metrics which will be reported to prometheus
|
||||||
|
type Metrics struct {
|
||||||
|
// Keys
|
||||||
|
NumberKeysWritten metrics.Gauge
|
||||||
|
NumberKeysRead metrics.Gauge
|
||||||
|
NumberKeysUpdated metrics.Gauge
|
||||||
|
EstimateNumKeys metrics.Gauge
|
||||||
|
|
||||||
|
// Files
|
||||||
|
NumberFileOpens metrics.Gauge
|
||||||
|
NumberFileErrors metrics.Gauge
|
||||||
|
|
||||||
|
// Memory
|
||||||
|
BlockCacheUsage metrics.Gauge
|
||||||
|
EstimateTableReadersMem metrics.Gauge
|
||||||
|
CurSizeAllMemTables metrics.Gauge
|
||||||
|
BlockCachePinnedUsage metrics.Gauge
|
||||||
|
|
||||||
|
// Cache
|
||||||
|
BlockCacheMiss metrics.Gauge
|
||||||
|
BlockCacheHit metrics.Gauge
|
||||||
|
BlockCacheAdd metrics.Gauge
|
||||||
|
BlockCacheAddFailures metrics.Gauge
|
||||||
|
}
|
||||||
|
|
||||||
|
// registerMetrics registers metrics in prometheus and initializes rocksdbMetrics variable
|
||||||
|
func registerMetrics() {
|
||||||
|
if rocksdbMetrics != nil {
|
||||||
|
// metrics already registered
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
labels := make([]string, 0)
|
||||||
|
rocksdbMetrics = &Metrics{
|
||||||
|
// Keys
|
||||||
|
NumberKeysWritten: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||||
|
Namespace: "rocksdb",
|
||||||
|
Subsystem: "key",
|
||||||
|
Name: "number_keys_written",
|
||||||
|
Help: "",
|
||||||
|
}, labels),
|
||||||
|
NumberKeysRead: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||||
|
Namespace: "rocksdb",
|
||||||
|
Subsystem: "key",
|
||||||
|
Name: "number_keys_read",
|
||||||
|
Help: "",
|
||||||
|
}, labels),
|
||||||
|
NumberKeysUpdated: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||||
|
Namespace: "rocksdb",
|
||||||
|
Subsystem: "key",
|
||||||
|
Name: "number_keys_updated",
|
||||||
|
Help: "",
|
||||||
|
}, labels),
|
||||||
|
EstimateNumKeys: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||||
|
Namespace: "rocksdb",
|
||||||
|
Subsystem: "key",
|
||||||
|
Name: "estimate_num_keys",
|
||||||
|
Help: "estimated number of total keys in the active and unflushed immutable memtables and storage",
|
||||||
|
}, labels),
|
||||||
|
|
||||||
|
// Files
|
||||||
|
NumberFileOpens: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||||
|
Namespace: "rocksdb",
|
||||||
|
Subsystem: "file",
|
||||||
|
Name: "number_file_opens",
|
||||||
|
Help: "",
|
||||||
|
}, labels),
|
||||||
|
NumberFileErrors: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||||
|
Namespace: "rocksdb",
|
||||||
|
Subsystem: "file",
|
||||||
|
Name: "number_file_errors",
|
||||||
|
Help: "",
|
||||||
|
}, labels),
|
||||||
|
|
||||||
|
// Memory
|
||||||
|
BlockCacheUsage: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||||
|
Namespace: "rocksdb",
|
||||||
|
Subsystem: "memory",
|
||||||
|
Name: "block_cache_usage",
|
||||||
|
Help: "memory size for the entries residing in block cache",
|
||||||
|
}, labels),
|
||||||
|
EstimateTableReadersMem: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||||
|
Namespace: "rocksdb",
|
||||||
|
Subsystem: "memory",
|
||||||
|
Name: "estimate_table_readers_mem",
|
||||||
|
Help: "estimated memory used for reading SST tables, excluding memory used in block cache (e.g., filter and index blocks)",
|
||||||
|
}, labels),
|
||||||
|
CurSizeAllMemTables: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||||
|
Namespace: "rocksdb",
|
||||||
|
Subsystem: "memory",
|
||||||
|
Name: "cur_size_all_mem_tables",
|
||||||
|
Help: "approximate size of active and unflushed immutable memtables (bytes)",
|
||||||
|
}, labels),
|
||||||
|
BlockCachePinnedUsage: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||||
|
Namespace: "rocksdb",
|
||||||
|
Subsystem: "memory",
|
||||||
|
Name: "block_cache_pinned_usage",
|
||||||
|
Help: "returns the memory size for the entries being pinned",
|
||||||
|
}, labels),
|
||||||
|
|
||||||
|
// Cache
|
||||||
|
BlockCacheMiss: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||||
|
Namespace: "rocksdb",
|
||||||
|
Subsystem: "cache",
|
||||||
|
Name: "block_cache_miss",
|
||||||
|
Help: "block_cache_miss == block_cache_index_miss + block_cache_filter_miss + block_cache_data_miss",
|
||||||
|
}, labels),
|
||||||
|
BlockCacheHit: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||||
|
Namespace: "rocksdb",
|
||||||
|
Subsystem: "cache",
|
||||||
|
Name: "block_cache_hit",
|
||||||
|
Help: "block_cache_hit == block_cache_index_hit + block_cache_filter_hit + block_cache_data_hit",
|
||||||
|
}, labels),
|
||||||
|
BlockCacheAdd: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||||
|
Namespace: "rocksdb",
|
||||||
|
Subsystem: "cache",
|
||||||
|
Name: "block_cache_add",
|
||||||
|
Help: "number of blocks added to block cache",
|
||||||
|
}, labels),
|
||||||
|
BlockCacheAddFailures: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||||
|
Namespace: "rocksdb",
|
||||||
|
Subsystem: "cache",
|
||||||
|
Name: "block_cache_add_failures",
|
||||||
|
Help: "number of failures when adding blocks to block cache",
|
||||||
|
}, labels),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// report reports metrics to prometheus based on rocksdb props and stats
|
||||||
|
func (m *Metrics) report(props *properties, stats *stats) {
|
||||||
|
// Keys
|
||||||
|
m.NumberKeysWritten.Set(float64(stats.NumberKeysWritten))
|
||||||
|
m.NumberKeysRead.Set(float64(stats.NumberKeysRead))
|
||||||
|
m.NumberKeysUpdated.Set(float64(stats.NumberKeysUpdated))
|
||||||
|
m.EstimateNumKeys.Set(float64(props.EstimateNumKeys))
|
||||||
|
|
||||||
|
// Files
|
||||||
|
m.NumberFileOpens.Set(float64(stats.NumberFileOpens))
|
||||||
|
m.NumberFileErrors.Set(float64(stats.NumberFileErrors))
|
||||||
|
|
||||||
|
// Memory
|
||||||
|
m.BlockCacheUsage.Set(float64(props.BlockCacheUsage))
|
||||||
|
m.EstimateTableReadersMem.Set(float64(props.EstimateTableReadersMem))
|
||||||
|
m.CurSizeAllMemTables.Set(float64(props.CurSizeAllMemTables))
|
||||||
|
m.BlockCachePinnedUsage.Set(float64(props.BlockCachePinnedUsage))
|
||||||
|
|
||||||
|
// Cache
|
||||||
|
m.BlockCacheMiss.Set(float64(stats.BlockCacheMiss))
|
||||||
|
m.BlockCacheHit.Set(float64(stats.BlockCacheHit))
|
||||||
|
m.BlockCacheAdd.Set(float64(stats.BlockCacheAdd))
|
||||||
|
m.BlockCacheAddFailures.Set(float64(stats.BlockCacheAddFailures))
|
||||||
|
}
|
18
cmd/kava/opendb/opendb.go
Normal file
18
cmd/kava/opendb/opendb.go
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
//go:build !rocksdb
|
||||||
|
// +build !rocksdb
|
||||||
|
|
||||||
|
package opendb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"path/filepath"
|
||||||
|
|
||||||
|
"github.com/cosmos/cosmos-sdk/server/types"
|
||||||
|
dbm "github.com/tendermint/tm-db"
|
||||||
|
)
|
||||||
|
|
||||||
|
// OpenDB is a copy of default DBOpener function used by ethermint, see for details:
|
||||||
|
// https://github.com/evmos/ethermint/blob/07cf2bd2b1ce9bdb2e44ec42a39e7239292a14af/server/start.go#L647
|
||||||
|
func OpenDB(_ types.AppOptions, home string, backendType dbm.BackendType) (dbm.DB, error) {
|
||||||
|
dataDir := filepath.Join(home, "data")
|
||||||
|
return dbm.NewDB("application", backendType, dataDir)
|
||||||
|
}
|
378
cmd/kava/opendb/opendb_rocksdb.go
Normal file
378
cmd/kava/opendb/opendb_rocksdb.go
Normal file
@ -0,0 +1,378 @@
|
|||||||
|
//go:build rocksdb
|
||||||
|
// +build rocksdb
|
||||||
|
|
||||||
|
// Copyright 2023 Kava Labs, Inc.
|
||||||
|
// Copyright 2023 Cronos Labs, Inc.
|
||||||
|
//
|
||||||
|
// Derived from https://github.com/crypto-org-chain/cronos@496ce7e
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package opendb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"runtime"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/cosmos/cosmos-sdk/server/types"
|
||||||
|
"github.com/linxGnu/grocksdb"
|
||||||
|
"github.com/spf13/cast"
|
||||||
|
dbm "github.com/tendermint/tm-db"
|
||||||
|
)
|
||||||
|
|
||||||
|
var ErrUnexpectedConfiguration = errors.New("unexpected rocksdb configuration, rocksdb should have only one column family named default")
|
||||||
|
|
||||||
|
const (
|
||||||
|
// default tm-db block cache size for RocksDB
|
||||||
|
defaultBlockCacheSize = 1 << 30
|
||||||
|
|
||||||
|
defaultColumnFamilyName = "default"
|
||||||
|
|
||||||
|
enableMetricsOptName = "rocksdb.enable-metrics"
|
||||||
|
reportMetricsIntervalSecsOptName = "rocksdb.report-metrics-interval-secs"
|
||||||
|
defaultReportMetricsIntervalSecs = 15
|
||||||
|
|
||||||
|
maxOpenFilesDBOptName = "rocksdb.max-open-files"
|
||||||
|
maxFileOpeningThreadsDBOptName = "rocksdb.max-file-opening-threads"
|
||||||
|
tableCacheNumshardbitsDBOptName = "rocksdb.table_cache_numshardbits"
|
||||||
|
allowMMAPWritesDBOptName = "rocksdb.allow_mmap_writes"
|
||||||
|
allowMMAPReadsDBOptName = "rocksdb.allow_mmap_reads"
|
||||||
|
useFsyncDBOptName = "rocksdb.use_fsync"
|
||||||
|
useAdaptiveMutexDBOptName = "rocksdb.use_adaptive_mutex"
|
||||||
|
bytesPerSyncDBOptName = "rocksdb.bytes_per_sync"
|
||||||
|
|
||||||
|
writeBufferSizeCFOptName = "rocksdb.write-buffer-size"
|
||||||
|
numLevelsCFOptName = "rocksdb.num-levels"
|
||||||
|
maxWriteBufferNumberCFOptName = "rocksdb.max_write_buffer_number"
|
||||||
|
minWriteBufferNumberToMergeCFOptName = "rocksdb.min_write_buffer_number_to_merge"
|
||||||
|
maxBytesForLevelBaseCFOptName = "rocksdb.max_bytes_for_level_base"
|
||||||
|
maxBytesForLevelMultiplierCFOptName = "rocksdb.max_bytes_for_level_multiplier"
|
||||||
|
targetFileSizeBaseCFOptName = "rocksdb.target_file_size_base"
|
||||||
|
targetFileSizeMultiplierCFOptName = "rocksdb.target_file_size_multiplier"
|
||||||
|
level0FileNumCompactionTriggerCFOptName = "rocksdb.level0_file_num_compaction_trigger"
|
||||||
|
level0SlowdownWritesTriggerCFOptName = "rocksdb.level0_slowdown_writes_trigger"
|
||||||
|
|
||||||
|
blockCacheSizeBBTOOptName = "rocksdb.block_cache_size"
|
||||||
|
bitsPerKeyBBTOOptName = "rocksdb.bits_per_key"
|
||||||
|
blockSizeBBTOOptName = "rocksdb.block_size"
|
||||||
|
cacheIndexAndFilterBlocksBBTOOptName = "rocksdb.cache_index_and_filter_blocks"
|
||||||
|
pinL0FilterAndIndexBlocksInCacheBBTOOptName = "rocksdb.pin_l0_filter_and_index_blocks_in_cache"
|
||||||
|
formatVersionBBTOOptName = "rocksdb.format_version"
|
||||||
|
)
|
||||||
|
|
||||||
|
func OpenDB(appOpts types.AppOptions, home string, backendType dbm.BackendType) (dbm.DB, error) {
|
||||||
|
dataDir := filepath.Join(home, "data")
|
||||||
|
if backendType == dbm.RocksDBBackend {
|
||||||
|
return openRocksdb(filepath.Join(dataDir, "application.db"), appOpts)
|
||||||
|
}
|
||||||
|
|
||||||
|
return dbm.NewDB("application", backendType, dataDir)
|
||||||
|
}
|
||||||
|
|
||||||
|
// openRocksdb loads existing options, overrides some of them with appOpts and opens database
|
||||||
|
// option will be overridden only in case if it explicitly specified in appOpts
|
||||||
|
func openRocksdb(dir string, appOpts types.AppOptions) (dbm.DB, error) {
|
||||||
|
dbOpts, cfOpts, err := loadLatestOptions(dir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// customize rocksdb options
|
||||||
|
bbtoOpts := bbtoFromAppOpts(appOpts)
|
||||||
|
dbOpts.SetBlockBasedTableFactory(bbtoOpts)
|
||||||
|
cfOpts.SetBlockBasedTableFactory(bbtoOpts)
|
||||||
|
dbOpts = overrideDBOpts(dbOpts, appOpts)
|
||||||
|
cfOpts = overrideCFOpts(cfOpts, appOpts)
|
||||||
|
|
||||||
|
enableMetrics := cast.ToBool(appOpts.Get(enableMetricsOptName))
|
||||||
|
reportMetricsIntervalSecs := cast.ToInt64(appOpts.Get(reportMetricsIntervalSecsOptName))
|
||||||
|
if reportMetricsIntervalSecs == 0 {
|
||||||
|
reportMetricsIntervalSecs = defaultReportMetricsIntervalSecs
|
||||||
|
}
|
||||||
|
|
||||||
|
return newRocksDBWithOptions("application", dir, dbOpts, cfOpts, enableMetrics, reportMetricsIntervalSecs)
|
||||||
|
}
|
||||||
|
|
||||||
|
// loadLatestOptions loads and returns database and column family options
|
||||||
|
// if options file not found, it means database isn't created yet, in such case default tm-db options will be returned
|
||||||
|
// if database exists it should have only one column family named default
|
||||||
|
func loadLatestOptions(dir string) (*grocksdb.Options, *grocksdb.Options, error) {
|
||||||
|
latestOpts, err := grocksdb.LoadLatestOptions(dir, grocksdb.NewDefaultEnv(), true, grocksdb.NewLRUCache(defaultBlockCacheSize))
|
||||||
|
if err != nil && strings.HasPrefix(err.Error(), "NotFound: ") {
|
||||||
|
return newDefaultOptions(), newDefaultOptions(), nil
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
cfNames := latestOpts.ColumnFamilyNames()
|
||||||
|
cfOpts := latestOpts.ColumnFamilyOpts()
|
||||||
|
// db should have only one column family named default
|
||||||
|
ok := len(cfNames) == 1 && cfNames[0] == defaultColumnFamilyName
|
||||||
|
if !ok {
|
||||||
|
return nil, nil, ErrUnexpectedConfiguration
|
||||||
|
}
|
||||||
|
|
||||||
|
// return db and cf opts
|
||||||
|
return latestOpts.Options(), &cfOpts[0], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// overrideDBOpts merges dbOpts and appOpts, appOpts takes precedence
|
||||||
|
func overrideDBOpts(dbOpts *grocksdb.Options, appOpts types.AppOptions) *grocksdb.Options {
|
||||||
|
maxOpenFiles := appOpts.Get(maxOpenFilesDBOptName)
|
||||||
|
if maxOpenFiles != nil {
|
||||||
|
dbOpts.SetMaxOpenFiles(cast.ToInt(maxOpenFiles))
|
||||||
|
}
|
||||||
|
|
||||||
|
maxFileOpeningThreads := appOpts.Get(maxFileOpeningThreadsDBOptName)
|
||||||
|
if maxFileOpeningThreads != nil {
|
||||||
|
dbOpts.SetMaxFileOpeningThreads(cast.ToInt(maxFileOpeningThreads))
|
||||||
|
}
|
||||||
|
|
||||||
|
tableCacheNumshardbits := appOpts.Get(tableCacheNumshardbitsDBOptName)
|
||||||
|
if tableCacheNumshardbits != nil {
|
||||||
|
dbOpts.SetTableCacheNumshardbits(cast.ToInt(tableCacheNumshardbits))
|
||||||
|
}
|
||||||
|
|
||||||
|
allowMMAPWrites := appOpts.Get(allowMMAPWritesDBOptName)
|
||||||
|
if allowMMAPWrites != nil {
|
||||||
|
dbOpts.SetAllowMmapWrites(cast.ToBool(allowMMAPWrites))
|
||||||
|
}
|
||||||
|
|
||||||
|
allowMMAPReads := appOpts.Get(allowMMAPReadsDBOptName)
|
||||||
|
if allowMMAPReads != nil {
|
||||||
|
dbOpts.SetAllowMmapReads(cast.ToBool(allowMMAPReads))
|
||||||
|
}
|
||||||
|
|
||||||
|
useFsync := appOpts.Get(useFsyncDBOptName)
|
||||||
|
if useFsync != nil {
|
||||||
|
dbOpts.SetUseFsync(cast.ToBool(useFsync))
|
||||||
|
}
|
||||||
|
|
||||||
|
useAdaptiveMutex := appOpts.Get(useAdaptiveMutexDBOptName)
|
||||||
|
if useAdaptiveMutex != nil {
|
||||||
|
dbOpts.SetUseAdaptiveMutex(cast.ToBool(useAdaptiveMutex))
|
||||||
|
}
|
||||||
|
|
||||||
|
bytesPerSync := appOpts.Get(bytesPerSyncDBOptName)
|
||||||
|
if bytesPerSync != nil {
|
||||||
|
dbOpts.SetBytesPerSync(cast.ToUint64(bytesPerSync))
|
||||||
|
}
|
||||||
|
|
||||||
|
return dbOpts
|
||||||
|
}
|
||||||
|
|
||||||
|
// overrideCFOpts merges cfOpts and appOpts, appOpts takes precedence
|
||||||
|
func overrideCFOpts(cfOpts *grocksdb.Options, appOpts types.AppOptions) *grocksdb.Options {
|
||||||
|
writeBufferSize := appOpts.Get(writeBufferSizeCFOptName)
|
||||||
|
if writeBufferSize != nil {
|
||||||
|
cfOpts.SetWriteBufferSize(cast.ToUint64(writeBufferSize))
|
||||||
|
}
|
||||||
|
|
||||||
|
numLevels := appOpts.Get(numLevelsCFOptName)
|
||||||
|
if numLevels != nil {
|
||||||
|
cfOpts.SetNumLevels(cast.ToInt(numLevels))
|
||||||
|
}
|
||||||
|
|
||||||
|
maxWriteBufferNumber := appOpts.Get(maxWriteBufferNumberCFOptName)
|
||||||
|
if maxWriteBufferNumber != nil {
|
||||||
|
cfOpts.SetMaxWriteBufferNumber(cast.ToInt(maxWriteBufferNumber))
|
||||||
|
}
|
||||||
|
|
||||||
|
minWriteBufferNumberToMerge := appOpts.Get(minWriteBufferNumberToMergeCFOptName)
|
||||||
|
if minWriteBufferNumberToMerge != nil {
|
||||||
|
cfOpts.SetMinWriteBufferNumberToMerge(cast.ToInt(minWriteBufferNumberToMerge))
|
||||||
|
}
|
||||||
|
|
||||||
|
maxBytesForLevelBase := appOpts.Get(maxBytesForLevelBaseCFOptName)
|
||||||
|
if maxBytesForLevelBase != nil {
|
||||||
|
cfOpts.SetMaxBytesForLevelBase(cast.ToUint64(maxBytesForLevelBase))
|
||||||
|
}
|
||||||
|
|
||||||
|
maxBytesForLevelMultiplier := appOpts.Get(maxBytesForLevelMultiplierCFOptName)
|
||||||
|
if maxBytesForLevelMultiplier != nil {
|
||||||
|
cfOpts.SetMaxBytesForLevelMultiplier(cast.ToFloat64(maxBytesForLevelMultiplier))
|
||||||
|
}
|
||||||
|
|
||||||
|
targetFileSizeBase := appOpts.Get(targetFileSizeBaseCFOptName)
|
||||||
|
if targetFileSizeBase != nil {
|
||||||
|
cfOpts.SetTargetFileSizeBase(cast.ToUint64(targetFileSizeBase))
|
||||||
|
}
|
||||||
|
|
||||||
|
targetFileSizeMultiplier := appOpts.Get(targetFileSizeMultiplierCFOptName)
|
||||||
|
if targetFileSizeMultiplier != nil {
|
||||||
|
cfOpts.SetTargetFileSizeMultiplier(cast.ToInt(targetFileSizeMultiplier))
|
||||||
|
}
|
||||||
|
|
||||||
|
level0FileNumCompactionTrigger := appOpts.Get(level0FileNumCompactionTriggerCFOptName)
|
||||||
|
if level0FileNumCompactionTrigger != nil {
|
||||||
|
cfOpts.SetLevel0FileNumCompactionTrigger(cast.ToInt(level0FileNumCompactionTrigger))
|
||||||
|
}
|
||||||
|
|
||||||
|
level0SlowdownWritesTrigger := appOpts.Get(level0SlowdownWritesTriggerCFOptName)
|
||||||
|
if level0SlowdownWritesTrigger != nil {
|
||||||
|
cfOpts.SetLevel0SlowdownWritesTrigger(cast.ToInt(level0SlowdownWritesTrigger))
|
||||||
|
}
|
||||||
|
|
||||||
|
return cfOpts
|
||||||
|
}
|
||||||
|
|
||||||
|
func bbtoFromAppOpts(appOpts types.AppOptions) *grocksdb.BlockBasedTableOptions {
|
||||||
|
bbto := defaultBBTO()
|
||||||
|
|
||||||
|
blockCacheSize := appOpts.Get(blockCacheSizeBBTOOptName)
|
||||||
|
if blockCacheSize != nil {
|
||||||
|
cache := grocksdb.NewLRUCache(cast.ToUint64(blockCacheSize))
|
||||||
|
bbto.SetBlockCache(cache)
|
||||||
|
}
|
||||||
|
|
||||||
|
bitsPerKey := appOpts.Get(bitsPerKeyBBTOOptName)
|
||||||
|
if bitsPerKey != nil {
|
||||||
|
filter := grocksdb.NewBloomFilter(cast.ToFloat64(bitsPerKey))
|
||||||
|
bbto.SetFilterPolicy(filter)
|
||||||
|
}
|
||||||
|
|
||||||
|
blockSize := appOpts.Get(blockSizeBBTOOptName)
|
||||||
|
if blockSize != nil {
|
||||||
|
bbto.SetBlockSize(cast.ToInt(blockSize))
|
||||||
|
}
|
||||||
|
|
||||||
|
cacheIndexAndFilterBlocks := appOpts.Get(cacheIndexAndFilterBlocksBBTOOptName)
|
||||||
|
if cacheIndexAndFilterBlocks != nil {
|
||||||
|
bbto.SetCacheIndexAndFilterBlocks(cast.ToBool(cacheIndexAndFilterBlocks))
|
||||||
|
}
|
||||||
|
|
||||||
|
pinL0FilterAndIndexBlocksInCache := appOpts.Get(pinL0FilterAndIndexBlocksInCacheBBTOOptName)
|
||||||
|
if pinL0FilterAndIndexBlocksInCache != nil {
|
||||||
|
bbto.SetPinL0FilterAndIndexBlocksInCache(cast.ToBool(pinL0FilterAndIndexBlocksInCache))
|
||||||
|
}
|
||||||
|
|
||||||
|
formatVersion := appOpts.Get(formatVersionBBTOOptName)
|
||||||
|
if formatVersion != nil {
|
||||||
|
bbto.SetFormatVersion(cast.ToInt(formatVersion))
|
||||||
|
}
|
||||||
|
|
||||||
|
return bbto
|
||||||
|
}
|
||||||
|
|
||||||
|
// newRocksDBWithOptions opens rocksdb with provided database and column family options
|
||||||
|
// newRocksDBWithOptions expects that db has only one column family named default
|
||||||
|
func newRocksDBWithOptions(
|
||||||
|
name string,
|
||||||
|
dir string,
|
||||||
|
dbOpts *grocksdb.Options,
|
||||||
|
cfOpts *grocksdb.Options,
|
||||||
|
enableMetrics bool,
|
||||||
|
reportMetricsIntervalSecs int64,
|
||||||
|
) (*dbm.RocksDB, error) {
|
||||||
|
dbPath := filepath.Join(dir, name+".db")
|
||||||
|
|
||||||
|
// Ensure path exists
|
||||||
|
if err := os.MkdirAll(dbPath, 0755); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create db path: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnableStatistics adds overhead so shouldn't be enabled in production
|
||||||
|
if enableMetrics {
|
||||||
|
dbOpts.EnableStatistics()
|
||||||
|
}
|
||||||
|
|
||||||
|
db, _, err := grocksdb.OpenDbColumnFamilies(dbOpts, dbPath, []string{defaultColumnFamilyName}, []*grocksdb.Options{cfOpts})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if enableMetrics {
|
||||||
|
registerMetrics()
|
||||||
|
go reportMetrics(db, time.Second*time.Duration(reportMetricsIntervalSecs))
|
||||||
|
}
|
||||||
|
|
||||||
|
ro := grocksdb.NewDefaultReadOptions()
|
||||||
|
wo := grocksdb.NewDefaultWriteOptions()
|
||||||
|
woSync := grocksdb.NewDefaultWriteOptions()
|
||||||
|
woSync.SetSync(true)
|
||||||
|
return dbm.NewRocksDBWithRawDB(db, ro, wo, woSync), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// newDefaultOptions returns default tm-db options for RocksDB, see for details:
|
||||||
|
// https://github.com/Kava-Labs/tm-db/blob/94ff76d31724965f8883cddebabe91e0d01bc03f/rocksdb.go#L30
|
||||||
|
func newDefaultOptions() *grocksdb.Options {
|
||||||
|
// default rocksdb option, good enough for most cases, including heavy workloads.
|
||||||
|
// 1GB table cache, 512MB write buffer(may use 50% more on heavy workloads).
|
||||||
|
// compression: snappy as default, need to -lsnappy to enable.
|
||||||
|
bbto := defaultBBTO()
|
||||||
|
|
||||||
|
opts := grocksdb.NewDefaultOptions()
|
||||||
|
opts.SetBlockBasedTableFactory(bbto)
|
||||||
|
// SetMaxOpenFiles to 4096 seems to provide a reliable performance boost
|
||||||
|
opts.SetMaxOpenFiles(4096)
|
||||||
|
opts.SetCreateIfMissing(true)
|
||||||
|
opts.IncreaseParallelism(runtime.NumCPU())
|
||||||
|
// 1.5GB maximum memory use for writebuffer.
|
||||||
|
opts.OptimizeLevelStyleCompaction(512 * 1024 * 1024)
|
||||||
|
|
||||||
|
return opts
|
||||||
|
}
|
||||||
|
|
||||||
|
// defaultBBTO returns default tm-db bbto options for RocksDB, see for details:
|
||||||
|
// https://github.com/Kava-Labs/tm-db/blob/94ff76d31724965f8883cddebabe91e0d01bc03f/rocksdb.go#L30
|
||||||
|
func defaultBBTO() *grocksdb.BlockBasedTableOptions {
|
||||||
|
bbto := grocksdb.NewDefaultBlockBasedTableOptions()
|
||||||
|
bbto.SetBlockCache(grocksdb.NewLRUCache(defaultBlockCacheSize))
|
||||||
|
bbto.SetFilterPolicy(grocksdb.NewBloomFilter(10))
|
||||||
|
|
||||||
|
return bbto
|
||||||
|
}
|
||||||
|
|
||||||
|
// reportMetrics periodically requests stats from rocksdb and reports to prometheus
|
||||||
|
// NOTE: should be launched as a goroutine
|
||||||
|
func reportMetrics(db *grocksdb.DB, interval time.Duration) {
|
||||||
|
ticker := time.NewTicker(interval)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
props, stats, err := getPropsAndStats(db)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
rocksdbMetrics.report(props, stats)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// getPropsAndStats gets statistics from rocksdb
|
||||||
|
func getPropsAndStats(db *grocksdb.DB) (*properties, *stats, error) {
|
||||||
|
propsLoader := newPropsLoader(db)
|
||||||
|
props, err := propsLoader.load()
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
statMap, err := parseSerializedStats(props.OptionsStatistics)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
statLoader := newStatLoader(statMap)
|
||||||
|
stats, err := statLoader.load()
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return props, stats, nil
|
||||||
|
}
|
357
cmd/kava/opendb/opendb_rocksdb_test.go
Normal file
357
cmd/kava/opendb/opendb_rocksdb_test.go
Normal file
@ -0,0 +1,357 @@
|
|||||||
|
//go:build rocksdb
|
||||||
|
// +build rocksdb
|
||||||
|
|
||||||
|
package opendb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/linxGnu/grocksdb"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type mockAppOptions struct {
|
||||||
|
opts map[string]interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMockAppOptions(opts map[string]interface{}) *mockAppOptions {
|
||||||
|
return &mockAppOptions{
|
||||||
|
opts: opts,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockAppOptions) Get(key string) interface{} {
|
||||||
|
return m.opts[key]
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOpenRocksdb(t *testing.T) {
|
||||||
|
t.Run("db already exists", func(t *testing.T) {
|
||||||
|
defaultOpts := newDefaultOptions()
|
||||||
|
|
||||||
|
for _, tc := range []struct {
|
||||||
|
desc string
|
||||||
|
mockAppOptions *mockAppOptions
|
||||||
|
maxOpenFiles int
|
||||||
|
maxFileOpeningThreads int
|
||||||
|
writeBufferSize uint64
|
||||||
|
numLevels int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "default options",
|
||||||
|
mockAppOptions: newMockAppOptions(map[string]interface{}{}),
|
||||||
|
maxOpenFiles: defaultOpts.GetMaxOpenFiles(),
|
||||||
|
maxFileOpeningThreads: defaultOpts.GetMaxFileOpeningThreads(),
|
||||||
|
writeBufferSize: defaultOpts.GetWriteBufferSize(),
|
||||||
|
numLevels: defaultOpts.GetNumLevels(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "change 2 options",
|
||||||
|
mockAppOptions: newMockAppOptions(map[string]interface{}{
|
||||||
|
maxOpenFilesDBOptName: 999,
|
||||||
|
writeBufferSizeCFOptName: 999_999,
|
||||||
|
}),
|
||||||
|
maxOpenFiles: 999,
|
||||||
|
maxFileOpeningThreads: defaultOpts.GetMaxFileOpeningThreads(),
|
||||||
|
writeBufferSize: 999_999,
|
||||||
|
numLevels: defaultOpts.GetNumLevels(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "change 4 options",
|
||||||
|
mockAppOptions: newMockAppOptions(map[string]interface{}{
|
||||||
|
maxOpenFilesDBOptName: 999,
|
||||||
|
maxFileOpeningThreadsDBOptName: 9,
|
||||||
|
writeBufferSizeCFOptName: 999_999,
|
||||||
|
numLevelsCFOptName: 9,
|
||||||
|
}),
|
||||||
|
maxOpenFiles: 999,
|
||||||
|
maxFileOpeningThreads: 9,
|
||||||
|
writeBufferSize: 999_999,
|
||||||
|
numLevels: 9,
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.desc, func(t *testing.T) {
|
||||||
|
dir, err := os.MkdirTemp("", "rocksdb")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() {
|
||||||
|
err := os.RemoveAll(dir)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}()
|
||||||
|
|
||||||
|
db, err := openRocksdb(dir, tc.mockAppOptions)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
|
||||||
|
dbOpts, cfOpts, err := loadLatestOptions(filepath.Join(dir, "application.db"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, tc.maxOpenFiles, dbOpts.GetMaxOpenFiles())
|
||||||
|
require.Equal(t, tc.maxFileOpeningThreads, dbOpts.GetMaxFileOpeningThreads())
|
||||||
|
require.Equal(t, tc.writeBufferSize, cfOpts.GetWriteBufferSize())
|
||||||
|
require.Equal(t, tc.numLevels, cfOpts.GetNumLevels())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("db doesn't exist yet", func(t *testing.T) {
|
||||||
|
defaultOpts := newDefaultOptions()
|
||||||
|
|
||||||
|
dir, err := os.MkdirTemp("", "rocksdb")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() {
|
||||||
|
err := os.RemoveAll(dir)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}()
|
||||||
|
|
||||||
|
mockAppOpts := newMockAppOptions(map[string]interface{}{})
|
||||||
|
db, err := openRocksdb(dir, mockAppOpts)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
|
||||||
|
dbOpts, cfOpts, err := loadLatestOptions(filepath.Join(dir, "application.db"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, defaultOpts.GetMaxOpenFiles(), dbOpts.GetMaxOpenFiles())
|
||||||
|
require.Equal(t, defaultOpts.GetMaxFileOpeningThreads(), dbOpts.GetMaxFileOpeningThreads())
|
||||||
|
require.Equal(t, defaultOpts.GetWriteBufferSize(), cfOpts.GetWriteBufferSize())
|
||||||
|
require.Equal(t, defaultOpts.GetNumLevels(), cfOpts.GetNumLevels())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLoadLatestOptions(t *testing.T) {
|
||||||
|
t.Run("db already exists", func(t *testing.T) {
|
||||||
|
defaultOpts := newDefaultOptions()
|
||||||
|
|
||||||
|
const testCasesNum = 3
|
||||||
|
dbOptsList := make([]*grocksdb.Options, testCasesNum)
|
||||||
|
cfOptsList := make([]*grocksdb.Options, testCasesNum)
|
||||||
|
|
||||||
|
dbOptsList[0] = newDefaultOptions()
|
||||||
|
cfOptsList[0] = newDefaultOptions()
|
||||||
|
|
||||||
|
dbOptsList[1] = newDefaultOptions()
|
||||||
|
dbOptsList[1].SetMaxOpenFiles(999)
|
||||||
|
cfOptsList[1] = newDefaultOptions()
|
||||||
|
cfOptsList[1].SetWriteBufferSize(999_999)
|
||||||
|
|
||||||
|
dbOptsList[2] = newDefaultOptions()
|
||||||
|
dbOptsList[2].SetMaxOpenFiles(999)
|
||||||
|
dbOptsList[2].SetMaxFileOpeningThreads(9)
|
||||||
|
cfOptsList[2] = newDefaultOptions()
|
||||||
|
cfOptsList[2].SetWriteBufferSize(999_999)
|
||||||
|
cfOptsList[2].SetNumLevels(9)
|
||||||
|
|
||||||
|
for _, tc := range []struct {
|
||||||
|
desc string
|
||||||
|
dbOpts *grocksdb.Options
|
||||||
|
cfOpts *grocksdb.Options
|
||||||
|
maxOpenFiles int
|
||||||
|
maxFileOpeningThreads int
|
||||||
|
writeBufferSize uint64
|
||||||
|
numLevels int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "default options",
|
||||||
|
dbOpts: dbOptsList[0],
|
||||||
|
cfOpts: cfOptsList[0],
|
||||||
|
maxOpenFiles: defaultOpts.GetMaxOpenFiles(),
|
||||||
|
maxFileOpeningThreads: defaultOpts.GetMaxFileOpeningThreads(),
|
||||||
|
writeBufferSize: defaultOpts.GetWriteBufferSize(),
|
||||||
|
numLevels: defaultOpts.GetNumLevels(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "change 2 options",
|
||||||
|
dbOpts: dbOptsList[1],
|
||||||
|
cfOpts: cfOptsList[1],
|
||||||
|
maxOpenFiles: 999,
|
||||||
|
maxFileOpeningThreads: defaultOpts.GetMaxFileOpeningThreads(),
|
||||||
|
writeBufferSize: 999_999,
|
||||||
|
numLevels: defaultOpts.GetNumLevels(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "change 4 options",
|
||||||
|
dbOpts: dbOptsList[2],
|
||||||
|
cfOpts: cfOptsList[2],
|
||||||
|
maxOpenFiles: 999,
|
||||||
|
maxFileOpeningThreads: 9,
|
||||||
|
writeBufferSize: 999_999,
|
||||||
|
numLevels: 9,
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.desc, func(t *testing.T) {
|
||||||
|
name := "application"
|
||||||
|
dir, err := os.MkdirTemp("", "rocksdb")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() {
|
||||||
|
err := os.RemoveAll(dir)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}()
|
||||||
|
|
||||||
|
db, err := newRocksDBWithOptions(name, dir, tc.dbOpts, tc.cfOpts, true, defaultReportMetricsIntervalSecs)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
|
||||||
|
dbOpts, cfOpts, err := loadLatestOptions(filepath.Join(dir, "application.db"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, tc.maxOpenFiles, dbOpts.GetMaxOpenFiles())
|
||||||
|
require.Equal(t, tc.maxFileOpeningThreads, dbOpts.GetMaxFileOpeningThreads())
|
||||||
|
require.Equal(t, tc.writeBufferSize, cfOpts.GetWriteBufferSize())
|
||||||
|
require.Equal(t, tc.numLevels, cfOpts.GetNumLevels())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("db doesn't exist yet", func(t *testing.T) {
|
||||||
|
defaultOpts := newDefaultOptions()
|
||||||
|
|
||||||
|
dir, err := os.MkdirTemp("", "rocksdb")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() {
|
||||||
|
err := os.RemoveAll(dir)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}()
|
||||||
|
|
||||||
|
dbOpts, cfOpts, err := loadLatestOptions(filepath.Join(dir, "application.db"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, defaultOpts.GetMaxOpenFiles(), dbOpts.GetMaxOpenFiles())
|
||||||
|
require.Equal(t, defaultOpts.GetMaxFileOpeningThreads(), dbOpts.GetMaxFileOpeningThreads())
|
||||||
|
require.Equal(t, defaultOpts.GetWriteBufferSize(), cfOpts.GetWriteBufferSize())
|
||||||
|
require.Equal(t, defaultOpts.GetNumLevels(), cfOpts.GetNumLevels())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOverrideDBOpts(t *testing.T) {
|
||||||
|
defaultOpts := newDefaultOptions()
|
||||||
|
|
||||||
|
for _, tc := range []struct {
|
||||||
|
desc string
|
||||||
|
mockAppOptions *mockAppOptions
|
||||||
|
maxOpenFiles int
|
||||||
|
maxFileOpeningThreads int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "override nothing",
|
||||||
|
mockAppOptions: newMockAppOptions(map[string]interface{}{}),
|
||||||
|
maxOpenFiles: defaultOpts.GetMaxOpenFiles(),
|
||||||
|
maxFileOpeningThreads: defaultOpts.GetMaxFileOpeningThreads(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "override max-open-files",
|
||||||
|
mockAppOptions: newMockAppOptions(map[string]interface{}{
|
||||||
|
maxOpenFilesDBOptName: 999,
|
||||||
|
}),
|
||||||
|
maxOpenFiles: 999,
|
||||||
|
maxFileOpeningThreads: defaultOpts.GetMaxFileOpeningThreads(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "override max-file-opening-threads",
|
||||||
|
mockAppOptions: newMockAppOptions(map[string]interface{}{
|
||||||
|
maxFileOpeningThreadsDBOptName: 9,
|
||||||
|
}),
|
||||||
|
maxOpenFiles: defaultOpts.GetMaxOpenFiles(),
|
||||||
|
maxFileOpeningThreads: 9,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "override max-open-files and max-file-opening-threads",
|
||||||
|
mockAppOptions: newMockAppOptions(map[string]interface{}{
|
||||||
|
maxOpenFilesDBOptName: 999,
|
||||||
|
maxFileOpeningThreadsDBOptName: 9,
|
||||||
|
}),
|
||||||
|
maxOpenFiles: 999,
|
||||||
|
maxFileOpeningThreads: 9,
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.desc, func(t *testing.T) {
|
||||||
|
dbOpts := newDefaultOptions()
|
||||||
|
dbOpts = overrideDBOpts(dbOpts, tc.mockAppOptions)
|
||||||
|
|
||||||
|
require.Equal(t, tc.maxOpenFiles, dbOpts.GetMaxOpenFiles())
|
||||||
|
require.Equal(t, tc.maxFileOpeningThreads, dbOpts.GetMaxFileOpeningThreads())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOverrideCFOpts(t *testing.T) {
|
||||||
|
defaultOpts := newDefaultOptions()
|
||||||
|
|
||||||
|
for _, tc := range []struct {
|
||||||
|
desc string
|
||||||
|
mockAppOptions *mockAppOptions
|
||||||
|
writeBufferSize uint64
|
||||||
|
numLevels int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "override nothing",
|
||||||
|
mockAppOptions: newMockAppOptions(map[string]interface{}{}),
|
||||||
|
writeBufferSize: defaultOpts.GetWriteBufferSize(),
|
||||||
|
numLevels: defaultOpts.GetNumLevels(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "override write-buffer-size",
|
||||||
|
mockAppOptions: newMockAppOptions(map[string]interface{}{
|
||||||
|
writeBufferSizeCFOptName: 999_999,
|
||||||
|
}),
|
||||||
|
writeBufferSize: 999_999,
|
||||||
|
numLevels: defaultOpts.GetNumLevels(),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "override num-levels",
|
||||||
|
mockAppOptions: newMockAppOptions(map[string]interface{}{
|
||||||
|
numLevelsCFOptName: 9,
|
||||||
|
}),
|
||||||
|
writeBufferSize: defaultOpts.GetWriteBufferSize(),
|
||||||
|
numLevels: 9,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "override write-buffer-size and num-levels",
|
||||||
|
mockAppOptions: newMockAppOptions(map[string]interface{}{
|
||||||
|
writeBufferSizeCFOptName: 999_999,
|
||||||
|
numLevelsCFOptName: 9,
|
||||||
|
}),
|
||||||
|
writeBufferSize: 999_999,
|
||||||
|
numLevels: 9,
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.desc, func(t *testing.T) {
|
||||||
|
cfOpts := newDefaultOptions()
|
||||||
|
cfOpts = overrideCFOpts(cfOpts, tc.mockAppOptions)
|
||||||
|
|
||||||
|
require.Equal(t, tc.writeBufferSize, cfOpts.GetWriteBufferSize())
|
||||||
|
require.Equal(t, tc.numLevels, cfOpts.GetNumLevels())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewRocksDBWithOptions(t *testing.T) {
|
||||||
|
defaultOpts := newDefaultOptions()
|
||||||
|
|
||||||
|
name := "application"
|
||||||
|
dir, err := os.MkdirTemp("", "rocksdb")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() {
|
||||||
|
err := os.RemoveAll(dir)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}()
|
||||||
|
|
||||||
|
dbOpts := newDefaultOptions()
|
||||||
|
dbOpts.SetMaxOpenFiles(999)
|
||||||
|
cfOpts := newDefaultOptions()
|
||||||
|
cfOpts.SetWriteBufferSize(999_999)
|
||||||
|
|
||||||
|
db, err := newRocksDBWithOptions(name, dir, dbOpts, cfOpts, true, defaultReportMetricsIntervalSecs)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
|
||||||
|
dbOpts, cfOpts, err = loadLatestOptions(filepath.Join(dir, "application.db"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 999, dbOpts.GetMaxOpenFiles())
|
||||||
|
require.Equal(t, defaultOpts.GetMaxFileOpeningThreads(), dbOpts.GetMaxFileOpeningThreads())
|
||||||
|
require.Equal(t, uint64(999_999), cfOpts.GetWriteBufferSize())
|
||||||
|
require.Equal(t, defaultOpts.GetNumLevels(), dbOpts.GetNumLevels())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewDefaultOptions(t *testing.T) {
|
||||||
|
defaultOpts := newDefaultOptions()
|
||||||
|
|
||||||
|
maxOpenFiles := defaultOpts.GetMaxOpenFiles()
|
||||||
|
require.Equal(t, 4096, maxOpenFiles)
|
||||||
|
}
|
87
cmd/kava/opendb/props_loader.go
Normal file
87
cmd/kava/opendb/props_loader.go
Normal file
@ -0,0 +1,87 @@
|
|||||||
|
//go:build rocksdb
|
||||||
|
// +build rocksdb
|
||||||
|
|
||||||
|
package opendb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type propsGetter interface {
|
||||||
|
GetProperty(propName string) (value string)
|
||||||
|
GetIntProperty(propName string) (value uint64, success bool)
|
||||||
|
}
|
||||||
|
|
||||||
|
type propsLoader struct {
|
||||||
|
db propsGetter
|
||||||
|
errorMsgs []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPropsLoader(db propsGetter) *propsLoader {
|
||||||
|
return &propsLoader{
|
||||||
|
db: db,
|
||||||
|
errorMsgs: make([]string, 0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *propsLoader) load() (*properties, error) {
|
||||||
|
props := &properties{
|
||||||
|
BaseLevel: l.getIntProperty("rocksdb.base-level"),
|
||||||
|
BlockCacheCapacity: l.getIntProperty("rocksdb.block-cache-capacity"),
|
||||||
|
BlockCachePinnedUsage: l.getIntProperty("rocksdb.block-cache-pinned-usage"),
|
||||||
|
BlockCacheUsage: l.getIntProperty("rocksdb.block-cache-usage"),
|
||||||
|
CurSizeActiveMemTable: l.getIntProperty("rocksdb.cur-size-active-mem-table"),
|
||||||
|
CurSizeAllMemTables: l.getIntProperty("rocksdb.cur-size-all-mem-tables"),
|
||||||
|
EstimateLiveDataSize: l.getIntProperty("rocksdb.estimate-live-data-size"),
|
||||||
|
EstimateNumKeys: l.getIntProperty("rocksdb.estimate-num-keys"),
|
||||||
|
EstimateTableReadersMem: l.getIntProperty("rocksdb.estimate-table-readers-mem"),
|
||||||
|
LiveSSTFilesSize: l.getIntProperty("rocksdb.live-sst-files-size"),
|
||||||
|
SizeAllMemTables: l.getIntProperty("rocksdb.size-all-mem-tables"),
|
||||||
|
OptionsStatistics: l.getProperty("rocksdb.options-statistics"),
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(l.errorMsgs) != 0 {
|
||||||
|
errorMsg := strings.Join(l.errorMsgs, ";")
|
||||||
|
return nil, errors.New(errorMsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
return props, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *propsLoader) getProperty(propName string) string {
|
||||||
|
value := l.db.GetProperty(propName)
|
||||||
|
if value == "" {
|
||||||
|
l.errorMsgs = append(l.errorMsgs, fmt.Sprintf("property %v is empty", propName))
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *propsLoader) getIntProperty(propName string) uint64 {
|
||||||
|
value, ok := l.db.GetIntProperty(propName)
|
||||||
|
if !ok {
|
||||||
|
l.errorMsgs = append(l.errorMsgs, fmt.Sprintf("can't get %v int property", propName))
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
|
||||||
|
type properties struct {
|
||||||
|
BaseLevel uint64
|
||||||
|
BlockCacheCapacity uint64
|
||||||
|
BlockCachePinnedUsage uint64
|
||||||
|
BlockCacheUsage uint64
|
||||||
|
CurSizeActiveMemTable uint64
|
||||||
|
CurSizeAllMemTables uint64
|
||||||
|
EstimateLiveDataSize uint64
|
||||||
|
EstimateNumKeys uint64
|
||||||
|
EstimateTableReadersMem uint64
|
||||||
|
LiveSSTFilesSize uint64
|
||||||
|
SizeAllMemTables uint64
|
||||||
|
OptionsStatistics string
|
||||||
|
}
|
112
cmd/kava/opendb/props_loader_test.go
Normal file
112
cmd/kava/opendb/props_loader_test.go
Normal file
@ -0,0 +1,112 @@
|
|||||||
|
//go:build rocksdb
|
||||||
|
// +build rocksdb
|
||||||
|
|
||||||
|
package opendb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type mockPropsGetter struct {
|
||||||
|
props map[string]string
|
||||||
|
intProps map[string]uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMockPropsGetter(
|
||||||
|
props map[string]string,
|
||||||
|
intProps map[string]uint64,
|
||||||
|
) *mockPropsGetter {
|
||||||
|
return &mockPropsGetter{
|
||||||
|
props: props,
|
||||||
|
intProps: intProps,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockPropsGetter) GetProperty(propName string) string {
|
||||||
|
return m.props[propName]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockPropsGetter) GetIntProperty(propName string) (uint64, bool) {
|
||||||
|
prop, ok := m.intProps[propName]
|
||||||
|
return prop, ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPropsLoader(t *testing.T) {
|
||||||
|
defaultProps := map[string]string{
|
||||||
|
"rocksdb.options-statistics": "1",
|
||||||
|
}
|
||||||
|
defaultIntProps := map[string]uint64{
|
||||||
|
"rocksdb.base-level": 1,
|
||||||
|
"rocksdb.block-cache-capacity": 2,
|
||||||
|
"rocksdb.block-cache-pinned-usage": 3,
|
||||||
|
"rocksdb.block-cache-usage": 4,
|
||||||
|
"rocksdb.cur-size-active-mem-table": 5,
|
||||||
|
"rocksdb.cur-size-all-mem-tables": 6,
|
||||||
|
"rocksdb.estimate-live-data-size": 7,
|
||||||
|
"rocksdb.estimate-num-keys": 8,
|
||||||
|
"rocksdb.estimate-table-readers-mem": 9,
|
||||||
|
"rocksdb.live-sst-files-size": 10,
|
||||||
|
"rocksdb.size-all-mem-tables": 11,
|
||||||
|
}
|
||||||
|
missingProps := make(map[string]string)
|
||||||
|
missingIntProps := make(map[string]uint64)
|
||||||
|
defaultExpectedProps := properties{
|
||||||
|
BaseLevel: 1,
|
||||||
|
BlockCacheCapacity: 2,
|
||||||
|
BlockCachePinnedUsage: 3,
|
||||||
|
BlockCacheUsage: 4,
|
||||||
|
CurSizeActiveMemTable: 5,
|
||||||
|
CurSizeAllMemTables: 6,
|
||||||
|
EstimateLiveDataSize: 7,
|
||||||
|
EstimateNumKeys: 8,
|
||||||
|
EstimateTableReadersMem: 9,
|
||||||
|
LiveSSTFilesSize: 10,
|
||||||
|
SizeAllMemTables: 11,
|
||||||
|
OptionsStatistics: "1",
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range []struct {
|
||||||
|
desc string
|
||||||
|
props map[string]string
|
||||||
|
intProps map[string]uint64
|
||||||
|
expectedProps *properties
|
||||||
|
success bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "success case",
|
||||||
|
props: defaultProps,
|
||||||
|
intProps: defaultIntProps,
|
||||||
|
expectedProps: &defaultExpectedProps,
|
||||||
|
success: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "missing props",
|
||||||
|
props: missingProps,
|
||||||
|
intProps: defaultIntProps,
|
||||||
|
expectedProps: nil,
|
||||||
|
success: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "missing integer props",
|
||||||
|
props: defaultProps,
|
||||||
|
intProps: missingIntProps,
|
||||||
|
expectedProps: nil,
|
||||||
|
success: false,
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.desc, func(t *testing.T) {
|
||||||
|
mockPropsGetter := newMockPropsGetter(tc.props, tc.intProps)
|
||||||
|
|
||||||
|
propsLoader := newPropsLoader(mockPropsGetter)
|
||||||
|
actualProps, err := propsLoader.load()
|
||||||
|
if tc.success {
|
||||||
|
require.NoError(t, err)
|
||||||
|
} else {
|
||||||
|
require.Error(t, err)
|
||||||
|
}
|
||||||
|
require.Equal(t, tc.expectedProps, actualProps)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
111
cmd/kava/opendb/stat_parser.go
Normal file
111
cmd/kava/opendb/stat_parser.go
Normal file
@ -0,0 +1,111 @@
|
|||||||
|
//go:build rocksdb
|
||||||
|
// +build rocksdb
|
||||||
|
|
||||||
|
package opendb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
// stat represents one line from rocksdb statistics data, stat may have one or more properties
|
||||||
|
// examples:
|
||||||
|
// - rocksdb.block.cache.miss COUNT : 5
|
||||||
|
// - rocksdb.compaction.times.micros P50 : 21112 P95 : 21112 P99 : 21112 P100 : 21112 COUNT : 1 SUM : 21112
|
||||||
|
// `rocksdb.compaction.times.micros` is name of stat, P50, COUNT, SUM, etc... are props of stat
|
||||||
|
type stat struct {
|
||||||
|
name string
|
||||||
|
props map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseSerializedStats parses serialisedStats into map of stat objects
|
||||||
|
// example of serializedStats:
|
||||||
|
// rocksdb.block.cache.miss COUNT : 5
|
||||||
|
// rocksdb.compaction.times.micros P50 : 21112 P95 : 21112 P99 : 21112 P100 : 21112 COUNT : 1 SUM : 21112
|
||||||
|
func parseSerializedStats(serializedStats string) (map[string]*stat, error) {
|
||||||
|
stats := make(map[string]*stat, 0)
|
||||||
|
|
||||||
|
serializedStatList := strings.Split(serializedStats, "\n")
|
||||||
|
if len(serializedStatList) == 0 {
|
||||||
|
return nil, errors.New("serializedStats is empty")
|
||||||
|
}
|
||||||
|
serializedStatList = serializedStatList[:len(serializedStatList)-1]
|
||||||
|
// iterate over stats line by line
|
||||||
|
for _, serializedStat := range serializedStatList {
|
||||||
|
stat, err := parseSerializedStat(serializedStat)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
stats[stat.name] = stat
|
||||||
|
}
|
||||||
|
|
||||||
|
return stats, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseSerializedStat parses serialisedStat into stat object
|
||||||
|
// example of serializedStat:
|
||||||
|
// rocksdb.block.cache.miss COUNT : 5
|
||||||
|
func parseSerializedStat(serializedStat string) (*stat, error) {
|
||||||
|
tokens := strings.Split(serializedStat, " ")
|
||||||
|
tokensNum := len(tokens)
|
||||||
|
if err := validateTokens(tokens); err != nil {
|
||||||
|
return nil, fmt.Errorf("tokens are invalid: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
props := make(map[string]string)
|
||||||
|
for idx := 1; idx < tokensNum; idx += 3 {
|
||||||
|
// never should happen, but double check to avoid unexpected panic
|
||||||
|
if idx+2 >= tokensNum {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
key := tokens[idx]
|
||||||
|
sep := tokens[idx+1]
|
||||||
|
value := tokens[idx+2]
|
||||||
|
|
||||||
|
if err := validateStatProperty(key, value, sep); err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid stat property: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
props[key] = value
|
||||||
|
}
|
||||||
|
|
||||||
|
return &stat{
|
||||||
|
name: tokens[0],
|
||||||
|
props: props,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// validateTokens validates that tokens contains name + N triples (key, sep, value)
|
||||||
|
func validateTokens(tokens []string) error {
|
||||||
|
tokensNum := len(tokens)
|
||||||
|
if tokensNum < 4 {
|
||||||
|
return fmt.Errorf("invalid number of tokens: %v, tokens: %v", tokensNum, tokens)
|
||||||
|
}
|
||||||
|
if (tokensNum-1)%3 != 0 {
|
||||||
|
return fmt.Errorf("invalid number of tokens: %v, tokens: %v", tokensNum, tokens)
|
||||||
|
}
|
||||||
|
if tokens[0] == "" {
|
||||||
|
return fmt.Errorf("stat name shouldn't be empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// validateStatProperty validates that key and value are divided by separator and aren't empty
|
||||||
|
func validateStatProperty(key, value, sep string) error {
|
||||||
|
if key == "" {
|
||||||
|
return fmt.Errorf("key shouldn't be empty")
|
||||||
|
}
|
||||||
|
if sep != ":" {
|
||||||
|
return fmt.Errorf("separator should be :")
|
||||||
|
}
|
||||||
|
if value == "" {
|
||||||
|
return fmt.Errorf("value shouldn't be empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
208
cmd/kava/opendb/stat_parser_test.go
Normal file
208
cmd/kava/opendb/stat_parser_test.go
Normal file
@ -0,0 +1,208 @@
|
|||||||
|
//go:build rocksdb
|
||||||
|
// +build rocksdb
|
||||||
|
|
||||||
|
package opendb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestParseSerializedStats(t *testing.T) {
|
||||||
|
defaultSerializedStats := `rocksdb.block.cache.miss COUNT : 1
|
||||||
|
rocksdb.block.cache.hit COUNT : 2
|
||||||
|
rocksdb.block.cache.add COUNT : 3
|
||||||
|
rocksdb.block.cache.add.failures COUNT : 4
|
||||||
|
rocksdb.compaction.times.micros P50 : 1 P95 : 2 P99 : 3 P100 : 4 COUNT : 5 SUM : 6
|
||||||
|
rocksdb.compaction.times.cpu_micros P50 : 7 P95 : 8 P99 : 9 P100 : 10 COUNT : 11 SUM : 12
|
||||||
|
`
|
||||||
|
defaultExpectedStatMap := map[string]*stat{
|
||||||
|
"rocksdb.block.cache.miss": {
|
||||||
|
name: "rocksdb.block.cache.miss",
|
||||||
|
props: map[string]string{
|
||||||
|
"COUNT": "1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"rocksdb.block.cache.hit": {
|
||||||
|
name: "rocksdb.block.cache.hit",
|
||||||
|
props: map[string]string{
|
||||||
|
"COUNT": "2",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"rocksdb.block.cache.add": {
|
||||||
|
name: "rocksdb.block.cache.add",
|
||||||
|
props: map[string]string{
|
||||||
|
"COUNT": "3",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"rocksdb.block.cache.add.failures": {
|
||||||
|
name: "rocksdb.block.cache.add.failures",
|
||||||
|
props: map[string]string{
|
||||||
|
"COUNT": "4",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"rocksdb.compaction.times.micros": {
|
||||||
|
name: "rocksdb.compaction.times.micros",
|
||||||
|
props: map[string]string{
|
||||||
|
"P50": "1",
|
||||||
|
"P95": "2",
|
||||||
|
"P99": "3",
|
||||||
|
"P100": "4",
|
||||||
|
"COUNT": "5",
|
||||||
|
"SUM": "6",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"rocksdb.compaction.times.cpu_micros": {
|
||||||
|
name: "rocksdb.compaction.times.cpu_micros",
|
||||||
|
props: map[string]string{
|
||||||
|
"P50": "7",
|
||||||
|
"P95": "8",
|
||||||
|
"P99": "9",
|
||||||
|
"P100": "10",
|
||||||
|
"COUNT": "11",
|
||||||
|
"SUM": "12",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range []struct {
|
||||||
|
desc string
|
||||||
|
serializedStats string
|
||||||
|
expectedStatMap map[string]*stat
|
||||||
|
errMsg string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "success case",
|
||||||
|
serializedStats: defaultSerializedStats,
|
||||||
|
expectedStatMap: defaultExpectedStatMap,
|
||||||
|
errMsg: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "missing value #1",
|
||||||
|
serializedStats: `rocksdb.block.cache.miss COUNT :
|
||||||
|
`,
|
||||||
|
expectedStatMap: nil,
|
||||||
|
errMsg: "invalid number of tokens",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "missing value #2",
|
||||||
|
serializedStats: `rocksdb.compaction.times.micros P50 : 1 P95 :
|
||||||
|
`,
|
||||||
|
expectedStatMap: nil,
|
||||||
|
errMsg: "invalid number of tokens",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "missing stat name",
|
||||||
|
serializedStats: ` COUNT : 1
|
||||||
|
`,
|
||||||
|
expectedStatMap: nil,
|
||||||
|
errMsg: "stat name shouldn't be empty",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "empty stat",
|
||||||
|
serializedStats: ``,
|
||||||
|
expectedStatMap: make(map[string]*stat),
|
||||||
|
errMsg: "",
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.desc, func(t *testing.T) {
|
||||||
|
actualStatMap, err := parseSerializedStats(tc.serializedStats)
|
||||||
|
if tc.errMsg == "" {
|
||||||
|
require.NoError(t, err)
|
||||||
|
} else {
|
||||||
|
require.Error(t, err)
|
||||||
|
require.Contains(t, err.Error(), tc.errMsg)
|
||||||
|
}
|
||||||
|
require.Equal(t, tc.expectedStatMap, actualStatMap)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestValidateTokens(t *testing.T) {
|
||||||
|
for _, tc := range []struct {
|
||||||
|
desc string
|
||||||
|
tokens []string
|
||||||
|
errMsg string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "success case",
|
||||||
|
tokens: []string{"name", "key", ":", "value"},
|
||||||
|
errMsg: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "missing value #1",
|
||||||
|
tokens: []string{"name", "key", ":"},
|
||||||
|
errMsg: "invalid number of tokens",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "missing value #2",
|
||||||
|
tokens: []string{"name", "key", ":", "value", "key2", ":"},
|
||||||
|
errMsg: "invalid number of tokens",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "empty stat name",
|
||||||
|
tokens: []string{"", "key", ":", "value"},
|
||||||
|
errMsg: "stat name shouldn't be empty",
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.desc, func(t *testing.T) {
|
||||||
|
err := validateTokens(tc.tokens)
|
||||||
|
if tc.errMsg == "" {
|
||||||
|
require.NoError(t, err)
|
||||||
|
} else {
|
||||||
|
require.Error(t, err)
|
||||||
|
require.Contains(t, err.Error(), tc.errMsg)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestValidateStatProperty(t *testing.T) {
|
||||||
|
for _, tc := range []struct {
|
||||||
|
desc string
|
||||||
|
key string
|
||||||
|
value string
|
||||||
|
sep string
|
||||||
|
errMsg string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "success case",
|
||||||
|
key: "key",
|
||||||
|
value: "value",
|
||||||
|
sep: ":",
|
||||||
|
errMsg: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "missing key",
|
||||||
|
key: "",
|
||||||
|
value: "value",
|
||||||
|
sep: ":",
|
||||||
|
errMsg: "key shouldn't be empty",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "missing value",
|
||||||
|
key: "key",
|
||||||
|
value: "",
|
||||||
|
sep: ":",
|
||||||
|
errMsg: "value shouldn't be empty",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "invalid separator",
|
||||||
|
key: "key",
|
||||||
|
value: "value",
|
||||||
|
sep: "#",
|
||||||
|
errMsg: "separator should be :",
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.desc, func(t *testing.T) {
|
||||||
|
err := validateStatProperty(tc.key, tc.value, tc.sep)
|
||||||
|
if tc.errMsg == "" {
|
||||||
|
require.NoError(t, err)
|
||||||
|
} else {
|
||||||
|
require.Error(t, err)
|
||||||
|
require.Contains(t, err.Error(), tc.errMsg)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
263
cmd/kava/opendb/stats_loader.go
Normal file
263
cmd/kava/opendb/stats_loader.go
Normal file
@ -0,0 +1,263 @@
|
|||||||
|
//go:build rocksdb
|
||||||
|
// +build rocksdb
|
||||||
|
|
||||||
|
package opendb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
sum = "SUM"
|
||||||
|
count = "COUNT"
|
||||||
|
p50 = "P50"
|
||||||
|
p95 = "P95"
|
||||||
|
p99 = "P99"
|
||||||
|
p100 = "P100"
|
||||||
|
)
|
||||||
|
|
||||||
|
type statLoader struct {
|
||||||
|
// statMap contains map of stat objects returned by parseSerializedStats function
|
||||||
|
// example of stats:
|
||||||
|
// #1: rocksdb.block.cache.miss COUNT : 5
|
||||||
|
// #2: rocksdb.compaction.times.micros P50 : 21112 P95 : 21112 P99 : 21112 P100 : 21112 COUNT : 1 SUM : 21112
|
||||||
|
// #1 case will be cast into int64
|
||||||
|
// #2 case will be cast into float64Histogram
|
||||||
|
statMap map[string]*stat
|
||||||
|
|
||||||
|
// NOTE: some methods accumulate errors instead of returning them, these methods are private and not intended to use outside
|
||||||
|
errors []error
|
||||||
|
}
|
||||||
|
|
||||||
|
func newStatLoader(statMap map[string]*stat) *statLoader {
|
||||||
|
return &statLoader{
|
||||||
|
statMap: statMap,
|
||||||
|
errors: make([]error, 0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type stats struct {
|
||||||
|
NumberKeysWritten int64
|
||||||
|
NumberKeysRead int64
|
||||||
|
NumberKeysUpdated int64
|
||||||
|
|
||||||
|
// total block cache misses
|
||||||
|
// BLOCK_CACHE_MISS == BLOCK_CACHE_INDEX_MISS +
|
||||||
|
// BLOCK_CACHE_FILTER_MISS +
|
||||||
|
// BLOCK_CACHE_DATA_MISS;
|
||||||
|
// BLOCK_CACHE_INDEX_MISS: # of times cache miss when accessing index block from block cache.
|
||||||
|
// BLOCK_CACHE_FILTER_MISS: # of times cache miss when accessing filter block from block cache.
|
||||||
|
// BLOCK_CACHE_DATA_MISS: # of times cache miss when accessing data block from block cache.
|
||||||
|
BlockCacheMiss int64
|
||||||
|
|
||||||
|
// total block cache hit
|
||||||
|
// BLOCK_CACHE_HIT == BLOCK_CACHE_INDEX_HIT +
|
||||||
|
// BLOCK_CACHE_FILTER_HIT +
|
||||||
|
// BLOCK_CACHE_DATA_HIT;
|
||||||
|
// BLOCK_CACHE_INDEX_HIT: # of times cache hit when accessing index block from block cache.
|
||||||
|
// BLOCK_CACHE_FILTER_HIT: # of times cache hit when accessing filter block from block cache.
|
||||||
|
// BLOCK_CACHE_DATA_HIT: # of times cache hit when accessing data block from block cache.
|
||||||
|
BlockCacheHit int64
|
||||||
|
|
||||||
|
// # of blocks added to block cache.
|
||||||
|
BlockCacheAdd int64
|
||||||
|
// # of failures when adding blocks to block cache.
|
||||||
|
BlockCacheAddFailures int64
|
||||||
|
|
||||||
|
CompactReadBytes int64 // Bytes read during compaction
|
||||||
|
CompactWriteBytes int64 // Bytes written during compaction
|
||||||
|
|
||||||
|
CompactionTimesMicros *float64Histogram
|
||||||
|
CompactionTimesCPUMicros *float64Histogram
|
||||||
|
NumFilesInSingleCompaction *float64Histogram
|
||||||
|
|
||||||
|
// Read amplification statistics.
|
||||||
|
// Read amplification can be calculated using this formula
|
||||||
|
// (READ_AMP_TOTAL_READ_BYTES / READ_AMP_ESTIMATE_USEFUL_BYTES)
|
||||||
|
//
|
||||||
|
// REQUIRES: ReadOptions::read_amp_bytes_per_bit to be enabled
|
||||||
|
// TODO(yevhenii): seems not working?
|
||||||
|
ReadAmpEstimateUsefulBytes int64 // Estimate of total bytes actually used.
|
||||||
|
ReadAmpTotalReadBytes int64 // Total size of loaded data blocks.
|
||||||
|
|
||||||
|
NumberFileOpens int64
|
||||||
|
NumberFileErrors int64
|
||||||
|
|
||||||
|
// # of times bloom filter has avoided file reads, i.e., negatives.
|
||||||
|
BloomFilterUseful int64
|
||||||
|
// # of times bloom FullFilter has not avoided the reads.
|
||||||
|
BloomFilterFullPositive int64
|
||||||
|
// # of times bloom FullFilter has not avoided the reads and data actually
|
||||||
|
// exist.
|
||||||
|
BloomFilterFullTruePositive int64
|
||||||
|
|
||||||
|
// # of memtable hits.
|
||||||
|
MemtableHit int64
|
||||||
|
// # of memtable misses.
|
||||||
|
MemtableMiss int64
|
||||||
|
|
||||||
|
// # of Get() queries served by L0
|
||||||
|
GetHitL0 int64
|
||||||
|
// # of Get() queries served by L1
|
||||||
|
GetHitL1 int64
|
||||||
|
// # of Get() queries served by L2 and up
|
||||||
|
GetHitL2AndUp int64
|
||||||
|
|
||||||
|
// The number of uncompressed bytes issued by DB::Put(), DB::Delete(),
|
||||||
|
// DB::Merge(), and DB::Write().
|
||||||
|
BytesWritten int64
|
||||||
|
// The number of uncompressed bytes read from DB::Get(). It could be
|
||||||
|
// either from memtables, cache, or table files.
|
||||||
|
// For the number of logical bytes read from DB::MultiGet(),
|
||||||
|
// please use NUMBER_MULTIGET_BYTES_READ.
|
||||||
|
BytesRead int64
|
||||||
|
|
||||||
|
// Writer has to wait for compaction or flush to finish.
|
||||||
|
StallMicros int64
|
||||||
|
|
||||||
|
// Last level and non-last level read statistics
|
||||||
|
LastLevelReadBytes int64
|
||||||
|
LastLevelReadCount int64
|
||||||
|
NonLastLevelReadBytes int64
|
||||||
|
NonLastLevelReadCount int64
|
||||||
|
|
||||||
|
DBGetMicros *float64Histogram
|
||||||
|
DBWriteMicros *float64Histogram
|
||||||
|
|
||||||
|
// Value size distribution in each operation
|
||||||
|
BytesPerRead *float64Histogram
|
||||||
|
BytesPerWrite *float64Histogram
|
||||||
|
BytesPerMultiget *float64Histogram
|
||||||
|
|
||||||
|
// Time spent flushing memtable to disk
|
||||||
|
FlushMicros *float64Histogram
|
||||||
|
}
|
||||||
|
|
||||||
|
type float64Histogram struct {
|
||||||
|
Sum float64
|
||||||
|
Count float64
|
||||||
|
P50 float64
|
||||||
|
P95 float64
|
||||||
|
P99 float64
|
||||||
|
P100 float64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *statLoader) error() error {
|
||||||
|
if len(l.errors) != 0 {
|
||||||
|
return fmt.Errorf("%v", l.errors)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *statLoader) load() (*stats, error) {
|
||||||
|
stats := &stats{
|
||||||
|
NumberKeysWritten: l.getInt64StatValue("rocksdb.number.keys.written", count),
|
||||||
|
NumberKeysRead: l.getInt64StatValue("rocksdb.number.keys.read", count),
|
||||||
|
NumberKeysUpdated: l.getInt64StatValue("rocksdb.number.keys.updated", count),
|
||||||
|
BlockCacheMiss: l.getInt64StatValue("rocksdb.block.cache.miss", count),
|
||||||
|
BlockCacheHit: l.getInt64StatValue("rocksdb.block.cache.hit", count),
|
||||||
|
BlockCacheAdd: l.getInt64StatValue("rocksdb.block.cache.add", count),
|
||||||
|
BlockCacheAddFailures: l.getInt64StatValue("rocksdb.block.cache.add.failures", count),
|
||||||
|
CompactReadBytes: l.getInt64StatValue("rocksdb.compact.read.bytes", count),
|
||||||
|
CompactWriteBytes: l.getInt64StatValue("rocksdb.compact.write.bytes", count),
|
||||||
|
CompactionTimesMicros: l.getFloat64HistogramStatValue("rocksdb.compaction.times.micros"),
|
||||||
|
CompactionTimesCPUMicros: l.getFloat64HistogramStatValue("rocksdb.compaction.times.cpu_micros"),
|
||||||
|
NumFilesInSingleCompaction: l.getFloat64HistogramStatValue("rocksdb.numfiles.in.singlecompaction"),
|
||||||
|
ReadAmpEstimateUsefulBytes: l.getInt64StatValue("rocksdb.read.amp.estimate.useful.bytes", count),
|
||||||
|
ReadAmpTotalReadBytes: l.getInt64StatValue("rocksdb.read.amp.total.read.bytes", count),
|
||||||
|
NumberFileOpens: l.getInt64StatValue("rocksdb.no.file.opens", count),
|
||||||
|
NumberFileErrors: l.getInt64StatValue("rocksdb.no.file.errors", count),
|
||||||
|
BloomFilterUseful: l.getInt64StatValue("rocksdb.bloom.filter.useful", count),
|
||||||
|
BloomFilterFullPositive: l.getInt64StatValue("rocksdb.bloom.filter.full.positive", count),
|
||||||
|
BloomFilterFullTruePositive: l.getInt64StatValue("rocksdb.bloom.filter.full.true.positive", count),
|
||||||
|
MemtableHit: l.getInt64StatValue("rocksdb.memtable.hit", count),
|
||||||
|
MemtableMiss: l.getInt64StatValue("rocksdb.memtable.miss", count),
|
||||||
|
GetHitL0: l.getInt64StatValue("rocksdb.l0.hit", count),
|
||||||
|
GetHitL1: l.getInt64StatValue("rocksdb.l1.hit", count),
|
||||||
|
GetHitL2AndUp: l.getInt64StatValue("rocksdb.l2andup.hit", count),
|
||||||
|
BytesWritten: l.getInt64StatValue("rocksdb.bytes.written", count),
|
||||||
|
BytesRead: l.getInt64StatValue("rocksdb.bytes.read", count),
|
||||||
|
StallMicros: l.getInt64StatValue("rocksdb.stall.micros", count),
|
||||||
|
LastLevelReadBytes: l.getInt64StatValue("rocksdb.last.level.read.bytes", count),
|
||||||
|
LastLevelReadCount: l.getInt64StatValue("rocksdb.last.level.read.count", count),
|
||||||
|
NonLastLevelReadBytes: l.getInt64StatValue("rocksdb.non.last.level.read.bytes", count),
|
||||||
|
NonLastLevelReadCount: l.getInt64StatValue("rocksdb.non.last.level.read.count", count),
|
||||||
|
DBGetMicros: l.getFloat64HistogramStatValue("rocksdb.db.get.micros"),
|
||||||
|
DBWriteMicros: l.getFloat64HistogramStatValue("rocksdb.db.write.micros"),
|
||||||
|
BytesPerRead: l.getFloat64HistogramStatValue("rocksdb.bytes.per.read"),
|
||||||
|
BytesPerWrite: l.getFloat64HistogramStatValue("rocksdb.bytes.per.write"),
|
||||||
|
BytesPerMultiget: l.getFloat64HistogramStatValue("rocksdb.bytes.per.multiget"),
|
||||||
|
FlushMicros: l.getFloat64HistogramStatValue("rocksdb.db.flush.micros"),
|
||||||
|
}
|
||||||
|
|
||||||
|
err := l.error()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return stats, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getFloat64HistogramStatValue converts stat object into float64Histogram
|
||||||
|
func (l *statLoader) getFloat64HistogramStatValue(statName string) *float64Histogram {
|
||||||
|
return &float64Histogram{
|
||||||
|
Sum: l.getFloat64StatValue(statName, sum),
|
||||||
|
Count: l.getFloat64StatValue(statName, count),
|
||||||
|
P50: l.getFloat64StatValue(statName, p50),
|
||||||
|
P95: l.getFloat64StatValue(statName, p95),
|
||||||
|
P99: l.getFloat64StatValue(statName, p99),
|
||||||
|
P100: l.getFloat64StatValue(statName, p100),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// getInt64StatValue converts property of stat object into int64
|
||||||
|
func (l *statLoader) getInt64StatValue(statName, propName string) int64 {
|
||||||
|
stringVal := l.getStatValue(statName, propName)
|
||||||
|
if stringVal == "" {
|
||||||
|
l.errors = append(l.errors, fmt.Errorf("can't get stat by name: %v", statName))
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
intVal, err := strconv.ParseInt(stringVal, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
l.errors = append(l.errors, fmt.Errorf("can't parse int: %v", err))
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
return intVal
|
||||||
|
}
|
||||||
|
|
||||||
|
// getFloat64StatValue converts property of stat object into float64
|
||||||
|
func (l *statLoader) getFloat64StatValue(statName, propName string) float64 {
|
||||||
|
stringVal := l.getStatValue(statName, propName)
|
||||||
|
if stringVal == "" {
|
||||||
|
l.errors = append(l.errors, fmt.Errorf("can't get stat by name: %v", statName))
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
floatVal, err := strconv.ParseFloat(stringVal, 64)
|
||||||
|
if err != nil {
|
||||||
|
l.errors = append(l.errors, fmt.Errorf("can't parse float: %v", err))
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
return floatVal
|
||||||
|
}
|
||||||
|
|
||||||
|
// getStatValue gets property of stat object
|
||||||
|
func (l *statLoader) getStatValue(statName, propName string) string {
|
||||||
|
stat, ok := l.statMap[statName]
|
||||||
|
if !ok {
|
||||||
|
l.errors = append(l.errors, fmt.Errorf("stat %v doesn't exist", statName))
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
prop, ok := stat.props[propName]
|
||||||
|
if !ok {
|
||||||
|
l.errors = append(l.errors, fmt.Errorf("stat %v doesn't have %v property", statName, propName))
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
return prop
|
||||||
|
}
|
80
cmd/kava/opendb/stats_loader_test.go
Normal file
80
cmd/kava/opendb/stats_loader_test.go
Normal file
@ -0,0 +1,80 @@
|
|||||||
|
//go:build rocksdb
|
||||||
|
// +build rocksdb
|
||||||
|
|
||||||
|
package opendb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestStatsLoader(t *testing.T) {
|
||||||
|
defaultStat := stat{
|
||||||
|
props: map[string]string{
|
||||||
|
"COUNT": "1",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
defaultHistogramStat := stat{
|
||||||
|
props: map[string]string{
|
||||||
|
"P50": "1",
|
||||||
|
"P95": "2",
|
||||||
|
"P99": "3",
|
||||||
|
"P100": "4",
|
||||||
|
"COUNT": "5",
|
||||||
|
"SUM": "6",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
defaultStatMap := map[string]*stat{
|
||||||
|
"rocksdb.number.keys.written": &defaultStat,
|
||||||
|
"rocksdb.number.keys.read": &defaultStat,
|
||||||
|
"rocksdb.number.keys.updated": &defaultStat,
|
||||||
|
"rocksdb.block.cache.miss": &defaultStat,
|
||||||
|
"rocksdb.block.cache.hit": &defaultStat,
|
||||||
|
"rocksdb.block.cache.add": &defaultStat,
|
||||||
|
"rocksdb.block.cache.add.failures": &defaultStat,
|
||||||
|
"rocksdb.compact.read.bytes": &defaultStat,
|
||||||
|
"rocksdb.compact.write.bytes": &defaultStat,
|
||||||
|
"rocksdb.compaction.times.micros": &defaultHistogramStat,
|
||||||
|
"rocksdb.compaction.times.cpu_micros": &defaultHistogramStat,
|
||||||
|
"rocksdb.numfiles.in.singlecompaction": &defaultHistogramStat,
|
||||||
|
"rocksdb.read.amp.estimate.useful.bytes": &defaultStat,
|
||||||
|
"rocksdb.read.amp.total.read.bytes": &defaultStat,
|
||||||
|
"rocksdb.no.file.opens": &defaultStat,
|
||||||
|
"rocksdb.no.file.errors": &defaultStat,
|
||||||
|
"rocksdb.bloom.filter.useful": &defaultStat,
|
||||||
|
"rocksdb.bloom.filter.full.positive": &defaultStat,
|
||||||
|
"rocksdb.bloom.filter.full.true.positive": &defaultStat,
|
||||||
|
"rocksdb.memtable.hit": &defaultStat,
|
||||||
|
"rocksdb.memtable.miss": &defaultStat,
|
||||||
|
"rocksdb.l0.hit": &defaultStat,
|
||||||
|
"rocksdb.l1.hit": &defaultStat,
|
||||||
|
"rocksdb.l2andup.hit": &defaultStat,
|
||||||
|
"rocksdb.bytes.written": &defaultStat,
|
||||||
|
"rocksdb.bytes.read": &defaultStat,
|
||||||
|
"rocksdb.stall.micros": &defaultStat,
|
||||||
|
"rocksdb.last.level.read.bytes": &defaultStat,
|
||||||
|
"rocksdb.last.level.read.count": &defaultStat,
|
||||||
|
"rocksdb.non.last.level.read.bytes": &defaultStat,
|
||||||
|
"rocksdb.non.last.level.read.count": &defaultStat,
|
||||||
|
"rocksdb.db.get.micros": &defaultHistogramStat,
|
||||||
|
"rocksdb.db.write.micros": &defaultHistogramStat,
|
||||||
|
"rocksdb.bytes.per.read": &defaultHistogramStat,
|
||||||
|
"rocksdb.bytes.per.write": &defaultHistogramStat,
|
||||||
|
"rocksdb.bytes.per.multiget": &defaultHistogramStat,
|
||||||
|
"rocksdb.db.flush.micros": &defaultHistogramStat,
|
||||||
|
}
|
||||||
|
|
||||||
|
statLoader := newStatLoader(defaultStatMap)
|
||||||
|
stats, err := statLoader.load()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, stats.NumberKeysWritten, int64(1))
|
||||||
|
require.Equal(t, stats.NumberKeysRead, int64(1))
|
||||||
|
require.Equal(t, stats.CompactionTimesMicros.P50, float64(1))
|
||||||
|
require.Equal(t, stats.CompactionTimesMicros.P95, float64(2))
|
||||||
|
require.Equal(t, stats.CompactionTimesMicros.P99, float64(3))
|
||||||
|
require.Equal(t, stats.CompactionTimesMicros.P100, float64(4))
|
||||||
|
require.Equal(t, stats.CompactionTimesMicros.Count, float64(5))
|
||||||
|
require.Equal(t, stats.CompactionTimesMicros.Sum, float64(6))
|
||||||
|
}
|
6
go.mod
6
go.mod
@ -17,12 +17,13 @@ require (
|
|||||||
github.com/golang/protobuf v1.5.3
|
github.com/golang/protobuf v1.5.3
|
||||||
github.com/gorilla/mux v1.8.0
|
github.com/gorilla/mux v1.8.0
|
||||||
github.com/grpc-ecosystem/grpc-gateway v1.16.0
|
github.com/grpc-ecosystem/grpc-gateway v1.16.0
|
||||||
|
github.com/linxGnu/grocksdb v1.8.0
|
||||||
github.com/pelletier/go-toml/v2 v2.0.6
|
github.com/pelletier/go-toml/v2 v2.0.6
|
||||||
github.com/prometheus/client_golang v1.14.0
|
github.com/prometheus/client_golang v1.14.0
|
||||||
github.com/spf13/cast v1.5.0
|
github.com/spf13/cast v1.5.0
|
||||||
github.com/spf13/cobra v1.6.1
|
github.com/spf13/cobra v1.6.1
|
||||||
github.com/spf13/viper v1.15.0
|
github.com/spf13/viper v1.15.0
|
||||||
github.com/stretchr/testify v1.8.2
|
github.com/stretchr/testify v1.8.3
|
||||||
github.com/subosito/gotenv v1.4.2
|
github.com/subosito/gotenv v1.4.2
|
||||||
github.com/tendermint/tendermint v0.34.27
|
github.com/tendermint/tendermint v0.34.27
|
||||||
github.com/tendermint/tm-db v0.6.7
|
github.com/tendermint/tm-db v0.6.7
|
||||||
@ -130,7 +131,6 @@ require (
|
|||||||
github.com/klauspost/compress v1.15.15 // indirect
|
github.com/klauspost/compress v1.15.15 // indirect
|
||||||
github.com/lib/pq v1.10.7 // indirect
|
github.com/lib/pq v1.10.7 // indirect
|
||||||
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
|
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
|
||||||
github.com/linxGnu/grocksdb v1.7.15 // indirect
|
|
||||||
github.com/magiconair/properties v1.8.7 // indirect
|
github.com/magiconair/properties v1.8.7 // indirect
|
||||||
github.com/manifoldco/promptui v0.9.0 // indirect
|
github.com/manifoldco/promptui v0.9.0 // indirect
|
||||||
github.com/mattn/go-colorable v0.1.13 // indirect
|
github.com/mattn/go-colorable v0.1.13 // indirect
|
||||||
@ -216,5 +216,5 @@ replace (
|
|||||||
// Use cometbft fork of tendermint
|
// Use cometbft fork of tendermint
|
||||||
github.com/tendermint/tendermint => github.com/cometbft/cometbft v0.34.27
|
github.com/tendermint/tendermint => github.com/cometbft/cometbft v0.34.27
|
||||||
// Indirect dependencies still use tendermint/tm-db
|
// Indirect dependencies still use tendermint/tm-db
|
||||||
github.com/tendermint/tm-db => github.com/kava-labs/tm-db v0.6.7-kava.3
|
github.com/tendermint/tm-db => github.com/kava-labs/tm-db v0.6.7-kava.4
|
||||||
)
|
)
|
||||||
|
12
go.sum
12
go.sum
@ -808,8 +808,8 @@ github.com/kava-labs/cosmos-sdk v0.46.11-kava.1 h1:3VRpm4zf/gQgmpRVd1p99/2P8ZecA
|
|||||||
github.com/kava-labs/cosmos-sdk v0.46.11-kava.1/go.mod h1:bG4AkW9bqc8ycrryyKGQEl3YV9BY2wr6HggGq8kvcgM=
|
github.com/kava-labs/cosmos-sdk v0.46.11-kava.1/go.mod h1:bG4AkW9bqc8ycrryyKGQEl3YV9BY2wr6HggGq8kvcgM=
|
||||||
github.com/kava-labs/ethermint v0.21.0-kava-v23-1 h1:5TSyCtPvFdMuSe8p2iMVqXmFBlK3lHyjaT9EqN752aI=
|
github.com/kava-labs/ethermint v0.21.0-kava-v23-1 h1:5TSyCtPvFdMuSe8p2iMVqXmFBlK3lHyjaT9EqN752aI=
|
||||||
github.com/kava-labs/ethermint v0.21.0-kava-v23-1/go.mod h1:rdm6AinxZ4dzPEv/cjH+/AGyTbKufJ3RE7M2MDyklH0=
|
github.com/kava-labs/ethermint v0.21.0-kava-v23-1/go.mod h1:rdm6AinxZ4dzPEv/cjH+/AGyTbKufJ3RE7M2MDyklH0=
|
||||||
github.com/kava-labs/tm-db v0.6.7-kava.3 h1:4vyAh+NyZ1xTjCt0utNT6FJHnsZK1I19xwZeJttdRXQ=
|
github.com/kava-labs/tm-db v0.6.7-kava.4 h1:M2RibOKmbi+k2OhAFry8z9+RJF0CYuDETB7/PrSdoro=
|
||||||
github.com/kava-labs/tm-db v0.6.7-kava.3/go.mod h1:70tpLhNfwCP64nAlq+bU+rOiVfWr3Nnju1D1nhGDGKs=
|
github.com/kava-labs/tm-db v0.6.7-kava.4/go.mod h1:70tpLhNfwCP64nAlq+bU+rOiVfWr3Nnju1D1nhGDGKs=
|
||||||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||||
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
|
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
|
||||||
@ -849,8 +849,8 @@ github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6
|
|||||||
github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg=
|
github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg=
|
||||||
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
|
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
|
||||||
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
|
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
|
||||||
github.com/linxGnu/grocksdb v1.7.15 h1:AEhP28lkeAybv5UYNYviYISpR6bJejEnKuYbnWAnxx0=
|
github.com/linxGnu/grocksdb v1.8.0 h1:H4L/LhP7GOMf1j17oQAElHgVlbEje2h14A8Tz9cM2BE=
|
||||||
github.com/linxGnu/grocksdb v1.7.15/go.mod h1:pY55D0o+r8yUYLq70QmhdudxYvoDb9F+9puf4m3/W+U=
|
github.com/linxGnu/grocksdb v1.8.0/go.mod h1:09CeBborffXhXdNpEcOeZrLKEnRtrZFEpFdPNI9Zjjg=
|
||||||
github.com/lucasjones/reggen v0.0.0-20180717132126-cdb49ff09d77/go.mod h1:5ELEyG+X8f+meRWHuqUOewBOhvHkl7M76pdGEansxW4=
|
github.com/lucasjones/reggen v0.0.0-20180717132126-cdb49ff09d77/go.mod h1:5ELEyG+X8f+meRWHuqUOewBOhvHkl7M76pdGEansxW4=
|
||||||
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
|
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
|
||||||
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
|
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
|
||||||
@ -1130,8 +1130,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
|
|||||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||||
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
|
github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY=
|
||||||
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
|
||||||
github.com/subosito/gotenv v1.4.2 h1:X1TuBLAMDFbaTAChgCBLu3DU3UPyELpnF2jjJ2cz/S8=
|
github.com/subosito/gotenv v1.4.2 h1:X1TuBLAMDFbaTAChgCBLu3DU3UPyELpnF2jjJ2cz/S8=
|
||||||
github.com/subosito/gotenv v1.4.2/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
|
github.com/subosito/gotenv v1.4.2/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
|
||||||
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY=
|
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY=
|
||||||
|
Loading…
Reference in New Issue
Block a user