fix: don't allow sync.mutex copies from protobuf to lock up dropRPC

This commit is contained in:
Cassandra Heart 2024-08-10 18:04:23 -05:00
parent 37dda10b8a
commit 139c6747d3
No known key found for this signature in database
GPG Key ID: 6352152859385958
7 changed files with 51 additions and 45 deletions

View File

@ -1263,8 +1263,9 @@ func (bs *BlossomSubRouter) sendRPC(p peer.ID, out *RPC) {
return
}
outCopy := copyRPC(out)
// Potentially split the RPC into multiple RPCs that are below the max message size
outRPCs := appendOrMergeRPC(nil, bs.p.maxMessageSize, *out)
outRPCs := appendOrMergeRPC(nil, bs.p.maxMessageSize, outCopy)
for _, rpc := range outRPCs {
if rpc.Size() > bs.p.maxMessageSize {
// This should only happen if a single message/control is above the maxMessageSize.
@ -1299,19 +1300,19 @@ func (bs *BlossomSubRouter) doSendRPC(rpc *RPC, p peer.ID, mch chan *RPC) {
// If an RPC is too large and can't be split further (e.g. Message data is
// bigger than the RPC limit), then it will be returned as an oversized RPC.
// The caller should filter out oversized RPCs.
func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC {
func appendOrMergeRPC(slice []*RPC, limit int, elems ...*RPC) []*RPC {
if len(elems) == 0 {
return slice
}
if len(slice) == 0 && len(elems) == 1 && elems[0].Size() < limit {
// Fast path: no merging needed and only one element
return append(slice, &elems[0])
return append(slice, elems[0])
}
out := slice
if len(out) == 0 {
out = append(out, &RPC{RPC: pb.RPC{}})
out = append(out, &RPC{RPC: &pb.RPC{}})
out[0].from = elems[0].from
}
@ -1325,7 +1326,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC {
for _, msg := range elem.GetPublish() {
if lastRPC.Publish = append(lastRPC.Publish, msg); lastRPC.Size() > limit {
lastRPC.Publish = lastRPC.Publish[:len(lastRPC.Publish)-1]
lastRPC = &RPC{RPC: pb.RPC{}, from: elem.from}
lastRPC = &RPC{RPC: &pb.RPC{}, from: elem.from}
lastRPC.Publish = append(lastRPC.Publish, msg)
out = append(out, lastRPC)
}
@ -1335,7 +1336,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC {
for _, sub := range elem.GetSubscriptions() {
if lastRPC.Subscriptions = append(lastRPC.Subscriptions, sub); lastRPC.Size() > limit {
lastRPC.Subscriptions = lastRPC.Subscriptions[:len(lastRPC.Subscriptions)-1]
lastRPC = &RPC{RPC: pb.RPC{}, from: elem.from}
lastRPC = &RPC{RPC: &pb.RPC{}, from: elem.from}
lastRPC.Subscriptions = append(lastRPC.Subscriptions, sub)
out = append(out, lastRPC)
}
@ -1347,7 +1348,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC {
lastRPC.Control = &pb.ControlMessage{}
if lastRPC.Size() > limit {
lastRPC.Control = nil
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from}
lastRPC = &RPC{RPC: &pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from}
out = append(out, lastRPC)
}
}
@ -1355,7 +1356,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC {
for _, graft := range ctl.GetGraft() {
if lastRPC.Control.Graft = append(lastRPC.Control.Graft, graft); lastRPC.Size() > limit {
lastRPC.Control.Graft = lastRPC.Control.Graft[:len(lastRPC.Control.Graft)-1]
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from}
lastRPC = &RPC{RPC: &pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from}
lastRPC.Control.Graft = append(lastRPC.Control.Graft, graft)
out = append(out, lastRPC)
}
@ -1364,7 +1365,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC {
for _, prune := range ctl.GetPrune() {
if lastRPC.Control.Prune = append(lastRPC.Control.Prune, prune); lastRPC.Size() > limit {
lastRPC.Control.Prune = lastRPC.Control.Prune[:len(lastRPC.Control.Prune)-1]
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from}
lastRPC = &RPC{RPC: &pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from}
lastRPC.Control.Prune = append(lastRPC.Control.Prune, prune)
out = append(out, lastRPC)
}
@ -1378,7 +1379,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC {
newIWant := &pb.ControlIWant{}
if lastRPC.Control.Iwant = append(lastRPC.Control.Iwant, newIWant); lastRPC.Size() > limit {
lastRPC.Control.Iwant = lastRPC.Control.Iwant[:len(lastRPC.Control.Iwant)-1]
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
lastRPC = &RPC{RPC: &pb.RPC{Control: &pb.ControlMessage{
Iwant: []*pb.ControlIWant{newIWant},
}}, from: elem.from}
out = append(out, lastRPC)
@ -1387,7 +1388,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC {
for _, msgID := range iwant.GetMessageIDs() {
if lastRPC.Control.Iwant[0].MessageIDs = append(lastRPC.Control.Iwant[0].MessageIDs, msgID); lastRPC.Size() > limit {
lastRPC.Control.Iwant[0].MessageIDs = lastRPC.Control.Iwant[0].MessageIDs[:len(lastRPC.Control.Iwant[0].MessageIDs)-1]
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
lastRPC = &RPC{RPC: &pb.RPC{Control: &pb.ControlMessage{
Iwant: []*pb.ControlIWant{{MessageIDs: [][]byte{msgID}}},
}}, from: elem.from}
out = append(out, lastRPC)
@ -1402,7 +1403,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC {
newIhave := &pb.ControlIHave{Bitmask: ihave.Bitmask}
if lastRPC.Control.Ihave = append(lastRPC.Control.Ihave, newIhave); lastRPC.Size() > limit {
lastRPC.Control.Ihave = lastRPC.Control.Ihave[:len(lastRPC.Control.Ihave)-1]
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
lastRPC = &RPC{RPC: &pb.RPC{Control: &pb.ControlMessage{
Ihave: []*pb.ControlIHave{newIhave},
}}, from: elem.from}
out = append(out, lastRPC)
@ -1412,7 +1413,7 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC {
lastIHave := lastRPC.Control.Ihave[len(lastRPC.Control.Ihave)-1]
if lastIHave.MessageIDs = append(lastIHave.MessageIDs, msgID); lastRPC.Size() > limit {
lastIHave.MessageIDs = lastIHave.MessageIDs[:len(lastIHave.MessageIDs)-1]
lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{
lastRPC = &RPC{RPC: &pb.RPC{Control: &pb.ControlMessage{
Ihave: []*pb.ControlIHave{{Bitmask: ihave.Bitmask, MessageIDs: [][]byte{msgID}}},
}}, from: elem.from}
out = append(out, lastRPC)

View File

@ -123,7 +123,7 @@ func TestBlossomSubAttackSpamIWANT(t *testing.T) {
iwantlst := [][]byte{DefaultMsgIdFn(msg)}
iwant := []*pb.ControlIWant{{MessageIDs: iwantlst}}
orpc := rpcWithControl(nil, nil, iwant, nil, nil)
writeMsg(&orpc.RPC)
writeMsg(orpc.RPC)
}
})
@ -217,7 +217,7 @@ func TestBlossomSubAttackSpamIHAVE(t *testing.T) {
ihavelst := [][]byte{[]byte("someid" + strconv.Itoa(i))}
ihave := []*pb.ControlIHave{{Bitmask: sub.Bitmask, MessageIDs: ihavelst}}
orpc := rpcWithControl(nil, ihave, nil, nil, nil)
writeMsg(&orpc.RPC)
writeMsg(orpc.RPC)
}
select {
@ -247,7 +247,7 @@ func TestBlossomSubAttackSpamIHAVE(t *testing.T) {
ihavelst := [][]byte{[]byte("someid" + strconv.Itoa(i+100))}
ihave := []*pb.ControlIHave{{Bitmask: sub.Bitmask, MessageIDs: ihavelst}}
orpc := rpcWithControl(nil, ihave, nil, nil, nil)
writeMsg(&orpc.RPC)
writeMsg(orpc.RPC)
}
select {

View File

@ -1053,6 +1053,7 @@ func TestBlossomSubControlPiggyback(t *testing.T) {
}
func TestMixedBlossomSub(t *testing.T) {
t.Skip("skip unless blossomsub regains some alternate messaging channel baked into the proto")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hosts := getDefaultHosts(t, 30)
@ -1945,7 +1946,7 @@ func TestBlossomSubPiggybackControl(t *testing.T) {
gs.mesh[string(test2)] = make(map[peer.ID]struct{})
gs.mesh[string(test1)][blah] = struct{}{}
rpc := &RPC{RPC: pb.RPC{}}
rpc := &RPC{RPC: &pb.RPC{}}
gs.piggybackControl(blah, rpc, &pb.ControlMessage{
Graft: []*pb.ControlGraft{{Bitmask: test1}, {Bitmask: test2}, {Bitmask: test3}},
Prune: []*pb.ControlPrune{{Bitmask: test1}, {Bitmask: test2}, {Bitmask: test3}},
@ -2150,6 +2151,7 @@ func TestBlossomSubOpportunisticGrafting(t *testing.T) {
}
}
}
func TestBlossomSubLeaveBitmask(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -2607,7 +2609,7 @@ func (iwe *iwantEverything) handleStream(s network.Stream) {
func TestFragmentRPCFunction(t *testing.T) {
p := peer.ID("some-peer")
bitmask := []byte{0x00, 0x00, 0x80, 0x00}
rpc := &RPC{from: p}
rpc := &RPC{RPC: new(pb.RPC), from: p}
limit := 1024
mkMsg := func(size int) *pb.Message {
@ -2628,14 +2630,14 @@ func TestFragmentRPCFunction(t *testing.T) {
// it should not fragment if everything fits in one RPC
rpc.Publish = []*pb.Message{}
rpc.Publish = []*pb.Message{mkMsg(10), mkMsg(10)}
results := appendOrMergeRPC([]*RPC{}, limit, *rpc)
results := appendOrMergeRPC([]*RPC{}, limit, rpc)
if len(results) != 1 {
t.Fatalf("expected single RPC if input is < limit, got %d", len(results))
}
// if there's a message larger than the limit, we should fail
rpc.Publish = []*pb.Message{mkMsg(10), mkMsg(limit * 2)}
results = appendOrMergeRPC([]*RPC{}, limit, *rpc)
results = appendOrMergeRPC([]*RPC{}, limit, rpc)
// if the individual messages are below the limit, but the RPC as a whole is larger, we should fragment
nMessages := 100
@ -2651,7 +2653,7 @@ func TestFragmentRPCFunction(t *testing.T) {
for i := 0; i < nMessages; i++ {
rpc.Publish[i] = mkMsg(msgSize)
}
results = appendOrMergeRPC([]*RPC{}, limit, *rpc)
results = appendOrMergeRPC([]*RPC{}, limit, rpc)
ensureBelowLimit(results)
msgsPerRPC := limit / msgSize
expectedRPCs := nMessages / msgsPerRPC
@ -2680,7 +2682,7 @@ func TestFragmentRPCFunction(t *testing.T) {
Ihave: []*pb.ControlIHave{{MessageIDs: [][]byte{[]byte("foo")}}},
Iwant: []*pb.ControlIWant{{MessageIDs: [][]byte{[]byte("bar")}}},
}
results = appendOrMergeRPC([]*RPC{}, limit, *rpc)
results = appendOrMergeRPC([]*RPC{}, limit, rpc)
ensureBelowLimit(results)
// we expect one more RPC than last time, with the final one containing the control messages
expectedCtrl := 1
@ -2721,7 +2723,7 @@ func TestFragmentRPCFunction(t *testing.T) {
rpc.Control.Ihave[i] = &pb.ControlIHave{MessageIDs: messageIds}
rpc.Control.Iwant[i] = &pb.ControlIWant{MessageIDs: messageIds}
}
results = appendOrMergeRPC([]*RPC{}, limit, *rpc)
results = appendOrMergeRPC([]*RPC{}, limit, rpc)
ensureBelowLimit(results)
minExpectedCtl := rpc.Control.Size() / limit
minExpectedRPCs := (nMessages / msgsPerRPC) + minExpectedCtl
@ -2738,7 +2740,7 @@ func TestFragmentRPCFunction(t *testing.T) {
{MessageIDs: [][]byte{[]byte("hello"), giantIdBytes}},
},
}
results = appendOrMergeRPC([]*RPC{}, limit, *rpc)
results = appendOrMergeRPC([]*RPC{}, limit, rpc)
if len(results) != 2 {
t.Fatalf("expected 2 RPC, got %d", len(results))
}

View File

@ -8,6 +8,7 @@ import (
pool "github.com/libp2p/go-buffer-pool"
"github.com/multiformats/go-varint"
"google.golang.org/protobuf/proto"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
@ -18,7 +19,9 @@ import (
// get the initial RPC containing all of our subscriptions to send to new peers
func (p *PubSub) getHelloPacket() *RPC {
var rpc RPC
var rpc = &RPC{
RPC: new(pb.RPC),
}
subscriptions := make(map[string]bool)
@ -37,7 +40,7 @@ func (p *PubSub) getHelloPacket() *RPC {
}
rpc.Subscriptions = append(rpc.Subscriptions, as)
}
return &rpc
return rpc
}
func (p *PubSub) handleNewStream(s network.Stream) {
@ -80,7 +83,9 @@ func (p *PubSub) handleNewStream(s network.Stream) {
continue
}
rpc := new(RPC)
rpc := &RPC{
RPC: new(pb.RPC),
}
err = rpc.Unmarshal(msgbytes)
r.ReleaseMsg(msgbytes)
if err != nil {
@ -194,14 +199,14 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, ou
func rpcWithSubs(subs ...*pb.RPC_SubOpts) *RPC {
return &RPC{
RPC: pb.RPC{
RPC: &pb.RPC{
Subscriptions: subs,
},
}
}
func rpcWithMessages(msgs ...*pb.Message) *RPC {
return &RPC{RPC: pb.RPC{Publish: msgs}}
return &RPC{RPC: &pb.RPC{Publish: msgs}}
}
func rpcWithControl(msgs []*pb.Message,
@ -210,7 +215,7 @@ func rpcWithControl(msgs []*pb.Message,
graft []*pb.ControlGraft,
prune []*pb.ControlPrune) *RPC {
return &RPC{
RPC: pb.RPC{
RPC: &pb.RPC{
Publish: msgs,
Control: &pb.ControlMessage{
Ihave: ihave,
@ -224,10 +229,7 @@ func rpcWithControl(msgs []*pb.Message,
func copyRPC(rpc *RPC) *RPC {
res := new(RPC)
*res = *rpc
if rpc.Control != nil {
res.Control = new(pb.ControlMessage)
*res.Control = *rpc.Control
}
copiedRPC := (proto.Clone(rpc.RPC)).(*pb.RPC)
res.RPC = copiedRPC
return res
}

View File

@ -245,7 +245,7 @@ func (m *Message) GetFrom() peer.ID {
}
type RPC struct {
pb.RPC
*pb.RPC
// unexported on purpose, not sending this over the wire
from peer.ID

View File

@ -3,6 +3,7 @@ package blossomsub
import (
"fmt"
"google.golang.org/protobuf/proto"
pb "source.quilibrium.com/quilibrium/monorepo/go-libp2p-blossomsub/pb"
"github.com/libp2p/go-libp2p/core/crypto"
@ -52,7 +53,7 @@ func verifyMessageSignature(m *pb.Message) error {
return err
}
xm := *m
xm := (proto.Clone(m)).(*pb.Message)
xm.Signature = nil
xm.Key = nil
bytes, err := xm.Marshal()

View File

@ -58,19 +58,11 @@ var BootstrapPeers = []string{
"/ip4/185.143.102.84/udp/8336/quic-v1/p2p/Qmce68gLLq9eMdwCcmd1ptfoC2nVoe861LF1cjdVHC2DwK",
"/ip4/65.109.17.13/udp/8336/quic-v1/p2p/Qmc35n99eojSvW3PkbfBczJoSX92WmnnKh3Fg114ok3oo4",
"/ip4/65.108.194.84/udp/8336/quic-v1/p2p/QmP8C7g9ZRiWzhqN2AgFu5onS6HwHzR6Vv1TCHxAhnCSnq",
"/dns/quil.dfcnodes.eu/udp/8336/quic-v1/p2p/QmQaFmbYVrKSwoen5UQdaqyDq4QhXfSSLDVnYpYD4SF9tX",
"/ip4/15.204.100.222/udp/8336/quic-v1/p2p/Qmef3Z3RvGg49ZpDPcf2shWtJNgPJNpXrowjUcfz23YQ3V",
"/ip4/69.197.174.35/udp/8336/quic-v1/p2p/QmeprCaZKiymofPJgnp2ANR3F4pRus9PHHaxnJDh1Jwr1p",
"/ip4/70.36.102.32/udp/8336/quic-v1/p2p/QmYriGRXCUiwFodqSoS4GgEcD7UVyxXPeCgQKmYne3iLSF",
"/ip4/204.12.220.2/udp/8336/quic-v1/p2p/QmRw5Tw4p5v2vLPvVSAkQEiRPQGnWk9HM4xiSvgxF82CCw",
"/ip4/209.159.149.14/udp/8336/quic-v1/p2p/Qmcq4Lmw45tbodvdRWZ8iGgy3rUcR3dikHTj1fBXP8VJqv",
"/ip4/148.251.9.90/udp/8336/quic-v1/p2p/QmRpKmQ1W83s6moBFpG6D6nrttkqdQSbdCJpvfxDVGcs38",
"/ip4/35.232.113.144/udp/8336/quic-v1/p2p/QmWxkBc7a17ZsLHhszLyTvKsoHMKvKae2XwfQXymiU66md",
"/ip4/34.87.85.78/udp/8336/quic-v1/p2p/QmTGguT5XhtvZZwTLnNQTN8Bg9eUm1THWEneXXHGhMDPrz",
"/ip4/34.81.199.27/udp/8336/quic-v1/p2p/QmTMMKpzCKJCwrnUzNu6tNj4P1nL7hVqz251245wsVpGNg",
"/ip4/34.143.255.235/udp/8336/quic-v1/p2p/QmeifsP6Kvq8A3yabQs6CBg7prSpDSqdee8P2BDQm9EpP8",
"/ip4/34.34.125.238/udp/8336/quic-v1/p2p/QmZdSyBJLm9UiDaPZ4XDkgRGXUwPcHJCmKoH6fS9Qjyko4",
"/ip4/34.80.245.52/udp/8336/quic-v1/p2p/QmNmbqobt82Vre5JxUGVNGEWn2HsztQQ1xfeg6mx7X5u3f",
"/dns/bravo-1.qcommander.sh/udp/8336/quic-v1/p2p/QmWFK1gVuhEqZdr8phTo3QbyLwjYmyivx31Zubqt7oR4XB",
"/ip4/109.199.100.108/udp/8336/quic-v1/p2p/Qma9fgugQc17MDu4YRSvnhfhVre6AYZ3nZdW8dSUYbsWvm",
"/ip4/47.251.49.193/udp/8336/quic-v1/p2p/QmP6ADPmMCsB8y82oFbrKTrwYWXt1CTMJ3jGNDXRHyYJgR",
@ -83,6 +75,14 @@ var BootstrapPeers = []string{
// "/ip4/207.246.81.38/udp/8336/quic-v1/p2p/QmPBYgDy7snHon7PAn8nv1shApQBQz1iHb2sBBS8QSgQwW",
// "/dns/abyssia.fr/udp/8336/quic-v1/p2p/QmS7C1UhN8nvzLJgFFf1uspMRrXjJqThHNN6AyEXp6oVUB",
// "/ip4/51.15.18.247/udp/8336/quic-v1/p2p/QmYVaHXdFmHFeTa6oPixgjMVag6Ex7gLjE559ejJddwqzu",
// "/ip4/35.232.113.144/udp/8336/quic-v1/p2p/QmWxkBc7a17ZsLHhszLyTvKsoHMKvKae2XwfQXymiU66md",
// "/ip4/34.87.85.78/udp/8336/quic-v1/p2p/QmTGguT5XhtvZZwTLnNQTN8Bg9eUm1THWEneXXHGhMDPrz",
// "/ip4/34.81.199.27/udp/8336/quic-v1/p2p/QmTMMKpzCKJCwrnUzNu6tNj4P1nL7hVqz251245wsVpGNg",
// "/ip4/34.143.255.235/udp/8336/quic-v1/p2p/QmeifsP6Kvq8A3yabQs6CBg7prSpDSqdee8P2BDQm9EpP8",
// "/ip4/34.34.125.238/udp/8336/quic-v1/p2p/QmZdSyBJLm9UiDaPZ4XDkgRGXUwPcHJCmKoH6fS9Qjyko4",
// "/ip4/34.80.245.52/udp/8336/quic-v1/p2p/QmNmbqobt82Vre5JxUGVNGEWn2HsztQQ1xfeg6mx7X5u3f",
// "/ip4/148.251.9.90/udp/8336/quic-v1/p2p/QmRpKmQ1W83s6moBFpG6D6nrttkqdQSbdCJpvfxDVGcs38",
// "/dns/quil.dfcnodes.eu/udp/8336/quic-v1/p2p/QmQaFmbYVrKSwoen5UQdaqyDq4QhXfSSLDVnYpYD4SF9tX",
}
func LoadConfig(configPath string, proverKey string) (*Config, error) {