From ebd7f723c6c300b240509ce3216a3dada45c4d2d Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Thu, 14 Mar 2024 23:40:52 -0500 Subject: [PATCH] quiet the messages --- .../consensus/ceremony/broadcast_messaging.go | 74 ++++++++++--------- 1 file changed, 40 insertions(+), 34 deletions(-) diff --git a/node/consensus/ceremony/broadcast_messaging.go b/node/consensus/ceremony/broadcast_messaging.go index bebfa6d..d80e3b7 100644 --- a/node/consensus/ceremony/broadcast_messaging.go +++ b/node/consensus/ceremony/broadcast_messaging.go @@ -30,50 +30,56 @@ func (e *CeremonyDataClockConsensusEngine) runMessageHandler() { continue } - for name := range e.executionEngines { - name := name - go func() error { - messages, err := e.executionEngines[name].ProcessMessage( - msg.Address, - msg, - ) - if err != nil { - e.logger.Debug( - "could not process message for engine", - zap.Error(err), - zap.String("engine_name", name), - ) - return nil - } + e.peerMapMx.RLock() + peer, ok := e.peerMap[string(message.From)] + e.peerMapMx.RUnlock() - for _, appMessage := range messages { - appMsg := &anypb.Any{} - err := proto.Unmarshal(appMessage.Payload, appMsg) + if ok && bytes.Compare(peer.version, config.GetMinimumVersion()) >= 0 { + for name := range e.executionEngines { + name := name + go func() error { + messages, err := e.executionEngines[name].ProcessMessage( + msg.Address, + msg, + ) if err != nil { - e.logger.Error( - "could not unmarshal app message", + e.logger.Debug( + "could not process message for engine", zap.Error(err), zap.String("engine_name", name), ) - continue + return nil } - switch appMsg.TypeUrl { - case protobufs.CeremonyLobbyStateTransitionType: - t := &protobufs.CeremonyLobbyStateTransition{} - err := proto.Unmarshal(appMsg.Value, t) + for _, appMessage := range messages { + appMsg := &anypb.Any{} + err := proto.Unmarshal(appMessage.Payload, appMsg) if err != nil { + e.logger.Error( + "could not unmarshal app message", + zap.Error(err), + zap.String("engine_name", name), + ) continue } - if err := e.handleCeremonyLobbyStateTransition(t); err != nil { - continue + switch appMsg.TypeUrl { + case protobufs.CeremonyLobbyStateTransitionType: + t := &protobufs.CeremonyLobbyStateTransition{} + err := proto.Unmarshal(appMsg.Value, t) + if err != nil { + continue + } + + if err := e.handleCeremonyLobbyStateTransition(t); err != nil { + continue + } } } - } - return nil - }() + return nil + }() + } } any := &anypb.Any{} @@ -84,12 +90,12 @@ func (e *CeremonyDataClockConsensusEngine) runMessageHandler() { go func() { switch any.TypeUrl { case protobufs.ClockFrameType: - e.peerMapMx.RLock() - if peer, ok := e.peerMap[string(message.From)]; !ok || - bytes.Compare(peer.version, config.GetMinimumVersion()) < 0 { + if !ok || bytes.Compare( + peer.version, + config.GetMinimumVersion(), + ) < 0 { return } - e.peerMapMx.RUnlock() if err := e.handleClockFrameData( message.From, msg.Address,