mirror of
https://source.quilibrium.com/quilibrium/ceremonyclient.git
synced 2024-12-24 23:55:18 +00:00
v1.1.6 - fix: [PROTO-60],[PROTO-61],[PROTO-62] (#25)
* [PROTO-61] - fix: Use DHT for source of truth on peer info * [PROTO-60] – fix: nil uncooperative peer state panic * [PROTO-62] – fix: stored frame corruption due to old bug * v1.1.6 – bump version
This commit is contained in:
parent
bde6a921ca
commit
f2ad38da05
@ -225,11 +225,7 @@ func (e *CeremonyDataClockConsensusEngine) handleCeremonyPeerListAnnounce(
|
||||
}
|
||||
}
|
||||
|
||||
multiaddr := p.Multiaddr
|
||||
if bytes.Equal(p.PeerId, peerID) || p.Multiaddr == "" {
|
||||
// we have to fetch self-reported peer info
|
||||
multiaddr = e.pubSub.GetMultiaddrOfPeer(peerID)
|
||||
}
|
||||
multiaddr := e.pubSub.GetMultiaddrOfPeer(peerID)
|
||||
|
||||
e.pubSub.SetPeerScore(p.PeerId, 10)
|
||||
existing, ok := e.peerMap[string(p.PeerId)]
|
||||
|
@ -240,6 +240,8 @@ func (e *CeremonyDataClockConsensusEngine) Start(
|
||||
}
|
||||
}()
|
||||
|
||||
latestFrame = e.performSanityCheck(latestFrame)
|
||||
|
||||
e.state = consensus.EngineStateCollecting
|
||||
|
||||
for i := int64(0); i < e.pendingCommitWorkers; i++ {
|
||||
@ -286,6 +288,9 @@ func (e *CeremonyDataClockConsensusEngine) Start(
|
||||
})
|
||||
}
|
||||
for _, v := range e.uncooperativePeersMap {
|
||||
if v == nil {
|
||||
continue
|
||||
}
|
||||
if v.timestamp <= time.Now().UnixMilli()-UNCOOPERATIVE_PEER_INFO_TTL {
|
||||
deletes = append(deletes, v)
|
||||
}
|
||||
@ -308,6 +313,17 @@ func (e *CeremonyDataClockConsensusEngine) Start(
|
||||
peerCount := e.pubSub.GetNetworkPeersCount()
|
||||
if peerCount >= e.minimumPeersRequired {
|
||||
e.logger.Info("selecting leader")
|
||||
if e.frame > latest.FrameNumber && e.frame-latest.FrameNumber > 16 &&
|
||||
e.syncingTarget == nil {
|
||||
e.logger.Info("rewinding sync head due to large delta")
|
||||
latest, _, err = e.clockStore.GetDataClockFrame(
|
||||
e.filter,
|
||||
0,
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
latest, err = e.commitLongestPath(latest)
|
||||
if err != nil {
|
||||
e.logger.Error("could not collect longest path", zap.Error(err))
|
||||
@ -393,6 +409,127 @@ func (e *CeremonyDataClockConsensusEngine) Stop(force bool) <-chan error {
|
||||
return errChan
|
||||
}
|
||||
|
||||
func (e *CeremonyDataClockConsensusEngine) performSanityCheck(
|
||||
frame *protobufs.ClockFrame,
|
||||
) *protobufs.ClockFrame {
|
||||
e.logger.Info("performing sanity check")
|
||||
start := uint64(0)
|
||||
idx := start
|
||||
end := frame.FrameNumber + 1
|
||||
var prior *protobufs.ClockFrame
|
||||
for start < end {
|
||||
tail := end
|
||||
if start+16 < tail {
|
||||
tail = start + 16
|
||||
}
|
||||
iter, err := e.clockStore.RangeDataClockFrames(
|
||||
e.filter,
|
||||
start,
|
||||
tail,
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
for iter.First(); iter.Valid(); iter.Next() {
|
||||
v, err := iter.Value()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if v.FrameNumber != idx {
|
||||
e.logger.Warn(
|
||||
"discontinuity found, attempting to fix",
|
||||
zap.Uint64("expected_frame_number", idx),
|
||||
zap.Uint64("found_frame_number", v.FrameNumber),
|
||||
)
|
||||
|
||||
disc := v
|
||||
for disc.FrameNumber-idx > 0 {
|
||||
frames, err := e.clockStore.GetCandidateDataClockFrames(
|
||||
e.filter,
|
||||
disc.FrameNumber-1,
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
found := false
|
||||
for _, candidate := range frames {
|
||||
selector, err := candidate.GetSelector()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
parentSelector, _, _, err := disc.GetParentSelectorAndDistance()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if selector.Cmp(parentSelector) == 0 {
|
||||
found = true
|
||||
_, priorTrie, err := e.clockStore.GetDataClockFrame(
|
||||
e.filter,
|
||||
prior.FrameNumber,
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
txn, err := e.clockStore.NewTransaction()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = e.clockStore.PutDataClockFrame(
|
||||
candidate,
|
||||
priorTrie,
|
||||
txn,
|
||||
true,
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err = txn.Commit(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
disc = candidate
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
e.logger.Error(
|
||||
"could not resolve discontinuity, rewinding consensus head",
|
||||
)
|
||||
|
||||
if err = iter.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return prior
|
||||
}
|
||||
}
|
||||
|
||||
idx = v.FrameNumber
|
||||
} else {
|
||||
prior = v
|
||||
}
|
||||
|
||||
idx++
|
||||
}
|
||||
|
||||
if err = iter.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
start += 16
|
||||
}
|
||||
|
||||
return frame
|
||||
}
|
||||
|
||||
func (e *CeremonyDataClockConsensusEngine) GetDifficulty() uint32 {
|
||||
return e.difficulty
|
||||
}
|
||||
|
@ -587,6 +587,7 @@ func (
|
||||
frame,
|
||||
e.frameProverTrie,
|
||||
txn,
|
||||
false,
|
||||
); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
@ -729,6 +730,7 @@ func (e *CeremonyDataClockConsensusEngine) commitLongestPath(
|
||||
s,
|
||||
e.frameProverTrie,
|
||||
txn,
|
||||
false,
|
||||
); err != nil {
|
||||
e.logger.Error(
|
||||
"could not commit candidate",
|
||||
@ -890,8 +892,10 @@ func (e *CeremonyDataClockConsensusEngine) reverseOptimisticSync(
|
||||
zap.Error(err),
|
||||
)
|
||||
e.peerMapMx.Lock()
|
||||
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
|
||||
delete(e.peerMap, string(peerId))
|
||||
if _, ok := e.peerMap[string(peerId)]; ok {
|
||||
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
|
||||
delete(e.peerMap, string(peerId))
|
||||
}
|
||||
e.peerMapMx.Unlock()
|
||||
e.syncingTarget = nil
|
||||
return latest, errors.Wrap(err, "reverse optimistic sync")
|
||||
@ -924,8 +928,10 @@ func (e *CeremonyDataClockConsensusEngine) reverseOptimisticSync(
|
||||
zap.Error(err),
|
||||
)
|
||||
e.peerMapMx.Lock()
|
||||
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
|
||||
delete(e.peerMap, string(peerId))
|
||||
if _, ok := e.peerMap[string(peerId)]; ok {
|
||||
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
|
||||
delete(e.peerMap, string(peerId))
|
||||
}
|
||||
e.peerMapMx.Unlock()
|
||||
e.syncingTarget = nil
|
||||
return latest, errors.Wrap(err, "reverse optimistic sync")
|
||||
@ -950,8 +956,10 @@ func (e *CeremonyDataClockConsensusEngine) reverseOptimisticSync(
|
||||
zap.Error(err),
|
||||
)
|
||||
e.peerMapMx.Lock()
|
||||
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
|
||||
delete(e.peerMap, string(peerId))
|
||||
if _, ok := e.peerMap[string(peerId)]; ok {
|
||||
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
|
||||
delete(e.peerMap, string(peerId))
|
||||
}
|
||||
e.peerMapMx.Unlock()
|
||||
|
||||
if err := cc.Close(); err != nil {
|
||||
@ -995,8 +1003,10 @@ func (e *CeremonyDataClockConsensusEngine) reverseOptimisticSync(
|
||||
zap.Error(err),
|
||||
)
|
||||
e.peerMapMx.Lock()
|
||||
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
|
||||
delete(e.peerMap, string(peerId))
|
||||
if _, ok := e.peerMap[string(peerId)]; ok {
|
||||
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
|
||||
delete(e.peerMap, string(peerId))
|
||||
}
|
||||
e.peerMapMx.Unlock()
|
||||
e.syncingStatus = SyncStatusFailed
|
||||
|
||||
@ -1067,8 +1077,10 @@ func (e *CeremonyDataClockConsensusEngine) sync(
|
||||
zap.Error(err),
|
||||
)
|
||||
e.peerMapMx.Lock()
|
||||
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
|
||||
delete(e.peerMap, string(peerId))
|
||||
if _, ok := e.peerMap[string(peerId)]; ok {
|
||||
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
|
||||
delete(e.peerMap, string(peerId))
|
||||
}
|
||||
e.peerMapMx.Unlock()
|
||||
return latest, errors.Wrap(err, "reverse optimistic sync")
|
||||
}
|
||||
@ -1095,8 +1107,10 @@ func (e *CeremonyDataClockConsensusEngine) sync(
|
||||
zap.Error(err),
|
||||
)
|
||||
e.peerMapMx.Lock()
|
||||
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
|
||||
delete(e.peerMap, string(peerId))
|
||||
if _, ok := e.peerMap[string(peerId)]; ok {
|
||||
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
|
||||
delete(e.peerMap, string(peerId))
|
||||
}
|
||||
e.peerMapMx.Unlock()
|
||||
return latest, errors.Wrap(err, "reverse optimistic sync")
|
||||
}
|
||||
@ -1120,8 +1134,10 @@ func (e *CeremonyDataClockConsensusEngine) sync(
|
||||
zap.Error(err),
|
||||
)
|
||||
e.peerMapMx.Lock()
|
||||
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
|
||||
delete(e.peerMap, string(peerId))
|
||||
if _, ok := e.peerMap[string(peerId)]; ok {
|
||||
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
|
||||
delete(e.peerMap, string(peerId))
|
||||
}
|
||||
e.peerMapMx.Unlock()
|
||||
|
||||
if err := cc.Close(); err != nil {
|
||||
|
@ -212,8 +212,10 @@ func (e *CeremonyDataClockConsensusEngine) decompressAndStoreCandidates(
|
||||
syncMsg.ToFrameNumber-syncMsg.FromFrameNumber+1,
|
||||
) {
|
||||
e.peerMapMx.Lock()
|
||||
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
|
||||
delete(e.peerMap, string(peerId))
|
||||
if _, ok := e.peerMap[string(peerId)]; ok {
|
||||
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
|
||||
delete(e.peerMap, string(peerId))
|
||||
}
|
||||
e.peerMapMx.Unlock()
|
||||
return nil, errors.New("invalid continuity for compressed sync response")
|
||||
}
|
||||
|
@ -60,5 +60,5 @@ func GetMinimumVersion() []byte {
|
||||
}
|
||||
|
||||
func GetVersion() []byte {
|
||||
return []byte{0x01, 0x01, 0x05}
|
||||
return []byte{0x01, 0x01, 0x06}
|
||||
}
|
||||
|
@ -231,5 +231,5 @@ func printLogo() {
|
||||
|
||||
func printVersion() {
|
||||
fmt.Println(" ")
|
||||
fmt.Println(" Quilibrium Node - v1.1.5 – Dawn")
|
||||
fmt.Println(" Quilibrium Node - v1.1.6 – Dawn")
|
||||
}
|
||||
|
@ -48,6 +48,7 @@ type ClockStore interface {
|
||||
frame *protobufs.ClockFrame,
|
||||
proverTrie *tries.RollingFrecencyCritbitTrie,
|
||||
txn Transaction,
|
||||
backfill bool,
|
||||
) error
|
||||
PutCandidateDataClockFrame(
|
||||
parentSelector []byte,
|
||||
@ -189,6 +190,10 @@ func (p *PebbleClockIterator) Next() bool {
|
||||
return p.i.Next()
|
||||
}
|
||||
|
||||
func (p *PebbleClockIterator) Prev() bool {
|
||||
return p.i.Prev()
|
||||
}
|
||||
|
||||
func (p *PebbleClockIterator) Valid() bool {
|
||||
return p.i.Valid()
|
||||
}
|
||||
@ -909,6 +914,7 @@ func (p *PebbleClockStore) PutDataClockFrame(
|
||||
frame *protobufs.ClockFrame,
|
||||
proverTrie *tries.RollingFrecencyCritbitTrie,
|
||||
txn Transaction,
|
||||
backfill bool,
|
||||
) error {
|
||||
if frame.FrameNumber != 0 {
|
||||
if err := p.saveAggregateProofs(nil, frame); err != nil {
|
||||
@ -977,11 +983,13 @@ func (p *PebbleClockStore) PutDataClockFrame(
|
||||
closer.Close()
|
||||
}
|
||||
|
||||
if err = txn.Set(
|
||||
clockDataLatestIndex(frame.Filter),
|
||||
frameNumberBytes,
|
||||
); err != nil {
|
||||
return errors.Wrap(err, "put data clock frame")
|
||||
if !backfill {
|
||||
if err = txn.Set(
|
||||
clockDataLatestIndex(frame.Filter),
|
||||
frameNumberBytes,
|
||||
); err != nil {
|
||||
return errors.Wrap(err, "put data clock frame")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
Loading…
Reference in New Issue
Block a user