* experimental priority sync

* get more aggressive about uncooperative ranking

* v1.4.10
This commit is contained in:
Cassandra Heart 2024-03-19 00:57:52 -05:00 committed by GitHub
parent 3001611197
commit 0803d95573
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 59 additions and 181 deletions

View File

@ -27,7 +27,7 @@ const (
var (
BlossomSubD = 6
BlossomSubDlo = 5
BlossomSubDhi = 12
BlossomSubDhi = 24
BlossomSubDscore = 4
BlossomSubDout = 2
BlossomSubHistoryLength = 5
@ -50,7 +50,7 @@ var (
BlossomSubOpportunisticGraftPeers = 2
BlossomSubGraftFloodThreshold = 10 * time.Second
BlossomSubMaxIHaveLength = 5000
BlossomSubMaxIHaveMessages = 10
BlossomSubMaxIHaveMessages = 100
BlossomSubIWantFollowupTime = 3 * time.Second
)

View File

@ -14,7 +14,7 @@ func GetMinimumVersion() []byte {
}
func GetVersion() []byte {
return []byte{0x01, 0x04, 0x09}
return []byte{0x01, 0x04, 0x0A}
}
func GetVersionString() string {

View File

@ -42,7 +42,7 @@ func (e *CeremonyDataClockConsensusEngine) publishProof(
}
peers, max, err := e.GetMostAheadPeer()
if err != nil || len(peers) == 0 || head.FrameNumber+3 > max {
if err != nil || len(peers) == 0 || head.FrameNumber > max {
if err := e.publishMessage(e.filter, frame); err != nil {
return errors.Wrap(
err,

View File

@ -271,16 +271,37 @@ func (e *CeremonyDataClockConsensusEngine) sync(
},
})
if err != nil {
e.peerMapMx.Lock()
if _, ok := e.peerMap[string(peerId)]; ok {
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
e.uncooperativePeersMap[string(peerId)].timestamp = time.Now().UnixMilli()
delete(e.peerMap, string(peerId))
}
e.peerMapMx.Unlock()
return latest, errors.Wrap(err, "sync")
}
syncMsg, err := s.Recv()
if err != nil {
e.peerMapMx.Lock()
if _, ok := e.peerMap[string(peerId)]; ok {
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
e.uncooperativePeersMap[string(peerId)].timestamp = time.Now().UnixMilli()
delete(e.peerMap, string(peerId))
}
e.peerMapMx.Unlock()
return latest, errors.Wrap(err, "sync")
}
preflight, ok := syncMsg.
SyncMessage.(*protobufs.CeremonyCompressedSyncResponseMessage_Preflight)
if !ok {
e.peerMapMx.Lock()
if _, ok := e.peerMap[string(peerId)]; ok {
e.uncooperativePeersMap[string(peerId)] = e.peerMap[string(peerId)]
e.uncooperativePeersMap[string(peerId)].timestamp = time.Now().UnixMilli()
delete(e.peerMap, string(peerId))
}
e.peerMapMx.Unlock()
s.CloseSend()
return latest, errors.Wrap(
errors.New("preflight message invalid"),

View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/mr-tron/base58"
"github.com/pbnjay/memory"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -35,7 +36,9 @@ func (e *CeremonyDataClockConsensusEngine) NegotiateCompressedSyncFrames(
server protobufs.CeremonyService_NegotiateCompressedSyncFramesServer,
) error {
e.currentReceivingSyncPeersMx.Lock()
if e.currentReceivingSyncPeers > 4 {
if e.currentReceivingSyncPeers > int(
memory.TotalMemory()/uint64(2147483648)-4,
) {
e.currentReceivingSyncPeersMx.Unlock()
e.logger.Debug(
@ -246,20 +249,12 @@ func (e *CeremonyDataClockConsensusEngine) GetCompressedSyncFrames(
request *protobufs.ClockFramesRequest,
server protobufs.CeremonyService_GetCompressedSyncFramesServer,
) error {
e.logger.Info(
e.logger.Debug(
"received clock frame request",
zap.Uint64("from_frame_number", request.FromFrameNumber),
zap.Uint64("to_frame_number", request.ToFrameNumber),
)
e.currentReceivingSyncPeersMx.Lock()
if e.currentReceivingSyncPeers > 4 {
e.currentReceivingSyncPeersMx.Unlock()
e.logger.Info(
"currently processing maximum sync requests, returning",
)
if err := server.SendMsg(
&protobufs.ClockFramesResponse{
Filter: request.Filter,
@ -274,156 +269,6 @@ func (e *CeremonyDataClockConsensusEngine) GetCompressedSyncFrames(
return nil
}
e.currentReceivingSyncPeers++
e.currentReceivingSyncPeersMx.Unlock()
defer func() {
e.currentReceivingSyncPeersMx.Lock()
e.currentReceivingSyncPeers--
e.currentReceivingSyncPeersMx.Unlock()
}()
from := request.FromFrameNumber
parent := request.ParentSelector
frame, _, err := e.clockStore.GetDataClockFrame(
request.Filter,
from,
true,
)
if err != nil {
if !errors.Is(err, store.ErrNotFound) {
e.logger.Error(
"peer asked for frame that returned error",
zap.Uint64("frame_number", request.FromFrameNumber),
)
return errors.Wrap(err, "get compressed sync frames")
} else {
e.logger.Debug(
"peer asked for undiscovered frame",
zap.Uint64("frame_number", request.FromFrameNumber),
)
if err := server.SendMsg(
&protobufs.ClockFramesResponse{
Filter: request.Filter,
FromFrameNumber: 0,
ToFrameNumber: 0,
ClockFrames: []*protobufs.ClockFrame{},
},
); err != nil {
return errors.Wrap(err, "get compressed sync frames")
}
return nil
}
}
if parent != nil {
if !bytes.Equal(frame.ParentSelector, parent) {
e.logger.Debug(
"peer specified out of consensus head, seeking backwards for fork",
)
}
for !bytes.Equal(frame.ParentSelector, parent) {
ours, err := e.clockStore.GetStagedDataClockFrame(
e.filter,
frame.FrameNumber-1,
frame.ParentSelector,
true,
)
if err != nil {
from = 1
e.logger.Debug("peer fully out of sync, rewinding sync head to start")
break
}
theirs, err := e.clockStore.GetStagedDataClockFrame(
e.filter,
frame.FrameNumber-1,
parent,
true,
)
if err != nil {
from = 1
e.logger.Debug("peer fully out of sync, rewinding sync head to start")
break
}
from--
frame = ours
parent = theirs.ParentSelector
}
}
if request.RangeParentSelectors != nil {
for _, selector := range request.RangeParentSelectors {
frame, err := e.clockStore.GetStagedDataClockFrame(
e.filter,
selector.FrameNumber,
selector.ParentSelector,
true,
)
if err == nil && frame != nil {
from = selector.FrameNumber
break
}
}
}
head, err := e.dataTimeReel.Head()
if err != nil {
panic(err)
}
max := head.FrameNumber
to := request.ToFrameNumber
// We need to slightly rewind, to compensate for unconfirmed frame heads on a
// given branch
if from >= 2 {
from--
}
for {
if to == 0 || to-from > 16 {
if max > from+15 {
to = from + 16
} else {
to = max + 1
}
}
syncMsg, err := e.clockStore.GetCompressedDataClockFrames(
e.filter,
from,
to,
)
if err != nil {
return errors.Wrap(err, "get compressed sync frames")
}
if err := server.SendMsg(syncMsg); err != nil {
return errors.Wrap(err, "get compressed sync frames")
}
if (request.ToFrameNumber == 0 || request.ToFrameNumber > to) && max > to {
from = to + 1
if request.ToFrameNumber > to {
to = request.ToFrameNumber
} else {
to = 0
}
} else {
break
}
}
return nil
}
func (e *CeremonyDataClockConsensusEngine) decompressAndStoreCandidates(
peerId []byte,
syncMsg *protobufs.CeremonyCompressedSync,
@ -432,6 +277,11 @@ func (e *CeremonyDataClockConsensusEngine) decompressAndStoreCandidates(
return nil, ErrNoNewFrames
}
head, err := e.dataTimeReel.Head()
if err != nil {
panic(err)
}
if len(syncMsg.TruncatedClockFrames) < int(
syncMsg.ToFrameNumber-syncMsg.FromFrameNumber+1,
) {
@ -595,7 +445,10 @@ func (e *CeremonyDataClockConsensusEngine) decompressAndStoreCandidates(
p2p.GetBloomFilterIndices(application.CEREMONY_ADDRESS, 65536, 24)...,
),
any,
true,
// We'll tell the time reel to process it (isSync = false) if we're caught
// up beyond the head and frame number is divisible by 100 (limited to
// avoid thrash):
head.FrameNumber > frame.FrameNumber || frame.FrameNumber%100 != 0,
); err != nil {
return nil, errors.Wrap(err, "decompress and store candidates")
}

View File

@ -141,7 +141,7 @@ func NewBlossomSub(
routingDiscovery := routing.NewRoutingDiscovery(kademliaDHT)
util.Advertise(ctx, routingDiscovery, ANNOUNCE)
go discoverPeers(p2pConfig, ctx, logger, h, routingDiscovery)
discoverPeers(p2pConfig, ctx, logger, h, routingDiscovery)
// TODO: turn into an option flag for console logging, this is too noisy for
// default logging behavior
@ -158,19 +158,23 @@ func NewBlossomSub(
}
}
blossomOpts := []blossomsub.Option{
blossomOpts := []blossomsub.Option{}
if isBootstrapPeer {
blossomOpts = append(blossomOpts,
blossomsub.WithPeerExchange(true),
)
}
if tracer != nil {
blossomOpts = append(blossomOpts, blossomsub.WithEventTracer(tracer))
}
blossomOpts = append(blossomOpts, blossomsub.WithPeerScore(
&blossomsub.PeerScoreParams{
SkipAtomicValidation: false,
BitmaskScoreCap: 100,
BitmaskScoreCap: 0,
IPColocationFactorWeight: 0,
IPColocationFactorThreshold: 6,
BehaviourPenaltyWeight: -80,
BehaviourPenaltyWeight: 0,
BehaviourPenaltyThreshold: 100,
BehaviourPenaltyDecay: .5,
DecayInterval: 10 * time.Second,
@ -382,7 +386,7 @@ func initDHT(
time.Sleep(30 * time.Second)
// try to assert some stability, never go below min peers for data
// consensus:
if len(h.Network().Peers()) < 3 {
if len(h.Network().Peers()) < 4 {
logger.Info("reconnecting to peers")
reconnect()
}
@ -574,7 +578,7 @@ func discoverPeers(
go func() {
for {
time.Sleep(30 * time.Second)
time.Sleep(5 * time.Minute)
discover()
}
}()