diff --git a/node/app/db_console.go b/node/app/db_console.go new file mode 100644 index 0000000..52e546e --- /dev/null +++ b/node/app/db_console.go @@ -0,0 +1,112 @@ +package app + +import ( + "bufio" + "fmt" + "os" + "strings" + + "source.quilibrium.com/quilibrium/monorepo/node/store" +) + +type DBConsole struct { + clockStore store.ClockStore +} + +func newDBConsole( + clockStore store.ClockStore, +) (*DBConsole, error) { + return &DBConsole{ + clockStore, + }, nil +} + +// Runs the DB console, this is meant for simple debugging, not production use. +func (c *DBConsole) Run() { + for { + fmt.Printf("db> ") + + reader := bufio.NewReader(os.Stdin) + s, err := reader.ReadString('\n') + if err != nil { + panic(err) + } + + cmd := strings.Trim(s, "\n") + switch cmd { + case "quit": + return + case "show frames": + earliestFrame, err := c.clockStore.GetEarliestMasterClockFrame([]byte{ + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + }) + if err != nil { + panic(err) + } + + latestFrame, err := c.clockStore.GetLatestMasterClockFrame([]byte{ + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + }) + if err != nil { + panic(err) + } + + fmt.Printf( + "earliest: %d, latest: %d\n", + earliestFrame.FrameNumber, + latestFrame.FrameNumber, + ) + + fmt.Printf( + "Genesis Frame:\n\tVDF Proof: %x\n", + earliestFrame.Input[:516], + ) + + iter, err := c.clockStore.RangeMasterClockFrames( + []byte{ + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + }, + earliestFrame.FrameNumber, + latestFrame.FrameNumber, + ) + if err != nil { + panic(err) + } + + for iter.First(); iter.Valid(); iter.Next() { + value, err := iter.Value() + if err != nil { + panic(err) + } + + selector, err := value.GetSelector() + if err != nil { + panic(err) + } + + fmt.Printf( + "Frame %d (Selector: %x):\n\tParent: %x\n\tVDF Proof: %x\n\n", + value.FrameNumber, + selector.Bytes(), + value.ParentSelector, + value.Input[:516], + ) + } + + if err := iter.Close(); err != nil { + panic(err) + } + default: + fmt.Printf("unknown command %s\n", cmd) + } + } +} diff --git a/node/app/wire.go b/node/app/wire.go index e3c4111..dbe798f 100644 --- a/node/app/wire.go +++ b/node/app/wire.go @@ -12,6 +12,7 @@ import ( "source.quilibrium.com/quilibrium/monorepo/node/execution/nop" "source.quilibrium.com/quilibrium/monorepo/node/keys" "source.quilibrium.com/quilibrium/monorepo/node/p2p" + "source.quilibrium.com/quilibrium/monorepo/node/store" ) func logger() *zap.Logger { @@ -33,6 +34,13 @@ var keyManagerSet = wire.NewSet( wire.Bind(new(keys.KeyManager), new(*keys.FileKeyManager)), ) +var storeSet = wire.NewSet( + wire.FieldsOf(new(*config.Config), "DB"), + store.NewPebbleDB, + store.NewPebbleClockStore, + wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore)), +) + var pubSubSet = wire.NewSet( wire.FieldsOf(new(*config.Config), "P2P"), p2p.NewBlossomSub, @@ -46,9 +54,24 @@ var engineSet = wire.NewSet( var consensusSet = wire.NewSet( wire.FieldsOf(new(*config.Config), "Engine"), master.NewMasterClockConsensusEngine, - wire.Bind(new(consensus.ConsensusEngine), new(*master.MasterClockConsensusEngine)), + wire.Bind( + new(consensus.ConsensusEngine), + new(*master.MasterClockConsensusEngine), + ), ) func NewNode(*config.Config) (*Node, error) { - panic(wire.Build(loggerSet, keyManagerSet, pubSubSet, engineSet, consensusSet, newNode)) + panic(wire.Build( + loggerSet, + keyManagerSet, + storeSet, + pubSubSet, + engineSet, + consensusSet, + newNode, + )) +} + +func NewDBConsole(*config.Config) (*DBConsole, error) { + panic(wire.Build(loggerSet, storeSet, newDBConsole)) } diff --git a/node/app/wire_gen.go b/node/app/wire_gen.go index 132203b..22da60c 100644 --- a/node/app/wire_gen.go +++ b/node/app/wire_gen.go @@ -15,6 +15,7 @@ import ( "source.quilibrium.com/quilibrium/monorepo/node/execution/nop" "source.quilibrium.com/quilibrium/monorepo/node/keys" "source.quilibrium.com/quilibrium/monorepo/node/p2p" + "source.quilibrium.com/quilibrium/monorepo/node/store" ) // Injectors from wire.go: @@ -23,11 +24,14 @@ func NewNode(configConfig *config.Config) (*Node, error) { zapLogger := logger() nopExecutionEngine := nop.NewNopExecutionEngine(zapLogger) engineConfig := configConfig.Engine + dbConfig := configConfig.DB + db := store.NewPebbleDB(dbConfig) + pebbleClockStore := store.NewPebbleClockStore(db, zapLogger) keyConfig := configConfig.Key fileKeyManager := keys.NewFileKeyManager(keyConfig, zapLogger) p2PConfig := configConfig.P2P blossomSub := p2p.NewBlossomSub(p2PConfig, zapLogger) - masterClockConsensusEngine := master.NewMasterClockConsensusEngine(engineConfig, zapLogger, fileKeyManager, blossomSub) + masterClockConsensusEngine := master.NewMasterClockConsensusEngine(engineConfig, zapLogger, pebbleClockStore, fileKeyManager, blossomSub) node, err := newNode(nopExecutionEngine, masterClockConsensusEngine) if err != nil { return nil, err @@ -35,6 +39,18 @@ func NewNode(configConfig *config.Config) (*Node, error) { return node, nil } +func NewDBConsole(configConfig *config.Config) (*DBConsole, error) { + dbConfig := configConfig.DB + db := store.NewPebbleDB(dbConfig) + zapLogger := logger() + pebbleClockStore := store.NewPebbleClockStore(db, zapLogger) + dbConsole, err := newDBConsole(pebbleClockStore) + if err != nil { + return nil, err + } + return dbConsole, nil +} + // wire.go: func logger() *zap.Logger { @@ -52,8 +68,14 @@ var loggerSet = wire.NewSet( var keyManagerSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "Key"), keys.NewFileKeyManager, wire.Bind(new(keys.KeyManager), new(*keys.FileKeyManager))) +var storeSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "DB"), store.NewPebbleDB, store.NewPebbleClockStore, wire.Bind(new(store.ClockStore), new(*store.PebbleClockStore))) + var pubSubSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "P2P"), p2p.NewBlossomSub, wire.Bind(new(p2p.PubSub), new(*p2p.BlossomSub))) var engineSet = wire.NewSet(nop.NewNopExecutionEngine) -var consensusSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "Engine"), master.NewMasterClockConsensusEngine, wire.Bind(new(consensus.ConsensusEngine), new(*master.MasterClockConsensusEngine))) +var consensusSet = wire.NewSet(wire.FieldsOf(new(*config.Config), "Engine"), master.NewMasterClockConsensusEngine, wire.Bind( + new(consensus.ConsensusEngine), + new(*master.MasterClockConsensusEngine), +), +) diff --git a/node/consensus/master/consensus_frames.go b/node/consensus/master/consensus_frames.go index 765584f..9fa47dc 100644 --- a/node/consensus/master/consensus_frames.go +++ b/node/consensus/master/consensus_frames.go @@ -238,8 +238,33 @@ func ( } } + txn, err := e.clockStore.NewTransaction() + if err != nil { + e.logger.Error("error while creating transaction", zap.Error(err)) + return nil, errors.Wrap(err, "confirm latest frame") + } + + for _, frame := range committedSet { + if err = e.clockStore.PutMasterClockFrame(frame, txn); err != nil { + e.logger.Error("error while committing frame", zap.Error(err)) + return nil, errors.Wrap(err, "confirm latest frame") + } + } + + if err = txn.Commit(); err != nil { + e.logger.Error("error while committing transaction", zap.Error(err)) + return nil, errors.Wrap(err, "confirm latest frame") + } + + e.logger.Info("stored frames", zap.Int("frame_count", len(committedSet))) + e.historicFramesMx.Lock() + e.historicFrames = append(e.historicFrames, committedSet...) + if len(e.historicFrames) > 256 { + e.historicFrames = e.historicFrames[len(e.historicFrames)-256:] + } + e.historicFramesMx.Unlock() e.setFrame(prev) diff --git a/node/consensus/master/master_clock_consensus_engine.go b/node/consensus/master/master_clock_consensus_engine.go index 022710a..5d46fda 100644 --- a/node/consensus/master/master_clock_consensus_engine.go +++ b/node/consensus/master/master_clock_consensus_engine.go @@ -13,6 +13,7 @@ import ( "source.quilibrium.com/quilibrium/monorepo/node/keys" "source.quilibrium.com/quilibrium/monorepo/node/p2p" "source.quilibrium.com/quilibrium/monorepo/node/protobufs" + "source.quilibrium.com/quilibrium/monorepo/node/store" ) type SyncStatusType int @@ -42,9 +43,9 @@ type MasterClockConsensusEngine struct { engineMx sync.Mutex seenFramesMx sync.Mutex historicFramesMx sync.Mutex - // for DHT testing, we're only using in memory stores - seenFrames []*protobufs.ClockFrame - historicFrames []*protobufs.ClockFrame + seenFrames []*protobufs.ClockFrame + historicFrames []*protobufs.ClockFrame + clockStore store.ClockStore } var _ consensus.ConsensusEngine = (*MasterClockConsensusEngine)(nil) @@ -52,6 +53,7 @@ var _ consensus.ConsensusEngine = (*MasterClockConsensusEngine)(nil) func NewMasterClockConsensusEngine( engineConfig *config.EngineConfig, logger *zap.Logger, + clockStore store.ClockStore, keyManager keys.KeyManager, pubSub p2p.PubSub, ) *MasterClockConsensusEngine { @@ -88,6 +90,7 @@ func NewMasterClockConsensusEngine( input: seed, lastFrameReceivedAt: time.Time{}, syncingStatus: SyncStatusNotSyncing, + clockStore: clockStore, } if e.filter, err = hex.DecodeString(engineConfig.Filter); err != nil { @@ -107,8 +110,59 @@ func (e *MasterClockConsensusEngine) Start() <-chan error { e.state = consensus.EngineStateLoading e.logger.Info("syncing last seen state") - latestFrame := e.createGenesisFrame() - e.historicFrames = []*protobufs.ClockFrame{latestFrame} + latestFrame, err := e.clockStore.GetLatestMasterClockFrame(e.filter) + if err != nil && errors.Is(err, store.ErrNotFound) { + latestFrame = e.createGenesisFrame() + txn, err := e.clockStore.NewTransaction() + if err != nil { + panic(err) + } + + if err = e.clockStore.PutMasterClockFrame(latestFrame, txn); err != nil { + panic(err) + } + + if err = txn.Commit(); err != nil { + panic(err) + } + } else if err != nil { + panic(err) + } else { + e.setFrame(latestFrame) + } + + e.historicFrames = []*protobufs.ClockFrame{} + + if latestFrame.FrameNumber != 0 { + min := uint64(0) + if latestFrame.FrameNumber-255 > min { + min = latestFrame.FrameNumber - 255 + } + + iter, err := e.clockStore.RangeMasterClockFrames( + e.filter, + min, + latestFrame.FrameNumber-1, + ) + if err != nil { + panic(err) + } + + for iter.First(); iter.Valid(); iter.Next() { + frame, err := iter.Value() + if err != nil { + panic(err) + } + + e.historicFrames = append(e.historicFrames, frame) + } + + if err = iter.Close(); err != nil { + panic(err) + } + } + + e.historicFrames = append(e.historicFrames, latestFrame) e.logger.Info("subscribing to pubsub messages") e.pubSub.Subscribe(e.filter, e.handleMessage, true) diff --git a/node/consensus/master/peer_messaging.go b/node/consensus/master/peer_messaging.go index 660d3da..ecd9dc4 100644 --- a/node/consensus/master/peer_messaging.go +++ b/node/consensus/master/peer_messaging.go @@ -225,11 +225,35 @@ func (e *MasterClockConsensusEngine) handleClockFramesRequest( zap.Uint64("total_frames", uint64(to-from+1)), ) + iter, err := e.clockStore.RangeMasterClockFrames( + request.Filter, + from, + to, + ) + if err != nil { + return errors.Wrap(err, "handle clock frame request") + } + + response := []*protobufs.ClockFrame{} + + for iter.First(); iter.Valid(); iter.Next() { + frame, err := iter.Value() + if err != nil { + return errors.Wrap(err, "handle clock frame request") + } + + response = append(response, frame) + } + + if err = iter.Close(); err != nil { + return errors.Wrap(err, "handle clock frame request") + } + if err := e.publishMessage(channel, &protobufs.ClockFramesResponse{ Filter: request.Filter, FromFrameNumber: request.FromFrameNumber, ToFrameNumber: to, - ClockFrames: e.historicFrames[from:to], + ClockFrames: response, }); err != nil { return errors.Wrap(err, "handle clock frame request") } diff --git a/node/main.go b/node/main.go index 07d3e38..fea4e21 100644 --- a/node/main.go +++ b/node/main.go @@ -16,8 +16,21 @@ import ( ) var ( - configDirectory = flag.String("config", "./.config/", "the configuration directory") - importPrivKey = flag.String("import-priv-key", "", "creates a new config using a specific key from the phase one ceremony") + configDirectory = flag.String( + "config", + "./.config/", + "the configuration directory", + ) + importPrivKey = flag.String( + "import-priv-key", + "", + "creates a new config using a specific key from the phase one ceremony", + ) + dbConsole = flag.Bool( + "db-console", + false, + "starts the node in database console mode", + ) ) func main() { @@ -45,6 +58,16 @@ func main() { panic(err) } + if *dbConsole { + console, err := app.NewDBConsole(nodeConfig) + if err != nil { + panic(err) + } + + console.Run() + return + } + node, err := app.NewNode(nodeConfig) if err != nil { panic(err) diff --git a/node/store/clock.go b/node/store/clock.go new file mode 100644 index 0000000..1f10c3d --- /dev/null +++ b/node/store/clock.go @@ -0,0 +1,778 @@ +package store + +import ( + "encoding/binary" + + "github.com/cockroachdb/pebble" + "github.com/pkg/errors" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" + "source.quilibrium.com/quilibrium/monorepo/node/protobufs" + "source.quilibrium.com/quilibrium/monorepo/node/tries" +) + +type ClockStore interface { + NewTransaction() (Transaction, error) + GetLatestMasterClockFrame(filter []byte) (*protobufs.ClockFrame, error) + GetEarliestMasterClockFrame(filter []byte) (*protobufs.ClockFrame, error) + GetMasterClockFrame( + filter []byte, + frameNumber uint64, + ) (*protobufs.ClockFrame, error) + RangeMasterClockFrames( + filter []byte, + startFrameNumber uint64, + endFrameNumber uint64, + ) (*PebbleMasterClockIterator, error) + PutMasterClockFrame(frame *protobufs.ClockFrame, txn Transaction) error + GetLatestDataClockFrame( + filter []byte, + proverTrie *tries.RollingFrecencyCritbitTrie, + ) (*protobufs.ClockFrame, error) + GetEarliestDataClockFrame(filter []byte) (*protobufs.ClockFrame, error) + GetDataClockFrame( + filter []byte, + frameNumber uint64, + ) (*protobufs.ClockFrame, error) + RangeDataClockFrames( + filter []byte, + startFrameNumber uint64, + endFrameNumber uint64, + ) (*PebbleClockIterator, error) + PutDataClockFrame( + frame *protobufs.ClockFrame, + proverTrie *tries.RollingFrecencyCritbitTrie, + txn Transaction, + ) error + PutCandidateDataClockFrame( + parentSelector []byte, + distance []byte, + selector []byte, + frame *protobufs.ClockFrame, + txn Transaction, + ) error + GetParentDataClockFrame( + filter []byte, + frameNumber uint64, + parentSelector []byte, + ) (*protobufs.ClockFrame, error) + RangeCandidateDataClockFrames( + filter []byte, + parent []byte, + frameNumber uint64, + ) (*PebbleCandidateClockIterator, error) + GetLeadingCandidateDataClockFrame( + filter []byte, + parent []byte, + frameNumber uint64, + ) (*protobufs.ClockFrame, error) +} + +type PebbleClockStore struct { + db *pebble.DB + logger *zap.Logger +} + +var _ ClockStore = (*PebbleClockStore)(nil) + +type PebbleMasterClockIterator struct { + i *pebble.Iterator +} + +type PebbleClockIterator struct { + i *pebble.Iterator +} + +type PebbleCandidateClockIterator struct { + i *pebble.Iterator +} + +var _ Iterator[*protobufs.ClockFrame] = (*PebbleMasterClockIterator)(nil) +var _ Iterator[*protobufs.ClockFrame] = (*PebbleClockIterator)(nil) +var _ Iterator[*protobufs.ClockFrame] = (*PebbleCandidateClockIterator)(nil) + +func (p *PebbleMasterClockIterator) First() bool { + return p.i.First() +} + +func (p *PebbleMasterClockIterator) Next() bool { + return p.i.Next() +} + +func (p *PebbleMasterClockIterator) Valid() bool { + return p.i.Valid() +} + +func (p *PebbleMasterClockIterator) Value() (*protobufs.ClockFrame, error) { + if !p.i.Valid() { + return nil, ErrNotFound + } + + key := p.i.Key() + value := p.i.Value() + frame := &protobufs.ClockFrame{} + + frameNumber, filter, err := extractFrameNumberAndFilterFromMasterFrameKey(key) + if err != nil { + return nil, errors.Wrap(err, "get master clock frame iterator value") + } + + frame.FrameNumber = frameNumber + frame.Filter = filter + + if len(value) < 521 { + return nil, errors.Wrap( + ErrInvalidData, + "get master clock frame iterator value", + ) + } + + frame.Difficulty = binary.BigEndian.Uint32(value[:4]) + frame.Input = value[4 : len(value)-516] + frame.Output = value[len(value)-516:] + + return frame, nil +} + +func (p *PebbleMasterClockIterator) Close() error { + return errors.Wrap(p.i.Close(), "closing master clock iterator") +} + +func (p *PebbleClockIterator) First() bool { + return p.i.First() +} + +func (p *PebbleClockIterator) Next() bool { + return p.i.Next() +} + +func (p *PebbleClockIterator) Valid() bool { + return p.i.Valid() +} + +func (p *PebbleClockIterator) Value() (*protobufs.ClockFrame, error) { + if !p.i.Valid() { + return nil, ErrNotFound + } + + value := p.i.Value() + frame := &protobufs.ClockFrame{} + if err := proto.Unmarshal(value, frame); err != nil { + return nil, errors.Wrap( + errors.Wrap(err, ErrInvalidData.Error()), + "get clock frame iterator value", + ) + } + + return frame, nil +} + +func (p *PebbleClockIterator) Close() error { + return errors.Wrap(p.i.Close(), "closing clock frame iterator") +} + +func (p *PebbleCandidateClockIterator) First() bool { + return p.i.First() +} + +func (p *PebbleCandidateClockIterator) Next() bool { + return p.i.Next() +} + +func (p *PebbleCandidateClockIterator) Valid() bool { + return p.i.Valid() +} + +func (p *PebbleCandidateClockIterator) Value() (*protobufs.ClockFrame, error) { + if !p.i.Valid() { + return nil, ErrNotFound + } + + value := p.i.Value() + frame := &protobufs.ClockFrame{} + if err := proto.Unmarshal(value, frame); err != nil { + return nil, errors.Wrap( + errors.Wrap(err, ErrInvalidData.Error()), + "get candidate clock frame iterator value", + ) + } + + return frame, nil +} + +func (p *PebbleCandidateClockIterator) Close() error { + return errors.Wrap(p.i.Close(), "closing candidate clock frame iterator") +} + +func NewPebbleClockStore(db *pebble.DB, logger *zap.Logger) *PebbleClockStore { + return &PebbleClockStore{ + db, + logger, + } +} + +const CLOCK_FRAME = 0x00 +const CLOCK_MASTER_FRAME_DATA = 0x00 +const CLOCK_DATA_FRAME_DATA = 0x01 +const CLOCK_DATA_FRAME_CANDIDATE_DATA = 0x02 +const CLOCK_DATA_FRAME_FRECENCY_DATA = 0x03 +const CLOCK_MASTER_FRAME_INDEX_EARLIEST = 0x10 | CLOCK_MASTER_FRAME_DATA +const CLOCK_MASTER_FRAME_INDEX_LATEST = 0x20 | CLOCK_MASTER_FRAME_DATA +const CLOCK_MASTER_FRAME_INDEX_PARENT = 0x30 | CLOCK_MASTER_FRAME_DATA +const CLOCK_DATA_FRAME_INDEX_EARLIEST = 0x10 | CLOCK_DATA_FRAME_DATA +const CLOCK_DATA_FRAME_INDEX_LATEST = 0x20 | CLOCK_DATA_FRAME_DATA +const CLOCK_DATA_FRAME_INDEX_PARENT = 0x30 | CLOCK_DATA_FRAME_DATA + +// +// DB Keys +// +// Keys are structured as: +// [] +// Increment necessarily must be full width – elsewise the frame number would +// easily produce conflicts if filters are stepped by byte: +// 0x01 || 0xffff == 0x01ff || 0xff +// +// Master frames are serialized as output data only, Data frames are raw +// protobufs for fast disk-to-network output. + +func clockFrameKey(filter []byte, frameNumber uint64, frameType byte) []byte { + key := []byte{CLOCK_FRAME, frameType} + key = binary.BigEndian.AppendUint64(key, frameNumber) + key = append(key, filter...) + return key +} + +func clockMasterFrameKey(filter []byte, frameNumber uint64) []byte { + return clockFrameKey(filter, frameNumber, CLOCK_MASTER_FRAME_DATA) +} + +func extractFrameNumberAndFilterFromMasterFrameKey( + key []byte, +) (uint64, []byte, error) { + if len(key) < 11 { + return 0, nil, errors.Wrap( + ErrInvalidData, + "extract frame number and filter from master frame key", + ) + } + + return binary.BigEndian.Uint64(key[2:10]), key[10:], nil +} + +func clockDataFrameKey( + filter []byte, + frameNumber uint64, +) []byte { + return clockFrameKey(filter, frameNumber, CLOCK_DATA_FRAME_DATA) +} + +func clockLatestIndex(filter []byte, frameType byte) []byte { + key := []byte{CLOCK_FRAME, frameType} + key = append(key, filter...) + return key +} + +func clockMasterLatestIndex(filter []byte) []byte { + return clockLatestIndex(filter, CLOCK_MASTER_FRAME_INDEX_LATEST) +} + +func clockDataLatestIndex(filter []byte) []byte { + return clockLatestIndex(filter, CLOCK_DATA_FRAME_INDEX_LATEST) +} + +func clockEarliestIndex(filter []byte, frameType byte) []byte { + key := []byte{CLOCK_FRAME, frameType} + key = append(key, filter...) + return key +} + +func clockMasterEarliestIndex(filter []byte) []byte { + return clockEarliestIndex(filter, CLOCK_MASTER_FRAME_INDEX_EARLIEST) +} + +func clockDataEarliestIndex(filter []byte) []byte { + return clockEarliestIndex(filter, CLOCK_DATA_FRAME_INDEX_EARLIEST) +} + +func clockParentIndexKey( + filter []byte, + frameNumber uint64, + selector []byte, + frameType byte, +) []byte { + key := []byte{CLOCK_FRAME, frameType} + key = binary.BigEndian.AppendUint64(key, frameNumber) + key = append(key, filter...) + key = append(key, rightAlign(selector, 32)...) + return key +} + +func clockDataParentIndexKey( + filter []byte, + frameNumber uint64, + selector []byte, +) []byte { + return clockParentIndexKey( + filter, + frameNumber, + selector, + CLOCK_DATA_FRAME_INDEX_PARENT, + ) +} + +func clockDataCandidateFrameKey( + filter []byte, + frameNumber uint64, + parent []byte, + distance []byte, +) []byte { + key := []byte{CLOCK_FRAME, CLOCK_DATA_FRAME_CANDIDATE_DATA} + key = binary.BigEndian.AppendUint64(key, frameNumber) + key = append(key, filter...) + key = append(key, rightAlign(parent, 32)...) + key = append(key, rightAlign(distance, 32)...) + return key +} + +func clockProverTrieKey(filter []byte, frameNumber uint64) []byte { + key := []byte{CLOCK_FRAME, CLOCK_DATA_FRAME_FRECENCY_DATA} + key = binary.BigEndian.AppendUint64(key, frameNumber) + key = append(key, filter...) + return key +} + +func (p *PebbleClockStore) NewTransaction() (Transaction, error) { + return &PebbleTransaction{ + b: p.db.NewBatch(), + }, nil +} + +// GetEarliestMasterClockFrame implements ClockStore. +func (p *PebbleClockStore) GetEarliestMasterClockFrame( + filter []byte, +) (*protobufs.ClockFrame, error) { + idxValue, closer, err := p.db.Get(clockMasterEarliestIndex(filter)) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return nil, ErrNotFound + } + + return nil, errors.Wrap(err, "get earliest master clock frame") + } + + defer closer.Close() + frameNumber := binary.BigEndian.Uint64(idxValue) + frame, err := p.GetMasterClockFrame(filter, frameNumber) + if err != nil { + return nil, errors.Wrap(err, "get earliest master clock frame") + } + + return frame, nil +} + +// GetLatestMasterClockFrame implements ClockStore. +func (p *PebbleClockStore) GetLatestMasterClockFrame( + filter []byte, +) (*protobufs.ClockFrame, error) { + idxValue, closer, err := p.db.Get(clockMasterLatestIndex(filter)) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return nil, ErrNotFound + } + + return nil, errors.Wrap(err, "get latest master clock frame") + } + + defer closer.Close() + frameNumber := binary.BigEndian.Uint64(idxValue) + frame, err := p.GetMasterClockFrame(filter, frameNumber) + if err != nil { + return nil, errors.Wrap(err, "get latest master clock frame") + } + + return frame, nil +} + +// GetMasterClockFrame implements ClockStore. +func (p *PebbleClockStore) GetMasterClockFrame( + filter []byte, + frameNumber uint64, +) (*protobufs.ClockFrame, error) { + value, closer, err := p.db.Get(clockMasterFrameKey(filter, frameNumber)) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return nil, ErrNotFound + } + + return nil, errors.Wrap(err, "get master clock frame") + } + + defer closer.Close() + frame := &protobufs.ClockFrame{} + frame.FrameNumber = frameNumber + frame.Filter = filter + frame.Difficulty = binary.BigEndian.Uint32(value[:4]) + frame.Input = value[4 : len(value)-516] + frame.Output = value[len(value)-516:] + + return frame, nil +} + +// RangeMasterClockFrames implements ClockStore. +func (p *PebbleClockStore) RangeMasterClockFrames( + filter []byte, + startFrameNumber uint64, + endFrameNumber uint64, +) (*PebbleMasterClockIterator, error) { + if startFrameNumber > endFrameNumber { + temp := endFrameNumber + endFrameNumber = startFrameNumber + startFrameNumber = temp + } + + iter := p.db.NewIter(&pebble.IterOptions{ + LowerBound: clockMasterFrameKey(filter, startFrameNumber), + UpperBound: clockMasterFrameKey(filter, endFrameNumber), + }) + + return &PebbleMasterClockIterator{i: iter}, nil +} + +// PutMasterClockFrame implements ClockStore. +func (p *PebbleClockStore) PutMasterClockFrame( + frame *protobufs.ClockFrame, + txn Transaction, +) error { + data := binary.BigEndian.AppendUint32([]byte{}, frame.Difficulty) + data = append(data, frame.Input...) + data = append(data, frame.Output...) + + frameNumberBytes := make([]byte, 8) + binary.BigEndian.PutUint64(frameNumberBytes, frame.FrameNumber) + + if err := txn.Set( + clockMasterFrameKey(frame.Filter, frame.FrameNumber), + data, + ); err != nil { + return errors.Wrap(err, "put master clock frame") + } + + _, closer, err := p.db.Get(clockMasterEarliestIndex(frame.Filter)) + if err != nil { + if !errors.Is(err, pebble.ErrNotFound) { + return errors.Wrap(err, "put master clock frame") + } + + if err = txn.Set( + clockMasterEarliestIndex(frame.Filter), + frameNumberBytes, + ); err != nil { + return errors.Wrap(err, "put master clock frame") + } + } + + if err == nil && closer != nil { + closer.Close() + } + + if err = txn.Set( + clockMasterLatestIndex(frame.Filter), + frameNumberBytes, + ); err != nil { + return errors.Wrap(err, "put master clock frame") + } + + return nil +} + +// GetDataClockFrame implements ClockStore. +func (p *PebbleClockStore) GetDataClockFrame( + filter []byte, + frameNumber uint64, +) (*protobufs.ClockFrame, error) { + value, closer, err := p.db.Get(clockDataFrameKey(filter, frameNumber)) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return nil, ErrNotFound + } + + return nil, errors.Wrap(err, "get data clock frame") + } + + defer closer.Close() + frame := &protobufs.ClockFrame{} + if err := proto.Unmarshal(value, frame); err != nil { + return nil, errors.Wrap( + errors.Wrap(err, ErrInvalidData.Error()), + "get data clock frame", + ) + } + + return frame, nil +} + +// GetEarliestDataClockFrame implements ClockStore. +func (p *PebbleClockStore) GetEarliestDataClockFrame( + filter []byte, +) (*protobufs.ClockFrame, error) { + idxValue, closer, err := p.db.Get(clockDataEarliestIndex(filter)) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return nil, ErrNotFound + } + + return nil, errors.Wrap(err, "get earliest data clock frame") + } + + defer closer.Close() + frameNumber := binary.BigEndian.Uint64(idxValue) + frame, err := p.GetDataClockFrame(filter, frameNumber) + if err != nil { + return nil, errors.Wrap(err, "get earliest data clock frame") + } + + return frame, nil +} + +// GetLatestDataClockFrame implements ClockStore. +func (p *PebbleClockStore) GetLatestDataClockFrame( + filter []byte, + proverTrie *tries.RollingFrecencyCritbitTrie, +) (*protobufs.ClockFrame, error) { + idxValue, closer, err := p.db.Get(clockDataLatestIndex(filter)) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return nil, ErrNotFound + } + + return nil, errors.Wrap(err, "get latest data clock frame") + } + + frameNumber := binary.BigEndian.Uint64(idxValue) + frame, err := p.GetDataClockFrame(filter, frameNumber) + if err != nil { + return nil, errors.Wrap(err, "get latest data clock frame") + } + + closer.Close() + if proverTrie != nil { + trieData, closer, err := p.db.Get(clockProverTrieKey(filter, frameNumber)) + if err != nil { + return nil, errors.Wrap(err, "get latest data clock frame") + } + + defer closer.Close() + + if err := proverTrie.Deserialize(trieData); err != nil { + return nil, errors.Wrap(err, "get latest data clock frame") + } + } + + return frame, nil +} + +// GetLeadingCandidateDataClockFrame implements ClockStore. +func (p *PebbleClockStore) GetLeadingCandidateDataClockFrame( + filter []byte, + parent []byte, + frameNumber uint64, +) (*protobufs.ClockFrame, error) { + iter, err := p.RangeCandidateDataClockFrames(filter, parent, frameNumber) + if err != nil { + return nil, errors.Wrap(err, "get leading candidate data clock frame") + } + + if !iter.First() { + return nil, ErrNotFound + } + + defer iter.Close() + frame, err := iter.Value() + return frame, errors.Wrap(err, "get leading candidate data clock frame") +} + +// GetParentDataClockFrame implements ClockStore. +func (p *PebbleClockStore) GetParentDataClockFrame( + filter []byte, + frameNumber uint64, + parentSelector []byte, +) (*protobufs.ClockFrame, error) { + data, closer, err := p.db.Get( + clockDataParentIndexKey(filter, frameNumber, parentSelector), + ) + if err != nil { + return nil, errors.Wrap(err, "get parent data clock frame") + } + + parent := &protobufs.ClockFrame{} + if err := proto.Unmarshal(data, parent); err != nil { + return nil, errors.Wrap(err, "get parent data clock frame") + } + + if closer != nil { + closer.Close() + } + + return parent, nil +} + +// PutCandidateDataClockFrame implements ClockStore. +func (p *PebbleClockStore) PutCandidateDataClockFrame( + parentSelector []byte, + distance []byte, + selector []byte, + frame *protobufs.ClockFrame, + txn Transaction, +) error { + data, err := proto.Marshal(frame) + if err != nil { + return errors.Wrap( + errors.Wrap(err, ErrInvalidData.Error()), + "put candidate data clock frame", + ) + } + + if err = txn.Set( + clockDataCandidateFrameKey( + frame.Filter, + frame.FrameNumber, + frame.ParentSelector, + distance, + ), + data, + ); err != nil { + return errors.Wrap(err, "put candidate data clock frame") + } + + if err = txn.Set( + clockDataParentIndexKey( + frame.Filter, + frame.FrameNumber, + selector, + ), + data, + ); err != nil { + return errors.Wrap(err, "put candidate data clock frame") + } + + return nil +} + +// PutDataClockFrame implements ClockStore. +func (p *PebbleClockStore) PutDataClockFrame( + frame *protobufs.ClockFrame, + proverTrie *tries.RollingFrecencyCritbitTrie, + txn Transaction, +) error { + data, err := proto.Marshal(frame) + if err != nil { + return errors.Wrap( + errors.Wrap(err, ErrInvalidData.Error()), + "put data clock frame", + ) + } + + frameNumberBytes := make([]byte, 8) + binary.BigEndian.PutUint64(frameNumberBytes, frame.FrameNumber) + + if err = txn.Set( + clockDataFrameKey(frame.Filter, frame.FrameNumber), + data, + ); err != nil { + return errors.Wrap(err, "put data clock frame") + } + + proverData, err := proverTrie.Serialize() + if err != nil { + return errors.Wrap(err, "put data clock frame") + } + + if err = txn.Set( + clockProverTrieKey(frame.Filter, frame.FrameNumber), + proverData, + ); err != nil { + return errors.Wrap(err, "put data clock frame") + } + + _, closer, err := p.db.Get(clockDataEarliestIndex(frame.Filter)) + if err != nil { + if !errors.Is(err, pebble.ErrNotFound) { + return errors.Wrap(err, "put data clock frame") + } + + if err = txn.Set( + clockDataEarliestIndex(frame.Filter), + frameNumberBytes, + ); err != nil { + return errors.Wrap(err, "put data clock frame") + } + } + + if err == nil && closer != nil { + closer.Close() + } + + if err = txn.Set( + clockDataLatestIndex(frame.Filter), + frameNumberBytes, + ); err != nil { + return errors.Wrap(err, "put data clock frame") + } + + return nil +} + +// RangeCandidateDataClockFrames implements ClockStore. +// Distance is 32-byte aligned, so we just use a 0x00 * 32 -> 0xff * 32 range +func (p *PebbleClockStore) RangeCandidateDataClockFrames( + filter []byte, + parent []byte, + frameNumber uint64, +) (*PebbleCandidateClockIterator, error) { + iter := p.db.NewIter(&pebble.IterOptions{ + LowerBound: clockDataCandidateFrameKey( + filter, + frameNumber, + parent, + []byte{ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + }, + ), + UpperBound: clockDataCandidateFrameKey( + filter, + frameNumber, + parent, + []byte{ + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + }, + ), + }) + + return &PebbleCandidateClockIterator{i: iter}, nil +} + +// RangeDataClockFrames implements ClockStore. +func (p *PebbleClockStore) RangeDataClockFrames( + filter []byte, + startFrameNumber uint64, + endFrameNumber uint64, +) (*PebbleClockIterator, error) { + if startFrameNumber > endFrameNumber { + temp := endFrameNumber + endFrameNumber = startFrameNumber + startFrameNumber = temp + } + + iter := p.db.NewIter(&pebble.IterOptions{ + LowerBound: clockDataFrameKey(filter, startFrameNumber), + UpperBound: clockDataFrameKey(filter, endFrameNumber), + }) + + return &PebbleClockIterator{i: iter}, nil +} diff --git a/node/store/errors.go b/node/store/errors.go new file mode 100644 index 0000000..b894433 --- /dev/null +++ b/node/store/errors.go @@ -0,0 +1,8 @@ +package store + +import "errors" + +var ( + ErrNotFound = errors.New("item not found") + ErrInvalidData = errors.New("invalid data") +) diff --git a/node/store/iterator.go b/node/store/iterator.go new file mode 100644 index 0000000..d239ee1 --- /dev/null +++ b/node/store/iterator.go @@ -0,0 +1,11 @@ +package store + +import "google.golang.org/protobuf/proto" + +type Iterator[T proto.Message] interface { + First() bool + Next() bool + Valid() bool + Value() (T, error) + Close() error +} diff --git a/node/store/key.go b/node/store/key.go new file mode 100644 index 0000000..371d015 --- /dev/null +++ b/node/store/key.go @@ -0,0 +1,531 @@ +package store + +import ( + "encoding/binary" + + "github.com/cockroachdb/pebble" + "github.com/pkg/errors" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" + "source.quilibrium.com/quilibrium/monorepo/node/protobufs" +) + +type KeyStore interface { + NewTransaction() (Transaction, error) + StageProvingKey(provingKey *protobufs.ProvingKeyAnnouncement) error + IncludeProvingKey( + inclusionCommitment *protobufs.InclusionCommitment, + txn Transaction, + ) error + GetStagedProvingKey( + provingKey []byte, + ) (*protobufs.ProvingKeyAnnouncement, error) + GetProvingKey(provingKey []byte) (*protobufs.InclusionCommitment, error) + GetKeyBundle( + provingKey []byte, + frameNumber uint64, + ) (*protobufs.InclusionCommitment, error) + GetLatestKeyBundle(provingKey []byte) (*protobufs.InclusionCommitment, error) + PutKeyBundle( + provingKey []byte, + keyBundleCommitment *protobufs.InclusionCommitment, + txn Transaction, + ) error + RangeProvingKeys() (*PebbleProvingKeyIterator, error) + RangeStagedProvingKeys() (*PebbleStagedProvingKeyIterator, error) + RangeKeyBundleKeys(provingKey []byte) (*PebbleKeyBundleIterator, error) +} + +type PebbleKeyStore struct { + db *pebble.DB + logger *zap.Logger +} + +type PebbleProvingKeyIterator struct { + i *pebble.Iterator +} + +type PebbleStagedProvingKeyIterator struct { + i *pebble.Iterator +} + +type PebbleKeyBundleIterator struct { + i *pebble.Iterator +} + +var pki = (*PebbleProvingKeyIterator)(nil) +var spki = (*PebbleStagedProvingKeyIterator)(nil) +var kbi = (*PebbleKeyBundleIterator)(nil) +var _ Iterator[*protobufs.InclusionCommitment] = pki +var _ Iterator[*protobufs.ProvingKeyAnnouncement] = spki +var _ Iterator[*protobufs.InclusionCommitment] = kbi +var _ KeyStore = (*PebbleKeyStore)(nil) + +func (p *PebbleProvingKeyIterator) First() bool { + return p.i.First() +} + +func (p *PebbleProvingKeyIterator) Next() bool { + return p.i.Next() +} + +func (p *PebbleProvingKeyIterator) Valid() bool { + return p.i.Valid() +} + +func (p *PebbleProvingKeyIterator) Value() ( + *protobufs.InclusionCommitment, + error, +) { + if !p.i.Valid() { + return nil, ErrNotFound + } + + value := p.i.Value() + frame := &protobufs.InclusionCommitment{} + if err := proto.Unmarshal(value, frame); err != nil { + return nil, errors.Wrap( + errors.Wrap(err, ErrInvalidData.Error()), + "get proving key iterator value", + ) + } + + return frame, nil +} + +func (p *PebbleProvingKeyIterator) Close() error { + return errors.Wrap(p.i.Close(), "closing iterator") +} + +func (p *PebbleStagedProvingKeyIterator) First() bool { + return p.i.First() +} + +func (p *PebbleStagedProvingKeyIterator) Next() bool { + return p.i.Next() +} + +func (p *PebbleStagedProvingKeyIterator) Valid() bool { + return p.i.Valid() +} + +func (p *PebbleStagedProvingKeyIterator) Value() ( + *protobufs.ProvingKeyAnnouncement, + error, +) { + if !p.i.Valid() { + return nil, ErrNotFound + } + + value := p.i.Value() + frame := &protobufs.ProvingKeyAnnouncement{} + if err := proto.Unmarshal(value, frame); err != nil { + return nil, errors.Wrap( + errors.Wrap(err, ErrInvalidData.Error()), + "get staged proving key iterator value", + ) + } + + return frame, nil +} + +func (p *PebbleStagedProvingKeyIterator) Close() error { + return errors.Wrap(p.i.Close(), "closing iterator") +} + +func (p *PebbleKeyBundleIterator) First() bool { + return p.i.First() +} + +func (p *PebbleKeyBundleIterator) Next() bool { + return p.i.Next() +} + +func (p *PebbleKeyBundleIterator) Valid() bool { + return p.i.Valid() +} + +func (p *PebbleKeyBundleIterator) Value() ( + *protobufs.InclusionCommitment, + error, +) { + if !p.i.Valid() { + return nil, ErrNotFound + } + + value := p.i.Value() + frame := &protobufs.InclusionCommitment{} + if err := proto.Unmarshal(value, frame); err != nil { + return nil, errors.Wrap( + errors.Wrap(err, ErrInvalidData.Error()), + "get key bundle iterator value", + ) + } + + return frame, nil +} + +func (p *PebbleKeyBundleIterator) Close() error { + return errors.Wrap(p.i.Close(), "closing iterator") +} + +func NewPebbleKeyStore(db *pebble.DB, logger *zap.Logger) *PebbleKeyStore { + return &PebbleKeyStore{ + db, + logger, + } +} + +const ( + PROVING_KEY = 0x01 + PROVING_KEY_STAGED = 0x02 + KEY_BUNDLE = 0x03 + KEY_DATA = 0x00 + KEY_BUNDLE_INDEX_EARLIEST = 0x10 + KEY_BUNDLE_INDEX_LATEST = 0x20 +) + +func provingKeyKey(provingKey []byte) []byte { + key := []byte{PROVING_KEY, KEY_DATA} + key = append(key, provingKey...) + return key +} + +func stagedProvingKeyKey(provingKey []byte) []byte { + key := []byte{PROVING_KEY_STAGED, KEY_DATA} + key = append(key, provingKey...) + return key +} + +func keyBundleKey(provingKey []byte, frameNumber uint64) []byte { + key := []byte{KEY_BUNDLE, KEY_DATA} + key = append(key, provingKey...) + key = binary.BigEndian.AppendUint64(key, frameNumber) + return key +} + +func keyBundleLatestKey(provingKey []byte) []byte { + key := []byte{KEY_BUNDLE, KEY_BUNDLE_INDEX_LATEST} + key = append(key, provingKey...) + return key +} + +func keyBundleEarliestKey(provingKey []byte) []byte { + key := []byte{KEY_BUNDLE, KEY_BUNDLE_INDEX_EARLIEST} + key = append(key, provingKey...) + return key +} + +func (p *PebbleKeyStore) NewTransaction() (Transaction, error) { + return &PebbleTransaction{ + b: p.db.NewBatch(), + }, nil +} + +// Stages a proving key for later inclusion on proof of meaningful work. +// Does not verify, upstream callers must verify. +func (p *PebbleKeyStore) StageProvingKey( + provingKey *protobufs.ProvingKeyAnnouncement, +) error { + data, err := proto.Marshal(provingKey) + if err != nil { + return errors.Wrap(err, "stage proving key") + } + + err = p.db.Set( + stagedProvingKeyKey(provingKey.PublicKey()), + data, + &pebble.WriteOptions{ + Sync: true, + }, + ) + if err != nil { + return errors.Wrap(err, "stage proving key") + } + + return nil +} + +// Includes a proving key with an inclusion commitment. If a proving key is +// staged, promotes it by including it in the primary key store and deletes the +// staged key. +func (p *PebbleKeyStore) IncludeProvingKey( + inclusionCommitment *protobufs.InclusionCommitment, + txn Transaction, +) error { + provingKey := &protobufs.ProvingKeyAnnouncement{} + if err := proto.Unmarshal(inclusionCommitment.Data, provingKey); err != nil { + return errors.Wrap(err, "include proving key") + } + + if err := provingKey.Verify(); err != nil { + return errors.Wrap(err, "include proving key") + } + + data, err := proto.Marshal(inclusionCommitment) + if err != nil { + return errors.Wrap(err, "include proving key") + } + + txn.Set( + provingKeyKey(provingKey.PublicKey()), + data, + ) + + staged, closer, err := p.db.Get(stagedProvingKeyKey(provingKey.PublicKey())) + if err != nil && !errors.Is(err, ErrNotFound) { + return errors.Wrap(err, "include proving key") + } + + if staged != nil { + if err := txn.Delete( + stagedProvingKeyKey(provingKey.PublicKey()), + ); err != nil { + return errors.Wrap(err, "include proving key") + } + } + if err := closer.Close(); err != nil { + return errors.Wrap(err, "include proving key") + } + + return nil +} + +func (p *PebbleKeyStore) GetStagedProvingKey( + provingKey []byte, +) (*protobufs.ProvingKeyAnnouncement, error) { + data, closer, err := p.db.Get(stagedProvingKeyKey(provingKey)) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return nil, ErrNotFound + } + + return nil, errors.Wrap(err, "get staged proving key") + } + + stagedKey := &protobufs.ProvingKeyAnnouncement{} + if err = proto.Unmarshal(data, stagedKey); err != nil { + return nil, errors.Wrap(err, "get staged proving key") + } + + if err := closer.Close(); err != nil { + return nil, errors.Wrap(err, "get staged proving key") + } + + return stagedKey, nil +} + +// Returns the latest key bundle for a given proving key. +func (p *PebbleKeyStore) GetLatestKeyBundle( + provingKey []byte, +) (*protobufs.InclusionCommitment, error) { + value, closer, err := p.db.Get(keyBundleLatestKey(provingKey)) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return nil, ErrNotFound + } + + return nil, errors.Wrap(err, "get latest key bundle") + } + frameNumber := binary.BigEndian.Uint64(value) + + if err := closer.Close(); err != nil { + return nil, errors.Wrap(err, "get latest key bundle") + } + + value, closer, err = p.db.Get(keyBundleKey(provingKey, frameNumber)) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return nil, ErrNotFound + } + + return nil, errors.Wrap(err, "get latest key bundle") + } + + defer closer.Close() + + announcement := &protobufs.InclusionCommitment{} + if err := proto.Unmarshal(value, announcement); err != nil { + return nil, errors.Wrap( + errors.Wrap(err, ErrInvalidData.Error()), + "get latest key bundle", + ) + } + + return announcement, nil +} + +// Retrieves the specific key bundle included at a given frame number. +func (p *PebbleKeyStore) GetKeyBundle( + provingKey []byte, + frameNumber uint64, +) (*protobufs.InclusionCommitment, error) { + value, closer, err := p.db.Get(keyBundleKey(provingKey, frameNumber)) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return nil, ErrNotFound + } + + return nil, errors.Wrap(err, "get key bundle") + } + + defer closer.Close() + + announcement := &protobufs.InclusionCommitment{} + if err := proto.Unmarshal(value, announcement); err != nil { + return nil, errors.Wrap( + errors.Wrap(err, ErrInvalidData.Error()), + "get key bundle", + ) + } + + return announcement, nil +} + +// Retrieves an included proving key, returns ErrNotFound if not present. +func (p *PebbleKeyStore) GetProvingKey( + provingKey []byte, +) (*protobufs.InclusionCommitment, error) { + value, closer, err := p.db.Get(provingKeyKey(provingKey)) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return nil, ErrNotFound + } + + return nil, errors.Wrap(err, "get proving key") + } + + defer closer.Close() + + announcement := &protobufs.InclusionCommitment{} + if err := proto.Unmarshal(value, announcement); err != nil { + return nil, errors.Wrap( + errors.Wrap(err, ErrInvalidData.Error()), + "get proving key", + ) + } + + return announcement, nil +} + +// Inserts a key bundle with inclusion commitment. Does not verify, upstream +// callers must perform the verification. +func (p *PebbleKeyStore) PutKeyBundle( + provingKey []byte, + keyBundle *protobufs.InclusionCommitment, + txn Transaction, +) error { + data, err := proto.Marshal(keyBundle) + if err != nil { + return errors.Wrap( + errors.Wrap(err, ErrInvalidData.Error()), + "put key bundle", + ) + } + + frameNumberBytes := make([]byte, 8) + binary.BigEndian.PutUint64(frameNumberBytes, keyBundle.FrameNumber) + + if err = txn.Set( + keyBundleKey(provingKey, keyBundle.FrameNumber), + data, + ); err != nil { + return errors.Wrap(err, "put key bundle") + } + + _, closer, err := p.db.Get(keyBundleEarliestKey(provingKey)) + if err != nil { + if !errors.Is(err, pebble.ErrNotFound) { + return errors.Wrap(err, "put key bundle") + } + + if err = txn.Set( + keyBundleEarliestKey(provingKey), + frameNumberBytes, + ); err != nil { + return errors.Wrap(err, "put key bundle") + } + } + + if err == nil && closer != nil { + closer.Close() + } + + if err = txn.Set( + keyBundleLatestKey(provingKey), + frameNumberBytes, + ); err != nil { + return errors.Wrap(err, "put key bundle") + } + + return nil +} + +func (p *PebbleKeyStore) RangeProvingKeys() (*PebbleProvingKeyIterator, error) { + iter := p.db.NewIter(&pebble.IterOptions{ + LowerBound: provingKeyKey([]byte{ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, + }), + UpperBound: provingKeyKey([]byte{ + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, + }), + }) + + return &PebbleProvingKeyIterator{i: iter}, nil +} + +func (p *PebbleKeyStore) RangeStagedProvingKeys() ( + *PebbleStagedProvingKeyIterator, + error, +) { + iter := p.db.NewIter(&pebble.IterOptions{ + LowerBound: stagedProvingKeyKey([]byte{ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, + }), + UpperBound: stagedProvingKeyKey([]byte{ + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, + }), + }) + + return &PebbleStagedProvingKeyIterator{i: iter}, nil +} + +func (p *PebbleKeyStore) RangeKeyBundleKeys(provingKey []byte) ( + *PebbleKeyBundleIterator, + error, +) { + iter := p.db.NewIter(&pebble.IterOptions{ + LowerBound: keyBundleKey(provingKey, 0), + UpperBound: keyBundleKey(provingKey, 0xffffffffffffffff), + }) + + return &PebbleKeyBundleIterator{i: iter}, nil +} diff --git a/node/store/pebble.go b/node/store/pebble.go new file mode 100644 index 0000000..b3001fd --- /dev/null +++ b/node/store/pebble.go @@ -0,0 +1,55 @@ +package store + +import ( + "github.com/cockroachdb/pebble" + "source.quilibrium.com/quilibrium/monorepo/node/config" +) + +func NewPebbleDB(config *config.DBConfig) *pebble.DB { + db, err := pebble.Open(config.Path, &pebble.Options{}) + if err != nil { + panic(err) + } + + return db +} + +type Transaction interface { + Set(key []byte, value []byte) error + Commit() error + Delete(key []byte) error +} + +type PebbleTransaction struct { + b *pebble.Batch +} + +func (t *PebbleTransaction) Set(key []byte, value []byte) error { + return t.b.Set(key, value, &pebble.WriteOptions{Sync: true}) +} + +func (t *PebbleTransaction) Commit() error { + return t.b.Commit(&pebble.WriteOptions{Sync: true}) +} + +func (t *PebbleTransaction) Delete(key []byte) error { + return t.b.Delete(key, &pebble.WriteOptions{Sync: true}) +} + +var _ Transaction = (*PebbleTransaction)(nil) + +func rightAlign(data []byte, size int) []byte { + l := len(data) + + if l == size { + return data + } + + if l > size { + return data[l-size:] + } + + pad := make([]byte, size) + copy(pad[size-l:], data) + return pad +}