From cc1e304119085157f0214a0184d6cb3857551db0 Mon Sep 17 00:00:00 2001 From: Cassandra Heart <7929478+CassOnMars@users.noreply.github.com> Date: Thu, 14 Mar 2024 02:18:14 -0500 Subject: [PATCH] v1.4.7 (#125) * move to a message processor channel model for ceremony * switch to goroutine * readjust * cut down volume * keep distance data asserted * bring it back so more bootstrap nodes can exist * bump the version, it's go time --- go-libp2p-blossomsub/pubsub.go | 10 +- node/config/version.go | 2 +- .../consensus/ceremony/broadcast_messaging.go | 188 +++++++++--------- .../ceremony_data_clock_consensus_engine.go | 7 + node/consensus/ceremony/consensus_frames.go | 9 +- node/consensus/time/data_time_reel.go | 39 +++- node/main.go | 8 + node/p2p/blossomsub.go | 12 -- node/store/clock.go | 61 ++++++ 9 files changed, 224 insertions(+), 112 deletions(-) diff --git a/go-libp2p-blossomsub/pubsub.go b/go-libp2p-blossomsub/pubsub.go index a28c96b..c21bcf9 100644 --- a/go-libp2p-blossomsub/pubsub.go +++ b/go-libp2p-blossomsub/pubsub.go @@ -260,18 +260,18 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option peerFilter: DefaultPeerFilter, disc: &discover{}, maxMessageSize: DefaultMaxMessageSize, - peerOutboundQueueSize: 128, + peerOutboundQueueSize: 32, signID: h.ID(), signKey: nil, signPolicy: StrictSign, - incoming: make(chan *RPC, 128), + incoming: make(chan *RPC, 32), newPeers: make(chan struct{}, 1), newPeersPend: make(map[peer.ID]struct{}), newPeerStream: make(chan network.Stream), newPeerError: make(chan peer.ID), peerDead: make(chan struct{}, 1), peerDeadPend: make(map[peer.ID]struct{}), - deadPeerBackoff: newBackoff(ctx, 128, BackoffCleanupInterval, MaxBackoffAttempts), + deadPeerBackoff: newBackoff(ctx, 1000, BackoffCleanupInterval, MaxBackoffAttempts), cancelCh: make(chan *Subscription), getPeers: make(chan *listPeerReq), addSub: make(chan *addSubReq), @@ -280,7 +280,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option addBitmask: make(chan *addBitmaskReq), rmBitmask: make(chan *rmBitmaskReq), getBitmasks: make(chan *bitmaskReq), - sendMsg: make(chan *Message, 128), + sendMsg: make(chan *Message, 32), addVal: make(chan *addValReq), rmVal: make(chan *rmValReq), eval: make(chan func()), @@ -979,7 +979,7 @@ func (p *PubSub) notifySubs(msg *Message) { for f := range subs { select { case f.ch <- msg: - case <-time.After(15 * time.Millisecond): + case <-time.After(5 * time.Millisecond): // it's unreasonable to immediately fall over because a subscriber didn't // answer, message delivery sometimes lands next nanosecond and dropping // it when there's room is absurd. diff --git a/node/config/version.go b/node/config/version.go index b4bcacb..103159d 100644 --- a/node/config/version.go +++ b/node/config/version.go @@ -14,7 +14,7 @@ func GetMinimumVersion() []byte { } func GetVersion() []byte { - return []byte{0x01, 0x04, 0x06} + return []byte{0x01, 0x04, 0x07} } func GetVersionString() string { diff --git a/node/consensus/ceremony/broadcast_messaging.go b/node/consensus/ceremony/broadcast_messaging.go index 66a6941..ce215c6 100644 --- a/node/consensus/ceremony/broadcast_messaging.go +++ b/node/consensus/ceremony/broadcast_messaging.go @@ -3,16 +3,16 @@ package ceremony import ( "bytes" "encoding/binary" - "source.quilibrium.com/quilibrium/monorepo/node/config" "strings" "time" + "source.quilibrium.com/quilibrium/monorepo/node/config" + "github.com/iden3/go-iden3-crypto/poseidon" pcrypto "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" "go.uber.org/zap" - "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb" @@ -20,6 +20,98 @@ import ( "source.quilibrium.com/quilibrium/monorepo/node/protobufs" ) +func (e *CeremonyDataClockConsensusEngine) runMessageHandler() { + for { + select { + case message := <-e.messageProcessorCh: + msg := &protobufs.Message{} + + if err := proto.Unmarshal(message.Data, msg); err != nil { + continue + } + + for name := range e.executionEngines { + name := name + go func() error { + messages, err := e.executionEngines[name].ProcessMessage( + msg.Address, + msg, + ) + if err != nil { + e.logger.Debug( + "could not process message for engine", + zap.Error(err), + zap.String("engine_name", name), + ) + return nil + } + + for _, appMessage := range messages { + appMsg := &anypb.Any{} + err := proto.Unmarshal(appMessage.Payload, appMsg) + if err != nil { + e.logger.Error( + "could not unmarshal app message", + zap.Error(err), + zap.String("engine_name", name), + ) + continue + } + + switch appMsg.TypeUrl { + case protobufs.CeremonyLobbyStateTransitionType: + t := &protobufs.CeremonyLobbyStateTransition{} + err := proto.Unmarshal(appMsg.Value, t) + if err != nil { + continue + } + + if err := e.handleCeremonyLobbyStateTransition(t); err != nil { + continue + } + } + } + + return nil + }() + } + + any := &anypb.Any{} + if err := proto.Unmarshal(msg.Payload, any); err != nil { + continue + } + + go func() { + switch any.TypeUrl { + case protobufs.ClockFrameType: + e.peerMapMx.RLock() + if peer, ok := e.peerMap[string(message.From)]; !ok || + bytes.Compare(peer.version, config.GetMinimumVersion()) < 0 { + return + } + e.peerMapMx.RUnlock() + if err := e.handleClockFrameData( + message.From, + msg.Address, + any, + false, + ); err != nil { + return + } + case protobufs.CeremonyPeerListAnnounceType: + if err := e.handleCeremonyPeerListAnnounce( + message.From, + msg.Address, + any, + ); err != nil { + return + } + } + }() + } + } +} + func (e *CeremonyDataClockConsensusEngine) handleMessage( message *pb.Message, ) error { @@ -29,96 +121,10 @@ func (e *CeremonyDataClockConsensusEngine) handleMessage( zap.Binary("from", message.From), zap.Binary("signature", message.Signature), ) - msg := &protobufs.Message{} - if err := proto.Unmarshal(message.Data, msg); err != nil { - return errors.Wrap(err, "handle message") - } - - eg := errgroup.Group{} - eg.SetLimit(len(e.executionEngines)) - - for name := range e.executionEngines { - name := name - eg.Go(func() error { - messages, err := e.executionEngines[name].ProcessMessage( - msg.Address, - msg, - ) - if err != nil { - e.logger.Debug( - "could not process message for engine", - zap.Error(err), - zap.String("engine_name", name), - ) - return errors.Wrap(err, "handle message") - } - - for _, appMessage := range messages { - appMsg := &anypb.Any{} - err := proto.Unmarshal(appMessage.Payload, appMsg) - if err != nil { - e.logger.Error( - "could not unmarshal app message", - zap.Error(err), - zap.String("engine_name", name), - ) - return errors.Wrap(err, "handle message") - } - - switch appMsg.TypeUrl { - case protobufs.CeremonyLobbyStateTransitionType: - t := &protobufs.CeremonyLobbyStateTransition{} - err := proto.Unmarshal(appMsg.Value, t) - if err != nil { - return errors.Wrap(err, "handle message") - } - - if err := e.handleCeremonyLobbyStateTransition(t); err != nil { - return errors.Wrap(err, "handle message") - } - } - } - - return nil - }) - } - - if err := eg.Wait(); err != nil { - e.logger.Debug("rejecting invalid message", zap.Error(err)) - return nil - } - - any := &anypb.Any{} - if err := proto.Unmarshal(msg.Payload, any); err != nil { - return errors.Wrap(err, "handle message") - } - - switch any.TypeUrl { - case protobufs.ClockFrameType: - e.peerMapMx.RLock() - if peer, ok := e.peerMap[string(message.From)]; !ok || - bytes.Compare(peer.version, config.GetMinimumVersion()) < 0 { - return nil - } - e.peerMapMx.RUnlock() - if err := e.handleClockFrameData( - message.From, - msg.Address, - any, - false, - ); err != nil { - return errors.Wrap(err, "handle message") - } - case protobufs.CeremonyPeerListAnnounceType: - if err := e.handleCeremonyPeerListAnnounce( - message.From, - msg.Address, - any, - ); err != nil { - return errors.Wrap(err, "handle message") - } - } + go func() { + e.messageProcessorCh <- message + }() return nil } diff --git a/node/consensus/ceremony/ceremony_data_clock_consensus_engine.go b/node/consensus/ceremony/ceremony_data_clock_consensus_engine.go index 7471878..0d91952 100644 --- a/node/consensus/ceremony/ceremony_data_clock_consensus_engine.go +++ b/node/consensus/ceremony/ceremony_data_clock_consensus_engine.go @@ -16,6 +16,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/protobuf/types/known/anypb" + "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb" "source.quilibrium.com/quilibrium/monorepo/nekryptology/pkg/core/curves" "source.quilibrium.com/quilibrium/monorepo/node/config" "source.quilibrium.com/quilibrium/monorepo/node/consensus" @@ -103,6 +104,7 @@ type CeremonyDataClockConsensusEngine struct { lastKeyBundleAnnouncementFrame uint64 peerMap map[string]*peerInfo uncooperativePeersMap map[string]*peerInfo + messageProcessorCh chan *pb.Message } var _ consensus.DataConsensusEngine = (*CeremonyDataClockConsensusEngine)(nil) @@ -233,6 +235,7 @@ func NewCeremonyDataClockConsensusEngine( masterTimeReel: masterTimeReel, dataTimeReel: dataTimeReel, statsClient: statsClient, + messageProcessorCh: make(chan *pb.Message, 128), } logger.Info("constructing consensus engine") @@ -270,6 +273,10 @@ func (e *CeremonyDataClockConsensusEngine) Start() <-chan error { panic(err) } + go func() { + e.runLoop() + }() + e.logger.Info("subscribing to pubsub messages") e.pubSub.Subscribe(e.filter, e.handleMessage, true) diff --git a/node/consensus/ceremony/consensus_frames.go b/node/consensus/ceremony/consensus_frames.go index befd26d..9193660 100644 --- a/node/consensus/ceremony/consensus_frames.go +++ b/node/consensus/ceremony/consensus_frames.go @@ -4,9 +4,11 @@ import ( "bytes" "context" "io" - "source.quilibrium.com/quilibrium/monorepo/node/config" + "math/big" "time" + "source.quilibrium.com/quilibrium/monorepo/node/config" + "github.com/pkg/errors" "go.uber.org/zap" "google.golang.org/grpc" @@ -165,7 +167,10 @@ func (e *CeremonyDataClockConsensusEngine) GetMostAheadPeer() ( _, ok := e.uncooperativePeersMap[string(v.peerId)] if v.maxFrame > max && v.timestamp > config.GetMinimumVersionCutoff().UnixMilli() && - bytes.Compare(v.version, config.GetMinimumVersion()) >= 0 && !ok { + bytes.Compare(v.version, config.GetMinimumVersion()) >= 0 && + new(big.Int).SetBytes( + v.totalDistance, + ).Cmp(e.dataTimeReel.GetTotalDistance()) < 0 && !ok { peer = v.peerId max = v.maxFrame } diff --git a/node/consensus/time/data_time_reel.go b/node/consensus/time/data_time_reel.go index da8fa3d..869c6ff 100644 --- a/node/consensus/time/data_time_reel.go +++ b/node/consensus/time/data_time_reel.go @@ -615,6 +615,14 @@ func (d *DataTimeReel) setHead(frame *protobufs.ClockFrame, distance *big.Int) { d.head = frame d.totalDistance.Add(d.totalDistance, distance) + + d.clockStore.SetTotalDistance( + d.filter, + frame.FrameNumber, + selector.FillBytes(make([]byte, 32)), + d.totalDistance, + ) + d.headDistance = distance go func() { d.newFrameCh <- frame @@ -623,7 +631,21 @@ func (d *DataTimeReel) setHead(frame *protobufs.ClockFrame, distance *big.Int) { // tag: dusk – store the distance with the frame func (d *DataTimeReel) getTotalDistance(frame *protobufs.ClockFrame) *big.Int { - total, err := d.GetDistance(frame) + selector, err := frame.GetSelector() + if err != nil { + panic(err) + } + + total, err := d.clockStore.GetTotalDistance( + d.filter, + frame.FrameNumber, + selector.FillBytes(make([]byte, 32)), + ) + if err == nil && total != nil { + return total + } + + total, err = d.GetDistance(frame) if err != nil { panic(err) } @@ -643,6 +665,13 @@ func (d *DataTimeReel) getTotalDistance(frame *protobufs.ClockFrame) *big.Int { total.Add(total, distance) } + d.clockStore.SetTotalDistance( + d.filter, + frame.FrameNumber, + selector.FillBytes(make([]byte, 32)), + total, + ) + return total } @@ -883,6 +912,14 @@ func (d *DataTimeReel) forkChoice( "set total distance", zap.String("total_distance", d.totalDistance.Text(16)), ) + + d.clockStore.SetTotalDistance( + d.filter, + frame.FrameNumber, + selector.FillBytes(make([]byte, 32)), + d.totalDistance, + ) + go func() { d.newFrameCh <- frame }() diff --git a/node/main.go b/node/main.go index 8452675..ef1309e 100644 --- a/node/main.go +++ b/node/main.go @@ -209,6 +209,14 @@ func RunSelfTestIfNeeded( cores := runtime.GOMAXPROCS(0) memory := memory.TotalMemory() + d, err := os.Stat(filepath.Join(configDir, "store")) + if d == nil { + err := os.Mkdir(filepath.Join(configDir, "store"), 0755) + if err != nil { + panic(err) + } + } + f, err := os.Stat(filepath.Join(configDir, "SELF_TEST")) if f != nil { diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index 667dea2..f8dfa83 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -22,7 +22,6 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/discovery/routing" "github.com/libp2p/go-libp2p/p2p/discovery/util" - "github.com/libp2p/go-libp2p/p2p/net/connmgr" "github.com/mr-tron/base58" "github.com/pkg/errors" "go.uber.org/zap" @@ -107,17 +106,6 @@ func NewBlossomSub( } } - if isBootstrapPeer { - mgr, err := connmgr.NewConnManager(512, 8192) - if err != nil { - panic(err) - } - - opts = append(opts, - libp2p.ConnectionManager(mgr), - ) - } - var privKey crypto.PrivKey if p2pConfig.PeerPrivKey != "" { peerPrivKey, err := hex.DecodeString(p2pConfig.PeerPrivKey) diff --git a/node/store/clock.go b/node/store/clock.go index 011d425..6902068 100644 --- a/node/store/clock.go +++ b/node/store/clock.go @@ -3,6 +3,7 @@ package store import ( "bytes" "encoding/binary" + "math/big" "github.com/cockroachdb/pebble" "github.com/iden3/go-iden3-crypto/poseidon" @@ -76,6 +77,17 @@ type ClockStore interface { masterFilter []byte, dataFilter []byte, ) error + GetTotalDistance( + filter []byte, + frameNumber uint64, + selector []byte, + ) (*big.Int, error) + SetTotalDistance( + filter []byte, + frameNumber uint64, + selector []byte, + totalDistance *big.Int, + ) error } type PebbleClockStore struct { @@ -268,6 +280,7 @@ const CLOCK_MASTER_FRAME_DATA = 0x00 const CLOCK_DATA_FRAME_DATA = 0x01 const CLOCK_DATA_FRAME_CANDIDATE_DATA = 0x02 const CLOCK_DATA_FRAME_FRECENCY_DATA = 0x03 +const CLOCK_DATA_FRAME_DISTANCE_DATA = 0x04 const CLOCK_MASTER_FRAME_INDEX_EARLIEST = 0x10 | CLOCK_MASTER_FRAME_DATA const CLOCK_MASTER_FRAME_INDEX_LATEST = 0x20 | CLOCK_MASTER_FRAME_DATA const CLOCK_MASTER_FRAME_INDEX_PARENT = 0x30 | CLOCK_MASTER_FRAME_DATA @@ -402,6 +415,18 @@ func clockProverTrieKey(filter []byte, frameNumber uint64) []byte { return key } +func clockDataTotalDistanceKey( + filter []byte, + frameNumber uint64, + selector []byte, +) []byte { + key := []byte{CLOCK_FRAME, CLOCK_DATA_FRAME_DISTANCE_DATA} + key = binary.BigEndian.AppendUint64(key, frameNumber) + key = append(key, filter...) + key = append(key, rightAlign(selector, 32)...) + return key +} + func (p *PebbleClockStore) NewTransaction() (Transaction, error) { return p.db.NewBatch(), nil } @@ -1287,3 +1312,39 @@ func (p *PebbleClockStore) Compact( return nil } + +func (p *PebbleClockStore) GetTotalDistance( + filter []byte, + frameNumber uint64, + selector []byte, +) (*big.Int, error) { + value, closer, err := p.db.Get( + clockDataTotalDistanceKey(filter, frameNumber, selector), + ) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return nil, ErrNotFound + } + + return nil, errors.Wrap(err, "get total distance") + } + + defer closer.Close() + dist := new(big.Int).SetBytes(value) + + return dist, nil +} + +func (p *PebbleClockStore) SetTotalDistance( + filter []byte, + frameNumber uint64, + selector []byte, + totalDistance *big.Int, +) error { + err := p.db.Set( + clockDataTotalDistanceKey(filter, frameNumber, selector), + totalDistance.Bytes(), + ) + + return errors.Wrap(err, "set total distance") +}