This commit is contained in:
Cassandra Heart 2024-02-16 15:46:54 -06:00 committed by GitHub
parent d499a60937
commit 1b810d624c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 67 additions and 21 deletions

View File

@ -886,11 +886,11 @@ func logoVersion(width int) string {
out += " ####################################### ########\n" out += " ####################################### ########\n"
out += " ############################# ##\n" out += " ############################# ##\n"
out += " \n" out += " \n"
out += " Quilibrium Node - v1.2.6 Dawn\n" out += " Quilibrium Node - v1.2.7 Dawn\n"
out += " \n" out += " \n"
out += " DB Console\n" out += " DB Console\n"
} else { } else {
out = "Quilibrium Node - v1.2.6 Dawn - DB Console\n" out = "Quilibrium Node - v1.2.7 Dawn - DB Console\n"
} }
return out return out
} }

View File

@ -425,6 +425,8 @@ func (e *CeremonyDataClockConsensusEngine) runLoop() {
e.frameProverTrie.FindNearest(e.provingKeyAddress).External.Key, e.frameProverTrie.FindNearest(e.provingKeyAddress).External.Key,
e.provingKeyAddress, e.provingKeyAddress,
) { ) {
e.dataTimeReel.Insert(nextFrame)
if err = e.publishProof(nextFrame); err != nil { if err = e.publishProof(nextFrame); err != nil {
e.logger.Error("could not publish", zap.Error(err)) e.logger.Error("could not publish", zap.Error(err))
e.state = consensus.EngineStateCollecting e.state = consensus.EngineStateCollecting
@ -453,6 +455,8 @@ func (e *CeremonyDataClockConsensusEngine) runLoop() {
} }
if e.frameProverTrie.Contains(e.provingKeyAddress) { if e.frameProverTrie.Contains(e.provingKeyAddress) {
e.dataTimeReel.Insert(nextFrame)
if err = e.publishProof(nextFrame); err != nil { if err = e.publishProof(nextFrame); err != nil {
e.logger.Error("could not publish", zap.Error(err)) e.logger.Error("could not publish", zap.Error(err))
e.state = consensus.EngineStateCollecting e.state = consensus.EngineStateCollecting

View File

@ -346,6 +346,10 @@ func (e *CeremonyDataClockConsensusEngine) collect(
} }
} }
if latest.FrameNumber < currentFramePublished.FrameNumber {
latest = currentFramePublished
}
e.logger.Info( e.logger.Info(
"returning leader frame", "returning leader frame",
zap.Uint64("frame_number", latest.FrameNumber), zap.Uint64("frame_number", latest.FrameNumber),

View File

@ -59,5 +59,5 @@ func GetMinimumVersion() []byte {
} }
func GetVersion() []byte { func GetVersion() []byte {
return []byte{0x01, 0x02, 0x06} return []byte{0x01, 0x02, 0x07}
} }

View File

@ -5,6 +5,7 @@ import (
"math/big" "math/big"
"sync" "sync"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/pkg/errors" "github.com/pkg/errors"
"go.uber.org/zap" "go.uber.org/zap"
"source.quilibrium.com/quilibrium/monorepo/node/config" "source.quilibrium.com/quilibrium/monorepo/node/config"
@ -35,6 +36,7 @@ type pendingFrame struct {
type DataTimeReel struct { type DataTimeReel struct {
rwMutex sync.RWMutex rwMutex sync.RWMutex
running bool
filter []byte filter []byte
engineConfig *config.EngineConfig engineConfig *config.EngineConfig
@ -49,6 +51,7 @@ type DataTimeReel struct {
head *protobufs.ClockFrame head *protobufs.ClockFrame
totalDistance *big.Int totalDistance *big.Int
headDistance *big.Int headDistance *big.Int
lruFrames *lru.Cache[string, string]
proverTrie *tries.RollingFrecencyCritbitTrie proverTrie *tries.RollingFrecencyCritbitTrie
pending map[uint64][]*pendingFrame pending map[uint64][]*pendingFrame
incompleteForks map[uint64][]*pendingFrame incompleteForks map[uint64][]*pendingFrame
@ -88,7 +91,13 @@ func NewDataTimeReel(
panic("frame prover is nil") panic("frame prover is nil")
} }
cache, err := lru.New[string, string](10000)
if err != nil {
panic(err)
}
return &DataTimeReel{ return &DataTimeReel{
running: false,
logger: logger, logger: logger,
filter: filter, filter: filter,
engineConfig: engineConfig, engineConfig: engineConfig,
@ -97,6 +106,7 @@ func NewDataTimeReel(
origin: origin, origin: origin,
initialInclusionProof: initialInclusionProof, initialInclusionProof: initialInclusionProof,
initialProverKeys: initialProverKeys, initialProverKeys: initialProverKeys,
lruFrames: cache,
pending: make(map[uint64][]*pendingFrame), pending: make(map[uint64][]*pendingFrame),
incompleteForks: make(map[uint64][]*pendingFrame), incompleteForks: make(map[uint64][]*pendingFrame),
frames: make(chan *protobufs.ClockFrame), frames: make(chan *protobufs.ClockFrame),
@ -116,9 +126,14 @@ func (d *DataTimeReel) Start() error {
if frame == nil { if frame == nil {
d.head, d.proverTrie = d.createGenesisFrame() d.head, d.proverTrie = d.createGenesisFrame()
d.totalDistance = big.NewInt(0) d.totalDistance = big.NewInt(0)
d.headDistance = big.NewInt(0)
} else { } else {
d.head = frame d.head = frame
if err != nil {
panic(err)
}
d.proverTrie = trie d.proverTrie = trie
d.headDistance, err = d.GetDistance(frame)
d.totalDistance = d.getTotalDistance(frame) d.totalDistance = d.getTotalDistance(frame)
} }
@ -135,6 +150,16 @@ func (d *DataTimeReel) Head() (*protobufs.ClockFrame, error) {
// is the next one in sequence, it advances the reel head forward and emits a // is the next one in sequence, it advances the reel head forward and emits a
// new frame on the new frame channel. // new frame on the new frame channel.
func (d *DataTimeReel) Insert(frame *protobufs.ClockFrame) error { func (d *DataTimeReel) Insert(frame *protobufs.ClockFrame) error {
if !d.running {
return nil
}
if d.lruFrames.Contains(string(frame.Output[:64])) {
return nil
}
d.lruFrames.Add(string(frame.Output[:64]), string(frame.ParentSelector))
go func() { go func() {
d.frames <- frame d.frames <- frame
}() }()
@ -214,6 +239,7 @@ func (d *DataTimeReel) createGenesisFrame() (
// Main data consensus loop // Main data consensus loop
func (d *DataTimeReel) runLoop() { func (d *DataTimeReel) runLoop() {
d.running = true
for { for {
select { select {
case frame := <-d.frames: case frame := <-d.frames:
@ -324,9 +350,11 @@ func (d *DataTimeReel) addPending(
d.pending[frame.FrameNumber] = []*pendingFrame{} d.pending[frame.FrameNumber] = []*pendingFrame{}
} }
txn, err := d.clockStore.NewTransaction() // avoid heavy thrashing
if err != nil { for _, frame := range d.pending[frame.FrameNumber] {
panic(err) if frame.parentSelector.Cmp(parent) == 0 {
return
}
} }
if distance.Cmp(unknownDistance) == 0 { if distance.Cmp(unknownDistance) == 0 {
@ -334,6 +362,16 @@ func (d *DataTimeReel) addPending(
distance.Sub(distance, big.NewInt(int64(len(d.pending[frame.FrameNumber])))) distance.Sub(distance, big.NewInt(int64(len(d.pending[frame.FrameNumber]))))
} }
// avoid db thrashing
if existing, err := d.clockStore.GetParentDataClockFrame(
frame.Filter,
frame.FrameNumber,
selector.FillBytes(make([]byte, 32)),
); err != nil && existing == nil {
txn, err := d.clockStore.NewTransaction()
if err != nil {
panic(err)
}
err = d.clockStore.PutCandidateDataClockFrame( err = d.clockStore.PutCandidateDataClockFrame(
parent.FillBytes(make([]byte, 32)), parent.FillBytes(make([]byte, 32)),
distance.FillBytes(make([]byte, 32)), distance.FillBytes(make([]byte, 32)),
@ -345,10 +383,10 @@ func (d *DataTimeReel) addPending(
txn.Abort() txn.Abort()
panic(err) panic(err)
} }
if err = txn.Commit(); err != nil { if err = txn.Commit(); err != nil {
panic(err) panic(err)
} }
}
d.pending[frame.FrameNumber] = append( d.pending[frame.FrameNumber] = append(
d.pending[frame.FrameNumber], d.pending[frame.FrameNumber],

View File

@ -97,7 +97,7 @@ require (
github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.2 // indirect github.com/hashicorp/golang-lru/v2 v2.0.2
github.com/huin/goupnp v1.2.0 // indirect github.com/huin/goupnp v1.2.0 // indirect
github.com/iden3/go-iden3-crypto v0.0.15 github.com/iden3/go-iden3-crypto v0.0.15
github.com/ipfs/boxo v0.8.0 // indirect github.com/ipfs/boxo v0.8.0 // indirect

View File

@ -284,5 +284,5 @@ func printLogo() {
func printVersion() { func printVersion() {
fmt.Println(" ") fmt.Println(" ")
fmt.Println(" Quilibrium Node - v1.2.6 Dawn") fmt.Println(" Quilibrium Node - v1.2.7 Dawn")
} }