mirror of
https://source.quilibrium.com/quilibrium/ceremonyclient.git
synced 2025-01-23 14:15:18 +00:00
fix: OOM, send candidates on sync
This commit is contained in:
parent
cbeec8b1ee
commit
4a6c4fa08e
@ -398,6 +398,7 @@ func (e *CeremonyDataClockConsensusEngine) handleProvingKey(
|
||||
"could not unmarshal key bundle announcement",
|
||||
zap.Error(err),
|
||||
)
|
||||
return
|
||||
}
|
||||
if err := keyBundleAnnouncement.Verify(
|
||||
provingKeyAnnouncement,
|
||||
@ -406,6 +407,7 @@ func (e *CeremonyDataClockConsensusEngine) handleProvingKey(
|
||||
"could not verify key bundle announcement",
|
||||
zap.Error(err),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
e.pendingCommits <- e.dependencyMap[string(provingKey)]
|
||||
|
@ -34,6 +34,18 @@ func (e *CeremonyDataClockConsensusEngine) prove(
|
||||
) (*protobufs.ClockFrame, error) {
|
||||
if e.state == consensus.EngineStateProving {
|
||||
if !e.frameProverTrie.Contains(e.provingKeyAddress) {
|
||||
e.stagedKeyCommitsMx.Lock()
|
||||
e.stagedKeyCommits = make(
|
||||
map[curves.PairingPoint]*protobufs.InclusionCommitment,
|
||||
)
|
||||
e.stagedKeyPolynomials = make(
|
||||
map[curves.PairingPoint][]curves.PairingScalar,
|
||||
)
|
||||
e.stagedKeyCommitsMx.Unlock()
|
||||
e.stagedLobbyStateTransitionsMx.Lock()
|
||||
e.stagedLobbyStateTransitions = &protobufs.CeremonyLobbyStateTransition{}
|
||||
e.stagedLobbyStateTransitionsMx.Unlock()
|
||||
|
||||
e.state = consensus.EngineStateCollecting
|
||||
return previousFrame, nil
|
||||
}
|
||||
@ -678,6 +690,10 @@ func (e *CeremonyDataClockConsensusEngine) commitLongestPath(
|
||||
}
|
||||
|
||||
iter.Close()
|
||||
|
||||
if len(nextRunningFrames) == 1 && len(nextRunningFrames[0]) > 32 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if commitReady && len(nextRunningFrames) == 1 {
|
||||
@ -1026,6 +1042,104 @@ func (e *CeremonyDataClockConsensusEngine) reverseOptimisticSync(
|
||||
return latest, nil
|
||||
}
|
||||
|
||||
func (e *CeremonyDataClockConsensusEngine) sync(
|
||||
currentLatest *protobufs.ClockFrame,
|
||||
maxFrame uint64,
|
||||
peerId []byte,
|
||||
) (*protobufs.ClockFrame, error) {
|
||||
latest := currentLatest
|
||||
e.logger.Info("polling peer for new frames", zap.Binary("peer_id", peerId))
|
||||
cc, err := e.pubSub.GetDirectChannel(peerId)
|
||||
if err != nil {
|
||||
e.logger.Error(
|
||||
"could not establish direct channel",
|
||||
zap.Error(err),
|
||||
)
|
||||
e.peerMapMx.Lock()
|
||||
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
|
||||
delete(e.peerMap, string(peerId))
|
||||
e.peerMapMx.Unlock()
|
||||
return latest, errors.Wrap(err, "reverse optimistic sync")
|
||||
}
|
||||
|
||||
client := protobufs.NewCeremonyServiceClient(cc)
|
||||
|
||||
from := latest.FrameNumber
|
||||
if from == 0 {
|
||||
from = 1
|
||||
}
|
||||
|
||||
if maxFrame > from {
|
||||
from = 1
|
||||
s, err := client.GetCompressedSyncFrames(
|
||||
context.Background(),
|
||||
&protobufs.ClockFramesRequest{
|
||||
Filter: e.filter,
|
||||
FromFrameNumber: maxFrame - 16,
|
||||
},
|
||||
grpc.MaxCallRecvMsgSize(400*1024*1024),
|
||||
)
|
||||
if err != nil {
|
||||
e.logger.Error(
|
||||
"received error from peer",
|
||||
zap.Error(err),
|
||||
)
|
||||
e.peerMapMx.Lock()
|
||||
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
|
||||
delete(e.peerMap, string(peerId))
|
||||
e.peerMapMx.Unlock()
|
||||
return latest, errors.Wrap(err, "reverse optimistic sync")
|
||||
}
|
||||
var syncMsg *protobufs.CeremonyCompressedSync
|
||||
for syncMsg, err = s.Recv(); err == nil; syncMsg, err = s.Recv() {
|
||||
e.logger.Info(
|
||||
"received compressed sync frame",
|
||||
zap.Uint64("from", syncMsg.FromFrameNumber),
|
||||
zap.Uint64("to", syncMsg.ToFrameNumber),
|
||||
zap.Int("frames", len(syncMsg.TruncatedClockFrames)),
|
||||
zap.Int("proofs", len(syncMsg.Proofs)),
|
||||
)
|
||||
var next *protobufs.ClockFrame
|
||||
if next, err = e.decompressAndStoreCandidates(
|
||||
syncMsg,
|
||||
e.logger.Info,
|
||||
); err != nil && !errors.Is(err, ErrNoNewFrames) {
|
||||
e.logger.Error(
|
||||
"could not decompress and store candidate",
|
||||
zap.Error(err),
|
||||
)
|
||||
e.peerMapMx.Lock()
|
||||
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
|
||||
delete(e.peerMap, string(peerId))
|
||||
e.peerMapMx.Unlock()
|
||||
|
||||
if err := cc.Close(); err != nil {
|
||||
e.logger.Error("error while closing connection", zap.Error(err))
|
||||
}
|
||||
|
||||
return currentLatest, errors.Wrap(err, "reverse optimistic sync")
|
||||
}
|
||||
if next != nil {
|
||||
latest = next
|
||||
}
|
||||
}
|
||||
if err != nil && err != io.EOF && !errors.Is(err, ErrNoNewFrames) {
|
||||
if err := cc.Close(); err != nil {
|
||||
e.logger.Error("error while closing connection", zap.Error(err))
|
||||
}
|
||||
e.logger.Error("error while receiving sync", zap.Error(err))
|
||||
return latest, errors.Wrap(err, "reverse optimistic sync")
|
||||
}
|
||||
|
||||
e.logger.Info("received new leading frame", zap.Uint64("frame_number", latest.FrameNumber))
|
||||
if err := cc.Close(); err != nil {
|
||||
e.logger.Error("error while closing connection", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
return latest, nil
|
||||
}
|
||||
|
||||
func (e *CeremonyDataClockConsensusEngine) collect(
|
||||
currentFramePublished *protobufs.ClockFrame,
|
||||
) (*protobufs.ClockFrame, error) {
|
||||
@ -1049,6 +1163,8 @@ func (e *CeremonyDataClockConsensusEngine) collect(
|
||||
|
||||
e.syncingTarget = peerId
|
||||
latest, err = e.reverseOptimisticSync(latest, maxFrame, peerId)
|
||||
} else if maxFrame > latest.FrameNumber {
|
||||
latest, err = e.sync(latest, maxFrame, peerId)
|
||||
}
|
||||
|
||||
go func() {
|
||||
|
@ -4,6 +4,8 @@ import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"sort"
|
||||
|
||||
"github.com/cockroachdb/pebble"
|
||||
"github.com/iden3/go-iden3-crypto/poseidon"
|
||||
@ -1340,6 +1342,143 @@ func (p *PebbleClockStore) GetCompressedDataClockFrames(
|
||||
return nil, errors.Wrap(err, "get compressed data clock frames")
|
||||
}
|
||||
|
||||
if len(syncMessage.TruncatedClockFrames) < int(
|
||||
toFrameNumber-fromFrameNumber+1,
|
||||
) {
|
||||
newFrom := fromFrameNumber
|
||||
if len(syncMessage.TruncatedClockFrames) > 0 {
|
||||
newFrom = syncMessage.TruncatedClockFrames[len(
|
||||
syncMessage.TruncatedClockFrames,
|
||||
)-1].FrameNumber + 1
|
||||
}
|
||||
from := clockDataCandidateFrameKey(
|
||||
filter,
|
||||
newFrom,
|
||||
[]byte{
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
},
|
||||
[]byte{
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
},
|
||||
)
|
||||
to := clockDataCandidateFrameKey(
|
||||
filter,
|
||||
toFrameNumber+1,
|
||||
[]byte{
|
||||
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
|
||||
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
|
||||
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
|
||||
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
|
||||
},
|
||||
[]byte{
|
||||
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
|
||||
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
|
||||
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
|
||||
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
|
||||
},
|
||||
)
|
||||
|
||||
iter := p.db.NewIter(&pebble.IterOptions{
|
||||
LowerBound: from,
|
||||
UpperBound: to,
|
||||
})
|
||||
|
||||
candidates := []*protobufs.ClockFrame{}
|
||||
for iter.First(); iter.Valid(); iter.Next() {
|
||||
value := iter.Value()
|
||||
frame := &protobufs.ClockFrame{}
|
||||
if err := proto.Unmarshal(value, frame); err != nil {
|
||||
return nil, errors.Wrap(err, "get compressed data clock frames")
|
||||
}
|
||||
|
||||
candidates = append(candidates, frame)
|
||||
}
|
||||
|
||||
if err := iter.Close(); err != nil {
|
||||
return nil, errors.Wrap(err, "get compressed data clock frames")
|
||||
}
|
||||
|
||||
sort.Slice(candidates, func(i, j int) bool {
|
||||
return candidates[i].FrameNumber < candidates[j].FrameNumber
|
||||
})
|
||||
|
||||
if len(candidates) > 0 {
|
||||
cursorStart := candidates[0].FrameNumber
|
||||
paths := [][]*protobufs.ClockFrame{}
|
||||
for _, frame := range candidates {
|
||||
frame := frame
|
||||
if frame.FrameNumber == cursorStart {
|
||||
paths = append(paths, []*protobufs.ClockFrame{frame})
|
||||
}
|
||||
if frame.FrameNumber > cursorStart {
|
||||
for i, path := range paths {
|
||||
s, err := path[len(path)-1].GetSelector()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "get compressed data clock frames")
|
||||
}
|
||||
parentSelector, _, _, err := frame.GetParentSelectorAndDistance()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "get compressed data clock frames")
|
||||
}
|
||||
if s.Cmp(parentSelector) == 0 {
|
||||
paths[i] = append(paths[i], frame)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
sort.Slice(paths, func(i, j int) bool {
|
||||
return len(paths[i]) > len(paths[j])
|
||||
})
|
||||
|
||||
leadingIndex := 0
|
||||
var leadingScore *big.Int
|
||||
length := len(paths[0])
|
||||
for i := 0; i < len(paths); i++ {
|
||||
if len(paths[i]) < length {
|
||||
break
|
||||
}
|
||||
score := new(big.Int)
|
||||
for _, p := range paths[i] {
|
||||
_, distance, _, err := p.GetParentSelectorAndDistance()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "get compressed data clock frames")
|
||||
}
|
||||
score = score.Add(score, distance)
|
||||
}
|
||||
if leadingScore == nil || leadingScore.Cmp(score) > 0 {
|
||||
leadingIndex = i
|
||||
leadingScore = score
|
||||
}
|
||||
}
|
||||
for _, frame := range paths[leadingIndex] {
|
||||
frame := frame
|
||||
syncMessage.TruncatedClockFrames = append(
|
||||
syncMessage.TruncatedClockFrames,
|
||||
frame,
|
||||
)
|
||||
if frame.FrameNumber == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
for i := 0; i < len(frame.Input[516:])/74; i++ {
|
||||
aggregateCommit := frame.Input[516+(i*74) : 516+((i+1)*74)]
|
||||
|
||||
if _, ok := proofs[string(aggregateCommit)]; !ok {
|
||||
proofs[string(aggregateCommit)] = &protobufs.InclusionProofsMap{
|
||||
FrameCommit: aggregateCommit,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for k, v := range proofs {
|
||||
k := k
|
||||
v := v
|
||||
|
Loading…
Reference in New Issue
Block a user