From f0c71b2d40076f7a46e40311b7e820aecf29c53f Mon Sep 17 00:00:00 2001 From: Cassandra Heart <7929478+CassOnMars@users.noreply.github.com> Date: Tue, 12 Mar 2024 20:28:48 -0500 Subject: [PATCH] some various fixes ahead of 1.4.7 (#119) * experimental: switch mutex to RW to see if it alleviates backpressure on peer info * relax mutex req * reject unknown messages * open the floodgates * adjust message handler to use goroutine, i'll probably regret this * switch that back, it was regret * further discovery * log more data * forcibly block channel when unbounded * else * make it configurable so bootstrap peers are the only ones putting up with this. * ok, non-starter, let's try a different route * further tweaking * let the peer info flow uninhibited * final burn off on master * final adjustments --- go-libp2p-blossomsub/bitmask.go | 1 - go-libp2p-blossomsub/blossomsub.go | 29 -- go-libp2p-blossomsub/pubsub.go | 7 +- .../consensus/ceremony/broadcast_messaging.go | 22 +- .../ceremony_data_clock_consensus_engine.go | 6 +- node/consensus/ceremony/consensus_frames.go | 4 +- node/consensus/master/broadcast_messaging.go | 147 ++-------- .../master/master_clock_consensus_engine.go | 97 ++++++- node/consensus/time/data_time_reel.go | 1 - .../application/ceremony_application.go | 7 + node/main.go | 32 --- node/p2p/blossomsub.go | 32 ++- node/store/clock.go | 257 ++++-------------- node/store/data_proof.go | 22 +- 14 files changed, 221 insertions(+), 443 deletions(-) diff --git a/go-libp2p-blossomsub/bitmask.go b/go-libp2p-blossomsub/bitmask.go index 2b8f32e..5ab756b 100644 --- a/go-libp2p-blossomsub/bitmask.go +++ b/go-libp2p-blossomsub/bitmask.go @@ -160,7 +160,6 @@ func (t *Bitmask) Subscribe(opts ...SubOpt) (*Subscription, error) { } if sub.ch == nil { - // apply the default size sub.ch = make(chan *Message, 128) } diff --git a/go-libp2p-blossomsub/blossomsub.go b/go-libp2p-blossomsub/blossomsub.go index 3792156..b251a46 100644 --- a/go-libp2p-blossomsub/blossomsub.go +++ b/go-libp2p-blossomsub/blossomsub.go @@ -5,7 +5,6 @@ import ( "fmt" "math/rand" "sort" - "sync" "time" pb "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb" @@ -453,12 +452,6 @@ type BlossomSubRouter struct { protos []protocol.ID feature BlossomSubFeatureTest - fanoutMx sync.Mutex - lastpubMx sync.Mutex - meshMx sync.Mutex - peerhaveMx sync.Mutex - iaskedMx sync.Mutex - mcache *MessageCache tracer *pubsubTracer score *peerScore @@ -664,9 +657,7 @@ func (bs *BlossomSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb } // IHAVE flood protection - bs.peerhaveMx.Lock() bs.peerhave[p]++ - bs.peerhaveMx.Unlock() if bs.peerhave[p] > bs.params.MaxIHaveMessages { log.Debugf("IHAVE: peer %s has advertised too many times (%d) within this heartbeat interval; ignoring", p, bs.peerhave[p]) return nil @@ -718,9 +709,7 @@ func (bs *BlossomSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb // truncate to the messages we are actually asking for and update the iasked counter iwantlst = iwantlst[:iask] - bs.iaskedMx.Lock() bs.iasked[p] += iask - bs.iaskedMx.Unlock() bs.gossipTracer.AddPromise(p, iwantlst) @@ -1055,9 +1044,7 @@ func (bs *BlossomSubRouter) Publish(msg *Message) { if len(peers) > 0 { gmap = peerListToMap(peers) - bs.fanoutMx.Lock() bs.fanout[string(bitmask)] = gmap - bs.fanoutMx.Unlock() } } bs.lastpub[string(bitmask)] = time.Now().UnixNano() @@ -1114,15 +1101,9 @@ func (bs *BlossomSubRouter) Join(bitmask []byte) { } } - bs.meshMx.Lock() bs.mesh[string(bitmask)] = gmap - bs.meshMx.Unlock() - bs.fanoutMx.Lock() delete(bs.fanout, string(bitmask)) - bs.fanoutMx.Unlock() - bs.lastpubMx.Lock() delete(bs.lastpub, string(bitmask)) - bs.lastpubMx.Unlock() } else { backoff := bs.backoff[string(bitmask)] peers := bs.getPeers(bitmask, bs.params.D, func(p peer.ID) bool { @@ -1151,9 +1132,7 @@ func (bs *BlossomSubRouter) Leave(bitmask []byte) { log.Debugf("LEAVE %s", bitmask) bs.tracer.Leave(bitmask) - bs.meshMx.Lock() delete(bs.mesh, string(bitmask)) - bs.meshMx.Unlock() for p := range gmap { log.Debugf("LEAVE: Remove mesh link to %s in %s", p, bitmask) @@ -1604,12 +1583,8 @@ func (bs *BlossomSubRouter) heartbeat() { now := time.Now().UnixNano() for bitmask, lastpub := range bs.lastpub { if lastpub+int64(bs.params.FanoutTTL) < now { - bs.fanoutMx.Lock() delete(bs.fanout, bitmask) - bs.fanoutMx.Unlock() - bs.lastpubMx.Lock() delete(bs.lastpub, bitmask) - bs.lastpubMx.Unlock() } } @@ -1657,16 +1632,12 @@ func (bs *BlossomSubRouter) heartbeat() { func (bs *BlossomSubRouter) clearIHaveCounters() { if len(bs.peerhave) > 0 { // throw away the old map and make a new one - bs.peerhaveMx.Lock() bs.peerhave = make(map[peer.ID]int) - bs.peerhaveMx.Unlock() } if len(bs.iasked) > 0 { // throw away the old map and make a new one - bs.iaskedMx.Lock() bs.iasked = make(map[peer.ID]int) - bs.iaskedMx.Unlock() } } diff --git a/go-libp2p-blossomsub/pubsub.go b/go-libp2p-blossomsub/pubsub.go index caa883b..a28c96b 100644 --- a/go-libp2p-blossomsub/pubsub.go +++ b/go-libp2p-blossomsub/pubsub.go @@ -979,7 +979,10 @@ func (p *PubSub) notifySubs(msg *Message) { for f := range subs { select { case f.ch <- msg: - default: + case <-time.After(15 * time.Millisecond): + // it's unreasonable to immediately fall over because a subscriber didn't + // answer, message delivery sometimes lands next nanosecond and dropping + // it when there's room is absurd. p.tracer.UndeliverableMessage(msg) log.Infof("Can't deliver message to subscription for bitmask %x; subscriber too slow", bitmask) } @@ -1308,7 +1311,7 @@ func (p *PubSub) Subscribe(bitmask []byte, opts ...SubOpt) (*Subscription, error } // WithBufferSize is a Subscribe option to customize the size of the subscribe output buffer. -// The default length is 1000 but it can be configured to avoid dropping messages if the consumer is not reading fast +// The default length is 128 but it can be configured to avoid dropping messages if the consumer is not reading fast // enough. func WithBufferSize(size int) SubOpt { return func(sub *Subscription) error { diff --git a/node/consensus/ceremony/broadcast_messaging.go b/node/consensus/ceremony/broadcast_messaging.go index 437b01a..66ef533 100644 --- a/node/consensus/ceremony/broadcast_messaging.go +++ b/node/consensus/ceremony/broadcast_messaging.go @@ -96,12 +96,12 @@ func (e *CeremonyDataClockConsensusEngine) handleMessage( switch any.TypeUrl { case protobufs.ClockFrameType: - e.peerMapMx.Lock() + e.peerMapMx.RLock() if peer, ok := e.peerMap[string(message.From)]; !ok || bytes.Compare(peer.version, consensus.GetMinimumVersion()) < 0 { return nil } - e.peerMapMx.Unlock() + e.peerMapMx.RUnlock() if err := e.handleClockFrameData( message.From, msg.Address, @@ -134,13 +134,6 @@ func (e *CeremonyDataClockConsensusEngine) handleCeremonyPeerListAnnounce( } for _, p := range announce.PeerList { - e.peerMapMx.Lock() - if _, ok := e.uncooperativePeersMap[string(p.PeerId)]; ok { - e.peerMapMx.Unlock() - continue - } - e.peerMapMx.Unlock() - if bytes.Equal(p.PeerId, e.pubSub.GetPeerID()) { continue } @@ -197,13 +190,20 @@ func (e *CeremonyDataClockConsensusEngine) handleCeremonyPeerListAnnounce( } } + e.peerMapMx.RLock() + if _, ok := e.uncooperativePeersMap[string(p.PeerId)]; ok { + e.peerMapMx.RUnlock() + continue + } + e.peerMapMx.RUnlock() + multiaddr := e.pubSub.GetMultiaddrOfPeer(p.PeerId) e.pubSub.SetPeerScore(p.PeerId, 10) - e.peerMapMx.Lock() + e.peerMapMx.RLock() existing, ok := e.peerMap[string(p.PeerId)] - e.peerMapMx.Unlock() + e.peerMapMx.RUnlock() if ok { if existing.signature != nil && p.Signature == nil { diff --git a/node/consensus/ceremony/ceremony_data_clock_consensus_engine.go b/node/consensus/ceremony/ceremony_data_clock_consensus_engine.go index 5a8db09..d67fb03 100644 --- a/node/consensus/ceremony/ceremony_data_clock_consensus_engine.go +++ b/node/consensus/ceremony/ceremony_data_clock_consensus_engine.go @@ -98,7 +98,7 @@ type CeremonyDataClockConsensusEngine struct { engineMx sync.Mutex dependencyMapMx sync.Mutex stagedLobbyStateTransitionsMx sync.Mutex - peerMapMx sync.Mutex + peerMapMx sync.RWMutex peerAnnounceMapMx sync.Mutex lastKeyBundleAnnouncementFrame uint64 peerMap map[string]*peerInfo @@ -574,7 +574,7 @@ func ( e *CeremonyDataClockConsensusEngine, ) GetPeerInfo() *protobufs.PeerInfoResponse { resp := &protobufs.PeerInfoResponse{} - e.peerMapMx.Lock() + e.peerMapMx.RLock() for _, v := range e.peerMap { resp.PeerInfo = append(resp.PeerInfo, &protobufs.PeerInfo{ PeerId: v.peerId, @@ -602,6 +602,6 @@ func ( }, ) } - e.peerMapMx.Unlock() + e.peerMapMx.RUnlock() return resp } diff --git a/node/consensus/ceremony/consensus_frames.go b/node/consensus/ceremony/consensus_frames.go index ea06eb2..425a294 100644 --- a/node/consensus/ceremony/consensus_frames.go +++ b/node/consensus/ceremony/consensus_frames.go @@ -159,7 +159,7 @@ func (e *CeremonyDataClockConsensusEngine) GetMostAheadPeer() ( max := frame.FrameNumber var peer []byte = nil - e.peerMapMx.Lock() + e.peerMapMx.RLock() for _, v := range e.peerMap { _, ok := e.uncooperativePeersMap[string(v.peerId)] if v.maxFrame > max && @@ -169,7 +169,7 @@ func (e *CeremonyDataClockConsensusEngine) GetMostAheadPeer() ( max = v.maxFrame } } - e.peerMapMx.Unlock() + e.peerMapMx.RUnlock() if peer == nil { return nil, 0, p2p.ErrNoPeersAvailable diff --git a/node/consensus/master/broadcast_messaging.go b/node/consensus/master/broadcast_messaging.go index fdd1d45..1736b96 100644 --- a/node/consensus/master/broadcast_messaging.go +++ b/node/consensus/master/broadcast_messaging.go @@ -2,17 +2,13 @@ package master import ( "bytes" - "context" - "crypto/rand" "encoding/binary" "strings" - "time" "github.com/iden3/go-iden3-crypto/poseidon" "github.com/mr-tron/base58" "github.com/pkg/errors" "go.uber.org/zap" - "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb" @@ -28,7 +24,6 @@ func (e *MasterClockConsensusEngine) handleMessage(message *pb.Message) error { zap.Binary("signature", message.Signature), ) msg := &protobufs.Message{} - if err := proto.Unmarshal(message.Data, msg); err != nil { return errors.Wrap(err, "handle message") } @@ -38,42 +33,6 @@ func (e *MasterClockConsensusEngine) handleMessage(message *pb.Message) error { return errors.Wrap(err, "handle message") } - eg := errgroup.Group{} - eg.SetLimit(len(e.executionEngines)) - for name := range e.executionEngines { - name := name - eg.Go(func() error { - messages, err := e.executionEngines[name].ProcessMessage( - msg.Address, - msg, - ) - if err != nil { - e.logger.Error( - "could not process message for engine", - zap.Error(err), - zap.String("engine_name", name), - ) - return errors.Wrap(err, "handle message") - } - for _, m := range messages { - m := m - if err := e.publishMessage(m.Address, m); err != nil { - e.logger.Error( - "could not publish message for engine", - zap.Error(err), - zap.String("engine_name", name), - ) - return errors.Wrap(err, "handle message") - } - } - return nil - }) - } - if err := eg.Wait(); err != nil { - e.logger.Error("rejecting invalid message", zap.Error(err)) - return errors.Wrap(err, "execution failed") - } - switch any.TypeUrl { case protobufs.ClockFrameType: if err := e.handleClockFrameData( @@ -82,6 +41,7 @@ func (e *MasterClockConsensusEngine) handleMessage(message *pb.Message) error { ); err != nil { return errors.Wrap(err, "handle message") } + return nil case protobufs.SelfTestReportType: if err := e.handleSelfTestReport( message.From, @@ -89,9 +49,10 @@ func (e *MasterClockConsensusEngine) handleMessage(message *pb.Message) error { ); err != nil { return errors.Wrap(err, "handle message") } + return nil } - return nil + return errors.Wrap(errors.New("invalid message"), "handle message") } func (e *MasterClockConsensusEngine) handleClockFrameData( @@ -131,12 +92,19 @@ func (e *MasterClockConsensusEngine) handleClockFrameData( zap.Int("proof_count", len(frame.AggregateProofs)), ) - if err := e.frameProver.VerifyMasterClockFrame(frame); err != nil { - e.logger.Error("could not verify clock frame", zap.Error(err)) - return errors.Wrap(err, "handle clock frame data") - } - - e.masterTimeReel.Insert(frame) + go func() { + select { + case e.frameValidationCh <- frame: + default: + e.logger.Debug( + "dropped frame due to overwhelmed queue", + zap.Binary("sender", peerID), + zap.Binary("filter", frame.Filter), + zap.Uint64("frame_number", frame.FrameNumber), + zap.Int("proof_count", len(frame.AggregateProofs)), + ) + } + }() return nil } @@ -206,86 +174,9 @@ func (e *MasterClockConsensusEngine) handleSelfTestReport( return nil } - cc, err := e.pubSub.GetDirectChannel(peerID, "validation") - if err != nil { - e.logger.Debug( - "could not connect for validation", - zap.String("peer_id", base58.Encode(peerID)), - zap.Uint32("difficulty", report.Difficulty), - zap.Int64("difficulty_metric", report.DifficultyMetric), - zap.Int64("commit_16_metric", report.Commit_16Metric), - zap.Int64("commit_128_metric", report.Commit_128Metric), - zap.Int64("commit_1024_metric", report.Commit_1024Metric), - zap.Int64("commit_65536_metric", report.Commit_65536Metric), - zap.Int64("proof_16_metric", report.Proof_16Metric), - zap.Int64("proof_128_metric", report.Proof_128Metric), - zap.Int64("proof_1024_metric", report.Proof_1024Metric), - zap.Int64("proof_65536_metric", report.Proof_65536Metric), - zap.Uint32("cores", report.Cores), - zap.Uint64("memory", memory), - zap.Uint64("storage", binary.BigEndian.Uint64(report.Storage)), - ) - return errors.Wrap(err, "handle self test report") - } - client := protobufs.NewValidationServiceClient(cc) - verification := make([]byte, 1048576) - rand.Read(verification) - start := time.Now().UnixMilli() - validation, err := client.PerformValidation( - context.Background(), - &protobufs.ValidationMessage{ - Validation: verification, - }, - ) - end := time.Now().UnixMilli() - if err != nil { - cc.Close() - return errors.Wrap(err, "handle self test report") - } - cc.Close() - - if !bytes.Equal(verification, validation.Validation) { - e.logger.Debug( - "provided invalid verification", - zap.String("peer_id", base58.Encode(peerID)), - zap.Uint32("difficulty", report.Difficulty), - zap.Int64("difficulty_metric", report.DifficultyMetric), - zap.Int64("commit_16_metric", report.Commit_16Metric), - zap.Int64("commit_128_metric", report.Commit_128Metric), - zap.Int64("commit_1024_metric", report.Commit_1024Metric), - zap.Int64("commit_65536_metric", report.Commit_65536Metric), - zap.Int64("proof_16_metric", report.Proof_16Metric), - zap.Int64("proof_128_metric", report.Proof_128Metric), - zap.Int64("proof_1024_metric", report.Proof_1024Metric), - zap.Int64("proof_65536_metric", report.Proof_65536Metric), - zap.Uint32("cores", report.Cores), - zap.Uint64("memory", memory), - zap.Uint64("storage", binary.BigEndian.Uint64(report.Storage)), - ) - return nil - } - - if end-start > 2000 { - e.logger.Debug( - "slow bandwidth, scoring out", - zap.String("peer_id", base58.Encode(peerID)), - zap.Uint32("difficulty", report.Difficulty), - zap.Int64("difficulty_metric", report.DifficultyMetric), - zap.Int64("commit_16_metric", report.Commit_16Metric), - zap.Int64("commit_128_metric", report.Commit_128Metric), - zap.Int64("commit_1024_metric", report.Commit_1024Metric), - zap.Int64("commit_65536_metric", report.Commit_65536Metric), - zap.Int64("proof_16_metric", report.Proof_16Metric), - zap.Int64("proof_128_metric", report.Proof_128Metric), - zap.Int64("proof_1024_metric", report.Proof_1024Metric), - zap.Int64("proof_65536_metric", report.Proof_65536Metric), - zap.Uint32("cores", report.Cores), - zap.Uint64("memory", memory), - zap.Uint64("storage", binary.BigEndian.Uint64(report.Storage)), - ) - // tag: dusk – nuke this peer for now - e.pubSub.SetPeerScore(peerID, -1000) - } + go func() { + e.bandwidthTestCh <- peerID + }() return nil } diff --git a/node/consensus/master/master_clock_consensus_engine.go b/node/consensus/master/master_clock_consensus_engine.go index c554029..e6c4082 100644 --- a/node/consensus/master/master_clock_consensus_engine.go +++ b/node/consensus/master/master_clock_consensus_engine.go @@ -1,12 +1,16 @@ package master import ( + "bytes" "context" + "crypto/rand" "encoding/hex" + "io" "math/big" "sync" "time" + "github.com/mr-tron/base58" "github.com/pkg/errors" "go.uber.org/zap" "google.golang.org/grpc" @@ -54,8 +58,10 @@ type MasterClockConsensusEngine struct { clockStore store.ClockStore masterTimeReel *qtime.MasterTimeReel report *protobufs.SelfTestReport - peerMapMx sync.Mutex + peerMapMx sync.RWMutex peerMap map[string]*protobufs.SelfTestReport + frameValidationCh chan *protobufs.ClockFrame + bandwidthTestCh chan []byte currentReceivingSyncPeers int currentReceivingSyncPeersMx sync.Mutex } @@ -117,6 +123,8 @@ func NewMasterClockConsensusEngine( masterTimeReel: masterTimeReel, report: report, peerMap: map[string]*protobufs.SelfTestReport{}, + frameValidationCh: make(chan *protobufs.ClockFrame, 10), + bandwidthTestCh: make(chan []byte), } e.peerMap[string(e.pubSub.GetPeerID())] = report @@ -150,6 +158,22 @@ func (e *MasterClockConsensusEngine) Start() <-chan error { e.buildHistoricFrameCache(frame) + go func() { + for { + select { + case newFrame := <-e.frameValidationCh: + if err := e.frameProver.VerifyMasterClockFrame(newFrame); err != nil { + e.logger.Error("could not verify clock frame", zap.Error(err)) + continue + } + + e.masterTimeReel.Insert(newFrame) + case peerId := <-e.bandwidthTestCh: + e.performBandwidthTest(peerId) + } + } + }() + e.logger.Info("subscribing to pubsub messages") e.pubSub.Subscribe(e.filter, e.handleMessage, true) @@ -183,9 +207,10 @@ func (e *MasterClockConsensusEngine) Start() <-chan error { }() go func() { - for { - time.Sleep(30 * time.Second) + // Let it sit until we at least have a few more peers inbound + time.Sleep(30 * time.Second) + for { e.logger.Info("broadcasting self-test info") head, err := e.masterTimeReel.Head() if err != nil { @@ -197,6 +222,7 @@ func (e *MasterClockConsensusEngine) Start() <-chan error { if err := e.publishMessage(e.filter, e.report); err != nil { e.logger.Debug("error publishing message", zap.Error(err)) } + time.Sleep(30 * time.Minute) } }() @@ -288,6 +314,71 @@ func (e *MasterClockConsensusEngine) Stop(force bool) <-chan error { return errChan } +func (e *MasterClockConsensusEngine) performBandwidthTest(peerID []byte) { + result := e.pubSub.GetMultiaddrOfPeer(peerID) + if result == "" { + go func() { + e.bandwidthTestCh <- peerID + }() + return + } + + cc, err := e.pubSub.GetDirectChannel(peerID, "validation") + if err != nil { + e.logger.Info( + "could not connect for validation", + zap.String("peer_id", base58.Encode(peerID)), + ) + // tag: dusk – nuke this peer for now + e.pubSub.SetPeerScore(peerID, -1000) + return + } + + client := protobufs.NewValidationServiceClient(cc) + verification := make([]byte, 1048576) + rand.Read(verification) + start := time.Now().UnixMilli() + validation, err := client.PerformValidation( + context.Background(), + &protobufs.ValidationMessage{ + Validation: verification, + }, + ) + end := time.Now().UnixMilli() + if err != nil && err != io.EOF { + cc.Close() + e.logger.Info( + "peer returned error", + zap.String("peer_id", base58.Encode(peerID)), + zap.Error(err), + ) + // tag: dusk – nuke this peer for now + e.pubSub.SetPeerScore(peerID, -1000) + return + } + cc.Close() + + if !bytes.Equal(verification, validation.Validation) { + e.logger.Info( + "provided invalid verification", + zap.String("peer_id", base58.Encode(peerID)), + ) + // tag: dusk – nuke this peer for now + e.pubSub.SetPeerScore(peerID, -1000) + return + } + + if end-start > 2000 { + e.logger.Info( + "slow bandwidth, scoring out", + zap.String("peer_id", base58.Encode(peerID)), + ) + // tag: dusk – nuke this peer for now + e.pubSub.SetPeerScore(peerID, -1000) + return + } +} + func ( e *MasterClockConsensusEngine, ) GetPeerManifests() *protobufs.PeerManifestsResponse { diff --git a/node/consensus/time/data_time_reel.go b/node/consensus/time/data_time_reel.go index 9d365f0..da8fa3d 100644 --- a/node/consensus/time/data_time_reel.go +++ b/node/consensus/time/data_time_reel.go @@ -127,7 +127,6 @@ func (d *DataTimeReel) Start() error { } if frame == nil { - d.head, d.proverTrie = d.createGenesisFrame() d.totalDistance = big.NewInt(0) d.headDistance = big.NewInt(0) diff --git a/node/execution/intrinsics/ceremony/application/ceremony_application.go b/node/execution/intrinsics/ceremony/application/ceremony_application.go index 34f773f..c76010c 100644 --- a/node/execution/intrinsics/ceremony/application/ceremony_application.go +++ b/node/execution/intrinsics/ceremony/application/ceremony_application.go @@ -571,6 +571,13 @@ func GetOutputsFromClockFrame( return nil, nil, errors.Wrap(err, "get outputs from clock frame") } + // apply a small fixup based on a pre-dusk bug showing up with dusk + // conventions + if frame.FrameNumber == 0 && len(output.Address) == 32 { + output.Address = append(output.Address, output.Output[:48]...) + output.Output = output.Output[48:] + } + lobbyState = &protobufs.CeremonyLobbyState{} if err := proto.Unmarshal(output.Output, lobbyState); err != nil { return nil, nil, errors.Wrap(err, "get outputs from clock frame") diff --git a/node/main.go b/node/main.go index ad1dba0..8999dc0 100644 --- a/node/main.go +++ b/node/main.go @@ -31,7 +31,6 @@ import ( "source.quilibrium.com/quilibrium/monorepo/node/config" qcrypto "source.quilibrium.com/quilibrium/monorepo/node/crypto" "source.quilibrium.com/quilibrium/monorepo/node/crypto/kzg" - "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/ceremony/application" "source.quilibrium.com/quilibrium/monorepo/node/rpc" ) @@ -150,7 +149,6 @@ func main() { } clearIfTestData(*configDirectory, nodeConfig) - migrate(*configDirectory, nodeConfig) if *dbConsole { console, err := app.NewDBConsole(nodeConfig) @@ -435,36 +433,6 @@ func repair(configDir string, node *app.Node) { } } -func migrate(configDir string, nodeConfig *config.Config) { - _, err := os.Stat(filepath.Join(configDir, "MIGRATIONS")) - if os.IsNotExist(err) { - fmt.Println("Deduplicating and compressing clock frame data...") - clock, err := app.NewClockStore(nodeConfig) - if err := clock.Deduplicate(application.CEREMONY_ADDRESS); err != nil { - panic(err) - } - - migrationFile, err := os.OpenFile( - filepath.Join(configDir, "MIGRATIONS"), - os.O_CREATE|os.O_RDWR, - fs.FileMode(0600), - ) - if err != nil { - panic(err) - } - - _, err = migrationFile.Write([]byte{0x00, 0x00, 0x01}) - if err != nil { - panic(err) - } - - err = migrationFile.Close() - if err != nil { - panic(err) - } - } -} - func printPeerID(p2pConfig *config.P2PConfig) { peerPrivKey, err := hex.DecodeString(p2pConfig.PeerPrivKey) if err != nil { diff --git a/node/p2p/blossomsub.go b/node/p2p/blossomsub.go index e30fbfa..667dea2 100644 --- a/node/p2p/blossomsub.go +++ b/node/p2p/blossomsub.go @@ -35,15 +35,16 @@ import ( ) type BlossomSub struct { - ps *blossomsub.PubSub - ctx context.Context - logger *zap.Logger - peerID peer.ID - bitmaskMap map[string]*blossomsub.Bitmask - h host.Host - signKey crypto.PrivKey - peerScore map[string]int64 - peerScoreMx sync.Mutex + ps *blossomsub.PubSub + ctx context.Context + logger *zap.Logger + peerID peer.ID + bitmaskMap map[string]*blossomsub.Bitmask + h host.Host + signKey crypto.PrivKey + peerScore map[string]int64 + peerScoreMx sync.Mutex + isBootstrapPeer bool } var _ PubSub = (*BlossomSub)(nil) @@ -133,11 +134,12 @@ func NewBlossomSub( } bs := &BlossomSub{ - ctx: ctx, - logger: logger, - bitmaskMap: make(map[string]*blossomsub.Bitmask), - signKey: privKey, - peerScore: make(map[string]int64), + ctx: ctx, + logger: logger, + bitmaskMap: make(map[string]*blossomsub.Bitmask), + signKey: privKey, + peerScore: make(map[string]int64), + isBootstrapPeer: isBootstrapPeer, } h, err := libp2p.New(opts...) @@ -267,6 +269,7 @@ func (b *BlossomSub) Subscribe( "begin streaming from bitmask", zap.Binary("bitmask", bitmask), ) + go func() { for { m, err := sub.Next(b.ctx) @@ -276,7 +279,6 @@ func (b *BlossomSub) Subscribe( zap.Error(err), ) } - if err = handler(m.Message); err != nil { b.logger.Debug("message handler returned error", zap.Error(err)) } diff --git a/node/store/clock.go b/node/store/clock.go index 51b1e03..011d425 100644 --- a/node/store/clock.go +++ b/node/store/clock.go @@ -3,7 +3,6 @@ package store import ( "bytes" "encoding/binary" - "fmt" "github.com/cockroachdb/pebble" "github.com/iden3/go-iden3-crypto/poseidon" @@ -62,7 +61,6 @@ type ClockStore interface { parentSelector []byte, truncate bool, ) (*protobufs.ClockFrame, error) - Deduplicate(filter []byte) error GetCompressedDataClockFrames( filter []byte, fromFrameNumber uint64, @@ -186,11 +184,25 @@ func (p *PebbleClockIterator) TruncatedValue() ( value := p.i.Value() frame := &protobufs.ClockFrame{} - if err := proto.Unmarshal(value, frame); err != nil { - return nil, errors.Wrap( - errors.Wrap(err, ErrInvalidData.Error()), - "get candidate clock frame iterator value", - ) + if len(value) == (len(p.i.Key()) + 32) { + frameValue, frameCloser, err := p.db.db.Get(value) + if err != nil { + return nil, errors.Wrap(err, "get truncated clock frame iterator value") + } + if err := proto.Unmarshal(frameValue, frame); err != nil { + return nil, errors.Wrap( + errors.Wrap(err, ErrInvalidData.Error()), + "get truncated clock frame iterator value", + ) + } + frameCloser.Close() + } else { + if err := proto.Unmarshal(value, frame); err != nil { + return nil, errors.Wrap( + errors.Wrap(err, ErrInvalidData.Error()), + "get truncated clock frame iterator value", + ) + } } return frame, nil @@ -203,14 +215,34 @@ func (p *PebbleClockIterator) Value() (*protobufs.ClockFrame, error) { value := p.i.Value() frame := &protobufs.ClockFrame{} - if err := proto.Unmarshal(value, frame); err != nil { - return nil, errors.Wrap( - errors.Wrap(err, ErrInvalidData.Error()), - "get clock frame iterator value", - ) + genesisFramePreIndex := false + + // We do a bit of a cheap trick here while things are still stuck in the old + // ways: we use the size of the parent index key to determine if it's the new + // format, or the old raw frame + if len(value) == (len(p.i.Key()) + 32) { + frameValue, frameCloser, err := p.db.db.Get(value) + if err != nil { + return nil, errors.Wrap(err, "get clock frame iterator value") + } + if err := proto.Unmarshal(frameValue, frame); err != nil { + return nil, errors.Wrap( + errors.Wrap(err, ErrInvalidData.Error()), + "get clock frame iterator value", + ) + } + defer frameCloser.Close() + } else { + if err := proto.Unmarshal(value, frame); err != nil { + return nil, errors.Wrap( + errors.Wrap(err, ErrInvalidData.Error()), + "get clock frame iterator value", + ) + } + genesisFramePreIndex = frame.FrameNumber == 0 } - if err := p.db.fillAggregateProofs(frame, false); err != nil { + if err := p.db.fillAggregateProofs(frame, genesisFramePreIndex); err != nil { return nil, errors.Wrap( errors.Wrap(err, ErrInvalidData.Error()), "get clock frame iterator value", @@ -647,20 +679,8 @@ func (p *PebbleClockStore) saveAggregateProofs( p.db, txn, frame.AggregateProofs[i], - commit, func(typeUrl string, data []byte) ([][]byte, error) { - if typeUrl == protobufs.IntrinsicExecutionOutputType { - o := &protobufs.IntrinsicExecutionOutput{} - if err := proto.Unmarshal(data, o); err != nil { - return nil, err - } - leftBits := append([]byte{}, o.Address...) - leftBits = append(leftBits, o.Output...) - rightBits := o.Proof - return [][]byte{leftBits, rightBits}, nil - } - - return [][]byte{data}, nil - }) + commit, + ) if err != nil { if err = txn.Abort(); err != nil { return err @@ -900,185 +920,6 @@ func (p *PebbleClockStore) RangeDataClockFrames( return &PebbleClockIterator{i: iter, db: p}, nil } -// Should only need be run once, before starting -func (p *PebbleClockStore) Deduplicate(filter []byte) error { - from := clockDataParentIndexKey( - filter, - 1, - []byte{ - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - }, - ) - to := clockDataParentIndexKey( - filter, - 20000, - []byte{ - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - }, - ) - - iter, err := p.db.NewIter(from, to) - if err != nil { - return errors.Wrap(err, "deduplicate") - } - - i := 0 - for iter.First(); iter.Valid(); iter.Next() { - value := iter.Value() - frame := &protobufs.ClockFrame{} - if err := proto.Unmarshal(value, frame); err != nil { - return err - } - - if err := p.saveAggregateProofs(nil, frame); err != nil { - return err - } - - frame.AggregateProofs = []*protobufs.InclusionAggregateProof{} - newValue, err := proto.Marshal(frame) - if err != nil { - return err - } - - err = p.db.Set(iter.Key(), newValue) - if err != nil { - return err - } - i++ - if i%100 == 0 { - fmt.Println("Deduplicated 100 parent frames") - } - } - - iter.Close() - if err := p.db.Compact(from, to, true); err != nil { - return err - } - - from = clockDataFrameKey(filter, 1) - to = clockDataFrameKey(filter, 20000) - - iter, err = p.db.NewIter(from, to) - if err != nil { - return errors.Wrap(err, "deduplicate") - } - - i = 0 - for iter.First(); iter.Valid(); iter.Next() { - value := iter.Value() - frame := &protobufs.ClockFrame{} - if err := proto.Unmarshal(value, frame); err != nil { - return err - } - - if err := p.saveAggregateProofs(nil, frame); err != nil { - return err - } - - frame.AggregateProofs = []*protobufs.InclusionAggregateProof{} - newValue, err := proto.Marshal(frame) - if err != nil { - return err - } - - err = p.db.Set(iter.Key(), newValue) - if err != nil { - return err - } - i++ - if i%100 == 0 { - fmt.Println("Deduplicated 100 data frames") - } - } - - iter.Close() - if err := p.db.Compact(from, to, true); err != nil { - return err - } - - from = clockDataCandidateFrameKey( - filter, - 1, - []byte{ - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - }, - []byte{ - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - }, - ) - to = clockDataCandidateFrameKey( - filter, - 20000, - []byte{ - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - }, - []byte{ - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, - }, - ) - - iter, err = p.db.NewIter(from, to) - if err != nil { - return errors.Wrap(err, "deduplicate") - } - - i = 0 - for iter.First(); iter.Valid(); iter.Next() { - value := iter.Value() - frame := &protobufs.ClockFrame{} - if err := proto.Unmarshal(value, frame); err != nil { - return err - } - - if err := p.saveAggregateProofs(nil, frame); err != nil { - return err - } - - frame.AggregateProofs = []*protobufs.InclusionAggregateProof{} - newValue, err := proto.Marshal(frame) - if err != nil { - return err - } - - err = p.db.Set(iter.Key(), newValue) - if err != nil { - return err - } - - i++ - if i%100 == 0 { - fmt.Println("Deduplicated 100 candidate frames") - } - } - - iter.Close() - if err := p.db.Compact(from, to, true); err != nil { - return err - } - - p.db.Close() - - return nil -} - func (p *PebbleClockStore) GetCompressedDataClockFrames( filter []byte, fromFrameNumber uint64, @@ -1103,8 +944,8 @@ func (p *PebbleClockStore) GetCompressedDataClockFrames( genesisFramePreIndex := false // We do a bit of a cheap trick here while things are still stuck in the old - // ways: we use the size of the parent index key to determine if it's the new - // format, or the old raw frame + // ways: we use the size of the parent index key to determine if it's the + // new format, or the old raw frame if len(value) == (len(filter) + 42) { frameValue, frameCloser, err := p.db.Get(value) if err != nil { diff --git a/node/store/data_proof.go b/node/store/data_proof.go index ad5f1a4..ec3722c 100644 --- a/node/store/data_proof.go +++ b/node/store/data_proof.go @@ -220,7 +220,6 @@ func internalPutAggregateProof( txn Transaction, aggregateProof *protobufs.InclusionAggregateProof, commitment []byte, - inclusionSplitter func(typeUrl string, data []byte) ([][]byte, error), ) error { buf := binary.BigEndian.AppendUint64( nil, @@ -229,9 +228,18 @@ func internalPutAggregateProof( buf = append(buf, aggregateProof.Proof...) for i, inc := range aggregateProof.InclusionCommitments { - segments, err := inclusionSplitter(inc.TypeUrl, inc.Data) - if err != nil { - return errors.Wrap(err, "get aggregate proof") + var segments [][]byte + if inc.TypeUrl == protobufs.IntrinsicExecutionOutputType { + o := &protobufs.IntrinsicExecutionOutput{} + if err := proto.Unmarshal(inc.Data, o); err != nil { + return errors.Wrap(err, "get aggregate proof") + } + leftBits := append([]byte{}, o.Address...) + leftBits = append(leftBits, o.Output...) + rightBits := o.Proof + segments = [][]byte{leftBits, rightBits} + } else { + segments = [][]byte{inc.Data} } urlLength := len(inc.TypeUrl) @@ -244,7 +252,7 @@ func internalPutAggregateProof( for _, segment := range segments { hash := sha3.Sum256(segment) - if err = txn.Set( + if err := txn.Set( dataProofSegmentKey(aggregateProof.Filter, hash[:]), segment, ); err != nil { @@ -253,7 +261,7 @@ func internalPutAggregateProof( encoded = append(encoded, hash[:]...) } - if err = txn.Set( + if err := txn.Set( dataProofInclusionKey(aggregateProof.Filter, commitment, uint64(i)), encoded, ); err != nil { @@ -275,13 +283,11 @@ func (p *PebbleDataProofStore) PutAggregateProof( txn Transaction, aggregateProof *protobufs.InclusionAggregateProof, commitment []byte, - inclusionSplitter func(typeUrl string, data []byte) ([][]byte, error), ) error { return internalPutAggregateProof( p.db, txn, aggregateProof, commitment, - inclusionSplitter, ) }