From 4a6c4fa08ec99c3bca1dc687f97db88764382060 Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Thu, 26 Oct 2023 15:54:49 -0500 Subject: [PATCH] fix: OOM, send candidates on sync --- .../consensus/ceremony/broadcast_messaging.go | 2 + node/consensus/ceremony/consensus_frames.go | 116 +++++++++++++++ node/store/clock.go | 139 ++++++++++++++++++ 3 files changed, 257 insertions(+) diff --git a/node/consensus/ceremony/broadcast_messaging.go b/node/consensus/ceremony/broadcast_messaging.go index 29eb262..d4bf0c5 100644 --- a/node/consensus/ceremony/broadcast_messaging.go +++ b/node/consensus/ceremony/broadcast_messaging.go @@ -398,6 +398,7 @@ func (e *CeremonyDataClockConsensusEngine) handleProvingKey( "could not unmarshal key bundle announcement", zap.Error(err), ) + return } if err := keyBundleAnnouncement.Verify( provingKeyAnnouncement, @@ -406,6 +407,7 @@ func (e *CeremonyDataClockConsensusEngine) handleProvingKey( "could not verify key bundle announcement", zap.Error(err), ) + return } e.pendingCommits <- e.dependencyMap[string(provingKey)] diff --git a/node/consensus/ceremony/consensus_frames.go b/node/consensus/ceremony/consensus_frames.go index 082a0a6..14e6bbc 100644 --- a/node/consensus/ceremony/consensus_frames.go +++ b/node/consensus/ceremony/consensus_frames.go @@ -34,6 +34,18 @@ func (e *CeremonyDataClockConsensusEngine) prove( ) (*protobufs.ClockFrame, error) { if e.state == consensus.EngineStateProving { if !e.frameProverTrie.Contains(e.provingKeyAddress) { + e.stagedKeyCommitsMx.Lock() + e.stagedKeyCommits = make( + map[curves.PairingPoint]*protobufs.InclusionCommitment, + ) + e.stagedKeyPolynomials = make( + map[curves.PairingPoint][]curves.PairingScalar, + ) + e.stagedKeyCommitsMx.Unlock() + e.stagedLobbyStateTransitionsMx.Lock() + e.stagedLobbyStateTransitions = &protobufs.CeremonyLobbyStateTransition{} + e.stagedLobbyStateTransitionsMx.Unlock() + e.state = consensus.EngineStateCollecting return previousFrame, nil } @@ -678,6 +690,10 @@ func (e *CeremonyDataClockConsensusEngine) commitLongestPath( } iter.Close() + + if len(nextRunningFrames) == 1 && len(nextRunningFrames[0]) > 32 { + break + } } if commitReady && len(nextRunningFrames) == 1 { @@ -1026,6 +1042,104 @@ func (e *CeremonyDataClockConsensusEngine) reverseOptimisticSync( return latest, nil } +func (e *CeremonyDataClockConsensusEngine) sync( + currentLatest *protobufs.ClockFrame, + maxFrame uint64, + peerId []byte, +) (*protobufs.ClockFrame, error) { + latest := currentLatest + e.logger.Info("polling peer for new frames", zap.Binary("peer_id", peerId)) + cc, err := e.pubSub.GetDirectChannel(peerId) + if err != nil { + e.logger.Error( + "could not establish direct channel", + zap.Error(err), + ) + e.peerMapMx.Lock() + e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)] + delete(e.peerMap, string(peerId)) + e.peerMapMx.Unlock() + return latest, errors.Wrap(err, "reverse optimistic sync") + } + + client := protobufs.NewCeremonyServiceClient(cc) + + from := latest.FrameNumber + if from == 0 { + from = 1 + } + + if maxFrame > from { + from = 1 + s, err := client.GetCompressedSyncFrames( + context.Background(), + &protobufs.ClockFramesRequest{ + Filter: e.filter, + FromFrameNumber: maxFrame - 16, + }, + grpc.MaxCallRecvMsgSize(400*1024*1024), + ) + if err != nil { + e.logger.Error( + "received error from peer", + zap.Error(err), + ) + e.peerMapMx.Lock() + e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)] + delete(e.peerMap, string(peerId)) + e.peerMapMx.Unlock() + return latest, errors.Wrap(err, "reverse optimistic sync") + } + var syncMsg *protobufs.CeremonyCompressedSync + for syncMsg, err = s.Recv(); err == nil; syncMsg, err = s.Recv() { + e.logger.Info( + "received compressed sync frame", + zap.Uint64("from", syncMsg.FromFrameNumber), + zap.Uint64("to", syncMsg.ToFrameNumber), + zap.Int("frames", len(syncMsg.TruncatedClockFrames)), + zap.Int("proofs", len(syncMsg.Proofs)), + ) + var next *protobufs.ClockFrame + if next, err = e.decompressAndStoreCandidates( + syncMsg, + e.logger.Info, + ); err != nil && !errors.Is(err, ErrNoNewFrames) { + e.logger.Error( + "could not decompress and store candidate", + zap.Error(err), + ) + e.peerMapMx.Lock() + e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)] + delete(e.peerMap, string(peerId)) + e.peerMapMx.Unlock() + + if err := cc.Close(); err != nil { + e.logger.Error("error while closing connection", zap.Error(err)) + } + + return currentLatest, errors.Wrap(err, "reverse optimistic sync") + } + if next != nil { + latest = next + } + } + if err != nil && err != io.EOF && !errors.Is(err, ErrNoNewFrames) { + if err := cc.Close(); err != nil { + e.logger.Error("error while closing connection", zap.Error(err)) + } + e.logger.Error("error while receiving sync", zap.Error(err)) + return latest, errors.Wrap(err, "reverse optimistic sync") + } + + e.logger.Info("received new leading frame", zap.Uint64("frame_number", latest.FrameNumber)) + if err := cc.Close(); err != nil { + e.logger.Error("error while closing connection", zap.Error(err)) + } + } + + return latest, nil +} + func (e *CeremonyDataClockConsensusEngine) collect( currentFramePublished *protobufs.ClockFrame, ) (*protobufs.ClockFrame, error) { @@ -1049,6 +1163,8 @@ func (e *CeremonyDataClockConsensusEngine) collect( e.syncingTarget = peerId latest, err = e.reverseOptimisticSync(latest, maxFrame, peerId) + } else if maxFrame > latest.FrameNumber { + latest, err = e.sync(latest, maxFrame, peerId) } go func() { diff --git a/node/store/clock.go b/node/store/clock.go index 6ec668f..a1e0345 100644 --- a/node/store/clock.go +++ b/node/store/clock.go @@ -4,6 +4,8 @@ import ( "bytes" "encoding/binary" "fmt" + "math/big" + "sort" "github.com/cockroachdb/pebble" "github.com/iden3/go-iden3-crypto/poseidon" @@ -1340,6 +1342,143 @@ func (p *PebbleClockStore) GetCompressedDataClockFrames( return nil, errors.Wrap(err, "get compressed data clock frames") } + if len(syncMessage.TruncatedClockFrames) < int( + toFrameNumber-fromFrameNumber+1, + ) { + newFrom := fromFrameNumber + if len(syncMessage.TruncatedClockFrames) > 0 { + newFrom = syncMessage.TruncatedClockFrames[len( + syncMessage.TruncatedClockFrames, + )-1].FrameNumber + 1 + } + from := clockDataCandidateFrameKey( + filter, + newFrom, + []byte{ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + }, + []byte{ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + }, + ) + to := clockDataCandidateFrameKey( + filter, + toFrameNumber+1, + []byte{ + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + }, + []byte{ + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + }, + ) + + iter := p.db.NewIter(&pebble.IterOptions{ + LowerBound: from, + UpperBound: to, + }) + + candidates := []*protobufs.ClockFrame{} + for iter.First(); iter.Valid(); iter.Next() { + value := iter.Value() + frame := &protobufs.ClockFrame{} + if err := proto.Unmarshal(value, frame); err != nil { + return nil, errors.Wrap(err, "get compressed data clock frames") + } + + candidates = append(candidates, frame) + } + + if err := iter.Close(); err != nil { + return nil, errors.Wrap(err, "get compressed data clock frames") + } + + sort.Slice(candidates, func(i, j int) bool { + return candidates[i].FrameNumber < candidates[j].FrameNumber + }) + + if len(candidates) > 0 { + cursorStart := candidates[0].FrameNumber + paths := [][]*protobufs.ClockFrame{} + for _, frame := range candidates { + frame := frame + if frame.FrameNumber == cursorStart { + paths = append(paths, []*protobufs.ClockFrame{frame}) + } + if frame.FrameNumber > cursorStart { + for i, path := range paths { + s, err := path[len(path)-1].GetSelector() + if err != nil { + return nil, errors.Wrap(err, "get compressed data clock frames") + } + parentSelector, _, _, err := frame.GetParentSelectorAndDistance() + if err != nil { + return nil, errors.Wrap(err, "get compressed data clock frames") + } + if s.Cmp(parentSelector) == 0 { + paths[i] = append(paths[i], frame) + } + } + } + } + sort.Slice(paths, func(i, j int) bool { + return len(paths[i]) > len(paths[j]) + }) + + leadingIndex := 0 + var leadingScore *big.Int + length := len(paths[0]) + for i := 0; i < len(paths); i++ { + if len(paths[i]) < length { + break + } + score := new(big.Int) + for _, p := range paths[i] { + _, distance, _, err := p.GetParentSelectorAndDistance() + if err != nil { + return nil, errors.Wrap(err, "get compressed data clock frames") + } + score = score.Add(score, distance) + } + if leadingScore == nil || leadingScore.Cmp(score) > 0 { + leadingIndex = i + leadingScore = score + } + } + for _, frame := range paths[leadingIndex] { + frame := frame + syncMessage.TruncatedClockFrames = append( + syncMessage.TruncatedClockFrames, + frame, + ) + if frame.FrameNumber == 0 { + continue + } + + for i := 0; i < len(frame.Input[516:])/74; i++ { + aggregateCommit := frame.Input[516+(i*74) : 516+((i+1)*74)] + + if _, ok := proofs[string(aggregateCommit)]; !ok { + proofs[string(aggregateCommit)] = &protobufs.InclusionProofsMap{ + FrameCommit: aggregateCommit, + } + } + } + } + } + } + for k, v := range proofs { k := k v := v