some various fixes ahead of 1.4.7 (#119)

* experimental: switch mutex to RW to see if it alleviates backpressure on peer info

* relax mutex req

* reject unknown messages

* open the floodgates

* adjust message handler to use goroutine, i'll probably regret this

* switch that back, it was regret

* further discovery

* log more data

* forcibly block channel when unbounded

* else

* make it configurable so bootstrap peers are the only ones putting up with this.

* ok, non-starter, let's try a different route

* further tweaking

* let the peer info flow uninhibited

* final burn off on master

* final adjustments
This commit is contained in:
Cassandra Heart 2024-03-12 20:28:48 -05:00 committed by GitHub
parent 2e029fa4bf
commit f0c71b2d40
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 221 additions and 443 deletions

View File

@ -160,7 +160,6 @@ func (t *Bitmask) Subscribe(opts ...SubOpt) (*Subscription, error) {
}
if sub.ch == nil {
// apply the default size
sub.ch = make(chan *Message, 128)
}

View File

@ -5,7 +5,6 @@ import (
"fmt"
"math/rand"
"sort"
"sync"
"time"
pb "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
@ -453,12 +452,6 @@ type BlossomSubRouter struct {
protos []protocol.ID
feature BlossomSubFeatureTest
fanoutMx sync.Mutex
lastpubMx sync.Mutex
meshMx sync.Mutex
peerhaveMx sync.Mutex
iaskedMx sync.Mutex
mcache *MessageCache
tracer *pubsubTracer
score *peerScore
@ -664,9 +657,7 @@ func (bs *BlossomSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb
}
// IHAVE flood protection
bs.peerhaveMx.Lock()
bs.peerhave[p]++
bs.peerhaveMx.Unlock()
if bs.peerhave[p] > bs.params.MaxIHaveMessages {
log.Debugf("IHAVE: peer %s has advertised too many times (%d) within this heartbeat interval; ignoring", p, bs.peerhave[p])
return nil
@ -718,9 +709,7 @@ func (bs *BlossomSubRouter) handleIHave(p peer.ID, ctl *pb.ControlMessage) []*pb
// truncate to the messages we are actually asking for and update the iasked counter
iwantlst = iwantlst[:iask]
bs.iaskedMx.Lock()
bs.iasked[p] += iask
bs.iaskedMx.Unlock()
bs.gossipTracer.AddPromise(p, iwantlst)
@ -1055,9 +1044,7 @@ func (bs *BlossomSubRouter) Publish(msg *Message) {
if len(peers) > 0 {
gmap = peerListToMap(peers)
bs.fanoutMx.Lock()
bs.fanout[string(bitmask)] = gmap
bs.fanoutMx.Unlock()
}
}
bs.lastpub[string(bitmask)] = time.Now().UnixNano()
@ -1114,15 +1101,9 @@ func (bs *BlossomSubRouter) Join(bitmask []byte) {
}
}
bs.meshMx.Lock()
bs.mesh[string(bitmask)] = gmap
bs.meshMx.Unlock()
bs.fanoutMx.Lock()
delete(bs.fanout, string(bitmask))
bs.fanoutMx.Unlock()
bs.lastpubMx.Lock()
delete(bs.lastpub, string(bitmask))
bs.lastpubMx.Unlock()
} else {
backoff := bs.backoff[string(bitmask)]
peers := bs.getPeers(bitmask, bs.params.D, func(p peer.ID) bool {
@ -1151,9 +1132,7 @@ func (bs *BlossomSubRouter) Leave(bitmask []byte) {
log.Debugf("LEAVE %s", bitmask)
bs.tracer.Leave(bitmask)
bs.meshMx.Lock()
delete(bs.mesh, string(bitmask))
bs.meshMx.Unlock()
for p := range gmap {
log.Debugf("LEAVE: Remove mesh link to %s in %s", p, bitmask)
@ -1604,12 +1583,8 @@ func (bs *BlossomSubRouter) heartbeat() {
now := time.Now().UnixNano()
for bitmask, lastpub := range bs.lastpub {
if lastpub+int64(bs.params.FanoutTTL) < now {
bs.fanoutMx.Lock()
delete(bs.fanout, bitmask)
bs.fanoutMx.Unlock()
bs.lastpubMx.Lock()
delete(bs.lastpub, bitmask)
bs.lastpubMx.Unlock()
}
}
@ -1657,16 +1632,12 @@ func (bs *BlossomSubRouter) heartbeat() {
func (bs *BlossomSubRouter) clearIHaveCounters() {
if len(bs.peerhave) > 0 {
// throw away the old map and make a new one
bs.peerhaveMx.Lock()
bs.peerhave = make(map[peer.ID]int)
bs.peerhaveMx.Unlock()
}
if len(bs.iasked) > 0 {
// throw away the old map and make a new one
bs.iaskedMx.Lock()
bs.iasked = make(map[peer.ID]int)
bs.iaskedMx.Unlock()
}
}

View File

@ -979,7 +979,10 @@ func (p *PubSub) notifySubs(msg *Message) {
for f := range subs {
select {
case f.ch <- msg:
default:
case <-time.After(15 * time.Millisecond):
// it's unreasonable to immediately fall over because a subscriber didn't
// answer, message delivery sometimes lands next nanosecond and dropping
// it when there's room is absurd.
p.tracer.UndeliverableMessage(msg)
log.Infof("Can't deliver message to subscription for bitmask %x; subscriber too slow", bitmask)
}
@ -1308,7 +1311,7 @@ func (p *PubSub) Subscribe(bitmask []byte, opts ...SubOpt) (*Subscription, error
}
// WithBufferSize is a Subscribe option to customize the size of the subscribe output buffer.
// The default length is 1000 but it can be configured to avoid dropping messages if the consumer is not reading fast
// The default length is 128 but it can be configured to avoid dropping messages if the consumer is not reading fast
// enough.
func WithBufferSize(size int) SubOpt {
return func(sub *Subscription) error {

View File

@ -96,12 +96,12 @@ func (e *CeremonyDataClockConsensusEngine) handleMessage(
switch any.TypeUrl {
case protobufs.ClockFrameType:
e.peerMapMx.Lock()
e.peerMapMx.RLock()
if peer, ok := e.peerMap[string(message.From)]; !ok ||
bytes.Compare(peer.version, consensus.GetMinimumVersion()) < 0 {
return nil
}
e.peerMapMx.Unlock()
e.peerMapMx.RUnlock()
if err := e.handleClockFrameData(
message.From,
msg.Address,
@ -134,13 +134,6 @@ func (e *CeremonyDataClockConsensusEngine) handleCeremonyPeerListAnnounce(
}
for _, p := range announce.PeerList {
e.peerMapMx.Lock()
if _, ok := e.uncooperativePeersMap[string(p.PeerId)]; ok {
e.peerMapMx.Unlock()
continue
}
e.peerMapMx.Unlock()
if bytes.Equal(p.PeerId, e.pubSub.GetPeerID()) {
continue
}
@ -197,13 +190,20 @@ func (e *CeremonyDataClockConsensusEngine) handleCeremonyPeerListAnnounce(
}
}
e.peerMapMx.RLock()
if _, ok := e.uncooperativePeersMap[string(p.PeerId)]; ok {
e.peerMapMx.RUnlock()
continue
}
e.peerMapMx.RUnlock()
multiaddr := e.pubSub.GetMultiaddrOfPeer(p.PeerId)
e.pubSub.SetPeerScore(p.PeerId, 10)
e.peerMapMx.Lock()
e.peerMapMx.RLock()
existing, ok := e.peerMap[string(p.PeerId)]
e.peerMapMx.Unlock()
e.peerMapMx.RUnlock()
if ok {
if existing.signature != nil && p.Signature == nil {

View File

@ -98,7 +98,7 @@ type CeremonyDataClockConsensusEngine struct {
engineMx sync.Mutex
dependencyMapMx sync.Mutex
stagedLobbyStateTransitionsMx sync.Mutex
peerMapMx sync.Mutex
peerMapMx sync.RWMutex
peerAnnounceMapMx sync.Mutex
lastKeyBundleAnnouncementFrame uint64
peerMap map[string]*peerInfo
@ -574,7 +574,7 @@ func (
e *CeremonyDataClockConsensusEngine,
) GetPeerInfo() *protobufs.PeerInfoResponse {
resp := &protobufs.PeerInfoResponse{}
e.peerMapMx.Lock()
e.peerMapMx.RLock()
for _, v := range e.peerMap {
resp.PeerInfo = append(resp.PeerInfo, &protobufs.PeerInfo{
PeerId: v.peerId,
@ -602,6 +602,6 @@ func (
},
)
}
e.peerMapMx.Unlock()
e.peerMapMx.RUnlock()
return resp
}

View File

@ -159,7 +159,7 @@ func (e *CeremonyDataClockConsensusEngine) GetMostAheadPeer() (
max := frame.FrameNumber
var peer []byte = nil
e.peerMapMx.Lock()
e.peerMapMx.RLock()
for _, v := range e.peerMap {
_, ok := e.uncooperativePeersMap[string(v.peerId)]
if v.maxFrame > max &&
@ -169,7 +169,7 @@ func (e *CeremonyDataClockConsensusEngine) GetMostAheadPeer() (
max = v.maxFrame
}
}
e.peerMapMx.Unlock()
e.peerMapMx.RUnlock()
if peer == nil {
return nil, 0, p2p.ErrNoPeersAvailable

View File

@ -2,17 +2,13 @@ package master
import (
"bytes"
"context"
"crypto/rand"
"encoding/binary"
"strings"
"time"
"github.com/iden3/go-iden3-crypto/poseidon"
"github.com/mr-tron/base58"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
@ -28,7 +24,6 @@ func (e *MasterClockConsensusEngine) handleMessage(message *pb.Message) error {
zap.Binary("signature", message.Signature),
)
msg := &protobufs.Message{}
if err := proto.Unmarshal(message.Data, msg); err != nil {
return errors.Wrap(err, "handle message")
}
@ -38,42 +33,6 @@ func (e *MasterClockConsensusEngine) handleMessage(message *pb.Message) error {
return errors.Wrap(err, "handle message")
}
eg := errgroup.Group{}
eg.SetLimit(len(e.executionEngines))
for name := range e.executionEngines {
name := name
eg.Go(func() error {
messages, err := e.executionEngines[name].ProcessMessage(
msg.Address,
msg,
)
if err != nil {
e.logger.Error(
"could not process message for engine",
zap.Error(err),
zap.String("engine_name", name),
)
return errors.Wrap(err, "handle message")
}
for _, m := range messages {
m := m
if err := e.publishMessage(m.Address, m); err != nil {
e.logger.Error(
"could not publish message for engine",
zap.Error(err),
zap.String("engine_name", name),
)
return errors.Wrap(err, "handle message")
}
}
return nil
})
}
if err := eg.Wait(); err != nil {
e.logger.Error("rejecting invalid message", zap.Error(err))
return errors.Wrap(err, "execution failed")
}
switch any.TypeUrl {
case protobufs.ClockFrameType:
if err := e.handleClockFrameData(
@ -82,6 +41,7 @@ func (e *MasterClockConsensusEngine) handleMessage(message *pb.Message) error {
); err != nil {
return errors.Wrap(err, "handle message")
}
return nil
case protobufs.SelfTestReportType:
if err := e.handleSelfTestReport(
message.From,
@ -89,9 +49,10 @@ func (e *MasterClockConsensusEngine) handleMessage(message *pb.Message) error {
); err != nil {
return errors.Wrap(err, "handle message")
}
return nil
}
return nil
return errors.Wrap(errors.New("invalid message"), "handle message")
}
func (e *MasterClockConsensusEngine) handleClockFrameData(
@ -131,12 +92,19 @@ func (e *MasterClockConsensusEngine) handleClockFrameData(
zap.Int("proof_count", len(frame.AggregateProofs)),
)
if err := e.frameProver.VerifyMasterClockFrame(frame); err != nil {
e.logger.Error("could not verify clock frame", zap.Error(err))
return errors.Wrap(err, "handle clock frame data")
}
e.masterTimeReel.Insert(frame)
go func() {
select {
case e.frameValidationCh <- frame:
default:
e.logger.Debug(
"dropped frame due to overwhelmed queue",
zap.Binary("sender", peerID),
zap.Binary("filter", frame.Filter),
zap.Uint64("frame_number", frame.FrameNumber),
zap.Int("proof_count", len(frame.AggregateProofs)),
)
}
}()
return nil
}
@ -206,86 +174,9 @@ func (e *MasterClockConsensusEngine) handleSelfTestReport(
return nil
}
cc, err := e.pubSub.GetDirectChannel(peerID, "validation")
if err != nil {
e.logger.Debug(
"could not connect for validation",
zap.String("peer_id", base58.Encode(peerID)),
zap.Uint32("difficulty", report.Difficulty),
zap.Int64("difficulty_metric", report.DifficultyMetric),
zap.Int64("commit_16_metric", report.Commit_16Metric),
zap.Int64("commit_128_metric", report.Commit_128Metric),
zap.Int64("commit_1024_metric", report.Commit_1024Metric),
zap.Int64("commit_65536_metric", report.Commit_65536Metric),
zap.Int64("proof_16_metric", report.Proof_16Metric),
zap.Int64("proof_128_metric", report.Proof_128Metric),
zap.Int64("proof_1024_metric", report.Proof_1024Metric),
zap.Int64("proof_65536_metric", report.Proof_65536Metric),
zap.Uint32("cores", report.Cores),
zap.Uint64("memory", memory),
zap.Uint64("storage", binary.BigEndian.Uint64(report.Storage)),
)
return errors.Wrap(err, "handle self test report")
}
client := protobufs.NewValidationServiceClient(cc)
verification := make([]byte, 1048576)
rand.Read(verification)
start := time.Now().UnixMilli()
validation, err := client.PerformValidation(
context.Background(),
&protobufs.ValidationMessage{
Validation: verification,
},
)
end := time.Now().UnixMilli()
if err != nil {
cc.Close()
return errors.Wrap(err, "handle self test report")
}
cc.Close()
if !bytes.Equal(verification, validation.Validation) {
e.logger.Debug(
"provided invalid verification",
zap.String("peer_id", base58.Encode(peerID)),
zap.Uint32("difficulty", report.Difficulty),
zap.Int64("difficulty_metric", report.DifficultyMetric),
zap.Int64("commit_16_metric", report.Commit_16Metric),
zap.Int64("commit_128_metric", report.Commit_128Metric),
zap.Int64("commit_1024_metric", report.Commit_1024Metric),
zap.Int64("commit_65536_metric", report.Commit_65536Metric),
zap.Int64("proof_16_metric", report.Proof_16Metric),
zap.Int64("proof_128_metric", report.Proof_128Metric),
zap.Int64("proof_1024_metric", report.Proof_1024Metric),
zap.Int64("proof_65536_metric", report.Proof_65536Metric),
zap.Uint32("cores", report.Cores),
zap.Uint64("memory", memory),
zap.Uint64("storage", binary.BigEndian.Uint64(report.Storage)),
)
return nil
}
if end-start > 2000 {
e.logger.Debug(
"slow bandwidth, scoring out",
zap.String("peer_id", base58.Encode(peerID)),
zap.Uint32("difficulty", report.Difficulty),
zap.Int64("difficulty_metric", report.DifficultyMetric),
zap.Int64("commit_16_metric", report.Commit_16Metric),
zap.Int64("commit_128_metric", report.Commit_128Metric),
zap.Int64("commit_1024_metric", report.Commit_1024Metric),
zap.Int64("commit_65536_metric", report.Commit_65536Metric),
zap.Int64("proof_16_metric", report.Proof_16Metric),
zap.Int64("proof_128_metric", report.Proof_128Metric),
zap.Int64("proof_1024_metric", report.Proof_1024Metric),
zap.Int64("proof_65536_metric", report.Proof_65536Metric),
zap.Uint32("cores", report.Cores),
zap.Uint64("memory", memory),
zap.Uint64("storage", binary.BigEndian.Uint64(report.Storage)),
)
// tag: dusk nuke this peer for now
e.pubSub.SetPeerScore(peerID, -1000)
}
go func() {
e.bandwidthTestCh <- peerID
}()
return nil
}

View File

@ -1,12 +1,16 @@
package master
import (
"bytes"
"context"
"crypto/rand"
"encoding/hex"
"io"
"math/big"
"sync"
"time"
"github.com/mr-tron/base58"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -54,8 +58,10 @@ type MasterClockConsensusEngine struct {
clockStore store.ClockStore
masterTimeReel *qtime.MasterTimeReel
report *protobufs.SelfTestReport
peerMapMx sync.Mutex
peerMapMx sync.RWMutex
peerMap map[string]*protobufs.SelfTestReport
frameValidationCh chan *protobufs.ClockFrame
bandwidthTestCh chan []byte
currentReceivingSyncPeers int
currentReceivingSyncPeersMx sync.Mutex
}
@ -117,6 +123,8 @@ func NewMasterClockConsensusEngine(
masterTimeReel: masterTimeReel,
report: report,
peerMap: map[string]*protobufs.SelfTestReport{},
frameValidationCh: make(chan *protobufs.ClockFrame, 10),
bandwidthTestCh: make(chan []byte),
}
e.peerMap[string(e.pubSub.GetPeerID())] = report
@ -150,6 +158,22 @@ func (e *MasterClockConsensusEngine) Start() <-chan error {
e.buildHistoricFrameCache(frame)
go func() {
for {
select {
case newFrame := <-e.frameValidationCh:
if err := e.frameProver.VerifyMasterClockFrame(newFrame); err != nil {
e.logger.Error("could not verify clock frame", zap.Error(err))
continue
}
e.masterTimeReel.Insert(newFrame)
case peerId := <-e.bandwidthTestCh:
e.performBandwidthTest(peerId)
}
}
}()
e.logger.Info("subscribing to pubsub messages")
e.pubSub.Subscribe(e.filter, e.handleMessage, true)
@ -183,9 +207,10 @@ func (e *MasterClockConsensusEngine) Start() <-chan error {
}()
go func() {
for {
time.Sleep(30 * time.Second)
// Let it sit until we at least have a few more peers inbound
time.Sleep(30 * time.Second)
for {
e.logger.Info("broadcasting self-test info")
head, err := e.masterTimeReel.Head()
if err != nil {
@ -197,6 +222,7 @@ func (e *MasterClockConsensusEngine) Start() <-chan error {
if err := e.publishMessage(e.filter, e.report); err != nil {
e.logger.Debug("error publishing message", zap.Error(err))
}
time.Sleep(30 * time.Minute)
}
}()
@ -288,6 +314,71 @@ func (e *MasterClockConsensusEngine) Stop(force bool) <-chan error {
return errChan
}
func (e *MasterClockConsensusEngine) performBandwidthTest(peerID []byte) {
result := e.pubSub.GetMultiaddrOfPeer(peerID)
if result == "" {
go func() {
e.bandwidthTestCh <- peerID
}()
return
}
cc, err := e.pubSub.GetDirectChannel(peerID, "validation")
if err != nil {
e.logger.Info(
"could not connect for validation",
zap.String("peer_id", base58.Encode(peerID)),
)
// tag: dusk nuke this peer for now
e.pubSub.SetPeerScore(peerID, -1000)
return
}
client := protobufs.NewValidationServiceClient(cc)
verification := make([]byte, 1048576)
rand.Read(verification)
start := time.Now().UnixMilli()
validation, err := client.PerformValidation(
context.Background(),
&protobufs.ValidationMessage{
Validation: verification,
},
)
end := time.Now().UnixMilli()
if err != nil && err != io.EOF {
cc.Close()
e.logger.Info(
"peer returned error",
zap.String("peer_id", base58.Encode(peerID)),
zap.Error(err),
)
// tag: dusk nuke this peer for now
e.pubSub.SetPeerScore(peerID, -1000)
return
}
cc.Close()
if !bytes.Equal(verification, validation.Validation) {
e.logger.Info(
"provided invalid verification",
zap.String("peer_id", base58.Encode(peerID)),
)
// tag: dusk nuke this peer for now
e.pubSub.SetPeerScore(peerID, -1000)
return
}
if end-start > 2000 {
e.logger.Info(
"slow bandwidth, scoring out",
zap.String("peer_id", base58.Encode(peerID)),
)
// tag: dusk nuke this peer for now
e.pubSub.SetPeerScore(peerID, -1000)
return
}
}
func (
e *MasterClockConsensusEngine,
) GetPeerManifests() *protobufs.PeerManifestsResponse {

View File

@ -127,7 +127,6 @@ func (d *DataTimeReel) Start() error {
}
if frame == nil {
d.head, d.proverTrie = d.createGenesisFrame()
d.totalDistance = big.NewInt(0)
d.headDistance = big.NewInt(0)

View File

@ -571,6 +571,13 @@ func GetOutputsFromClockFrame(
return nil, nil, errors.Wrap(err, "get outputs from clock frame")
}
// apply a small fixup based on a pre-dusk bug showing up with dusk
// conventions
if frame.FrameNumber == 0 && len(output.Address) == 32 {
output.Address = append(output.Address, output.Output[:48]...)
output.Output = output.Output[48:]
}
lobbyState = &protobufs.CeremonyLobbyState{}
if err := proto.Unmarshal(output.Output, lobbyState); err != nil {
return nil, nil, errors.Wrap(err, "get outputs from clock frame")

View File

@ -31,7 +31,6 @@ import (
"source.quilibrium.com/quilibrium/monorepo/node/config"
qcrypto "source.quilibrium.com/quilibrium/monorepo/node/crypto"
"source.quilibrium.com/quilibrium/monorepo/node/crypto/kzg"
"source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/ceremony/application"
"source.quilibrium.com/quilibrium/monorepo/node/rpc"
)
@ -150,7 +149,6 @@ func main() {
}
clearIfTestData(*configDirectory, nodeConfig)
migrate(*configDirectory, nodeConfig)
if *dbConsole {
console, err := app.NewDBConsole(nodeConfig)
@ -435,36 +433,6 @@ func repair(configDir string, node *app.Node) {
}
}
func migrate(configDir string, nodeConfig *config.Config) {
_, err := os.Stat(filepath.Join(configDir, "MIGRATIONS"))
if os.IsNotExist(err) {
fmt.Println("Deduplicating and compressing clock frame data...")
clock, err := app.NewClockStore(nodeConfig)
if err := clock.Deduplicate(application.CEREMONY_ADDRESS); err != nil {
panic(err)
}
migrationFile, err := os.OpenFile(
filepath.Join(configDir, "MIGRATIONS"),
os.O_CREATE|os.O_RDWR,
fs.FileMode(0600),
)
if err != nil {
panic(err)
}
_, err = migrationFile.Write([]byte{0x00, 0x00, 0x01})
if err != nil {
panic(err)
}
err = migrationFile.Close()
if err != nil {
panic(err)
}
}
}
func printPeerID(p2pConfig *config.P2PConfig) {
peerPrivKey, err := hex.DecodeString(p2pConfig.PeerPrivKey)
if err != nil {

View File

@ -35,15 +35,16 @@ import (
)
type BlossomSub struct {
ps *blossomsub.PubSub
ctx context.Context
logger *zap.Logger
peerID peer.ID
bitmaskMap map[string]*blossomsub.Bitmask
h host.Host
signKey crypto.PrivKey
peerScore map[string]int64
peerScoreMx sync.Mutex
ps *blossomsub.PubSub
ctx context.Context
logger *zap.Logger
peerID peer.ID
bitmaskMap map[string]*blossomsub.Bitmask
h host.Host
signKey crypto.PrivKey
peerScore map[string]int64
peerScoreMx sync.Mutex
isBootstrapPeer bool
}
var _ PubSub = (*BlossomSub)(nil)
@ -133,11 +134,12 @@ func NewBlossomSub(
}
bs := &BlossomSub{
ctx: ctx,
logger: logger,
bitmaskMap: make(map[string]*blossomsub.Bitmask),
signKey: privKey,
peerScore: make(map[string]int64),
ctx: ctx,
logger: logger,
bitmaskMap: make(map[string]*blossomsub.Bitmask),
signKey: privKey,
peerScore: make(map[string]int64),
isBootstrapPeer: isBootstrapPeer,
}
h, err := libp2p.New(opts...)
@ -267,6 +269,7 @@ func (b *BlossomSub) Subscribe(
"begin streaming from bitmask",
zap.Binary("bitmask", bitmask),
)
go func() {
for {
m, err := sub.Next(b.ctx)
@ -276,7 +279,6 @@ func (b *BlossomSub) Subscribe(
zap.Error(err),
)
}
if err = handler(m.Message); err != nil {
b.logger.Debug("message handler returned error", zap.Error(err))
}

View File

@ -3,7 +3,6 @@ package store
import (
"bytes"
"encoding/binary"
"fmt"
"github.com/cockroachdb/pebble"
"github.com/iden3/go-iden3-crypto/poseidon"
@ -62,7 +61,6 @@ type ClockStore interface {
parentSelector []byte,
truncate bool,
) (*protobufs.ClockFrame, error)
Deduplicate(filter []byte) error
GetCompressedDataClockFrames(
filter []byte,
fromFrameNumber uint64,
@ -186,11 +184,25 @@ func (p *PebbleClockIterator) TruncatedValue() (
value := p.i.Value()
frame := &protobufs.ClockFrame{}
if err := proto.Unmarshal(value, frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get candidate clock frame iterator value",
)
if len(value) == (len(p.i.Key()) + 32) {
frameValue, frameCloser, err := p.db.db.Get(value)
if err != nil {
return nil, errors.Wrap(err, "get truncated clock frame iterator value")
}
if err := proto.Unmarshal(frameValue, frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get truncated clock frame iterator value",
)
}
frameCloser.Close()
} else {
if err := proto.Unmarshal(value, frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get truncated clock frame iterator value",
)
}
}
return frame, nil
@ -203,14 +215,34 @@ func (p *PebbleClockIterator) Value() (*protobufs.ClockFrame, error) {
value := p.i.Value()
frame := &protobufs.ClockFrame{}
if err := proto.Unmarshal(value, frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get clock frame iterator value",
)
genesisFramePreIndex := false
// We do a bit of a cheap trick here while things are still stuck in the old
// ways: we use the size of the parent index key to determine if it's the new
// format, or the old raw frame
if len(value) == (len(p.i.Key()) + 32) {
frameValue, frameCloser, err := p.db.db.Get(value)
if err != nil {
return nil, errors.Wrap(err, "get clock frame iterator value")
}
if err := proto.Unmarshal(frameValue, frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get clock frame iterator value",
)
}
defer frameCloser.Close()
} else {
if err := proto.Unmarshal(value, frame); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get clock frame iterator value",
)
}
genesisFramePreIndex = frame.FrameNumber == 0
}
if err := p.db.fillAggregateProofs(frame, false); err != nil {
if err := p.db.fillAggregateProofs(frame, genesisFramePreIndex); err != nil {
return nil, errors.Wrap(
errors.Wrap(err, ErrInvalidData.Error()),
"get clock frame iterator value",
@ -647,20 +679,8 @@ func (p *PebbleClockStore) saveAggregateProofs(
p.db,
txn,
frame.AggregateProofs[i],
commit, func(typeUrl string, data []byte) ([][]byte, error) {
if typeUrl == protobufs.IntrinsicExecutionOutputType {
o := &protobufs.IntrinsicExecutionOutput{}
if err := proto.Unmarshal(data, o); err != nil {
return nil, err
}
leftBits := append([]byte{}, o.Address...)
leftBits = append(leftBits, o.Output...)
rightBits := o.Proof
return [][]byte{leftBits, rightBits}, nil
}
return [][]byte{data}, nil
})
commit,
)
if err != nil {
if err = txn.Abort(); err != nil {
return err
@ -900,185 +920,6 @@ func (p *PebbleClockStore) RangeDataClockFrames(
return &PebbleClockIterator{i: iter, db: p}, nil
}
// Should only need be run once, before starting
func (p *PebbleClockStore) Deduplicate(filter []byte) error {
from := clockDataParentIndexKey(
filter,
1,
[]byte{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
},
)
to := clockDataParentIndexKey(
filter,
20000,
[]byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
},
)
iter, err := p.db.NewIter(from, to)
if err != nil {
return errors.Wrap(err, "deduplicate")
}
i := 0
for iter.First(); iter.Valid(); iter.Next() {
value := iter.Value()
frame := &protobufs.ClockFrame{}
if err := proto.Unmarshal(value, frame); err != nil {
return err
}
if err := p.saveAggregateProofs(nil, frame); err != nil {
return err
}
frame.AggregateProofs = []*protobufs.InclusionAggregateProof{}
newValue, err := proto.Marshal(frame)
if err != nil {
return err
}
err = p.db.Set(iter.Key(), newValue)
if err != nil {
return err
}
i++
if i%100 == 0 {
fmt.Println("Deduplicated 100 parent frames")
}
}
iter.Close()
if err := p.db.Compact(from, to, true); err != nil {
return err
}
from = clockDataFrameKey(filter, 1)
to = clockDataFrameKey(filter, 20000)
iter, err = p.db.NewIter(from, to)
if err != nil {
return errors.Wrap(err, "deduplicate")
}
i = 0
for iter.First(); iter.Valid(); iter.Next() {
value := iter.Value()
frame := &protobufs.ClockFrame{}
if err := proto.Unmarshal(value, frame); err != nil {
return err
}
if err := p.saveAggregateProofs(nil, frame); err != nil {
return err
}
frame.AggregateProofs = []*protobufs.InclusionAggregateProof{}
newValue, err := proto.Marshal(frame)
if err != nil {
return err
}
err = p.db.Set(iter.Key(), newValue)
if err != nil {
return err
}
i++
if i%100 == 0 {
fmt.Println("Deduplicated 100 data frames")
}
}
iter.Close()
if err := p.db.Compact(from, to, true); err != nil {
return err
}
from = clockDataCandidateFrameKey(
filter,
1,
[]byte{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
},
[]byte{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
},
)
to = clockDataCandidateFrameKey(
filter,
20000,
[]byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
},
[]byte{
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
},
)
iter, err = p.db.NewIter(from, to)
if err != nil {
return errors.Wrap(err, "deduplicate")
}
i = 0
for iter.First(); iter.Valid(); iter.Next() {
value := iter.Value()
frame := &protobufs.ClockFrame{}
if err := proto.Unmarshal(value, frame); err != nil {
return err
}
if err := p.saveAggregateProofs(nil, frame); err != nil {
return err
}
frame.AggregateProofs = []*protobufs.InclusionAggregateProof{}
newValue, err := proto.Marshal(frame)
if err != nil {
return err
}
err = p.db.Set(iter.Key(), newValue)
if err != nil {
return err
}
i++
if i%100 == 0 {
fmt.Println("Deduplicated 100 candidate frames")
}
}
iter.Close()
if err := p.db.Compact(from, to, true); err != nil {
return err
}
p.db.Close()
return nil
}
func (p *PebbleClockStore) GetCompressedDataClockFrames(
filter []byte,
fromFrameNumber uint64,
@ -1103,8 +944,8 @@ func (p *PebbleClockStore) GetCompressedDataClockFrames(
genesisFramePreIndex := false
// We do a bit of a cheap trick here while things are still stuck in the old
// ways: we use the size of the parent index key to determine if it's the new
// format, or the old raw frame
// ways: we use the size of the parent index key to determine if it's the
// new format, or the old raw frame
if len(value) == (len(filter) + 42) {
frameValue, frameCloser, err := p.db.Get(value)
if err != nil {

View File

@ -220,7 +220,6 @@ func internalPutAggregateProof(
txn Transaction,
aggregateProof *protobufs.InclusionAggregateProof,
commitment []byte,
inclusionSplitter func(typeUrl string, data []byte) ([][]byte, error),
) error {
buf := binary.BigEndian.AppendUint64(
nil,
@ -229,9 +228,18 @@ func internalPutAggregateProof(
buf = append(buf, aggregateProof.Proof...)
for i, inc := range aggregateProof.InclusionCommitments {
segments, err := inclusionSplitter(inc.TypeUrl, inc.Data)
if err != nil {
return errors.Wrap(err, "get aggregate proof")
var segments [][]byte
if inc.TypeUrl == protobufs.IntrinsicExecutionOutputType {
o := &protobufs.IntrinsicExecutionOutput{}
if err := proto.Unmarshal(inc.Data, o); err != nil {
return errors.Wrap(err, "get aggregate proof")
}
leftBits := append([]byte{}, o.Address...)
leftBits = append(leftBits, o.Output...)
rightBits := o.Proof
segments = [][]byte{leftBits, rightBits}
} else {
segments = [][]byte{inc.Data}
}
urlLength := len(inc.TypeUrl)
@ -244,7 +252,7 @@ func internalPutAggregateProof(
for _, segment := range segments {
hash := sha3.Sum256(segment)
if err = txn.Set(
if err := txn.Set(
dataProofSegmentKey(aggregateProof.Filter, hash[:]),
segment,
); err != nil {
@ -253,7 +261,7 @@ func internalPutAggregateProof(
encoded = append(encoded, hash[:]...)
}
if err = txn.Set(
if err := txn.Set(
dataProofInclusionKey(aggregateProof.Filter, commitment, uint64(i)),
encoded,
); err != nil {
@ -275,13 +283,11 @@ func (p *PebbleDataProofStore) PutAggregateProof(
txn Transaction,
aggregateProof *protobufs.InclusionAggregateProof,
commitment []byte,
inclusionSplitter func(typeUrl string, data []byte) ([][]byte, error),
) error {
return internalPutAggregateProof(
p.db,
txn,
aggregateProof,
commitment,
inclusionSplitter,
)
}