primitive storage + data clock store

This commit is contained in:
Cassandra Heart 2023-09-09 18:45:47 -05:00
parent c6d02ad1e7
commit b7b5bc0e41
No known key found for this signature in database
GPG Key ID: 6352152859385958
12 changed files with 1678 additions and 12 deletions

112
node/app/db_console.go Normal file
View File

@ -0,0 +1,112 @@
package app
import (
"bufio"
"fmt"
"os"
"strings"
"source.quilibrium.com/quilibrium/monorepo/node/store"
)
type DBConsole struct {
clockStore store.ClockStore
}
func newDBConsole(
clockStore store.ClockStore,
) (*DBConsole, error) {
return &DBConsole{
clockStore,
}, nil
}
// Runs the DB console, this is meant for simple debugging, not production use.
func (c *DBConsole) Run() {
for {
fmt.Printf("db> ")
reader := bufio.NewReader(os.Stdin)
s, err := reader.ReadString('\n')
if err != nil {
panic(err)
}
cmd := strings.Trim(s, "\n")
switch cmd {
case "quit":
return
case "show frames":
earliestFrame, err := c.clockStore.GetEarliestMasterClockFrame([]byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
})
if err != nil {
panic(err)
}
latestFrame, err := c.clockStore.GetLatestMasterClockFrame([]byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
})
if err != nil {
panic(err)
}
fmt.Printf(
"earliest: %d, latest: %d\n",
earliestFrame.FrameNumber,
latestFrame.FrameNumber,
)
fmt.Printf(
"Genesis Frame:\n\tVDF Proof: %x\n",
earliestFrame.Input[:516],
)
iter, err := c.clockStore.RangeMasterClockFrames(
[]byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
},
earliestFrame.FrameNumber,
latestFrame.FrameNumber,
)
if err != nil {
panic(err)
}
for iter.First(); iter.Valid(); iter.Next() {
value, err := iter.Value()
if err != nil {
panic(err)
}
selector, err := value.GetSelector()
if err != nil {
panic(err)
}
fmt.Printf(
"Frame %d (Selector: %x):\n\tParent: %x\n\tVDF Proof: %x\n\n",
value.FrameNumber,
selector.Bytes(),
value.ParentSelector,
value.Input[:516],
)
}
if err := iter.Close(); err != nil {
panic(err)
}
default:
fmt.Printf("unknown command %s\n", cmd)
}
}
}

View File

@ -12,6 +12,7 @@ import (
"source.quilibrium.com/quilibrium/monorepo/node/execution/nop" "source.quilibrium.com/quilibrium/monorepo/node/execution/nop"
"source.quilibrium.com/quilibrium/monorepo/node/keys" "source.quilibrium.com/quilibrium/monorepo/node/keys"
"source.quilibrium.com/quilibrium/monorepo/node/p2p" "source.quilibrium.com/quilibrium/monorepo/node/p2p"
"source.quilibrium.com/quilibrium/monorepo/node/store"
) )
func logger() *zap.Logger { func logger() *zap.Logger {
@ -33,6 +34,13 @@ var keyManagerSet = wire.NewSet(
wire.Bind(new(keys.KeyManager), new(*keys.FileKeyManager)), wire.Bind(new(keys.KeyManager), new(*keys.FileKeyManager)),
) )
var storeSet = wire.NewSet(
wire.FieldsOf(new(*config.Config), "DB"),
store.NewPebbleDB,
store.NewPebbleClockStore,
wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)),
)
var pubSubSet = wire.NewSet( var pubSubSet = wire.NewSet(
wire.FieldsOf(new(*config.Config), "P2P"), wire.FieldsOf(new(*config.Config), "P2P"),
p2p.NewBlossomSub, p2p.NewBlossomSub,
@ -46,9 +54,24 @@ var engineSet = wire.NewSet(
var consensusSet = wire.NewSet( var consensusSet = wire.NewSet(
wire.FieldsOf(new(*config.Config), "Engine"), wire.FieldsOf(new(*config.Config), "Engine"),
master.NewMasterClockConsensusEngine, master.NewMasterClockConsensusEngine,
wire.Bind(new(consensus.ConsensusEngine), new(*master.MasterClockConsensusEngine)), wire.Bind(
new(consensus.ConsensusEngine),
new(*master.MasterClockConsensusEngine),
),
) )
func NewNode(*config.Config) (*Node, error) { func NewNode(*config.Config) (*Node, error) {
panic(wire.Build(loggerSet, keyManagerSet, pubSubSet, engineSet, consensusSet, newNode)) panic(wire.Build(
loggerSet,
keyManagerSet,
storeSet,
pubSubSet,
engineSet,
consensusSet,
newNode,
))
}
func NewDBConsole(*config.Config) (*DBConsole, error) {
panic(wire.Build(loggerSet, storeSet, newDBConsole))
} }

View File

@ -15,6 +15,7 @@ import (
"source.quilibrium.com/quilibrium/monorepo/node/execution/nop" "source.quilibrium.com/quilibrium/monorepo/node/execution/nop"
"source.quilibrium.com/quilibrium/monorepo/node/keys" "source.quilibrium.com/quilibrium/monorepo/node/keys"
"source.quilibrium.com/quilibrium/monorepo/node/p2p" "source.quilibrium.com/quilibrium/monorepo/node/p2p"
"source.quilibrium.com/quilibrium/monorepo/node/store"
) )
// Injectors from wire.go: // Injectors from wire.go:
@ -23,11 +24,14 @@ func NewNode(configConfig *config.Config) (*Node, error) {
zapLogger := logger() zapLogger := logger()
nopExecutionEngine := nop.NewNopExecutionEngine(zapLogger) nopExecutionEngine := nop.NewNopExecutionEngine(zapLogger)
engineConfig := configConfig.Engine engineConfig := configConfig.Engine
dbConfig := configConfig.DB
db := store.NewPebbleDB(dbConfig)
pebbleClockStore := store.NewPebbleClockStore(db, zapLogger)
keyConfig := configConfig.Key keyConfig := configConfig.Key
fileKeyManager := keys.NewFileKeyManager(keyConfig, zapLogger) fileKeyManager := keys.NewFileKeyManager(keyConfig, zapLogger)
p2PConfig := configConfig.P2P p2PConfig := configConfig.P2P
blossomSub := p2p.NewBlossomSub(p2PConfig, zapLogger) blossomSub := p2p.NewBlossomSub(p2PConfig, zapLogger)
masterClockConsensusEngine := master.NewMasterClockConsensusEngine(engineConfig, zapLogger, fileKeyManager, blossomSub) masterClockConsensusEngine := master.NewMasterClockConsensusEngine(engineConfig, zapLogger, pebbleClockStore, fileKeyManager, blossomSub)
node, err := newNode(nopExecutionEngine, masterClockConsensusEngine) node, err := newNode(nopExecutionEngine, masterClockConsensusEngine)
if err != nil { if err != nil {
return nil, err return nil, err
@ -35,6 +39,18 @@ func NewNode(configConfig *config.Config) (*Node, error) {
return node, nil return node, nil
} }
func NewDBConsole(configConfig *config.Config) (*DBConsole, error) {
dbConfig := configConfig.DB
db := store.NewPebbleDB(dbConfig)
zapLogger := logger()
pebbleClockStore := store.NewPebbleClockStore(db, zapLogger)
dbConsole, err := newDBConsole(pebbleClockStore)
if err != nil {
return nil, err
}
return dbConsole, nil
}
// wire.go: // wire.go:
func logger() *zap.Logger { func logger() *zap.Logger {
@ -52,8 +68,14 @@ var loggerSet = wire.NewSet(
var keyManagerSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "Key"), keys.NewFileKeyManager, wire.Bind(new(keys.KeyManager), new(*keys.FileKeyManager))) var keyManagerSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "Key"), keys.NewFileKeyManager, wire.Bind(new(keys.KeyManager), new(*keys.FileKeyManager)))
var storeSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "DB"), store.NewPebbleDB, store.NewPebbleClockStore, wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)))
var pubSubSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "P2P"), p2p.NewBlossomSub, wire.Bind(new(p2p.PubSub), new(*p2p.BlossomSub))) var pubSubSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "P2P"), p2p.NewBlossomSub, wire.Bind(new(p2p.PubSub), new(*p2p.BlossomSub)))
var engineSet = wire.NewSet(nop.NewNopExecutionEngine) var engineSet = wire.NewSet(nop.NewNopExecutionEngine)
var consensusSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "Engine"), master.NewMasterClockConsensusEngine, wire.Bind(new(consensus.ConsensusEngine), new(*master.MasterClockConsensusEngine))) var consensusSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "Engine"), master.NewMasterClockConsensusEngine, wire.Bind(
new(consensus.ConsensusEngine),
new(*master.MasterClockConsensusEngine),
),
)

View File

@ -238,8 +238,33 @@ func (
} }
} }
txn, err := e.clockStore.NewTransaction()
if err != nil {
e.logger.Error("error while creating transaction", zap.Error(err))
return nil, errors.Wrap(err, "confirm latest frame")
}
for _, frame := range committedSet {
if err = e.clockStore.PutMasterClockFrame(frame, txn); err != nil {
e.logger.Error("error while committing frame", zap.Error(err))
return nil, errors.Wrap(err, "confirm latest frame")
}
}
if err = txn.Commit(); err != nil {
e.logger.Error("error while committing transaction", zap.Error(err))
return nil, errors.Wrap(err, "confirm latest frame")
}
e.logger.Info("stored frames", zap.Int("frame_count", len(committedSet)))
e.historicFramesMx.Lock() e.historicFramesMx.Lock()
e.historicFrames = append(e.historicFrames, committedSet...) e.historicFrames = append(e.historicFrames, committedSet...)
if len(e.historicFrames) > 256 {
e.historicFrames = e.historicFrames[len(e.historicFrames)-256:]
}
e.historicFramesMx.Unlock() e.historicFramesMx.Unlock()
e.setFrame(prev) e.setFrame(prev)

View File

@ -13,6 +13,7 @@ import (
"source.quilibrium.com/quilibrium/monorepo/node/keys" "source.quilibrium.com/quilibrium/monorepo/node/keys"
"source.quilibrium.com/quilibrium/monorepo/node/p2p" "source.quilibrium.com/quilibrium/monorepo/node/p2p"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs" "source.quilibrium.com/quilibrium/monorepo/node/protobufs"
"source.quilibrium.com/quilibrium/monorepo/node/store"
) )
type SyncStatusType int type SyncStatusType int
@ -42,9 +43,9 @@ type MasterClockConsensusEngine struct {
engineMx sync.Mutex engineMx sync.Mutex
seenFramesMx sync.Mutex seenFramesMx sync.Mutex
historicFramesMx sync.Mutex historicFramesMx sync.Mutex
// for DHT testing, we're only using in memory stores
seenFrames []*protobufs.ClockFrame seenFrames []*protobufs.ClockFrame
historicFrames []*protobufs.ClockFrame historicFrames []*protobufs.ClockFrame
clockStore store.ClockStore
} }
var _ consensus.ConsensusEngine = (*MasterClockConsensusEngine)(nil) var _ consensus.ConsensusEngine = (*MasterClockConsensusEngine)(nil)
@ -52,6 +53,7 @@ var _ consensus.ConsensusEngine = (*MasterClockConsensusEngine)(nil)
func NewMasterClockConsensusEngine( func NewMasterClockConsensusEngine(
engineConfig *config.EngineConfig, engineConfig *config.EngineConfig,
logger *zap.Logger, logger *zap.Logger,
clockStore store.ClockStore,
keyManager keys.KeyManager, keyManager keys.KeyManager,
pubSub p2p.PubSub, pubSub p2p.PubSub,
) *MasterClockConsensusEngine { ) *MasterClockConsensusEngine {
@ -88,6 +90,7 @@ func NewMasterClockConsensusEngine(
input: seed, input: seed,
lastFrameReceivedAt: time.Time{}, lastFrameReceivedAt: time.Time{},
syncingStatus: SyncStatusNotSyncing, syncingStatus: SyncStatusNotSyncing,
clockStore: clockStore,
} }
if e.filter, err = hex.DecodeString(engineConfig.Filter); err != nil { if e.filter, err = hex.DecodeString(engineConfig.Filter); err != nil {
@ -107,8 +110,59 @@ func (e *MasterClockConsensusEngine) Start() <-chan error {
e.state = consensus.EngineStateLoading e.state = consensus.EngineStateLoading
e.logger.Info("syncing last seen state") e.logger.Info("syncing last seen state")
latestFrame := e.createGenesisFrame() latestFrame, err := e.clockStore.GetLatestMasterClockFrame(e.filter)
e.historicFrames = []*protobufs.ClockFrame{latestFrame} if err != nil && errors.Is(err, store.ErrNotFound) {
latestFrame = e.createGenesisFrame()
txn, err := e.clockStore.NewTransaction()
if err != nil {
panic(err)
}
if err = e.clockStore.PutMasterClockFrame(latestFrame, txn); err != nil {
panic(err)
}
if err = txn.Commit(); err != nil {
panic(err)
}
} else if err != nil {
panic(err)
} else {
e.setFrame(latestFrame)
}
e.historicFrames = []*protobufs.ClockFrame{}
if latestFrame.FrameNumber != 0 {
min := uint64(0)
if latestFrame.FrameNumber-255 > min {
min = latestFrame.FrameNumber - 255
}
iter, err := e.clockStore.RangeMasterClockFrames(
e.filter,
min,
latestFrame.FrameNumber-1,
)
if err != nil {
panic(err)
}
for iter.First(); iter.Valid(); iter.Next() {
frame, err := iter.Value()
if err != nil {
panic(err)
}
e.historicFrames = append(e.historicFrames, frame)
}
if err = iter.Close(); err != nil {
panic(err)
}
}
e.historicFrames = append(e.historicFrames, latestFrame)
e.logger.Info("subscribing to pubsub messages") e.logger.Info("subscribing to pubsub messages")
e.pubSub.Subscribe(e.filter, e.handleMessage, true) e.pubSub.Subscribe(e.filter, e.handleMessage, true)

View File

@ -225,11 +225,35 @@ func (e *MasterClockConsensusEngine) handleClockFramesRequest(
zap.Uint64("total_frames", uint64(to-from+1)), zap.Uint64("total_frames", uint64(to-from+1)),
) )
iter, err := e.clockStore.RangeMasterClockFrames(
request.Filter,
from,
to,
)
if err != nil {
return errors.Wrap(err, "handle clock frame request")
}
response := []*protobufs.ClockFrame{}
for iter.First(); iter.Valid(); iter.Next() {
frame, err := iter.Value()
if err != nil {
return errors.Wrap(err, "handle clock frame request")
}
response = append(response, frame)
}
if err = iter.Close(); err != nil {
return errors.Wrap(err, "handle clock frame request")
}
if err := e.publishMessage(channel, &protobufs.ClockFramesResponse{ if err := e.publishMessage(channel, &protobufs.ClockFramesResponse{
Filter: request.Filter, Filter: request.Filter,
FromFrameNumber: request.FromFrameNumber, FromFrameNumber: request.FromFrameNumber,
ToFrameNumber: to, ToFrameNumber: to,
ClockFrames: e.historicFrames[from:to], ClockFrames: response,
}); err != nil { }); err != nil {
return errors.Wrap(err, "handle clock frame request") return errors.Wrap(err, "handle clock frame request")
} }

View File

@ -16,8 +16,21 @@ import (
) )
var ( var (
configDirectory = flag.String("config", "./.config/", "the configuration directory") configDirectory = flag.String(
importPrivKey = flag.String("import-priv-key", "", "creates a new config using a specific key from the phase one ceremony") "config",
"./.config/",
"the configuration directory",
)
importPrivKey = flag.String(
"import-priv-key",
"",
"creates a new config using a specific key from the phase one ceremony",
)
dbConsole = flag.Bool(
"db-console",
false,
"starts the node in database console mode",
)
) )
func main() { func main() {
@ -45,6 +58,16 @@ func main() {
panic(err) panic(err)
} }
if *dbConsole {
console, err := app.NewDBConsole(nodeConfig)
if err != nil {
panic(err)
}
console.Run()
return
}
node, err := app.NewNode(nodeConfig) node, err := app.NewNode(nodeConfig)
if err != nil { if err != nil {
panic(err) panic(err)

778
node/store/clock.go Normal file
View File

@ -0,0 +1,778 @@
package store
import (
"encoding/binary"
"github.com/cockroachdb/pebble"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
"source.quilibrium.com/quilibrium/monorepo/node/tries"
)
type ClockStore interface {
NewTransaction() (Transaction, error)
GetLatestMasterClockFrame(filter []byte) (*protobufs.ClockFrame, error)
GetEarliestMasterClockFrame(filter []byte) (*protobufs.ClockFrame, error)
GetMasterClockFrame(
filter []byte,
frameNumber uint64,
) (*protobufs.ClockFrame, error)
RangeMasterClockFrames(
filter []byte,
startFrameNumber uint64,
endFrameNumber uint64,
) (*PebbleMasterClockIterator, error)
PutMasterClockFrame(frame *protobufs.ClockFrame, txn Transaction) error
GetLatestDataClockFrame(
filter []byte,
proverTrie *tries.RollingFrecencyCritbitTrie,
) (*protobufs.ClockFrame, error)
GetEarliestDataClockFrame(filter []byte) (*protobufs.ClockFrame, error)
GetDataClockFrame(
filter []byte,
frameNumber uint64,
) (*protobufs.ClockFrame, error)
RangeDataClockFrames(
filter []byte,
startFrameNumber uint64,
endFrameNumber uint64,
) (*PebbleClockIterator, error)
PutDataClockFrame(
frame *protobufs.ClockFrame,
proverTrie *tries.RollingFrecencyCritbitTrie,
txn Transaction,
) error
PutCandidateDataClockFrame(
parentSelector []byte,
distance []byte,
selector []byte,
frame *protobufs.ClockFrame,
txn Transaction,
) error
GetParentDataClockFrame(
filter []byte,
frameNumber uint64,
parentSelector []byte,
) (*protobufs.ClockFrame, error)
RangeCandidateDataClockFrames(
filter []byte,
parent []byte,
frameNumber uint64,
) (*PebbleCandidateClockIterator, error)
GetLeadingCandidateDataClockFrame(
filter []byte,
parent []byte,
frameNumber uint64,
) (*protobufs.ClockFrame, error)
}
type PebbleClockStore struct {
db *pebble.DB
logger *zap.Logger
}
var _ ClockStore = (*PebbleClockStore)(nil)
type PebbleMasterClockIterator struct {
i *pebble.Iterator
}
type PebbleClockIterator struct {
i *pebble.Iterator
}
type PebbleCandidateClockIterator struct {
i *pebble.Iterator
}
var _ Iterator[*protobufs.ClockFrame] = (*PebbleMasterClockIterator)(nil)
var _ Iterator[*protobufs.ClockFrame] = (*PebbleClockIterator)(nil)
var _ Iterator[*protobufs.ClockFrame] = (*PebbleCandidateClockIterator)(nil)
func (p *PebbleMasterClockIterator) First() bool {
return p.i.First()
}
func (p *PebbleMasterClockIterator) Next() bool {
return p.i.Next()
}
func (p *PebbleMasterClockIterator) Valid() bool {
return p.i.Valid()
}
func (p *PebbleMasterClockIterator) Value() (*protobufs.ClockFrame, error) {
if !p.i.Valid() {
return nil, ErrNotFound
}
key := p.i.Key()
value := p.i.Value()
frame := &protobufs.ClockFrame{}
frameNumber, filter, err := extractFrameNumberAndFilterFromMasterFrameKey(key)
if err != nil {
return nil, errors.Wrap(err, "get master clock frame iterator value")
}
frame.FrameNumber = frameNumber
frame.Filter = filter
if len(value) < 521 {
return nil, errors.Wrap(
ErrInvalidData,
"get master clock frame iterator value",
)
}
frame.Difficulty = binary.BigEndian.Uint32(value[:4])
frame.Input = value[4 : len(value)-516]
frame.Output = value[len(value)-516:]
return frame, nil
}
func (p *PebbleMasterClockIterator) Close() error {
return errors.Wrap(p.i.Close(), "closing master clock iterator")
}
func (p *PebbleClockIterator) First() bool {
return p.i.First()
}
func (p *PebbleClockIterator) Next() bool {
return p.i.Next()
}
func (p *PebbleClockIterator) Valid() bool {
return p.i.Valid()
}
func (p *PebbleClockIterator) Value() (*protobufs.ClockFrame, error) {
if !p.i.Valid() {
return nil, ErrNotFound
}
value := p.i.Value()
frame := &protobufs.ClockFrame{}
if err := proto.Unmarshal(value, frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get clock frame iterator value",
)
}
return frame, nil
}
func (p *PebbleClockIterator) Close() error {
return errors.Wrap(p.i.Close(), "closing clock frame iterator")
}
func (p *PebbleCandidateClockIterator) First() bool {
return p.i.First()
}
func (p *PebbleCandidateClockIterator) Next() bool {
return p.i.Next()
}
func (p *PebbleCandidateClockIterator) Valid() bool {
return p.i.Valid()
}
func (p *PebbleCandidateClockIterator) Value() (*protobufs.ClockFrame, error) {
if !p.i.Valid() {
return nil, ErrNotFound
}
value := p.i.Value()
frame := &protobufs.ClockFrame{}
if err := proto.Unmarshal(value, frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get candidate clock frame iterator value",
)
}
return frame, nil
}
func (p *PebbleCandidateClockIterator) Close() error {
return errors.Wrap(p.i.Close(), "closing candidate clock frame iterator")
}
func NewPebbleClockStore(db *pebble.DB, logger *zap.Logger) *PebbleClockStore {
return &PebbleClockStore{
db,
logger,
}
}
const CLOCK_FRAME = 0x00
const CLOCK_MASTER_FRAME_DATA = 0x00
const CLOCK_DATA_FRAME_DATA = 0x01
const CLOCK_DATA_FRAME_CANDIDATE_DATA = 0x02
const CLOCK_DATA_FRAME_FRECENCY_DATA = 0x03
const CLOCK_MASTER_FRAME_INDEX_EARLIEST = 0x10 | CLOCK_MASTER_FRAME_DATA
const CLOCK_MASTER_FRAME_INDEX_LATEST = 0x20 | CLOCK_MASTER_FRAME_DATA
const CLOCK_MASTER_FRAME_INDEX_PARENT = 0x30 | CLOCK_MASTER_FRAME_DATA
const CLOCK_DATA_FRAME_INDEX_EARLIEST = 0x10 | CLOCK_DATA_FRAME_DATA
const CLOCK_DATA_FRAME_INDEX_LATEST = 0x20 | CLOCK_DATA_FRAME_DATA
const CLOCK_DATA_FRAME_INDEX_PARENT = 0x30 | CLOCK_DATA_FRAME_DATA
//
// DB Keys
//
// Keys are structured as:
// <core type><sub type | index>[<non-index increment>]<segment>
// Increment necessarily must be full width elsewise the frame number would
// easily produce conflicts if filters are stepped by byte:
// 0x01 || 0xffff == 0x01ff || 0xff
//
// Master frames are serialized as output data only, Data frames are raw
// protobufs for fast disk-to-network output.
func clockFrameKey(filter []byte, frameNumber uint64, frameType byte) []byte {
key := []byte{CLOCK_FRAME, frameType}
key = binary.BigEndian.AppendUint64(key, frameNumber)
key = append(key, filter...)
return key
}
func clockMasterFrameKey(filter []byte, frameNumber uint64) []byte {
return clockFrameKey(filter, frameNumber, CLOCK_MASTER_FRAME_DATA)
}
func extractFrameNumberAndFilterFromMasterFrameKey(
key []byte,
) (uint64, []byte, error) {
if len(key) < 11 {
return 0, nil, errors.Wrap(
ErrInvalidData,
"extract frame number and filter from master frame key",
)
}
return binary.BigEndian.Uint64(key[2:10]), key[10:], nil
}
func clockDataFrameKey(
filter []byte,
frameNumber uint64,
) []byte {
return clockFrameKey(filter, frameNumber, CLOCK_DATA_FRAME_DATA)
}
func clockLatestIndex(filter []byte, frameType byte) []byte {
key := []byte{CLOCK_FRAME, frameType}
key = append(key, filter...)
return key
}
func clockMasterLatestIndex(filter []byte) []byte {
return clockLatestIndex(filter, CLOCK_MASTER_FRAME_INDEX_LATEST)
}
func clockDataLatestIndex(filter []byte) []byte {
return clockLatestIndex(filter, CLOCK_DATA_FRAME_INDEX_LATEST)
}
func clockEarliestIndex(filter []byte, frameType byte) []byte {
key := []byte{CLOCK_FRAME, frameType}
key = append(key, filter...)
return key
}
func clockMasterEarliestIndex(filter []byte) []byte {
return clockEarliestIndex(filter, CLOCK_MASTER_FRAME_INDEX_EARLIEST)
}
func clockDataEarliestIndex(filter []byte) []byte {
return clockEarliestIndex(filter, CLOCK_DATA_FRAME_INDEX_EARLIEST)
}
func clockParentIndexKey(
filter []byte,
frameNumber uint64,
selector []byte,
frameType byte,
) []byte {
key := []byte{CLOCK_FRAME, frameType}
key = binary.BigEndian.AppendUint64(key, frameNumber)
key = append(key, filter...)
key = append(key, rightAlign(selector, 32)...)
return key
}
func clockDataParentIndexKey(
filter []byte,
frameNumber uint64,
selector []byte,
) []byte {
return clockParentIndexKey(
filter,
frameNumber,
selector,
CLOCK_DATA_FRAME_INDEX_PARENT,
)
}
func clockDataCandidateFrameKey(
filter []byte,
frameNumber uint64,
parent []byte,
distance []byte,
) []byte {
key := []byte{CLOCK_FRAME, CLOCK_DATA_FRAME_CANDIDATE_DATA}
key = binary.BigEndian.AppendUint64(key, frameNumber)
key = append(key, filter...)
key = append(key, rightAlign(parent, 32)...)
key = append(key, rightAlign(distance, 32)...)
return key
}
func clockProverTrieKey(filter []byte, frameNumber uint64) []byte {
key := []byte{CLOCK_FRAME, CLOCK_DATA_FRAME_FRECENCY_DATA}
key = binary.BigEndian.AppendUint64(key, frameNumber)
key = append(key, filter...)
return key
}
func (p *PebbleClockStore) NewTransaction() (Transaction, error) {
return &PebbleTransaction{
b: p.db.NewBatch(),
}, nil
}
// GetEarliestMasterClockFrame implements ClockStore.
func (p *PebbleClockStore) GetEarliestMasterClockFrame(
filter []byte,
) (*protobufs.ClockFrame, error) {
idxValue, closer, err := p.db.Get(clockMasterEarliestIndex(filter))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get earliest master clock frame")
}
defer closer.Close()
frameNumber := binary.BigEndian.Uint64(idxValue)
frame, err := p.GetMasterClockFrame(filter, frameNumber)
if err != nil {
return nil, errors.Wrap(err, "get earliest master clock frame")
}
return frame, nil
}
// GetLatestMasterClockFrame implements ClockStore.
func (p *PebbleClockStore) GetLatestMasterClockFrame(
filter []byte,
) (*protobufs.ClockFrame, error) {
idxValue, closer, err := p.db.Get(clockMasterLatestIndex(filter))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get latest master clock frame")
}
defer closer.Close()
frameNumber := binary.BigEndian.Uint64(idxValue)
frame, err := p.GetMasterClockFrame(filter, frameNumber)
if err != nil {
return nil, errors.Wrap(err, "get latest master clock frame")
}
return frame, nil
}
// GetMasterClockFrame implements ClockStore.
func (p *PebbleClockStore) GetMasterClockFrame(
filter []byte,
frameNumber uint64,
) (*protobufs.ClockFrame, error) {
value, closer, err := p.db.Get(clockMasterFrameKey(filter, frameNumber))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get master clock frame")
}
defer closer.Close()
frame := &protobufs.ClockFrame{}
frame.FrameNumber = frameNumber
frame.Filter = filter
frame.Difficulty = binary.BigEndian.Uint32(value[:4])
frame.Input = value[4 : len(value)-516]
frame.Output = value[len(value)-516:]
return frame, nil
}
// RangeMasterClockFrames implements ClockStore.
func (p *PebbleClockStore) RangeMasterClockFrames(
filter []byte,
startFrameNumber uint64,
endFrameNumber uint64,
) (*PebbleMasterClockIterator, error) {
if startFrameNumber > endFrameNumber {
temp := endFrameNumber
endFrameNumber = startFrameNumber
startFrameNumber = temp
}
iter := p.db.NewIter(&pebble.IterOptions{
LowerBound: clockMasterFrameKey(filter, startFrameNumber),
UpperBound: clockMasterFrameKey(filter, endFrameNumber),
})
return &PebbleMasterClockIterator{i: iter}, nil
}
// PutMasterClockFrame implements ClockStore.
func (p *PebbleClockStore) PutMasterClockFrame(
frame *protobufs.ClockFrame,
txn Transaction,
) error {
data := binary.BigEndian.AppendUint32([]byte{}, frame.Difficulty)
data = append(data, frame.Input...)
data = append(data, frame.Output...)
frameNumberBytes := make([]byte, 8)
binary.BigEndian.PutUint64(frameNumberBytes, frame.FrameNumber)
if err := txn.Set(
clockMasterFrameKey(frame.Filter, frame.FrameNumber),
data,
); err != nil {
return errors.Wrap(err, "put master clock frame")
}
_, closer, err := p.db.Get(clockMasterEarliestIndex(frame.Filter))
if err != nil {
if !errors.Is(err, pebble.ErrNotFound) {
return errors.Wrap(err, "put master clock frame")
}
if err = txn.Set(
clockMasterEarliestIndex(frame.Filter),
frameNumberBytes,
); err != nil {
return errors.Wrap(err, "put master clock frame")
}
}
if err == nil && closer != nil {
closer.Close()
}
if err = txn.Set(
clockMasterLatestIndex(frame.Filter),
frameNumberBytes,
); err != nil {
return errors.Wrap(err, "put master clock frame")
}
return nil
}
// GetDataClockFrame implements ClockStore.
func (p *PebbleClockStore) GetDataClockFrame(
filter []byte,
frameNumber uint64,
) (*protobufs.ClockFrame, error) {
value, closer, err := p.db.Get(clockDataFrameKey(filter, frameNumber))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get data clock frame")
}
defer closer.Close()
frame := &protobufs.ClockFrame{}
if err := proto.Unmarshal(value, frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get data clock frame",
)
}
return frame, nil
}
// GetEarliestDataClockFrame implements ClockStore.
func (p *PebbleClockStore) GetEarliestDataClockFrame(
filter []byte,
) (*protobufs.ClockFrame, error) {
idxValue, closer, err := p.db.Get(clockDataEarliestIndex(filter))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get earliest data clock frame")
}
defer closer.Close()
frameNumber := binary.BigEndian.Uint64(idxValue)
frame, err := p.GetDataClockFrame(filter, frameNumber)
if err != nil {
return nil, errors.Wrap(err, "get earliest data clock frame")
}
return frame, nil
}
// GetLatestDataClockFrame implements ClockStore.
func (p *PebbleClockStore) GetLatestDataClockFrame(
filter []byte,
proverTrie *tries.RollingFrecencyCritbitTrie,
) (*protobufs.ClockFrame, error) {
idxValue, closer, err := p.db.Get(clockDataLatestIndex(filter))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get latest data clock frame")
}
frameNumber := binary.BigEndian.Uint64(idxValue)
frame, err := p.GetDataClockFrame(filter, frameNumber)
if err != nil {
return nil, errors.Wrap(err, "get latest data clock frame")
}
closer.Close()
if proverTrie != nil {
trieData, closer, err := p.db.Get(clockProverTrieKey(filter, frameNumber))
if err != nil {
return nil, errors.Wrap(err, "get latest data clock frame")
}
defer closer.Close()
if err := proverTrie.Deserialize(trieData); err != nil {
return nil, errors.Wrap(err, "get latest data clock frame")
}
}
return frame, nil
}
// GetLeadingCandidateDataClockFrame implements ClockStore.
func (p *PebbleClockStore) GetLeadingCandidateDataClockFrame(
filter []byte,
parent []byte,
frameNumber uint64,
) (*protobufs.ClockFrame, error) {
iter, err := p.RangeCandidateDataClockFrames(filter, parent, frameNumber)
if err != nil {
return nil, errors.Wrap(err, "get leading candidate data clock frame")
}
if !iter.First() {
return nil, ErrNotFound
}
defer iter.Close()
frame, err := iter.Value()
return frame, errors.Wrap(err, "get leading candidate data clock frame")
}
// GetParentDataClockFrame implements ClockStore.
func (p *PebbleClockStore) GetParentDataClockFrame(
filter []byte,
frameNumber uint64,
parentSelector []byte,
) (*protobufs.ClockFrame, error) {
data, closer, err := p.db.Get(
clockDataParentIndexKey(filter, frameNumber, parentSelector),
)
if err != nil {
return nil, errors.Wrap(err, "get parent data clock frame")
}
parent := &protobufs.ClockFrame{}
if err := proto.Unmarshal(data, parent); err != nil {
return nil, errors.Wrap(err, "get parent data clock frame")
}
if closer != nil {
closer.Close()
}
return parent, nil
}
// PutCandidateDataClockFrame implements ClockStore.
func (p *PebbleClockStore) PutCandidateDataClockFrame(
parentSelector []byte,
distance []byte,
selector []byte,
frame *protobufs.ClockFrame,
txn Transaction,
) error {
data, err := proto.Marshal(frame)
if err != nil {
return errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"put candidate data clock frame",
)
}
if err = txn.Set(
clockDataCandidateFrameKey(
frame.Filter,
frame.FrameNumber,
frame.ParentSelector,
distance,
),
data,
); err != nil {
return errors.Wrap(err, "put candidate data clock frame")
}
if err = txn.Set(
clockDataParentIndexKey(
frame.Filter,
frame.FrameNumber,
selector,
),
data,
); err != nil {
return errors.Wrap(err, "put candidate data clock frame")
}
return nil
}
// PutDataClockFrame implements ClockStore.
func (p *PebbleClockStore) PutDataClockFrame(
frame *protobufs.ClockFrame,
proverTrie *tries.RollingFrecencyCritbitTrie,
txn Transaction,
) error {
data, err := proto.Marshal(frame)
if err != nil {
return errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"put data clock frame",
)
}
frameNumberBytes := make([]byte, 8)
binary.BigEndian.PutUint64(frameNumberBytes, frame.FrameNumber)
if err = txn.Set(
clockDataFrameKey(frame.Filter, frame.FrameNumber),
data,
); err != nil {
return errors.Wrap(err, "put data clock frame")
}
proverData, err := proverTrie.Serialize()
if err != nil {
return errors.Wrap(err, "put data clock frame")
}
if err = txn.Set(
clockProverTrieKey(frame.Filter, frame.FrameNumber),
proverData,
); err != nil {
return errors.Wrap(err, "put data clock frame")
}
_, closer, err := p.db.Get(clockDataEarliestIndex(frame.Filter))
if err != nil {
if !errors.Is(err, pebble.ErrNotFound) {
return errors.Wrap(err, "put data clock frame")
}
if err = txn.Set(
clockDataEarliestIndex(frame.Filter),
frameNumberBytes,
); err != nil {
return errors.Wrap(err, "put data clock frame")
}
}
if err == nil && closer != nil {
closer.Close()
}
if err = txn.Set(
clockDataLatestIndex(frame.Filter),
frameNumberBytes,
); err != nil {
return errors.Wrap(err, "put data clock frame")
}
return nil
}
// RangeCandidateDataClockFrames implements ClockStore.
// Distance is 32-byte aligned, so we just use a 0x00 * 32 -> 0xff * 32 range
func (p *PebbleClockStore) RangeCandidateDataClockFrames(
filter []byte,
parent []byte,
frameNumber uint64,
) (*PebbleCandidateClockIterator, error) {
iter := p.db.NewIter(&pebble.IterOptions{
LowerBound: clockDataCandidateFrameKey(
filter,
frameNumber,
parent,
[]byte{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
},
),
UpperBound: clockDataCandidateFrameKey(
filter,
frameNumber,
parent,
[]byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
},
),
})
return &PebbleCandidateClockIterator{i: iter}, nil
}
// RangeDataClockFrames implements ClockStore.
func (p *PebbleClockStore) RangeDataClockFrames(
filter []byte,
startFrameNumber uint64,
endFrameNumber uint64,
) (*PebbleClockIterator, error) {
if startFrameNumber > endFrameNumber {
temp := endFrameNumber
endFrameNumber = startFrameNumber
startFrameNumber = temp
}
iter := p.db.NewIter(&pebble.IterOptions{
LowerBound: clockDataFrameKey(filter, startFrameNumber),
UpperBound: clockDataFrameKey(filter, endFrameNumber),
})
return &PebbleClockIterator{i: iter}, nil
}

8
node/store/errors.go Normal file
View File

@ -0,0 +1,8 @@
package store
import "errors"
var (
ErrNotFound = errors.New("item not found")
ErrInvalidData = errors.New("invalid data")
)

11
node/store/iterator.go Normal file
View File

@ -0,0 +1,11 @@
package store
import "google.golang.org/protobuf/proto"
type Iterator[T proto.Message] interface {
First() bool
Next() bool
Valid() bool
Value() (T, error)
Close() error
}

531
node/store/key.go Normal file
View File

@ -0,0 +1,531 @@
package store
import (
"encoding/binary"
"github.com/cockroachdb/pebble"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"source.quilibrium.com/quilibrium/monorepo/node/protobufs"
)
type KeyStore interface {
NewTransaction() (Transaction, error)
StageProvingKey(provingKey *protobufs.ProvingKeyAnnouncement) error
IncludeProvingKey(
inclusionCommitment *protobufs.InclusionCommitment,
txn Transaction,
) error
GetStagedProvingKey(
provingKey []byte,
) (*protobufs.ProvingKeyAnnouncement, error)
GetProvingKey(provingKey []byte) (*protobufs.InclusionCommitment, error)
GetKeyBundle(
provingKey []byte,
frameNumber uint64,
) (*protobufs.InclusionCommitment, error)
GetLatestKeyBundle(provingKey []byte) (*protobufs.InclusionCommitment, error)
PutKeyBundle(
provingKey []byte,
keyBundleCommitment *protobufs.InclusionCommitment,
txn Transaction,
) error
RangeProvingKeys() (*PebbleProvingKeyIterator, error)
RangeStagedProvingKeys() (*PebbleStagedProvingKeyIterator, error)
RangeKeyBundleKeys(provingKey []byte) (*PebbleKeyBundleIterator, error)
}
type PebbleKeyStore struct {
db *pebble.DB
logger *zap.Logger
}
type PebbleProvingKeyIterator struct {
i *pebble.Iterator
}
type PebbleStagedProvingKeyIterator struct {
i *pebble.Iterator
}
type PebbleKeyBundleIterator struct {
i *pebble.Iterator
}
var pki = (*PebbleProvingKeyIterator)(nil)
var spki = (*PebbleStagedProvingKeyIterator)(nil)
var kbi = (*PebbleKeyBundleIterator)(nil)
var _ Iterator[*protobufs.InclusionCommitment] = pki
var _ Iterator[*protobufs.ProvingKeyAnnouncement] = spki
var _ Iterator[*protobufs.InclusionCommitment] = kbi
var _ KeyStore = (*PebbleKeyStore)(nil)
func (p *PebbleProvingKeyIterator) First() bool {
return p.i.First()
}
func (p *PebbleProvingKeyIterator) Next() bool {
return p.i.Next()
}
func (p *PebbleProvingKeyIterator) Valid() bool {
return p.i.Valid()
}
func (p *PebbleProvingKeyIterator) Value() (
*protobufs.InclusionCommitment,
error,
) {
if !p.i.Valid() {
return nil, ErrNotFound
}
value := p.i.Value()
frame := &protobufs.InclusionCommitment{}
if err := proto.Unmarshal(value, frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get proving key iterator value",
)
}
return frame, nil
}
func (p *PebbleProvingKeyIterator) Close() error {
return errors.Wrap(p.i.Close(), "closing iterator")
}
func (p *PebbleStagedProvingKeyIterator) First() bool {
return p.i.First()
}
func (p *PebbleStagedProvingKeyIterator) Next() bool {
return p.i.Next()
}
func (p *PebbleStagedProvingKeyIterator) Valid() bool {
return p.i.Valid()
}
func (p *PebbleStagedProvingKeyIterator) Value() (
*protobufs.ProvingKeyAnnouncement,
error,
) {
if !p.i.Valid() {
return nil, ErrNotFound
}
value := p.i.Value()
frame := &protobufs.ProvingKeyAnnouncement{}
if err := proto.Unmarshal(value, frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get staged proving key iterator value",
)
}
return frame, nil
}
func (p *PebbleStagedProvingKeyIterator) Close() error {
return errors.Wrap(p.i.Close(), "closing iterator")
}
func (p *PebbleKeyBundleIterator) First() bool {
return p.i.First()
}
func (p *PebbleKeyBundleIterator) Next() bool {
return p.i.Next()
}
func (p *PebbleKeyBundleIterator) Valid() bool {
return p.i.Valid()
}
func (p *PebbleKeyBundleIterator) Value() (
*protobufs.InclusionCommitment,
error,
) {
if !p.i.Valid() {
return nil, ErrNotFound
}
value := p.i.Value()
frame := &protobufs.InclusionCommitment{}
if err := proto.Unmarshal(value, frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get key bundle iterator value",
)
}
return frame, nil
}
func (p *PebbleKeyBundleIterator) Close() error {
return errors.Wrap(p.i.Close(), "closing iterator")
}
func NewPebbleKeyStore(db *pebble.DB, logger *zap.Logger) *PebbleKeyStore {
return &PebbleKeyStore{
db,
logger,
}
}
const (
PROVING_KEY = 0x01
PROVING_KEY_STAGED = 0x02
KEY_BUNDLE = 0x03
KEY_DATA = 0x00
KEY_BUNDLE_INDEX_EARLIEST = 0x10
KEY_BUNDLE_INDEX_LATEST = 0x20
)
func provingKeyKey(provingKey []byte) []byte {
key := []byte{PROVING_KEY, KEY_DATA}
key = append(key, provingKey...)
return key
}
func stagedProvingKeyKey(provingKey []byte) []byte {
key := []byte{PROVING_KEY_STAGED, KEY_DATA}
key = append(key, provingKey...)
return key
}
func keyBundleKey(provingKey []byte, frameNumber uint64) []byte {
key := []byte{KEY_BUNDLE, KEY_DATA}
key = append(key, provingKey...)
key = binary.BigEndian.AppendUint64(key, frameNumber)
return key
}
func keyBundleLatestKey(provingKey []byte) []byte {
key := []byte{KEY_BUNDLE, KEY_BUNDLE_INDEX_LATEST}
key = append(key, provingKey...)
return key
}
func keyBundleEarliestKey(provingKey []byte) []byte {
key := []byte{KEY_BUNDLE, KEY_BUNDLE_INDEX_EARLIEST}
key = append(key, provingKey...)
return key
}
func (p *PebbleKeyStore) NewTransaction() (Transaction, error) {
return &PebbleTransaction{
b: p.db.NewBatch(),
}, nil
}
// Stages a proving key for later inclusion on proof of meaningful work.
// Does not verify, upstream callers must verify.
func (p *PebbleKeyStore) StageProvingKey(
provingKey *protobufs.ProvingKeyAnnouncement,
) error {
data, err := proto.Marshal(provingKey)
if err != nil {
return errors.Wrap(err, "stage proving key")
}
err = p.db.Set(
stagedProvingKeyKey(provingKey.PublicKey()),
data,
&pebble.WriteOptions{
Sync: true,
},
)
if err != nil {
return errors.Wrap(err, "stage proving key")
}
return nil
}
// Includes a proving key with an inclusion commitment. If a proving key is
// staged, promotes it by including it in the primary key store and deletes the
// staged key.
func (p *PebbleKeyStore) IncludeProvingKey(
inclusionCommitment *protobufs.InclusionCommitment,
txn Transaction,
) error {
provingKey := &protobufs.ProvingKeyAnnouncement{}
if err := proto.Unmarshal(inclusionCommitment.Data, provingKey); err != nil {
return errors.Wrap(err, "include proving key")
}
if err := provingKey.Verify(); err != nil {
return errors.Wrap(err, "include proving key")
}
data, err := proto.Marshal(inclusionCommitment)
if err != nil {
return errors.Wrap(err, "include proving key")
}
txn.Set(
provingKeyKey(provingKey.PublicKey()),
data,
)
staged, closer, err := p.db.Get(stagedProvingKeyKey(provingKey.PublicKey()))
if err != nil && !errors.Is(err, ErrNotFound) {
return errors.Wrap(err, "include proving key")
}
if staged != nil {
if err := txn.Delete(
stagedProvingKeyKey(provingKey.PublicKey()),
); err != nil {
return errors.Wrap(err, "include proving key")
}
}
if err := closer.Close(); err != nil {
return errors.Wrap(err, "include proving key")
}
return nil
}
func (p *PebbleKeyStore) GetStagedProvingKey(
provingKey []byte,
) (*protobufs.ProvingKeyAnnouncement, error) {
data, closer, err := p.db.Get(stagedProvingKeyKey(provingKey))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get staged proving key")
}
stagedKey := &protobufs.ProvingKeyAnnouncement{}
if err = proto.Unmarshal(data, stagedKey); err != nil {
return nil, errors.Wrap(err, "get staged proving key")
}
if err := closer.Close(); err != nil {
return nil, errors.Wrap(err, "get staged proving key")
}
return stagedKey, nil
}
// Returns the latest key bundle for a given proving key.
func (p *PebbleKeyStore) GetLatestKeyBundle(
provingKey []byte,
) (*protobufs.InclusionCommitment, error) {
value, closer, err := p.db.Get(keyBundleLatestKey(provingKey))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get latest key bundle")
}
frameNumber := binary.BigEndian.Uint64(value)
if err := closer.Close(); err != nil {
return nil, errors.Wrap(err, "get latest key bundle")
}
value, closer, err = p.db.Get(keyBundleKey(provingKey, frameNumber))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get latest key bundle")
}
defer closer.Close()
announcement := &protobufs.InclusionCommitment{}
if err := proto.Unmarshal(value, announcement); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get latest key bundle",
)
}
return announcement, nil
}
// Retrieves the specific key bundle included at a given frame number.
func (p *PebbleKeyStore) GetKeyBundle(
provingKey []byte,
frameNumber uint64,
) (*protobufs.InclusionCommitment, error) {
value, closer, err := p.db.Get(keyBundleKey(provingKey, frameNumber))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get key bundle")
}
defer closer.Close()
announcement := &protobufs.InclusionCommitment{}
if err := proto.Unmarshal(value, announcement); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get key bundle",
)
}
return announcement, nil
}
// Retrieves an included proving key, returns ErrNotFound if not present.
func (p *PebbleKeyStore) GetProvingKey(
provingKey []byte,
) (*protobufs.InclusionCommitment, error) {
value, closer, err := p.db.Get(provingKeyKey(provingKey))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return nil, ErrNotFound
}
return nil, errors.Wrap(err, "get proving key")
}
defer closer.Close()
announcement := &protobufs.InclusionCommitment{}
if err := proto.Unmarshal(value, announcement); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get proving key",
)
}
return announcement, nil
}
// Inserts a key bundle with inclusion commitment. Does not verify, upstream
// callers must perform the verification.
func (p *PebbleKeyStore) PutKeyBundle(
provingKey []byte,
keyBundle *protobufs.InclusionCommitment,
txn Transaction,
) error {
data, err := proto.Marshal(keyBundle)
if err != nil {
return errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"put key bundle",
)
}
frameNumberBytes := make([]byte, 8)
binary.BigEndian.PutUint64(frameNumberBytes, keyBundle.FrameNumber)
if err = txn.Set(
keyBundleKey(provingKey, keyBundle.FrameNumber),
data,
); err != nil {
return errors.Wrap(err, "put key bundle")
}
_, closer, err := p.db.Get(keyBundleEarliestKey(provingKey))
if err != nil {
if !errors.Is(err, pebble.ErrNotFound) {
return errors.Wrap(err, "put key bundle")
}
if err = txn.Set(
keyBundleEarliestKey(provingKey),
frameNumberBytes,
); err != nil {
return errors.Wrap(err, "put key bundle")
}
}
if err == nil && closer != nil {
closer.Close()
}
if err = txn.Set(
keyBundleLatestKey(provingKey),
frameNumberBytes,
); err != nil {
return errors.Wrap(err, "put key bundle")
}
return nil
}
func (p *PebbleKeyStore) RangeProvingKeys() (*PebbleProvingKeyIterator, error) {
iter := p.db.NewIter(&pebble.IterOptions{
LowerBound: provingKeyKey([]byte{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00,
}),
UpperBound: provingKeyKey([]byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff,
}),
})
return &PebbleProvingKeyIterator{i: iter}, nil
}
func (p *PebbleKeyStore) RangeStagedProvingKeys() (
*PebbleStagedProvingKeyIterator,
error,
) {
iter := p.db.NewIter(&pebble.IterOptions{
LowerBound: stagedProvingKeyKey([]byte{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00,
}),
UpperBound: stagedProvingKeyKey([]byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff,
}),
})
return &PebbleStagedProvingKeyIterator{i: iter}, nil
}
func (p *PebbleKeyStore) RangeKeyBundleKeys(provingKey []byte) (
*PebbleKeyBundleIterator,
error,
) {
iter := p.db.NewIter(&pebble.IterOptions{
LowerBound: keyBundleKey(provingKey, 0),
UpperBound: keyBundleKey(provingKey, 0xffffffffffffffff),
})
return &PebbleKeyBundleIterator{i: iter}, nil
}

55
node/store/pebble.go Normal file
View File

@ -0,0 +1,55 @@
package store
import (
"github.com/cockroachdb/pebble"
"source.quilibrium.com/quilibrium/monorepo/node/config"
)
func NewPebbleDB(config *config.DBConfig) *pebble.DB {
db, err := pebble.Open(config.Path, &pebble.Options{})
if err != nil {
panic(err)
}
return db
}
type Transaction interface {
Set(key []byte, value []byte) error
Commit() error
Delete(key []byte) error
}
type PebbleTransaction struct {
b *pebble.Batch
}
func (t *PebbleTransaction) Set(key []byte, value []byte) error {
return t.b.Set(key, value, &pebble.WriteOptions{Sync: true})
}
func (t *PebbleTransaction) Commit() error {
return t.b.Commit(&pebble.WriteOptions{Sync: true})
}
func (t *PebbleTransaction) Delete(key []byte) error {
return t.b.Delete(key, &pebble.WriteOptions{Sync: true})
}
var _ Transaction = (*PebbleTransaction)(nil)
func rightAlign(data []byte, size int) []byte {
l := len(data)
if l == size {
return data
}
if l > size {
return data[l-size:]
}
pad := make([]byte, size)
copy(pad[size-l:], data)
return pad
}