diff --git a/go-libp2p-blossomsub/blossomsub.go b/go-libp2p-blossomsub/blossomsub.go index 3a25343..3218ec7 100644 --- a/go-libp2p-blossomsub/blossomsub.go +++ b/go-libp2p-blossomsub/blossomsub.go @@ -1263,8 +1263,9 @@ func (bs *BlossomSubRouter) sendRPC(p peer.ID, out *RPC) { return } + outCopy := copyRPC(out) // Potentially split the RPC into multiple RPCs that are below the max message size - outRPCs := appendOrMergeRPC(nil, bs.p.maxMessageSize, *out) + outRPCs := appendOrMergeRPC(nil, bs.p.maxMessageSize, outCopy) for _, rpc := range outRPCs { if rpc.Size() > bs.p.maxMessageSize { // This should only happen if a single message/control is above the maxMessageSize. @@ -1299,19 +1300,19 @@ func (bs *BlossomSubRouter) doSendRPC(rpc *RPC, p peer.ID, mch chan *RPC) { // If an RPC is too large and can't be split further (e.g. Message data is // bigger than the RPC limit), then it will be returned as an oversized RPC. // The caller should filter out oversized RPCs. -func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC { +func appendOrMergeRPC(slice []*RPC, limit int, elems ...*RPC) []*RPC { if len(elems) == 0 { return slice } if len(slice) == 0 && len(elems) == 1 && elems[0].Size() < limit { // Fast path: no merging needed and only one element - return append(slice, &elems[0]) + return append(slice, elems[0]) } out := slice if len(out) == 0 { - out = append(out, &RPC{RPC: pb.RPC{}}) + out = append(out, &RPC{RPC: &pb.RPC{}}) out[0].from = elems[0].from } @@ -1325,7 +1326,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC { for _, msg := range elem.GetPublish() { if lastRPC.Publish = append(lastRPC.Publish, msg); lastRPC.Size() > limit { lastRPC.Publish = lastRPC.Publish[:len(lastRPC.Publish)-1] - lastRPC = &RPC{RPC: pb.RPC{}, from: elem.from} + lastRPC = &RPC{RPC: &pb.RPC{}, from: elem.from} lastRPC.Publish = append(lastRPC.Publish, msg) out = append(out, lastRPC) } @@ -1335,7 +1336,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC { for _, sub := range elem.GetSubscriptions() { if lastRPC.Subscriptions = append(lastRPC.Subscriptions, sub); lastRPC.Size() > limit { lastRPC.Subscriptions = lastRPC.Subscriptions[:len(lastRPC.Subscriptions)-1] - lastRPC = &RPC{RPC: pb.RPC{}, from: elem.from} + lastRPC = &RPC{RPC: &pb.RPC{}, from: elem.from} lastRPC.Subscriptions = append(lastRPC.Subscriptions, sub) out = append(out, lastRPC) } @@ -1347,7 +1348,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC { lastRPC.Control = &pb.ControlMessage{} if lastRPC.Size() > limit { lastRPC.Control = nil - lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from} + lastRPC = &RPC{RPC: &pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from} out = append(out, lastRPC) } } @@ -1355,7 +1356,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC { for _, graft := range ctl.GetGraft() { if lastRPC.Control.Graft = append(lastRPC.Control.Graft, graft); lastRPC.Size() > limit { lastRPC.Control.Graft = lastRPC.Control.Graft[:len(lastRPC.Control.Graft)-1] - lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from} + lastRPC = &RPC{RPC: &pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from} lastRPC.Control.Graft = append(lastRPC.Control.Graft, graft) out = append(out, lastRPC) } @@ -1364,7 +1365,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC { for _, prune := range ctl.GetPrune() { if lastRPC.Control.Prune = append(lastRPC.Control.Prune, prune); lastRPC.Size() > limit { lastRPC.Control.Prune = lastRPC.Control.Prune[:len(lastRPC.Control.Prune)-1] - lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from} + lastRPC = &RPC{RPC: &pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from} lastRPC.Control.Prune = append(lastRPC.Control.Prune, prune) out = append(out, lastRPC) } @@ -1378,7 +1379,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC { newIWant := &pb.ControlIWant{} if lastRPC.Control.Iwant = append(lastRPC.Control.Iwant, newIWant); lastRPC.Size() > limit { lastRPC.Control.Iwant = lastRPC.Control.Iwant[:len(lastRPC.Control.Iwant)-1] - lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ + lastRPC = &RPC{RPC: &pb.RPC{Control: &pb.ControlMessage{ Iwant: []*pb.ControlIWant{newIWant}, }}, from: elem.from} out = append(out, lastRPC) @@ -1387,7 +1388,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC { for _, msgID := range iwant.GetMessageIDs() { if lastRPC.Control.Iwant[0].MessageIDs = append(lastRPC.Control.Iwant[0].MessageIDs, msgID); lastRPC.Size() > limit { lastRPC.Control.Iwant[0].MessageIDs = lastRPC.Control.Iwant[0].MessageIDs[:len(lastRPC.Control.Iwant[0].MessageIDs)-1] - lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ + lastRPC = &RPC{RPC: &pb.RPC{Control: &pb.ControlMessage{ Iwant: []*pb.ControlIWant{{MessageIDs: [][]byte{msgID}}}, }}, from: elem.from} out = append(out, lastRPC) @@ -1402,7 +1403,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC { newIhave := &pb.ControlIHave{Bitmask: ihave.Bitmask} if lastRPC.Control.Ihave = append(lastRPC.Control.Ihave, newIhave); lastRPC.Size() > limit { lastRPC.Control.Ihave = lastRPC.Control.Ihave[:len(lastRPC.Control.Ihave)-1] - lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ + lastRPC = &RPC{RPC: &pb.RPC{Control: &pb.ControlMessage{ Ihave: []*pb.ControlIHave{newIhave}, }}, from: elem.from} out = append(out, lastRPC) @@ -1412,7 +1413,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC { lastIHave := lastRPC.Control.Ihave[len(lastRPC.Control.Ihave)-1] if lastIHave.MessageIDs = append(lastIHave.MessageIDs, msgID); lastRPC.Size() > limit { lastIHave.MessageIDs = lastIHave.MessageIDs[:len(lastIHave.MessageIDs)-1] - lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ + lastRPC = &RPC{RPC: &pb.RPC{Control: &pb.ControlMessage{ Ihave: []*pb.ControlIHave{{Bitmask: ihave.Bitmask, MessageIDs: [][]byte{msgID}}}, }}, from: elem.from} out = append(out, lastRPC) diff --git a/go-libp2p-blossomsub/blossomsub_spam_test.go b/go-libp2p-blossomsub/blossomsub_spam_test.go index 27bdb22..839524d 100644 --- a/go-libp2p-blossomsub/blossomsub_spam_test.go +++ b/go-libp2p-blossomsub/blossomsub_spam_test.go @@ -123,7 +123,7 @@ func TestBlossomSubAttackSpamIWANT(t *testing.T) { iwantlst := [][]byte{DefaultMsgIdFn(msg)} iwant := []*pb.ControlIWant{{MessageIDs: iwantlst}} orpc := rpcWithControl(nil, nil, iwant, nil, nil) - writeMsg(&orpc.RPC) + writeMsg(orpc.RPC) } }) @@ -217,7 +217,7 @@ func TestBlossomSubAttackSpamIHAVE(t *testing.T) { ihavelst := [][]byte{[]byte("someid" + strconv.Itoa(i))} ihave := []*pb.ControlIHave{{Bitmask: sub.Bitmask, MessageIDs: ihavelst}} orpc := rpcWithControl(nil, ihave, nil, nil, nil) - writeMsg(&orpc.RPC) + writeMsg(orpc.RPC) } select { @@ -247,7 +247,7 @@ func TestBlossomSubAttackSpamIHAVE(t *testing.T) { ihavelst := [][]byte{[]byte("someid" + strconv.Itoa(i+100))} ihave := []*pb.ControlIHave{{Bitmask: sub.Bitmask, MessageIDs: ihavelst}} orpc := rpcWithControl(nil, ihave, nil, nil, nil) - writeMsg(&orpc.RPC) + writeMsg(orpc.RPC) } select { diff --git a/go-libp2p-blossomsub/blossomsub_test.go b/go-libp2p-blossomsub/blossomsub_test.go index 8a93417..00f6607 100644 --- a/go-libp2p-blossomsub/blossomsub_test.go +++ b/go-libp2p-blossomsub/blossomsub_test.go @@ -1053,6 +1053,7 @@ func TestBlossomSubControlPiggyback(t *testing.T) { } func TestMixedBlossomSub(t *testing.T) { + t.Skip("skip unless blossomsub regains some alternate messaging channel baked into the proto") ctx, cancel := context.WithCancel(context.Background()) defer cancel() hosts := getDefaultHosts(t, 30) @@ -1945,7 +1946,7 @@ func TestBlossomSubPiggybackControl(t *testing.T) { gs.mesh[string(test2)] = make(map[peer.ID]struct{}) gs.mesh[string(test1)][blah] = struct{}{} - rpc := &RPC{RPC: pb.RPC{}} + rpc := &RPC{RPC: &pb.RPC{}} gs.piggybackControl(blah, rpc, &pb.ControlMessage{ Graft: []*pb.ControlGraft{{Bitmask: test1}, {Bitmask: test2}, {Bitmask: test3}}, Prune: []*pb.ControlPrune{{Bitmask: test1}, {Bitmask: test2}, {Bitmask: test3}}, @@ -2150,6 +2151,7 @@ func TestBlossomSubOpportunisticGrafting(t *testing.T) { } } } + func TestBlossomSubLeaveBitmask(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -2607,7 +2609,7 @@ func (iwe *iwantEverything) handleStream(s network.Stream) { func TestFragmentRPCFunction(t *testing.T) { p := peer.ID("some-peer") bitmask := []byte{0x00, 0x00, 0x80, 0x00} - rpc := &RPC{from: p} + rpc := &RPC{RPC: new(pb.RPC), from: p} limit := 1024 mkMsg := func(size int) *pb.Message { @@ -2628,14 +2630,14 @@ func TestFragmentRPCFunction(t *testing.T) { // it should not fragment if everything fits in one RPC rpc.Publish = []*pb.Message{} rpc.Publish = []*pb.Message{mkMsg(10), mkMsg(10)} - results := appendOrMergeRPC([]*RPC{}, limit, *rpc) + results := appendOrMergeRPC([]*RPC{}, limit, rpc) if len(results) != 1 { t.Fatalf("expected single RPC if input is < limit, got %d", len(results)) } // if there's a message larger than the limit, we should fail rpc.Publish = []*pb.Message{mkMsg(10), mkMsg(limit * 2)} - results = appendOrMergeRPC([]*RPC{}, limit, *rpc) + results = appendOrMergeRPC([]*RPC{}, limit, rpc) // if the individual messages are below the limit, but the RPC as a whole is larger, we should fragment nMessages := 100 @@ -2651,7 +2653,7 @@ func TestFragmentRPCFunction(t *testing.T) { for i := 0; i < nMessages; i++ { rpc.Publish[i] = mkMsg(msgSize) } - results = appendOrMergeRPC([]*RPC{}, limit, *rpc) + results = appendOrMergeRPC([]*RPC{}, limit, rpc) ensureBelowLimit(results) msgsPerRPC := limit / msgSize expectedRPCs := nMessages / msgsPerRPC @@ -2680,7 +2682,7 @@ func TestFragmentRPCFunction(t *testing.T) { Ihave: []*pb.ControlIHave{{MessageIDs: [][]byte{[]byte("foo")}}}, Iwant: []*pb.ControlIWant{{MessageIDs: [][]byte{[]byte("bar")}}}, } - results = appendOrMergeRPC([]*RPC{}, limit, *rpc) + results = appendOrMergeRPC([]*RPC{}, limit, rpc) ensureBelowLimit(results) // we expect one more RPC than last time, with the final one containing the control messages expectedCtrl := 1 @@ -2721,7 +2723,7 @@ func TestFragmentRPCFunction(t *testing.T) { rpc.Control.Ihave[i] = &pb.ControlIHave{MessageIDs: messageIds} rpc.Control.Iwant[i] = &pb.ControlIWant{MessageIDs: messageIds} } - results = appendOrMergeRPC([]*RPC{}, limit, *rpc) + results = appendOrMergeRPC([]*RPC{}, limit, rpc) ensureBelowLimit(results) minExpectedCtl := rpc.Control.Size() / limit minExpectedRPCs := (nMessages / msgsPerRPC) + minExpectedCtl @@ -2738,7 +2740,7 @@ func TestFragmentRPCFunction(t *testing.T) { {MessageIDs: [][]byte{[]byte("hello"), giantIdBytes}}, }, } - results = appendOrMergeRPC([]*RPC{}, limit, *rpc) + results = appendOrMergeRPC([]*RPC{}, limit, rpc) if len(results) != 2 { t.Fatalf("expected 2 RPC, got %d", len(results)) } diff --git a/go-libp2p-blossomsub/comm.go b/go-libp2p-blossomsub/comm.go index 2c36c07..266209c 100644 --- a/go-libp2p-blossomsub/comm.go +++ b/go-libp2p-blossomsub/comm.go @@ -8,6 +8,7 @@ import ( pool "github.com/libp2p/go-buffer-pool" "github.com/multiformats/go-varint" + "google.golang.org/protobuf/proto" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -18,7 +19,9 @@ import ( // get the initial RPC containing all of our subscriptions to send to new peers func (p *PubSub) getHelloPacket() *RPC { - var rpc RPC + var rpc = &RPC{ + RPC: new(pb.RPC), + } subscriptions := make(map[string]bool) @@ -37,7 +40,7 @@ func (p *PubSub) getHelloPacket() *RPC { } rpc.Subscriptions = append(rpc.Subscriptions, as) } - return &rpc + return rpc } func (p *PubSub) handleNewStream(s network.Stream) { @@ -80,7 +83,9 @@ func (p *PubSub) handleNewStream(s network.Stream) { continue } - rpc := new(RPC) + rpc := &RPC{ + RPC: new(pb.RPC), + } err = rpc.Unmarshal(msgbytes) r.ReleaseMsg(msgbytes) if err != nil { @@ -194,14 +199,14 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, ou func rpcWithSubs(subs ...*pb.RPC_SubOpts) *RPC { return &RPC{ - RPC: pb.RPC{ + RPC: &pb.RPC{ Subscriptions: subs, }, } } func rpcWithMessages(msgs ...*pb.Message) *RPC { - return &RPC{RPC: pb.RPC{Publish: msgs}} + return &RPC{RPC: &pb.RPC{Publish: msgs}} } func rpcWithControl(msgs []*pb.Message, @@ -210,7 +215,7 @@ func rpcWithControl(msgs []*pb.Message, graft []*pb.ControlGraft, prune []*pb.ControlPrune) *RPC { return &RPC{ - RPC: pb.RPC{ + RPC: &pb.RPC{ Publish: msgs, Control: &pb.ControlMessage{ Ihave: ihave, @@ -224,10 +229,7 @@ func rpcWithControl(msgs []*pb.Message, func copyRPC(rpc *RPC) *RPC { res := new(RPC) - *res = *rpc - if rpc.Control != nil { - res.Control = new(pb.ControlMessage) - *res.Control = *rpc.Control - } + copiedRPC := (proto.Clone(rpc.RPC)).(*pb.RPC) + res.RPC = copiedRPC return res } diff --git a/go-libp2p-blossomsub/pubsub.go b/go-libp2p-blossomsub/pubsub.go index aa3880f..c7af86f 100644 --- a/go-libp2p-blossomsub/pubsub.go +++ b/go-libp2p-blossomsub/pubsub.go @@ -245,7 +245,7 @@ func (m *Message) GetFrom() peer.ID { } type RPC struct { - pb.RPC + *pb.RPC // unexported on purpose, not sending this over the wire from peer.ID diff --git a/go-libp2p-blossomsub/sign.go b/go-libp2p-blossomsub/sign.go index ee887bf..6142487 100644 --- a/go-libp2p-blossomsub/sign.go +++ b/go-libp2p-blossomsub/sign.go @@ -3,6 +3,7 @@ package blossomsub import ( "fmt" + "google.golang.org/protobuf/proto" pb "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb" "github.com/libp2p/go-libp2p/core/crypto" @@ -52,7 +53,7 @@ func verifyMessageSignature(m *pb.Message) error { return err } - xm := *m + xm := (proto.Clone(m)).(*pb.Message) xm.Signature = nil xm.Key = nil bytes, err := xm.Marshal() diff --git a/node/config/config.go b/node/config/config.go index a4a912d..163623f 100644 --- a/node/config/config.go +++ b/node/config/config.go @@ -58,19 +58,11 @@ var BootstrapPeers = []string{ "/ip4/185.143.102.84/udp/8336/quic-v1/p2p/Qmce68gLLq9eMdwCcmd1ptfoC2nVoe861LF1cjdVHC2DwK", "/ip4/65.109.17.13/udp/8336/quic-v1/p2p/Qmc35n99eojSvW3PkbfBczJoSX92WmnnKh3Fg114ok3oo4", "/ip4/65.108.194.84/udp/8336/quic-v1/p2p/QmP8C7g9ZRiWzhqN2AgFu5onS6HwHzR6Vv1TCHxAhnCSnq", - "/dns/quil.dfcnodes.eu/udp/8336/quic-v1/p2p/QmQaFmbYVrKSwoen5UQdaqyDq4QhXfSSLDVnYpYD4SF9tX", "/ip4/15.204.100.222/udp/8336/quic-v1/p2p/Qmef3Z3RvGg49ZpDPcf2shWtJNgPJNpXrowjUcfz23YQ3V", "/ip4/69.197.174.35/udp/8336/quic-v1/p2p/QmeprCaZKiymofPJgnp2ANR3F4pRus9PHHaxnJDh1Jwr1p", "/ip4/70.36.102.32/udp/8336/quic-v1/p2p/QmYriGRXCUiwFodqSoS4GgEcD7UVyxXPeCgQKmYne3iLSF", "/ip4/204.12.220.2/udp/8336/quic-v1/p2p/QmRw5Tw4p5v2vLPvVSAkQEiRPQGnWk9HM4xiSvgxF82CCw", "/ip4/209.159.149.14/udp/8336/quic-v1/p2p/Qmcq4Lmw45tbodvdRWZ8iGgy3rUcR3dikHTj1fBXP8VJqv", - "/ip4/148.251.9.90/udp/8336/quic-v1/p2p/QmRpKmQ1W83s6moBFpG6D6nrttkqdQSbdCJpvfxDVGcs38", - "/ip4/35.232.113.144/udp/8336/quic-v1/p2p/QmWxkBc7a17ZsLHhszLyTvKsoHMKvKae2XwfQXymiU66md", - "/ip4/34.87.85.78/udp/8336/quic-v1/p2p/QmTGguT5XhtvZZwTLnNQTN8Bg9eUm1THWEneXXHGhMDPrz", - "/ip4/34.81.199.27/udp/8336/quic-v1/p2p/QmTMMKpzCKJCwrnUzNu6tNj4P1nL7hVqz251245wsVpGNg", - "/ip4/34.143.255.235/udp/8336/quic-v1/p2p/QmeifsP6Kvq8A3yabQs6CBg7prSpDSqdee8P2BDQm9EpP8", - "/ip4/34.34.125.238/udp/8336/quic-v1/p2p/QmZdSyBJLm9UiDaPZ4XDkgRGXUwPcHJCmKoH6fS9Qjyko4", - "/ip4/34.80.245.52/udp/8336/quic-v1/p2p/QmNmbqobt82Vre5JxUGVNGEWn2HsztQQ1xfeg6mx7X5u3f", "/dns/bravo-1.qcommander.sh/udp/8336/quic-v1/p2p/QmWFK1gVuhEqZdr8phTo3QbyLwjYmyivx31Zubqt7oR4XB", "/ip4/109.199.100.108/udp/8336/quic-v1/p2p/Qma9fgugQc17MDu4YRSvnhfhVre6AYZ3nZdW8dSUYbsWvm", "/ip4/47.251.49.193/udp/8336/quic-v1/p2p/QmP6ADPmMCsB8y82oFbrKTrwYWXt1CTMJ3jGNDXRHyYJgR", @@ -83,6 +75,14 @@ var BootstrapPeers = []string{ // "/ip4/207.246.81.38/udp/8336/quic-v1/p2p/QmPBYgDy7snHon7PAn8nv1shApQBQz1iHb2sBBS8QSgQwW", // "/dns/abyssia.fr/udp/8336/quic-v1/p2p/QmS7C1UhN8nvzLJgFFf1uspMRrXjJqThHNN6AyEXp6oVUB", // "/ip4/51.15.18.247/udp/8336/quic-v1/p2p/QmYVaHXdFmHFeTa6oPixgjMVag6Ex7gLjE559ejJddwqzu", + // "/ip4/35.232.113.144/udp/8336/quic-v1/p2p/QmWxkBc7a17ZsLHhszLyTvKsoHMKvKae2XwfQXymiU66md", + // "/ip4/34.87.85.78/udp/8336/quic-v1/p2p/QmTGguT5XhtvZZwTLnNQTN8Bg9eUm1THWEneXXHGhMDPrz", + // "/ip4/34.81.199.27/udp/8336/quic-v1/p2p/QmTMMKpzCKJCwrnUzNu6tNj4P1nL7hVqz251245wsVpGNg", + // "/ip4/34.143.255.235/udp/8336/quic-v1/p2p/QmeifsP6Kvq8A3yabQs6CBg7prSpDSqdee8P2BDQm9EpP8", + // "/ip4/34.34.125.238/udp/8336/quic-v1/p2p/QmZdSyBJLm9UiDaPZ4XDkgRGXUwPcHJCmKoH6fS9Qjyko4", + // "/ip4/34.80.245.52/udp/8336/quic-v1/p2p/QmNmbqobt82Vre5JxUGVNGEWn2HsztQQ1xfeg6mx7X5u3f", + // "/ip4/148.251.9.90/udp/8336/quic-v1/p2p/QmRpKmQ1W83s6moBFpG6D6nrttkqdQSbdCJpvfxDVGcs38", + // "/dns/quil.dfcnodes.eu/udp/8336/quic-v1/p2p/QmQaFmbYVrKSwoen5UQdaqyDq4QhXfSSLDVnYpYD4SF9tX", } func LoadConfig(configPath string, proverKey string) (*Config, error) {