diff --git a/node/consensus/ceremony/broadcast_messaging.go b/node/consensus/ceremony/broadcast_messaging.go index d80e3b7..d1988d3 100644 --- a/node/consensus/ceremony/broadcast_messaging.go +++ b/node/consensus/ceremony/broadcast_messaging.go @@ -1,123 +1,17 @@ package ceremony import ( - "bytes" - "encoding/binary" "strings" - "time" - - "source.quilibrium.com/quilibrium/monorepo/node/config" "github.com/iden3/go-iden3-crypto/poseidon" - pcrypto "github.com/libp2p/go-libp2p/core/crypto" - "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" "go.uber.org/zap" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb" - "source.quilibrium.com/quilibrium/monorepo/node/keys" "source.quilibrium.com/quilibrium/monorepo/node/protobufs" ) -func (e *CeremonyDataClockConsensusEngine) runMessageHandler() { - for { - select { - case message := <-e.messageProcessorCh: - msg := &protobufs.Message{} - - if err := proto.Unmarshal(message.Data, msg); err != nil { - continue - } - - e.peerMapMx.RLock() - peer, ok := e.peerMap[string(message.From)] - e.peerMapMx.RUnlock() - - if ok && bytes.Compare(peer.version, config.GetMinimumVersion()) >= 0 { - for name := range e.executionEngines { - name := name - go func() error { - messages, err := e.executionEngines[name].ProcessMessage( - msg.Address, - msg, - ) - if err != nil { - e.logger.Debug( - "could not process message for engine", - zap.Error(err), - zap.String("engine_name", name), - ) - return nil - } - - for _, appMessage := range messages { - appMsg := &anypb.Any{} - err := proto.Unmarshal(appMessage.Payload, appMsg) - if err != nil { - e.logger.Error( - "could not unmarshal app message", - zap.Error(err), - zap.String("engine_name", name), - ) - continue - } - - switch appMsg.TypeUrl { - case protobufs.CeremonyLobbyStateTransitionType: - t := &protobufs.CeremonyLobbyStateTransition{} - err := proto.Unmarshal(appMsg.Value, t) - if err != nil { - continue - } - - if err := e.handleCeremonyLobbyStateTransition(t); err != nil { - continue - } - } - } - - return nil - }() - } - } - - any := &anypb.Any{} - if err := proto.Unmarshal(msg.Payload, any); err != nil { - continue - } - - go func() { - switch any.TypeUrl { - case protobufs.ClockFrameType: - if !ok || bytes.Compare( - peer.version, - config.GetMinimumVersion(), - ) < 0 { - return - } - if err := e.handleClockFrameData( - message.From, - msg.Address, - any, - false, - ); err != nil { - return - } - case protobufs.CeremonyPeerListAnnounceType: - if err := e.handleCeremonyPeerListAnnounce( - message.From, - msg.Address, - any, - ); err != nil { - return - } - } - }() - } - } -} - func (e *CeremonyDataClockConsensusEngine) handleMessage( message *pb.Message, ) error { @@ -135,223 +29,6 @@ func (e *CeremonyDataClockConsensusEngine) handleMessage( return nil } -func (e *CeremonyDataClockConsensusEngine) handleCeremonyPeerListAnnounce( - peerID []byte, - address []byte, - any *anypb.Any, -) error { - announce := &protobufs.CeremonyPeerListAnnounce{} - if err := any.UnmarshalTo(announce); err != nil { - return errors.Wrap(err, "handle ceremony peer list announce") - } - - for _, p := range announce.PeerList { - if bytes.Equal(p.PeerId, e.pubSub.GetPeerID()) { - continue - } - - if !bytes.Equal(p.PeerId, peerID) { - continue - } - - if p.PublicKey == nil || p.Signature == nil || p.Version == nil { - continue - } - - if p.PublicKey != nil && p.Signature != nil && p.Version != nil { - key, err := pcrypto.UnmarshalEd448PublicKey(p.PublicKey) - if err != nil { - e.logger.Warn( - "peer announcement contained invalid pubkey", - zap.Binary("public_key", p.PublicKey), - ) - continue - } - - if !(peer.ID(p.PeerId)).MatchesPublicKey(key) { - e.logger.Warn( - "peer announcement peer id does not match pubkey", - zap.Binary("peer_id", p.PeerId), - zap.Binary("public_key", p.PublicKey), - ) - continue - } - - msg := binary.BigEndian.AppendUint64([]byte{}, p.MaxFrame) - msg = append(msg, p.Version...) - msg = binary.BigEndian.AppendUint64(msg, uint64(p.Timestamp)) - b, err := key.Verify(msg, p.Signature) - if err != nil || !b { - e.logger.Warn( - "peer provided invalid signature", - zap.Binary("msg", msg), - zap.Binary("public_key", p.PublicKey), - zap.Binary("signature", p.Signature), - ) - continue - } - - if bytes.Compare(p.Version, config.GetMinimumVersion()) < 0 && - p.Timestamp > config.GetMinimumVersionCutoff().UnixMilli() { - e.logger.Debug( - "peer provided outdated version, penalizing app score", - zap.Binary("peer_id", p.PeerId), - ) - e.pubSub.SetPeerScore(p.PeerId, -10000) - continue - } - } - - e.peerMapMx.RLock() - if _, ok := e.uncooperativePeersMap[string(p.PeerId)]; ok { - e.peerMapMx.RUnlock() - continue - } - e.peerMapMx.RUnlock() - - multiaddr := e.pubSub.GetMultiaddrOfPeer(p.PeerId) - - e.pubSub.SetPeerScore(p.PeerId, 10) - - e.peerMapMx.RLock() - existing, ok := e.peerMap[string(p.PeerId)] - e.peerMapMx.RUnlock() - - if ok { - if existing.signature != nil && p.Signature == nil { - continue - } - - if existing.publicKey != nil && p.PublicKey == nil { - continue - } - - if existing.version != nil && p.Version == nil { - continue - } - - if existing.timestamp > p.Timestamp { - continue - } - } - - e.peerMapMx.Lock() - e.peerMap[string(p.PeerId)] = &peerInfo{ - peerId: p.PeerId, - multiaddr: multiaddr, - maxFrame: p.MaxFrame, - direct: bytes.Equal(p.PeerId, peerID), - lastSeen: time.Now().Unix(), - timestamp: p.Timestamp, - version: p.Version, - signature: p.Signature, - publicKey: p.PublicKey, - totalDistance: p.TotalDistance, - } - e.peerMapMx.Unlock() - } - - return nil -} - -func (e *CeremonyDataClockConsensusEngine) handleCeremonyLobbyStateTransition( - transition *protobufs.CeremonyLobbyStateTransition, -) error { - if len(transition.TransitionInputs) != len(transition.TypeUrls) { - return errors.Wrap( - errors.New("invalid state transition"), - "handle ceremony lobby state transition", - ) - } - - e.stagedLobbyStateTransitionsMx.Lock() - if e.stagedLobbyStateTransitions == nil { - e.stagedLobbyStateTransitions = &protobufs.CeremonyLobbyStateTransition{} - } - - found := false - for _, ti := range e.stagedLobbyStateTransitions.TransitionInputs { - for _, nti := range transition.TransitionInputs { - if bytes.Equal(ti, nti) { - found = true - } - } - } - - if !found { - for i := range transition.TransitionInputs { - e.stagedLobbyStateTransitions.TypeUrls = append( - e.stagedLobbyStateTransitions.TypeUrls, - transition.TypeUrls[i], - ) - e.stagedLobbyStateTransitions.TransitionInputs = append( - e.stagedLobbyStateTransitions.TransitionInputs, - transition.TransitionInputs[i], - ) - } - } - e.stagedLobbyStateTransitionsMx.Unlock() - return nil -} - -func (e *CeremonyDataClockConsensusEngine) handleClockFrameData( - peerID []byte, - address []byte, - any *anypb.Any, - isSync bool, -) error { - frame := &protobufs.ClockFrame{} - if err := any.UnmarshalTo(frame); err != nil { - return errors.Wrap(err, "handle clock frame data") - } - - addr, err := poseidon.HashBytes( - frame.GetPublicKeySignatureEd448().PublicKey.KeyValue, - ) - if err != nil { - return errors.Wrap(err, "handle clock frame data") - } - - prover := e.frameProverTrie.FindNearest(addr.Bytes()) - if !bytes.Equal(prover.External.Key, addr.Bytes()) { - e.logger.Info( - "prover not in trie at frame, address may be in fork", - zap.Binary("address", address), - zap.Binary("filter", frame.Filter), - zap.Uint64("frame_number", frame.FrameNumber), - ) - return nil - } - - e.logger.Info( - "got clock frame", - zap.Binary("address", address), - zap.Binary("filter", frame.Filter), - zap.Uint64("frame_number", frame.FrameNumber), - zap.Int("proof_count", len(frame.AggregateProofs)), - ) - - if err := e.frameProver.VerifyDataClockFrame(frame); err != nil { - e.logger.Error("could not verify clock frame", zap.Error(err)) - return errors.Wrap(err, "handle clock frame data") - } - - if err := e.inclusionProver.VerifyFrame(frame); err != nil { - e.logger.Error("could not verify clock frame", zap.Error(err)) - return errors.Wrap(err, "handle clock frame data") - } - - e.logger.Info( - "clock frame was valid", - zap.Binary("address", address), - zap.Binary("filter", frame.Filter), - zap.Uint64("frame_number", frame.FrameNumber), - ) - - e.dataTimeReel.Insert(frame) - return nil -} - func (e *CeremonyDataClockConsensusEngine) publishProof( frame *protobufs.ClockFrame, ) error { @@ -414,37 +91,3 @@ func (e *CeremonyDataClockConsensusEngine) publishMessage( } return e.pubSub.PublishToBitmask(filter, data) } - -func (e *CeremonyDataClockConsensusEngine) createCommunicationKeys() error { - _, err := e.keyManager.GetAgreementKey("q-ratchet-idk") - if err != nil { - if errors.Is(err, keys.KeyNotFoundErr) { - _, err = e.keyManager.CreateAgreementKey( - "q-ratchet-idk", - keys.KeyTypeX448, - ) - if err != nil { - return errors.Wrap(err, "announce key bundle") - } - } else { - return errors.Wrap(err, "announce key bundle") - } - } - - _, err = e.keyManager.GetAgreementKey("q-ratchet-spk") - if err != nil { - if errors.Is(err, keys.KeyNotFoundErr) { - _, err = e.keyManager.CreateAgreementKey( - "q-ratchet-spk", - keys.KeyTypeX448, - ) - if err != nil { - return errors.Wrap(err, "announce key bundle") - } - } else { - return errors.Wrap(err, "announce key bundle") - } - } - - return nil -} diff --git a/node/consensus/ceremony/ceremony_data_clock_consensus_engine.go b/node/consensus/ceremony/ceremony_data_clock_consensus_engine.go index 0d91952..2632472 100644 --- a/node/consensus/ceremony/ceremony_data_clock_consensus_engine.go +++ b/node/consensus/ceremony/ceremony_data_clock_consensus_engine.go @@ -274,7 +274,7 @@ func (e *CeremonyDataClockConsensusEngine) Start() <-chan error { } go func() { - e.runLoop() + e.runMessageHandler() }() e.logger.Info("subscribing to pubsub messages") @@ -302,7 +302,7 @@ func (e *CeremonyDataClockConsensusEngine) Start() <-chan error { thresholdBeforeConfirming := 4 for { - time.Sleep(30 * time.Second) + time.Sleep(120 * time.Second) list := &protobufs.CeremonyPeerListAnnounce{ PeerList: []*protobufs.CeremonyPeer{}, @@ -612,3 +612,37 @@ func ( e.peerMapMx.RUnlock() return resp } + +func (e *CeremonyDataClockConsensusEngine) createCommunicationKeys() error { + _, err := e.keyManager.GetAgreementKey("q-ratchet-idk") + if err != nil { + if errors.Is(err, keys.KeyNotFoundErr) { + _, err = e.keyManager.CreateAgreementKey( + "q-ratchet-idk", + keys.KeyTypeX448, + ) + if err != nil { + return errors.Wrap(err, "announce key bundle") + } + } else { + return errors.Wrap(err, "announce key bundle") + } + } + + _, err = e.keyManager.GetAgreementKey("q-ratchet-spk") + if err != nil { + if errors.Is(err, keys.KeyNotFoundErr) { + _, err = e.keyManager.CreateAgreementKey( + "q-ratchet-spk", + keys.KeyTypeX448, + ) + if err != nil { + return errors.Wrap(err, "announce key bundle") + } + } else { + return errors.Wrap(err, "announce key bundle") + } + } + + return nil +} diff --git a/node/consensus/ceremony/message_handler.go b/node/consensus/ceremony/message_handler.go new file mode 100644 index 0000000..880148e --- /dev/null +++ b/node/consensus/ceremony/message_handler.go @@ -0,0 +1,332 @@ +package ceremony + +import ( + "bytes" + "encoding/binary" + "time" + + "github.com/iden3/go-iden3-crypto/poseidon" + pcrypto "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/pkg/errors" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + "source.quilibrium.com/quilibrium/monorepo/node/config" + "source.quilibrium.com/quilibrium/monorepo/node/protobufs" +) + +func (e *CeremonyDataClockConsensusEngine) runMessageHandler() { + for { + select { + case message := <-e.messageProcessorCh: + msg := &protobufs.Message{} + + if err := proto.Unmarshal(message.Data, msg); err != nil { + continue + } + + e.peerMapMx.RLock() + peer, ok := e.peerMap[string(message.From)] + e.peerMapMx.RUnlock() + + if ok && bytes.Compare(peer.version, config.GetMinimumVersion()) >= 0 { + for name := range e.executionEngines { + name := name + go func() error { + messages, err := e.executionEngines[name].ProcessMessage( + msg.Address, + msg, + ) + if err != nil { + e.logger.Debug( + "could not process message for engine", + zap.Error(err), + zap.String("engine_name", name), + ) + return nil + } + + for _, appMessage := range messages { + appMsg := &anypb.Any{} + err := proto.Unmarshal(appMessage.Payload, appMsg) + if err != nil { + e.logger.Error( + "could not unmarshal app message", + zap.Error(err), + zap.String("engine_name", name), + ) + continue + } + + switch appMsg.TypeUrl { + case protobufs.CeremonyLobbyStateTransitionType: + t := &protobufs.CeremonyLobbyStateTransition{} + err := proto.Unmarshal(appMsg.Value, t) + if err != nil { + continue + } + + if err := e.handleCeremonyLobbyStateTransition(t); err != nil { + continue + } + } + } + + return nil + }() + } + } + + any := &anypb.Any{} + if err := proto.Unmarshal(msg.Payload, any); err != nil { + continue + } + + go func() { + switch any.TypeUrl { + case protobufs.ClockFrameType: + if !ok || bytes.Compare( + peer.version, + config.GetMinimumVersion(), + ) < 0 { + return + } + if err := e.handleClockFrameData( + message.From, + msg.Address, + any, + false, + ); err != nil { + return + } + case protobufs.CeremonyPeerListAnnounceType: + if err := e.handleCeremonyPeerListAnnounce( + message.From, + msg.Address, + any, + ); err != nil { + return + } + } + }() + } + } +} + +func (e *CeremonyDataClockConsensusEngine) handleCeremonyPeerListAnnounce( + peerID []byte, + address []byte, + any *anypb.Any, +) error { + announce := &protobufs.CeremonyPeerListAnnounce{} + if err := any.UnmarshalTo(announce); err != nil { + return errors.Wrap(err, "handle ceremony peer list announce") + } + + for _, p := range announce.PeerList { + if bytes.Equal(p.PeerId, e.pubSub.GetPeerID()) { + continue + } + + if !bytes.Equal(p.PeerId, peerID) { + continue + } + + if p.PublicKey == nil || p.Signature == nil || p.Version == nil { + continue + } + + if p.PublicKey != nil && p.Signature != nil && p.Version != nil { + key, err := pcrypto.UnmarshalEd448PublicKey(p.PublicKey) + if err != nil { + e.logger.Warn( + "peer announcement contained invalid pubkey", + zap.Binary("public_key", p.PublicKey), + ) + continue + } + + if !(peer.ID(p.PeerId)).MatchesPublicKey(key) { + e.logger.Warn( + "peer announcement peer id does not match pubkey", + zap.Binary("peer_id", p.PeerId), + zap.Binary("public_key", p.PublicKey), + ) + continue + } + + msg := binary.BigEndian.AppendUint64([]byte{}, p.MaxFrame) + msg = append(msg, p.Version...) + msg = binary.BigEndian.AppendUint64(msg, uint64(p.Timestamp)) + b, err := key.Verify(msg, p.Signature) + if err != nil || !b { + e.logger.Warn( + "peer provided invalid signature", + zap.Binary("msg", msg), + zap.Binary("public_key", p.PublicKey), + zap.Binary("signature", p.Signature), + ) + continue + } + + if bytes.Compare(p.Version, config.GetMinimumVersion()) < 0 && + p.Timestamp > config.GetMinimumVersionCutoff().UnixMilli() { + e.logger.Debug( + "peer provided outdated version, penalizing app score", + zap.Binary("peer_id", p.PeerId), + ) + e.pubSub.SetPeerScore(p.PeerId, -10000) + continue + } + } + + e.peerMapMx.RLock() + if _, ok := e.uncooperativePeersMap[string(p.PeerId)]; ok { + e.peerMapMx.RUnlock() + continue + } + e.peerMapMx.RUnlock() + + multiaddr := e.pubSub.GetMultiaddrOfPeer(p.PeerId) + + e.pubSub.SetPeerScore(p.PeerId, 10) + + e.peerMapMx.RLock() + existing, ok := e.peerMap[string(p.PeerId)] + e.peerMapMx.RUnlock() + + if ok { + if existing.signature != nil && p.Signature == nil { + continue + } + + if existing.publicKey != nil && p.PublicKey == nil { + continue + } + + if existing.version != nil && p.Version == nil { + continue + } + + if existing.timestamp > p.Timestamp { + continue + } + } + + e.peerMapMx.Lock() + e.peerMap[string(p.PeerId)] = &peerInfo{ + peerId: p.PeerId, + multiaddr: multiaddr, + maxFrame: p.MaxFrame, + direct: bytes.Equal(p.PeerId, peerID), + lastSeen: time.Now().Unix(), + timestamp: p.Timestamp, + version: p.Version, + signature: p.Signature, + publicKey: p.PublicKey, + totalDistance: p.TotalDistance, + } + e.peerMapMx.Unlock() + } + + return nil +} + +func (e *CeremonyDataClockConsensusEngine) handleCeremonyLobbyStateTransition( + transition *protobufs.CeremonyLobbyStateTransition, +) error { + if len(transition.TransitionInputs) != len(transition.TypeUrls) { + return errors.Wrap( + errors.New("invalid state transition"), + "handle ceremony lobby state transition", + ) + } + + e.stagedLobbyStateTransitionsMx.Lock() + if e.stagedLobbyStateTransitions == nil { + e.stagedLobbyStateTransitions = &protobufs.CeremonyLobbyStateTransition{} + } + + found := false + for _, ti := range e.stagedLobbyStateTransitions.TransitionInputs { + for _, nti := range transition.TransitionInputs { + if bytes.Equal(ti, nti) { + found = true + } + } + } + + if !found { + for i := range transition.TransitionInputs { + e.stagedLobbyStateTransitions.TypeUrls = append( + e.stagedLobbyStateTransitions.TypeUrls, + transition.TypeUrls[i], + ) + e.stagedLobbyStateTransitions.TransitionInputs = append( + e.stagedLobbyStateTransitions.TransitionInputs, + transition.TransitionInputs[i], + ) + } + } + e.stagedLobbyStateTransitionsMx.Unlock() + return nil +} + +func (e *CeremonyDataClockConsensusEngine) handleClockFrameData( + peerID []byte, + address []byte, + any *anypb.Any, + isSync bool, +) error { + frame := &protobufs.ClockFrame{} + if err := any.UnmarshalTo(frame); err != nil { + return errors.Wrap(err, "handle clock frame data") + } + + addr, err := poseidon.HashBytes( + frame.GetPublicKeySignatureEd448().PublicKey.KeyValue, + ) + if err != nil { + return errors.Wrap(err, "handle clock frame data") + } + + prover := e.frameProverTrie.FindNearest(addr.Bytes()) + if !bytes.Equal(prover.External.Key, addr.Bytes()) { + e.logger.Info( + "prover not in trie at frame, address may be in fork", + zap.Binary("address", address), + zap.Binary("filter", frame.Filter), + zap.Uint64("frame_number", frame.FrameNumber), + ) + return nil + } + + e.logger.Info( + "got clock frame", + zap.Binary("address", address), + zap.Binary("filter", frame.Filter), + zap.Uint64("frame_number", frame.FrameNumber), + zap.Int("proof_count", len(frame.AggregateProofs)), + ) + + if err := e.frameProver.VerifyDataClockFrame(frame); err != nil { + e.logger.Error("could not verify clock frame", zap.Error(err)) + return errors.Wrap(err, "handle clock frame data") + } + + if err := e.inclusionProver.VerifyFrame(frame); err != nil { + e.logger.Error("could not verify clock frame", zap.Error(err)) + return errors.Wrap(err, "handle clock frame data") + } + + e.logger.Info( + "clock frame was valid", + zap.Binary("address", address), + zap.Binary("filter", frame.Filter), + zap.Uint64("frame_number", frame.FrameNumber), + ) + + e.dataTimeReel.Insert(frame) + return nil +}