mirror of
				https://source.quilibrium.com/quilibrium/ceremonyclient.git
				synced 2025-11-04 06:17:27 +00:00 
			
		
		
		
	tighten up the defers on mutexes, it's hitting hard locks
This commit is contained in:
		
							parent
							
								
									9d0cf0bc68
								
							
						
					
					
						commit
						1e60549f89
					
				@ -51,7 +51,6 @@ func newBackoff(ctx context.Context, sizeThreshold int, cleanupInterval time.Dur
 | 
			
		||||
 | 
			
		||||
func (b *backoff) updateAndGet(id peer.ID) (time.Duration, error) {
 | 
			
		||||
	b.mu.Lock()
 | 
			
		||||
	defer b.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	h, ok := b.info[id]
 | 
			
		||||
	switch {
 | 
			
		||||
@ -62,6 +61,7 @@ func (b *backoff) updateAndGet(id peer.ID) (time.Duration, error) {
 | 
			
		||||
			attempts: 0,
 | 
			
		||||
		}
 | 
			
		||||
	case h.attempts >= b.maxAttempts:
 | 
			
		||||
		b.mu.Unlock()
 | 
			
		||||
		return 0, fmt.Errorf("peer %s has reached its maximum backoff attempts", id)
 | 
			
		||||
 | 
			
		||||
	case h.duration < MinBackoffDelay:
 | 
			
		||||
@ -78,27 +78,29 @@ func (b *backoff) updateAndGet(id peer.ID) (time.Duration, error) {
 | 
			
		||||
	h.attempts += 1
 | 
			
		||||
	h.lastTried = time.Now()
 | 
			
		||||
	b.info[id] = h
 | 
			
		||||
	b.mu.Unlock()
 | 
			
		||||
	return h.duration, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *backoff) cleanup() {
 | 
			
		||||
	b.mu.Lock()
 | 
			
		||||
	defer b.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	for id, h := range b.info {
 | 
			
		||||
		if time.Since(h.lastTried) > TimeToLive {
 | 
			
		||||
			delete(b.info, id)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	b.mu.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *backoff) cleanupLoop(ctx context.Context) {
 | 
			
		||||
	ticker := time.NewTicker(b.ci)
 | 
			
		||||
	defer ticker.Stop()
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case <-ctx.Done():
 | 
			
		||||
			ticker.Stop()
 | 
			
		||||
			return // pubsub shutting down
 | 
			
		||||
		case <-ticker.C:
 | 
			
		||||
			b.cleanup()
 | 
			
		||||
 | 
			
		||||
@ -48,9 +48,9 @@ func (t *Bitmask) SetScoreParams(p *BitmaskScoreParams) error {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	t.mux.Lock()
 | 
			
		||||
	defer t.mux.Unlock()
 | 
			
		||||
 | 
			
		||||
	if t.closed {
 | 
			
		||||
		t.mux.Unlock()
 | 
			
		||||
		return ErrBitmaskClosed
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -74,9 +74,11 @@ func (t *Bitmask) SetScoreParams(p *BitmaskScoreParams) error {
 | 
			
		||||
	select {
 | 
			
		||||
	case t.p.eval <- update:
 | 
			
		||||
		err = <-result
 | 
			
		||||
		t.mux.Unlock()
 | 
			
		||||
		return err
 | 
			
		||||
 | 
			
		||||
	case <-t.p.ctx.Done():
 | 
			
		||||
		t.mux.Unlock()
 | 
			
		||||
		return t.p.ctx.Err()
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@ -85,8 +87,8 @@ func (t *Bitmask) SetScoreParams(p *BitmaskScoreParams) error {
 | 
			
		||||
// Multiple event handlers may be created and will operate independently of each other
 | 
			
		||||
func (t *Bitmask) EventHandler(opts ...BitmaskEventHandlerOpt) (*BitmaskEventHandler, error) {
 | 
			
		||||
	t.mux.RLock()
 | 
			
		||||
	defer t.mux.RUnlock()
 | 
			
		||||
	if t.closed {
 | 
			
		||||
		t.mux.RUnlock()
 | 
			
		||||
		return nil, ErrBitmaskClosed
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -101,6 +103,7 @@ func (t *Bitmask) EventHandler(opts ...BitmaskEventHandlerOpt) (*BitmaskEventHan
 | 
			
		||||
	for _, opt := range opts {
 | 
			
		||||
		err := opt(h)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.mux.RUnlock()
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
@ -120,21 +123,23 @@ func (t *Bitmask) EventHandler(opts ...BitmaskEventHandlerOpt) (*BitmaskEventHan
 | 
			
		||||
		done <- struct{}{}
 | 
			
		||||
	}:
 | 
			
		||||
	case <-t.p.ctx.Done():
 | 
			
		||||
		t.mux.RUnlock()
 | 
			
		||||
		return nil, t.p.ctx.Err()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	<-done
 | 
			
		||||
 | 
			
		||||
	t.mux.RUnlock()
 | 
			
		||||
	return h, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *Bitmask) sendNotification(evt PeerEvent) {
 | 
			
		||||
	t.evtHandlerMux.RLock()
 | 
			
		||||
	defer t.evtHandlerMux.RUnlock()
 | 
			
		||||
 | 
			
		||||
	for h := range t.evtHandlers {
 | 
			
		||||
		h.sendNotification(evt)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	t.evtHandlerMux.RUnlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Subscribe returns a new Subscription for the bitmask.
 | 
			
		||||
@ -142,8 +147,9 @@ func (t *Bitmask) sendNotification(evt PeerEvent) {
 | 
			
		||||
// before the subscription is processed by the pubsub main loop and propagated to our peers.
 | 
			
		||||
func (t *Bitmask) Subscribe(opts ...SubOpt) (*Subscription, error) {
 | 
			
		||||
	t.mux.RLock()
 | 
			
		||||
	defer t.mux.RUnlock()
 | 
			
		||||
 | 
			
		||||
	if t.closed {
 | 
			
		||||
		t.mux.RUnlock()
 | 
			
		||||
		return nil, ErrBitmaskClosed
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -155,6 +161,7 @@ func (t *Bitmask) Subscribe(opts ...SubOpt) (*Subscription, error) {
 | 
			
		||||
	for _, opt := range opts {
 | 
			
		||||
		err := opt(sub)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.mux.RUnlock()
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
@ -173,10 +180,13 @@ func (t *Bitmask) Subscribe(opts ...SubOpt) (*Subscription, error) {
 | 
			
		||||
		resp: out,
 | 
			
		||||
	}:
 | 
			
		||||
	case <-t.p.ctx.Done():
 | 
			
		||||
		t.mux.RUnlock()
 | 
			
		||||
		return nil, t.p.ctx.Err()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return <-out, nil
 | 
			
		||||
	subOut := <-out
 | 
			
		||||
	t.mux.RUnlock()
 | 
			
		||||
	return subOut, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Relay enables message relaying for the bitmask and returns a reference
 | 
			
		||||
@ -184,8 +194,9 @@ func (t *Bitmask) Subscribe(opts ...SubOpt) (*Subscription, error) {
 | 
			
		||||
// To completely disable the relay, all references must be cancelled.
 | 
			
		||||
func (t *Bitmask) Relay() (RelayCancelFunc, error) {
 | 
			
		||||
	t.mux.RLock()
 | 
			
		||||
	defer t.mux.RUnlock()
 | 
			
		||||
 | 
			
		||||
	if t.closed {
 | 
			
		||||
		t.mux.RUnlock()
 | 
			
		||||
		return nil, ErrBitmaskClosed
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -199,10 +210,13 @@ func (t *Bitmask) Relay() (RelayCancelFunc, error) {
 | 
			
		||||
		resp:    out,
 | 
			
		||||
	}:
 | 
			
		||||
	case <-t.p.ctx.Done():
 | 
			
		||||
		t.mux.RUnlock()
 | 
			
		||||
		return nil, t.p.ctx.Err()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return <-out, nil
 | 
			
		||||
	cancelFunc := <-out
 | 
			
		||||
	t.mux.RUnlock()
 | 
			
		||||
	return cancelFunc, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// RouterReady is a function that decides if a router is ready to publish
 | 
			
		||||
@ -222,8 +236,9 @@ type PubOpt func(pub *PublishOptions) error
 | 
			
		||||
// Publish publishes data to bitmask.
 | 
			
		||||
func (t *Bitmask) Publish(ctx context.Context, bitmask []byte, data []byte, opts ...PubOpt) error {
 | 
			
		||||
	t.mux.RLock()
 | 
			
		||||
	defer t.mux.RUnlock()
 | 
			
		||||
 | 
			
		||||
	if t.closed {
 | 
			
		||||
		t.mux.RUnlock()
 | 
			
		||||
		return ErrBitmaskClosed
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -234,6 +249,7 @@ func (t *Bitmask) Publish(ctx context.Context, bitmask []byte, data []byte, opts
 | 
			
		||||
	for _, opt := range opts {
 | 
			
		||||
		err := opt(pub)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.mux.RUnlock()
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
@ -241,9 +257,11 @@ func (t *Bitmask) Publish(ctx context.Context, bitmask []byte, data []byte, opts
 | 
			
		||||
	if pub.customKey != nil && !pub.local {
 | 
			
		||||
		key, pid = pub.customKey()
 | 
			
		||||
		if key == nil {
 | 
			
		||||
			t.mux.RUnlock()
 | 
			
		||||
			return ErrNilSignKey
 | 
			
		||||
		}
 | 
			
		||||
		if len(pid) == 0 {
 | 
			
		||||
			t.mux.RUnlock()
 | 
			
		||||
			return ErrEmptyPeerID
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
@ -262,6 +280,7 @@ func (t *Bitmask) Publish(ctx context.Context, bitmask []byte, data []byte, opts
 | 
			
		||||
		m.From = []byte(pid)
 | 
			
		||||
		err := signMessage(pid, key, m)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.mux.RUnlock()
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
@ -286,28 +305,43 @@ func (t *Bitmask) Publish(ctx context.Context, bitmask []byte, data []byte, opts
 | 
			
		||||
					res <- done
 | 
			
		||||
				}:
 | 
			
		||||
					if <-res {
 | 
			
		||||
						if ticker != nil {
 | 
			
		||||
							ticker.Stop()
 | 
			
		||||
						}
 | 
			
		||||
						break readyLoop
 | 
			
		||||
					}
 | 
			
		||||
				case <-t.p.ctx.Done():
 | 
			
		||||
					if ticker != nil {
 | 
			
		||||
						ticker.Stop()
 | 
			
		||||
					}
 | 
			
		||||
					t.mux.RUnlock()
 | 
			
		||||
					return t.p.ctx.Err()
 | 
			
		||||
				case <-ctx.Done():
 | 
			
		||||
					if ticker != nil {
 | 
			
		||||
						ticker.Stop()
 | 
			
		||||
					}
 | 
			
		||||
					t.mux.RUnlock()
 | 
			
		||||
					return ctx.Err()
 | 
			
		||||
				}
 | 
			
		||||
				if ticker == nil {
 | 
			
		||||
					ticker = time.NewTicker(200 * time.Millisecond)
 | 
			
		||||
					defer ticker.Stop()
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				select {
 | 
			
		||||
				case <-ticker.C:
 | 
			
		||||
				case <-ctx.Done():
 | 
			
		||||
					ticker.Stop()
 | 
			
		||||
					t.mux.RUnlock()
 | 
			
		||||
					return fmt.Errorf("router is not ready: %w", ctx.Err())
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return t.p.val.PushLocal(&Message{m, nil, t.p.host.ID(), nil, pub.local})
 | 
			
		||||
	err := t.p.val.PushLocal(&Message{m, nil, t.p.host.ID(), nil, pub.local})
 | 
			
		||||
 | 
			
		||||
	t.mux.RUnlock()
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WithReadiness returns a publishing option for only publishing when the router is ready.
 | 
			
		||||
@ -347,8 +381,9 @@ func WithSecretKeyAndPeerId(key crypto.PrivKey, pid peer.ID) PubOpt {
 | 
			
		||||
// Does not error if the bitmask is already closed.
 | 
			
		||||
func (t *Bitmask) Close() error {
 | 
			
		||||
	t.mux.Lock()
 | 
			
		||||
	defer t.mux.Unlock()
 | 
			
		||||
 | 
			
		||||
	if t.closed {
 | 
			
		||||
		t.mux.Unlock()
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -357,6 +392,7 @@ func (t *Bitmask) Close() error {
 | 
			
		||||
	select {
 | 
			
		||||
	case t.p.rmBitmask <- req:
 | 
			
		||||
	case <-t.p.ctx.Done():
 | 
			
		||||
		t.mux.Unlock()
 | 
			
		||||
		return t.p.ctx.Err()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -366,18 +402,22 @@ func (t *Bitmask) Close() error {
 | 
			
		||||
		t.closed = true
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	t.mux.Unlock()
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ListPeers returns a list of peers we are connected to in the given bitmask.
 | 
			
		||||
func (t *Bitmask) ListPeers() []peer.ID {
 | 
			
		||||
	t.mux.RLock()
 | 
			
		||||
	defer t.mux.RUnlock()
 | 
			
		||||
 | 
			
		||||
	if t.closed {
 | 
			
		||||
		t.mux.RUnlock()
 | 
			
		||||
		return []peer.ID{}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return t.p.ListPeers(t.bitmask)
 | 
			
		||||
	l := t.p.ListPeers(t.bitmask)
 | 
			
		||||
	t.mux.RUnlock()
 | 
			
		||||
	return l
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type EventType int
 | 
			
		||||
 | 
			
		||||
@ -554,7 +554,6 @@ func (bs *BlossomSubRouter) manageAddrBook() {
 | 
			
		||||
		log.Errorf("failed to subscribe to peer identification events: %v", err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	defer sub.Close()
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
@ -566,6 +565,7 @@ func (bs *BlossomSubRouter) manageAddrBook() {
 | 
			
		||||
					log.Warnf("failed to close addr book: %v", errClose)
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
			sub.Close()
 | 
			
		||||
			return
 | 
			
		||||
		case ev := <-sub.Out():
 | 
			
		||||
			switch ev := ev.(type) {
 | 
			
		||||
@ -1435,7 +1435,6 @@ func (bs *BlossomSubRouter) heartbeatTimer() {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ticker := time.NewTicker(bs.params.HeartbeatInterval)
 | 
			
		||||
	defer ticker.Stop()
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
@ -1443,9 +1442,11 @@ func (bs *BlossomSubRouter) heartbeatTimer() {
 | 
			
		||||
			select {
 | 
			
		||||
			case bs.p.eval <- bs.heartbeat:
 | 
			
		||||
			case <-bs.p.ctx.Done():
 | 
			
		||||
				ticker.Stop()
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
		case <-bs.p.ctx.Done():
 | 
			
		||||
			ticker.Stop()
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
@ -1453,14 +1454,6 @@ func (bs *BlossomSubRouter) heartbeatTimer() {
 | 
			
		||||
 | 
			
		||||
func (bs *BlossomSubRouter) heartbeat() {
 | 
			
		||||
	start := time.Now()
 | 
			
		||||
	defer func() {
 | 
			
		||||
		if bs.params.SlowHeartbeatWarning > 0 {
 | 
			
		||||
			slowWarning := time.Duration(bs.params.SlowHeartbeatWarning * float64(bs.params.HeartbeatInterval))
 | 
			
		||||
			if dt := time.Since(start); dt > slowWarning {
 | 
			
		||||
				log.Warnw("slow heartbeat", "took", dt)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	bs.heartbeatTicks++
 | 
			
		||||
 | 
			
		||||
@ -1666,6 +1659,13 @@ func (bs *BlossomSubRouter) heartbeat() {
 | 
			
		||||
		// 2nd arg are mesh peers excluded from gossip. We already push
 | 
			
		||||
		// messages to them, so its redundant to gossip IHAVEs.
 | 
			
		||||
		bs.emitGossip(bitmask, peers)
 | 
			
		||||
 | 
			
		||||
		if bs.params.SlowHeartbeatWarning > 0 {
 | 
			
		||||
			slowWarning := time.Duration(bs.params.SlowHeartbeatWarning * float64(bs.params.HeartbeatInterval))
 | 
			
		||||
			if dt := time.Since(start); dt > slowWarning {
 | 
			
		||||
				log.Warnw("slow heartbeat", "took", dt)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// expire fanout for bitmasks we haven't published to in a while
 | 
			
		||||
 | 
			
		||||
@ -55,14 +55,6 @@ func (p *PubSub) handleNewStream(s network.Stream) {
 | 
			
		||||
	p.inboundStreams[peer] = s
 | 
			
		||||
	p.inboundStreamsMx.Unlock()
 | 
			
		||||
 | 
			
		||||
	defer func() {
 | 
			
		||||
		p.inboundStreamsMx.Lock()
 | 
			
		||||
		if p.inboundStreams[peer] == s {
 | 
			
		||||
			delete(p.inboundStreams, peer)
 | 
			
		||||
		}
 | 
			
		||||
		p.inboundStreamsMx.Unlock()
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	r := msgio.NewVarintReaderSize(s, p.maxMessageSize)
 | 
			
		||||
	for {
 | 
			
		||||
		msgbytes, err := r.ReadMsg()
 | 
			
		||||
@ -77,6 +69,11 @@ func (p *PubSub) handleNewStream(s network.Stream) {
 | 
			
		||||
				s.Close()
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			p.inboundStreamsMx.Lock()
 | 
			
		||||
			if p.inboundStreams[peer] == s {
 | 
			
		||||
				delete(p.inboundStreams, peer)
 | 
			
		||||
			}
 | 
			
		||||
			p.inboundStreamsMx.Unlock()
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		if len(msgbytes) == 0 {
 | 
			
		||||
@ -91,6 +88,11 @@ func (p *PubSub) handleNewStream(s network.Stream) {
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			s.Reset()
 | 
			
		||||
			log.Warnf("bogus rpc from %s: %s", s.Conn().RemotePeer(), err)
 | 
			
		||||
			p.inboundStreamsMx.Lock()
 | 
			
		||||
			if p.inboundStreams[peer] == s {
 | 
			
		||||
				delete(p.inboundStreams, peer)
 | 
			
		||||
			}
 | 
			
		||||
			p.inboundStreamsMx.Unlock()
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
@ -100,6 +102,11 @@ func (p *PubSub) handleNewStream(s network.Stream) {
 | 
			
		||||
		case <-p.ctx.Done():
 | 
			
		||||
			// Close is useless because the other side isn't reading.
 | 
			
		||||
			s.Reset()
 | 
			
		||||
			p.inboundStreamsMx.Lock()
 | 
			
		||||
			if p.inboundStreams[peer] == s {
 | 
			
		||||
				delete(p.inboundStreams, peer)
 | 
			
		||||
			}
 | 
			
		||||
			p.inboundStreamsMx.Unlock()
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
@ -161,37 +168,38 @@ func (p *PubSub) handlePeerDead(s network.Stream) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, outgoing <-chan *RPC) {
 | 
			
		||||
	writeRpc := func(rpc *RPC) error {
 | 
			
		||||
		size := uint64(rpc.Size())
 | 
			
		||||
 | 
			
		||||
		buf := pool.Get(varint.UvarintSize(size) + int(size))
 | 
			
		||||
		defer pool.Put(buf)
 | 
			
		||||
 | 
			
		||||
		n := binary.PutUvarint(buf, size)
 | 
			
		||||
		_, err := rpc.MarshalTo(buf[n:])
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		_, err = s.Write(buf)
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	defer s.Close()
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case rpc, ok := <-outgoing:
 | 
			
		||||
			if !ok {
 | 
			
		||||
				s.Close()
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			err := writeRpc(rpc)
 | 
			
		||||
			size := uint64(rpc.Size())
 | 
			
		||||
 | 
			
		||||
			buf := pool.Get(varint.UvarintSize(size) + int(size))
 | 
			
		||||
 | 
			
		||||
			n := binary.PutUvarint(buf, size)
 | 
			
		||||
			_, err := rpc.MarshalTo(buf[n:])
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				s.Reset()
 | 
			
		||||
				log.Debugf("writing message to %s: %s", s.Conn().RemotePeer(), err)
 | 
			
		||||
				s.Close()
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			_, err = s.Write(buf)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				s.Reset()
 | 
			
		||||
				log.Debugf("writing message to %s: %s", s.Conn().RemotePeer(), err)
 | 
			
		||||
				s.Close()
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			pool.Put(buf)
 | 
			
		||||
		case <-ctx.Done():
 | 
			
		||||
			s.Close()
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -120,7 +120,6 @@ func (d *discover) pollTimer() {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ticker := time.NewTicker(DiscoveryPollInterval)
 | 
			
		||||
	defer ticker.Stop()
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
@ -128,9 +127,11 @@ func (d *discover) pollTimer() {
 | 
			
		||||
			select {
 | 
			
		||||
			case d.p.eval <- d.requestDiscovery:
 | 
			
		||||
			case <-d.p.ctx.Done():
 | 
			
		||||
				ticker.Stop()
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
		case <-d.p.ctx.Done():
 | 
			
		||||
			ticker.Stop()
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
@ -197,7 +198,6 @@ func (d *discover) Advertise(bitmask []byte) {
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		t := time.NewTimer(next)
 | 
			
		||||
		defer t.Stop()
 | 
			
		||||
 | 
			
		||||
		for advertisingCtx.Err() == nil {
 | 
			
		||||
			select {
 | 
			
		||||
@ -211,9 +211,11 @@ func (d *discover) Advertise(bitmask []byte) {
 | 
			
		||||
				}
 | 
			
		||||
				t.Reset(next)
 | 
			
		||||
			case <-advertisingCtx.Done():
 | 
			
		||||
				t.Stop()
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		t.Stop()
 | 
			
		||||
	}()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -248,7 +250,6 @@ func (d *discover) Bootstrap(ctx context.Context, bitmask []byte, ready RouterRe
 | 
			
		||||
	if !t.Stop() {
 | 
			
		||||
		<-t.C
 | 
			
		||||
	}
 | 
			
		||||
	defer t.Stop()
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		// Check if ready for publishing
 | 
			
		||||
@ -259,11 +260,14 @@ func (d *discover) Bootstrap(ctx context.Context, bitmask []byte, ready RouterRe
 | 
			
		||||
			bootstrapped <- done
 | 
			
		||||
		}:
 | 
			
		||||
			if <-bootstrapped {
 | 
			
		||||
				t.Stop()
 | 
			
		||||
				return true
 | 
			
		||||
			}
 | 
			
		||||
		case <-d.p.ctx.Done():
 | 
			
		||||
			t.Stop()
 | 
			
		||||
			return false
 | 
			
		||||
		case <-ctx.Done():
 | 
			
		||||
			t.Stop()
 | 
			
		||||
			return false
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
@ -272,16 +276,20 @@ func (d *discover) Bootstrap(ctx context.Context, bitmask []byte, ready RouterRe
 | 
			
		||||
		select {
 | 
			
		||||
		case d.discoverQ <- disc:
 | 
			
		||||
		case <-d.p.ctx.Done():
 | 
			
		||||
			t.Stop()
 | 
			
		||||
			return false
 | 
			
		||||
		case <-ctx.Done():
 | 
			
		||||
			t.Stop()
 | 
			
		||||
			return false
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		select {
 | 
			
		||||
		case <-disc.done:
 | 
			
		||||
		case <-d.p.ctx.Done():
 | 
			
		||||
			t.Stop()
 | 
			
		||||
			return false
 | 
			
		||||
		case <-ctx.Done():
 | 
			
		||||
			t.Stop()
 | 
			
		||||
			return false
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
@ -289,8 +297,10 @@ func (d *discover) Bootstrap(ctx context.Context, bitmask []byte, ready RouterRe
 | 
			
		||||
		select {
 | 
			
		||||
		case <-t.C:
 | 
			
		||||
		case <-d.p.ctx.Done():
 | 
			
		||||
			t.Stop()
 | 
			
		||||
			return false
 | 
			
		||||
		case <-ctx.Done():
 | 
			
		||||
			t.Stop()
 | 
			
		||||
			return false
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
@ -298,15 +308,16 @@ func (d *discover) Bootstrap(ctx context.Context, bitmask []byte, ready RouterRe
 | 
			
		||||
 | 
			
		||||
func (d *discover) handleDiscovery(ctx context.Context, bitmask []byte, opts []discovery.Option) {
 | 
			
		||||
	discoverCtx, cancel := context.WithTimeout(ctx, time.Second*10)
 | 
			
		||||
	defer cancel()
 | 
			
		||||
 | 
			
		||||
	peerCh, err := d.discovery.FindPeers(discoverCtx, string(bitmask), opts...)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Debugf("error finding peers for bitmask %s: %v", bitmask, err)
 | 
			
		||||
		cancel()
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	d.connector.Connect(ctx, peerCh)
 | 
			
		||||
	cancel()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type discoverReq struct {
 | 
			
		||||
 | 
			
		||||
@ -54,7 +54,6 @@ func (gt *gossipTracer) AddPromise(p peer.ID, msgIDs [][]byte) {
 | 
			
		||||
	mid := msgIDs[idx]
 | 
			
		||||
 | 
			
		||||
	gt.Lock()
 | 
			
		||||
	defer gt.Unlock()
 | 
			
		||||
 | 
			
		||||
	promises, ok := gt.promises[string(mid)]
 | 
			
		||||
	if !ok {
 | 
			
		||||
@ -72,6 +71,8 @@ func (gt *gossipTracer) AddPromise(p peer.ID, msgIDs [][]byte) {
 | 
			
		||||
		}
 | 
			
		||||
		peerPromises[string(mid)] = struct{}{}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	gt.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// returns the number of broken promises for each peer who didn't follow up
 | 
			
		||||
@ -82,7 +83,6 @@ func (gt *gossipTracer) GetBrokenPromises() map[peer.ID]int {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	gt.Lock()
 | 
			
		||||
	defer gt.Unlock()
 | 
			
		||||
 | 
			
		||||
	var res map[peer.ID]int
 | 
			
		||||
	now := time.Now()
 | 
			
		||||
@ -111,6 +111,7 @@ func (gt *gossipTracer) GetBrokenPromises() map[peer.ID]int {
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	gt.Unlock()
 | 
			
		||||
	return res
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -120,10 +121,10 @@ func (gt *gossipTracer) fulfillPromise(msg *Message) {
 | 
			
		||||
	mid := gt.idGen.ID(msg)
 | 
			
		||||
 | 
			
		||||
	gt.Lock()
 | 
			
		||||
	defer gt.Unlock()
 | 
			
		||||
 | 
			
		||||
	promises, ok := gt.promises[string(mid)]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		gt.Unlock()
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	delete(gt.promises, string(mid))
 | 
			
		||||
@ -138,6 +139,8 @@ func (gt *gossipTracer) fulfillPromise(msg *Message) {
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	gt.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (gt *gossipTracer) DeliverMessage(msg *Message) {
 | 
			
		||||
@ -181,10 +184,10 @@ func (gt *gossipTracer) UndeliverableMessage(msg *Message)    {}
 | 
			
		||||
 | 
			
		||||
func (gt *gossipTracer) ThrottlePeer(p peer.ID) {
 | 
			
		||||
	gt.Lock()
 | 
			
		||||
	defer gt.Unlock()
 | 
			
		||||
 | 
			
		||||
	peerPromises, ok := gt.peerPromises[p]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		gt.Unlock()
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -197,4 +200,5 @@ func (gt *gossipTracer) ThrottlePeer(p peer.ID) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	delete(gt.peerPromises, p)
 | 
			
		||||
	gt.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -204,13 +204,12 @@ func newPeerGater(ctx context.Context, host host.Host, params *PeerGaterParams)
 | 
			
		||||
func (pg *peerGater) background(ctx context.Context) {
 | 
			
		||||
	tick := time.NewTicker(pg.params.DecayInterval)
 | 
			
		||||
 | 
			
		||||
	defer tick.Stop()
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case <-tick.C:
 | 
			
		||||
			pg.decayStats()
 | 
			
		||||
		case <-ctx.Done():
 | 
			
		||||
			tick.Stop()
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
@ -218,7 +217,6 @@ func (pg *peerGater) background(ctx context.Context) {
 | 
			
		||||
 | 
			
		||||
func (pg *peerGater) decayStats() {
 | 
			
		||||
	pg.Lock()
 | 
			
		||||
	defer pg.Unlock()
 | 
			
		||||
 | 
			
		||||
	pg.validate *= pg.params.GlobalDecay
 | 
			
		||||
	if pg.validate < pg.params.DecayToZero {
 | 
			
		||||
@ -256,6 +254,8 @@ func (pg *peerGater) decayStats() {
 | 
			
		||||
			delete(pg.ipStats, ip)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	pg.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pg *peerGater) getPeerStats(p peer.ID) *peerGaterStats {
 | 
			
		||||
@ -323,21 +323,23 @@ func (pg *peerGater) AcceptFrom(p peer.ID) AcceptStatus {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	pg.Lock()
 | 
			
		||||
	defer pg.Unlock()
 | 
			
		||||
 | 
			
		||||
	// check the quiet period; if the validation queue has not throttled for more than the Quiet
 | 
			
		||||
	// interval, we turn off the circuit breaker and accept.
 | 
			
		||||
	if time.Since(pg.lastThrottle) > pg.params.Quiet {
 | 
			
		||||
		pg.Unlock()
 | 
			
		||||
		return AcceptAll
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// no throttle events -- or they have decayed; accept.
 | 
			
		||||
	if pg.throttle == 0 {
 | 
			
		||||
		pg.Unlock()
 | 
			
		||||
		return AcceptAll
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// check the throttle/validate ration; if it is below threshold we accept.
 | 
			
		||||
	if pg.validate != 0 && pg.throttle/pg.validate < pg.params.Threshold {
 | 
			
		||||
		pg.Unlock()
 | 
			
		||||
		return AcceptAll
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -346,6 +348,7 @@ func (pg *peerGater) AcceptFrom(p peer.ID) AcceptStatus {
 | 
			
		||||
	// compute the goodput of the peer; the denominator is the weighted mix of message counters
 | 
			
		||||
	total := st.deliver + pg.params.DuplicateWeight*st.duplicate + pg.params.IgnoreWeight*st.ignore + pg.params.RejectWeight*st.reject
 | 
			
		||||
	if total == 0 {
 | 
			
		||||
		pg.Unlock()
 | 
			
		||||
		return AcceptAll
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -355,10 +358,12 @@ func (pg *peerGater) AcceptFrom(p peer.ID) AcceptStatus {
 | 
			
		||||
	// accepted; this is not a sinkhole/blacklist.
 | 
			
		||||
	threshold := (1 + st.deliver) / (1 + total)
 | 
			
		||||
	if rand.Float64() < threshold {
 | 
			
		||||
		pg.Unlock()
 | 
			
		||||
		return AcceptAll
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	log.Debugf("throttling peer %s with threshold %f", p, threshold)
 | 
			
		||||
	pg.Unlock()
 | 
			
		||||
	return AcceptControl
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -368,21 +373,21 @@ var _ RawTracer = (*peerGater)(nil)
 | 
			
		||||
// tracer interface
 | 
			
		||||
func (pg *peerGater) AddPeer(p peer.ID, proto protocol.ID) {
 | 
			
		||||
	pg.Lock()
 | 
			
		||||
	defer pg.Unlock()
 | 
			
		||||
 | 
			
		||||
	st := pg.getPeerStats(p)
 | 
			
		||||
	st.connected++
 | 
			
		||||
	pg.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pg *peerGater) RemovePeer(p peer.ID) {
 | 
			
		||||
	pg.Lock()
 | 
			
		||||
	defer pg.Unlock()
 | 
			
		||||
 | 
			
		||||
	st := pg.getPeerStats(p)
 | 
			
		||||
	st.connected--
 | 
			
		||||
	st.expire = time.Now().Add(pg.params.RetainStats)
 | 
			
		||||
 | 
			
		||||
	delete(pg.peerStats, p)
 | 
			
		||||
	pg.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pg *peerGater) Join(bitmask []byte)             {}
 | 
			
		||||
@ -392,14 +397,13 @@ func (pg *peerGater) Prune(p peer.ID, bitmask []byte) {}
 | 
			
		||||
 | 
			
		||||
func (pg *peerGater) ValidateMessage(msg *Message) {
 | 
			
		||||
	pg.Lock()
 | 
			
		||||
	defer pg.Unlock()
 | 
			
		||||
 | 
			
		||||
	pg.validate++
 | 
			
		||||
	pg.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pg *peerGater) DeliverMessage(msg *Message) {
 | 
			
		||||
	pg.Lock()
 | 
			
		||||
	defer pg.Unlock()
 | 
			
		||||
 | 
			
		||||
	st := pg.getPeerStats(msg.ReceivedFrom)
 | 
			
		||||
 | 
			
		||||
@ -411,11 +415,11 @@ func (pg *peerGater) DeliverMessage(msg *Message) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	st.deliver += weight
 | 
			
		||||
	pg.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pg *peerGater) RejectMessage(msg *Message, reason string) {
 | 
			
		||||
	pg.Lock()
 | 
			
		||||
	defer pg.Unlock()
 | 
			
		||||
 | 
			
		||||
	switch reason {
 | 
			
		||||
	case RejectValidationQueueFull:
 | 
			
		||||
@ -432,14 +436,15 @@ func (pg *peerGater) RejectMessage(msg *Message, reason string) {
 | 
			
		||||
		st := pg.getPeerStats(msg.ReceivedFrom)
 | 
			
		||||
		st.reject++
 | 
			
		||||
	}
 | 
			
		||||
	pg.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pg *peerGater) DuplicateMessage(msg *Message) {
 | 
			
		||||
	pg.Lock()
 | 
			
		||||
	defer pg.Unlock()
 | 
			
		||||
 | 
			
		||||
	st := pg.getPeerStats(msg.ReceivedFrom)
 | 
			
		||||
	st.duplicate++
 | 
			
		||||
	pg.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pg *peerGater) ThrottlePeer(p peer.ID) {}
 | 
			
		||||
 | 
			
		||||
@ -20,7 +20,6 @@ func (ps *PubSub) watchForNewPeers(ctx context.Context) {
 | 
			
		||||
		log.Errorf("failed to subscribe to peer identification events: %v", err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	defer sub.Close()
 | 
			
		||||
 | 
			
		||||
	ps.newPeersPrioLk.RLock()
 | 
			
		||||
	ps.newPeersMx.Lock()
 | 
			
		||||
@ -68,6 +67,7 @@ func (ps *PubSub) watchForNewPeers(ctx context.Context) {
 | 
			
		||||
		var ev any
 | 
			
		||||
		select {
 | 
			
		||||
		case <-ctx.Done():
 | 
			
		||||
			sub.Close()
 | 
			
		||||
			return
 | 
			
		||||
		case ev = <-sub.Out():
 | 
			
		||||
		}
 | 
			
		||||
@ -95,7 +95,7 @@ func (ps *PubSub) watchForNewPeers(ctx context.Context) {
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	sub.Close()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ps *PubSub) notifyNewPeer(peer peer.ID) {
 | 
			
		||||
 | 
			
		||||
@ -200,12 +200,12 @@ func newPeerScore(params *PeerScoreParams) *peerScore {
 | 
			
		||||
// Note: assumes that the bitmask score parameters have already been validated
 | 
			
		||||
func (ps *peerScore) SetBitmaskScoreParams(bitmask []byte, p *BitmaskScoreParams) error {
 | 
			
		||||
	ps.Lock()
 | 
			
		||||
	defer ps.Unlock()
 | 
			
		||||
 | 
			
		||||
	old, exist := ps.params.Bitmasks[string(bitmask)]
 | 
			
		||||
	ps.params.Bitmasks[string(bitmask)] = p
 | 
			
		||||
 | 
			
		||||
	if !exist {
 | 
			
		||||
		ps.Unlock()
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -218,6 +218,7 @@ func (ps *peerScore) SetBitmaskScoreParams(bitmask []byte, p *BitmaskScoreParams
 | 
			
		||||
		recap = true
 | 
			
		||||
	}
 | 
			
		||||
	if !recap {
 | 
			
		||||
		ps.Unlock()
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -236,7 +237,7 @@ func (ps *peerScore) SetBitmaskScoreParams(bitmask []byte, p *BitmaskScoreParams
 | 
			
		||||
			tstats.meshMessageDeliveries = p.MeshMessageDeliveriesCap
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ps.Unlock()
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -257,9 +258,10 @@ func (ps *peerScore) Score(p peer.ID) float64 {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ps.Lock()
 | 
			
		||||
	defer ps.Unlock()
 | 
			
		||||
 | 
			
		||||
	return ps.score(p)
 | 
			
		||||
	score := ps.score(p)
 | 
			
		||||
	ps.Unlock()
 | 
			
		||||
	return score
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ps *peerScore) score(p peer.ID) float64 {
 | 
			
		||||
@ -394,14 +396,15 @@ func (ps *peerScore) AddPenalty(p peer.ID, count int) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ps.Lock()
 | 
			
		||||
	defer ps.Unlock()
 | 
			
		||||
 | 
			
		||||
	pstats, ok := ps.peerStats[p]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		ps.Unlock()
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	pstats.behaviourPenalty += float64(count)
 | 
			
		||||
	ps.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// periodic maintenance
 | 
			
		||||
@ -503,7 +506,6 @@ func (ps *peerScore) inspectScoresExtended() {
 | 
			
		||||
// once their expiry has elapsed.
 | 
			
		||||
func (ps *peerScore) refreshScores() {
 | 
			
		||||
	ps.Lock()
 | 
			
		||||
	defer ps.Unlock()
 | 
			
		||||
 | 
			
		||||
	now := time.Now()
 | 
			
		||||
	for p, pstats := range ps.peerStats {
 | 
			
		||||
@ -562,12 +564,13 @@ func (ps *peerScore) refreshScores() {
 | 
			
		||||
			pstats.behaviourPenalty = 0
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ps.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// refreshIPs refreshes IPs we know of peers we're tracking.
 | 
			
		||||
func (ps *peerScore) refreshIPs() {
 | 
			
		||||
	ps.Lock()
 | 
			
		||||
	defer ps.Unlock()
 | 
			
		||||
 | 
			
		||||
	// peer IPs may change, so we periodically refresh them
 | 
			
		||||
	//
 | 
			
		||||
@ -582,19 +585,20 @@ func (ps *peerScore) refreshIPs() {
 | 
			
		||||
			pstats.ips = ips
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ps.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ps *peerScore) gcDeliveryRecords() {
 | 
			
		||||
	ps.Lock()
 | 
			
		||||
	defer ps.Unlock()
 | 
			
		||||
 | 
			
		||||
	ps.deliveries.gc()
 | 
			
		||||
	ps.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// tracer interface
 | 
			
		||||
func (ps *peerScore) AddPeer(p peer.ID, proto protocol.ID) {
 | 
			
		||||
	ps.Lock()
 | 
			
		||||
	defer ps.Unlock()
 | 
			
		||||
 | 
			
		||||
	pstats, ok := ps.peerStats[p]
 | 
			
		||||
	if !ok {
 | 
			
		||||
@ -606,14 +610,15 @@ func (ps *peerScore) AddPeer(p peer.ID, proto protocol.ID) {
 | 
			
		||||
	ips := ps.getIPs(p)
 | 
			
		||||
	ps.setIPs(p, ips, pstats.ips)
 | 
			
		||||
	pstats.ips = ips
 | 
			
		||||
	ps.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ps *peerScore) RemovePeer(p peer.ID) {
 | 
			
		||||
	ps.Lock()
 | 
			
		||||
	defer ps.Unlock()
 | 
			
		||||
 | 
			
		||||
	pstats, ok := ps.peerStats[p]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		ps.Unlock()
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -622,6 +627,7 @@ func (ps *peerScore) RemovePeer(p peer.ID) {
 | 
			
		||||
	if ps.score(p) > 0 {
 | 
			
		||||
		ps.removeIPs(p, pstats.ips)
 | 
			
		||||
		delete(ps.peerStats, p)
 | 
			
		||||
		ps.Unlock()
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -641,6 +647,7 @@ func (ps *peerScore) RemovePeer(p peer.ID) {
 | 
			
		||||
 | 
			
		||||
	pstats.connected = false
 | 
			
		||||
	pstats.expire = time.Now().Add(ps.params.RetainScore)
 | 
			
		||||
	ps.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ps *peerScore) Join(bitmask []byte)  {}
 | 
			
		||||
@ -648,15 +655,16 @@ func (ps *peerScore) Leave(bitmask []byte) {}
 | 
			
		||||
 | 
			
		||||
func (ps *peerScore) Graft(p peer.ID, bitmask []byte) {
 | 
			
		||||
	ps.Lock()
 | 
			
		||||
	defer ps.Unlock()
 | 
			
		||||
 | 
			
		||||
	pstats, ok := ps.peerStats[p]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		ps.Unlock()
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	tstats, ok := pstats.getBitmaskStats(bitmask, ps.params)
 | 
			
		||||
	if !ok {
 | 
			
		||||
		ps.Unlock()
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -664,19 +672,21 @@ func (ps *peerScore) Graft(p peer.ID, bitmask []byte) {
 | 
			
		||||
	tstats.graftTime = time.Now()
 | 
			
		||||
	tstats.meshTime = 0
 | 
			
		||||
	tstats.meshMessageDeliveriesActive = false
 | 
			
		||||
	ps.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ps *peerScore) Prune(p peer.ID, bitmask []byte) {
 | 
			
		||||
	ps.Lock()
 | 
			
		||||
	defer ps.Unlock()
 | 
			
		||||
 | 
			
		||||
	pstats, ok := ps.peerStats[p]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		ps.Unlock()
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	tstats, ok := pstats.getBitmaskStats(bitmask, ps.params)
 | 
			
		||||
	if !ok {
 | 
			
		||||
		ps.Unlock()
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -688,20 +698,20 @@ func (ps *peerScore) Prune(p peer.ID, bitmask []byte) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	tstats.inMesh = false
 | 
			
		||||
	ps.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ps *peerScore) ValidateMessage(msg *Message) {
 | 
			
		||||
	ps.Lock()
 | 
			
		||||
	defer ps.Unlock()
 | 
			
		||||
 | 
			
		||||
	// the pubsub subsystem is beginning validation; create a record to track time in
 | 
			
		||||
	// the validation pipeline with an accurate firstSeen time.
 | 
			
		||||
	_ = ps.deliveries.getRecord(ps.idGen.ID(msg))
 | 
			
		||||
	ps.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ps *peerScore) DeliverMessage(msg *Message) {
 | 
			
		||||
	ps.Lock()
 | 
			
		||||
	defer ps.Unlock()
 | 
			
		||||
 | 
			
		||||
	ps.markFirstMessageDelivery(msg.ReceivedFrom, msg)
 | 
			
		||||
 | 
			
		||||
@ -710,6 +720,7 @@ func (ps *peerScore) DeliverMessage(msg *Message) {
 | 
			
		||||
	// defensive check that this is the first delivery trace -- delivery status should be unknown
 | 
			
		||||
	if drec.status != deliveryUnknown {
 | 
			
		||||
		log.Debugf("unexpected delivery trace: message from %s was first seen %s ago and has delivery status %d", msg.ReceivedFrom, time.Since(drec.firstSeen), drec.status)
 | 
			
		||||
		ps.Unlock()
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -723,11 +734,11 @@ func (ps *peerScore) DeliverMessage(msg *Message) {
 | 
			
		||||
			ps.markDuplicateMessageDelivery(p, msg, time.Time{})
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	ps.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ps *peerScore) RejectMessage(msg *Message, reason string) {
 | 
			
		||||
	ps.Lock()
 | 
			
		||||
	defer ps.Unlock()
 | 
			
		||||
 | 
			
		||||
	switch reason {
 | 
			
		||||
	// we don't track those messages, but we penalize the peer as they are clearly invalid
 | 
			
		||||
@ -741,18 +752,21 @@ func (ps *peerScore) RejectMessage(msg *Message, reason string) {
 | 
			
		||||
		fallthrough
 | 
			
		||||
	case RejectSelfOrigin:
 | 
			
		||||
		ps.markInvalidMessageDelivery(msg.ReceivedFrom, msg)
 | 
			
		||||
		ps.Unlock()
 | 
			
		||||
		return
 | 
			
		||||
 | 
			
		||||
		// we ignore those messages, so do nothing.
 | 
			
		||||
	case RejectBlacklstedPeer:
 | 
			
		||||
		fallthrough
 | 
			
		||||
	case RejectBlacklistedSource:
 | 
			
		||||
		ps.Unlock()
 | 
			
		||||
		return
 | 
			
		||||
 | 
			
		||||
	case RejectValidationQueueFull:
 | 
			
		||||
		// the message was rejected before it entered the validation pipeline;
 | 
			
		||||
		// we don't know if this message has a valid signature, and thus we also don't know if
 | 
			
		||||
		// it has a valid message ID; all we can do is ignore it.
 | 
			
		||||
		ps.Unlock()
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -761,6 +775,7 @@ func (ps *peerScore) RejectMessage(msg *Message, reason string) {
 | 
			
		||||
	// defensive check that this is the first rejection trace -- delivery status should be unknown
 | 
			
		||||
	if drec.status != deliveryUnknown {
 | 
			
		||||
		log.Debugf("unexpected rejection trace: message from %s was first seen %s ago and has delivery status %d", msg.ReceivedFrom, time.Since(drec.firstSeen), drec.status)
 | 
			
		||||
		ps.Unlock()
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -771,12 +786,14 @@ func (ps *peerScore) RejectMessage(msg *Message, reason string) {
 | 
			
		||||
		drec.status = deliveryThrottled
 | 
			
		||||
		// release the delivery time tracking map to free some memory early
 | 
			
		||||
		drec.peers = nil
 | 
			
		||||
		ps.Unlock()
 | 
			
		||||
		return
 | 
			
		||||
	case RejectValidationIgnored:
 | 
			
		||||
		// we were explicitly instructed by the validator to ignore the message but not penalize
 | 
			
		||||
		// the peer
 | 
			
		||||
		drec.status = deliveryIgnored
 | 
			
		||||
		drec.peers = nil
 | 
			
		||||
		ps.Unlock()
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -790,17 +807,18 @@ func (ps *peerScore) RejectMessage(msg *Message, reason string) {
 | 
			
		||||
 | 
			
		||||
	// release the delivery time tracking map to free some memory early
 | 
			
		||||
	drec.peers = nil
 | 
			
		||||
	ps.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ps *peerScore) DuplicateMessage(msg *Message) {
 | 
			
		||||
	ps.Lock()
 | 
			
		||||
	defer ps.Unlock()
 | 
			
		||||
 | 
			
		||||
	drec := ps.deliveries.getRecord(ps.idGen.ID(msg))
 | 
			
		||||
 | 
			
		||||
	_, ok := drec.peers[msg.ReceivedFrom]
 | 
			
		||||
	if ok {
 | 
			
		||||
		// we have already seen this duplicate!
 | 
			
		||||
		ps.Unlock()
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -824,6 +842,7 @@ func (ps *peerScore) DuplicateMessage(msg *Message) {
 | 
			
		||||
	case deliveryIgnored:
 | 
			
		||||
		// the message was ignored; do nothing
 | 
			
		||||
	}
 | 
			
		||||
	ps.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ps *peerScore) ThrottlePeer(p peer.ID) {}
 | 
			
		||||
 | 
			
		||||
@ -111,7 +111,7 @@ func (t *tagTracer) addDeliveryTag(bitmask []byte) {
 | 
			
		||||
 | 
			
		||||
	name := "pubsub-deliveries:" + string(bitmask)
 | 
			
		||||
	t.Lock()
 | 
			
		||||
	defer t.Unlock()
 | 
			
		||||
 | 
			
		||||
	tag, err := t.decayer.RegisterDecayingTag(
 | 
			
		||||
		name,
 | 
			
		||||
		BlossomSubConnTagDecayInterval,
 | 
			
		||||
@ -120,16 +120,19 @@ func (t *tagTracer) addDeliveryTag(bitmask []byte) {
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Warnf("unable to create decaying delivery tag: %s", err)
 | 
			
		||||
		t.Unlock()
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	t.decaying[string(bitmask)] = tag
 | 
			
		||||
	t.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *tagTracer) removeDeliveryTag(bitmask []byte) {
 | 
			
		||||
	t.Lock()
 | 
			
		||||
	defer t.Unlock()
 | 
			
		||||
 | 
			
		||||
	tag, ok := t.decaying[string(bitmask)]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		t.Unlock()
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	err := tag.Close()
 | 
			
		||||
@ -137,17 +140,20 @@ func (t *tagTracer) removeDeliveryTag(bitmask []byte) {
 | 
			
		||||
		log.Warnf("error closing decaying connmgr tag: %s", err)
 | 
			
		||||
	}
 | 
			
		||||
	delete(t.decaying, string(bitmask))
 | 
			
		||||
	t.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *tagTracer) bumpDeliveryTag(p peer.ID, bitmask []byte) error {
 | 
			
		||||
	t.RLock()
 | 
			
		||||
	defer t.RUnlock()
 | 
			
		||||
 | 
			
		||||
	tag, ok := t.decaying[string(bitmask)]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		t.RUnlock()
 | 
			
		||||
		return fmt.Errorf("no decaying tag registered for bitmask %s", bitmask)
 | 
			
		||||
	}
 | 
			
		||||
	return tag.Bump(p, BlossomSubConnTagBumpMessageDelivery)
 | 
			
		||||
	err := tag.Bump(p, BlossomSubConnTagBumpMessageDelivery)
 | 
			
		||||
	t.RUnlock()
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *tagTracer) bumpTagsForMessage(p peer.ID, msg *Message) {
 | 
			
		||||
@ -161,15 +167,17 @@ func (t *tagTracer) bumpTagsForMessage(p peer.ID, msg *Message) {
 | 
			
		||||
// nearFirstPeers returns the peers who delivered the message while it was still validating
 | 
			
		||||
func (t *tagTracer) nearFirstPeers(msg *Message) []peer.ID {
 | 
			
		||||
	t.Lock()
 | 
			
		||||
	defer t.Unlock()
 | 
			
		||||
 | 
			
		||||
	peersMap, ok := t.nearFirst[string(t.idGen.ID(msg))]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		t.Unlock()
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	peers := make([]peer.ID, 0, len(peersMap))
 | 
			
		||||
	for p := range peersMap {
 | 
			
		||||
		peers = append(peers, p)
 | 
			
		||||
	}
 | 
			
		||||
	t.Unlock()
 | 
			
		||||
	return peers
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -212,31 +220,32 @@ func (t *tagTracer) Prune(p peer.ID, bitmask []byte) {
 | 
			
		||||
 | 
			
		||||
func (t *tagTracer) ValidateMessage(msg *Message) {
 | 
			
		||||
	t.Lock()
 | 
			
		||||
	defer t.Unlock()
 | 
			
		||||
 | 
			
		||||
	// create map to start tracking the peers who deliver while we're validating
 | 
			
		||||
	id := t.idGen.ID(msg)
 | 
			
		||||
	if _, exists := t.nearFirst[string(id)]; exists {
 | 
			
		||||
		t.Unlock()
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	t.nearFirst[string(id)] = make(map[peer.ID]struct{})
 | 
			
		||||
	t.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *tagTracer) DuplicateMessage(msg *Message) {
 | 
			
		||||
	t.Lock()
 | 
			
		||||
	defer t.Unlock()
 | 
			
		||||
 | 
			
		||||
	id := t.idGen.ID(msg)
 | 
			
		||||
	peers, ok := t.nearFirst[string(id)]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		t.Unlock()
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	peers[msg.ReceivedFrom] = struct{}{}
 | 
			
		||||
	t.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *tagTracer) RejectMessage(msg *Message, reason string) {
 | 
			
		||||
	t.Lock()
 | 
			
		||||
	defer t.Unlock()
 | 
			
		||||
 | 
			
		||||
	// We want to delete the near-first delivery tracking for messages that have passed through
 | 
			
		||||
	// the validation pipeline. Other rejection reasons (missing signature, etc) skip the validation
 | 
			
		||||
@ -249,6 +258,7 @@ func (t *tagTracer) RejectMessage(msg *Message, reason string) {
 | 
			
		||||
	case RejectValidationFailed:
 | 
			
		||||
		delete(t.nearFirst, string(t.idGen.ID(msg)))
 | 
			
		||||
	}
 | 
			
		||||
	t.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *tagTracer) RemovePeer(peer.ID)                {}
 | 
			
		||||
 | 
			
		||||
@ -36,21 +36,22 @@ func (tc *FirstSeenCache) Done() {
 | 
			
		||||
 | 
			
		||||
func (tc *FirstSeenCache) Has(s string) bool {
 | 
			
		||||
	tc.lk.RLock()
 | 
			
		||||
	defer tc.lk.RUnlock()
 | 
			
		||||
 | 
			
		||||
	_, ok := tc.m[s]
 | 
			
		||||
	tc.lk.RUnlock()
 | 
			
		||||
	return ok
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (tc *FirstSeenCache) Add(s string) bool {
 | 
			
		||||
	tc.lk.Lock()
 | 
			
		||||
	defer tc.lk.Unlock()
 | 
			
		||||
 | 
			
		||||
	_, ok := tc.m[s]
 | 
			
		||||
	if ok {
 | 
			
		||||
		tc.lk.Unlock()
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	tc.m[s] = time.Now().Add(tc.ttl)
 | 
			
		||||
	tc.lk.Unlock()
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -37,22 +37,20 @@ func (tc *LastSeenCache) Done() {
 | 
			
		||||
 | 
			
		||||
func (tc *LastSeenCache) Add(s string) bool {
 | 
			
		||||
	tc.lk.Lock()
 | 
			
		||||
	defer tc.lk.Unlock()
 | 
			
		||||
 | 
			
		||||
	_, ok := tc.m[s]
 | 
			
		||||
	tc.m[s] = time.Now().Add(tc.ttl)
 | 
			
		||||
 | 
			
		||||
	tc.lk.Unlock()
 | 
			
		||||
	return !ok
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (tc *LastSeenCache) Has(s string) bool {
 | 
			
		||||
	tc.lk.Lock()
 | 
			
		||||
	defer tc.lk.Unlock()
 | 
			
		||||
 | 
			
		||||
	_, ok := tc.m[s]
 | 
			
		||||
	if ok {
 | 
			
		||||
		tc.m[s] = time.Now().Add(tc.ttl)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	tc.lk.Unlock()
 | 
			
		||||
	return ok
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -10,7 +10,6 @@ var backgroundSweepInterval = time.Minute
 | 
			
		||||
 | 
			
		||||
func background(ctx context.Context, lk sync.Locker, m map[string]time.Time) {
 | 
			
		||||
	ticker := time.NewTicker(backgroundSweepInterval)
 | 
			
		||||
	defer ticker.Stop()
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
@ -18,6 +17,7 @@ func background(ctx context.Context, lk sync.Locker, m map[string]time.Time) {
 | 
			
		||||
			sweep(lk, m, now)
 | 
			
		||||
 | 
			
		||||
		case <-ctx.Done():
 | 
			
		||||
			ticker.Stop()
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
@ -25,11 +25,12 @@ func background(ctx context.Context, lk sync.Locker, m map[string]time.Time) {
 | 
			
		||||
 | 
			
		||||
func sweep(lk sync.Locker, m map[string]time.Time, now time.Time) {
 | 
			
		||||
	lk.Lock()
 | 
			
		||||
	defer lk.Unlock()
 | 
			
		||||
 | 
			
		||||
	for k, expiry := range m {
 | 
			
		||||
		if expiry.Before(now) {
 | 
			
		||||
			delete(m, k)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	lk.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -148,7 +148,6 @@ func (t *pubsubTracer) DuplicateMessage(msg *Message) {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// disable for now
 | 
			
		||||
	// now := time.Now().UnixNano()
 | 
			
		||||
	// evt := &pb.TraceEvent{
 | 
			
		||||
	// 	Type:      pb.TraceEvent_DUPLICATE_MESSAGE.Enum(),
 | 
			
		||||
@ -179,19 +178,19 @@ func (t *pubsubTracer) DeliverMessage(msg *Message) {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	now := time.Now().UnixNano()
 | 
			
		||||
	evt := &pb.TraceEvent{
 | 
			
		||||
		Type:      pb.TraceEvent_DELIVER_MESSAGE.Enum(),
 | 
			
		||||
		PeerID:    []byte(t.pid),
 | 
			
		||||
		Timestamp: &now,
 | 
			
		||||
		DeliverMessage: &pb.TraceEvent_DeliverMessage{
 | 
			
		||||
			MessageID:    []byte(t.idGen.ID(msg)),
 | 
			
		||||
			Bitmask:      msg.Bitmask,
 | 
			
		||||
			ReceivedFrom: []byte(msg.ReceivedFrom),
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	// now := time.Now().UnixNano()
 | 
			
		||||
	// evt := &pb.TraceEvent{
 | 
			
		||||
	// 	Type:      pb.TraceEvent_DELIVER_MESSAGE.Enum(),
 | 
			
		||||
	// 	PeerID:    []byte(t.pid),
 | 
			
		||||
	// 	Timestamp: &now,
 | 
			
		||||
	// 	DeliverMessage: &pb.TraceEvent_DeliverMessage{
 | 
			
		||||
	// 		MessageID:    []byte(t.idGen.ID(msg)),
 | 
			
		||||
	// 		Bitmask:      msg.Bitmask,
 | 
			
		||||
	// 		ReceivedFrom: []byte(msg.ReceivedFrom),
 | 
			
		||||
	// 	},
 | 
			
		||||
	// }
 | 
			
		||||
 | 
			
		||||
	t.tracer.Trace(evt)
 | 
			
		||||
	// t.tracer.Trace(evt)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *pubsubTracer) AddPeer(p peer.ID, proto protocol.ID) {
 | 
			
		||||
@ -261,7 +260,6 @@ func (t *pubsubTracer) RecvRPC(rpc *RPC) {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// disable for now
 | 
			
		||||
	// now := time.Now().UnixNano()
 | 
			
		||||
	// evt := &pb.TraceEvent{
 | 
			
		||||
	// 	Type:      pb.TraceEvent_RECV_RPC.Enum(),
 | 
			
		||||
@ -289,7 +287,6 @@ func (t *pubsubTracer) SendRPC(rpc *RPC, p peer.ID) {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// disable for now
 | 
			
		||||
	// now := time.Now().UnixNano()
 | 
			
		||||
	// evt := &pb.TraceEvent{
 | 
			
		||||
	// 	Type:      pb.TraceEvent_SEND_RPC.Enum(),
 | 
			
		||||
@ -317,7 +314,6 @@ func (t *pubsubTracer) DropRPC(rpc *RPC, p peer.ID) {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// disable for now
 | 
			
		||||
	// now := time.Now().UnixNano()
 | 
			
		||||
	// evt := &pb.TraceEvent{
 | 
			
		||||
	// 	Type:      pb.TraceEvent_DROP_RPC.Enum(),
 | 
			
		||||
@ -345,19 +341,19 @@ func (t *pubsubTracer) UndeliverableMessage(msg *Message) {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	now := time.Now().UnixNano()
 | 
			
		||||
	evt := &pb.TraceEvent{
 | 
			
		||||
		Type:      pb.TraceEvent_UNDELIVERABLE_MESSAGE.Enum(),
 | 
			
		||||
		PeerID:    []byte(t.pid),
 | 
			
		||||
		Timestamp: &now,
 | 
			
		||||
		UndeliverableMessage: &pb.TraceEvent_UndeliverableMessage{
 | 
			
		||||
			MessageID:    []byte(t.idGen.ID(msg)),
 | 
			
		||||
			Bitmask:      msg.Bitmask,
 | 
			
		||||
			ReceivedFrom: []byte(msg.ReceivedFrom),
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	// now := time.Now().UnixNano()
 | 
			
		||||
	// evt := &pb.TraceEvent{
 | 
			
		||||
	// 	Type:      pb.TraceEvent_UNDELIVERABLE_MESSAGE.Enum(),
 | 
			
		||||
	// 	PeerID:    []byte(t.pid),
 | 
			
		||||
	// 	Timestamp: &now,
 | 
			
		||||
	// 	UndeliverableMessage: &pb.TraceEvent_UndeliverableMessage{
 | 
			
		||||
	// 		MessageID:    []byte(t.idGen.ID(msg)),
 | 
			
		||||
	// 		Bitmask:      msg.Bitmask,
 | 
			
		||||
	// 		ReceivedFrom: []byte(msg.ReceivedFrom),
 | 
			
		||||
	// 	},
 | 
			
		||||
	// }
 | 
			
		||||
 | 
			
		||||
	t.tracer.Trace(evt)
 | 
			
		||||
	// t.tracer.Trace(evt)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *pubsubTracer) traceRPCMeta(rpc *RPC) *pb.TraceEvent_RPCMeta {
 | 
			
		||||
 | 
			
		||||
@ -48,9 +48,9 @@ type basicTracer struct {
 | 
			
		||||
 | 
			
		||||
func (t *basicTracer) Trace(evt *pb.TraceEvent) {
 | 
			
		||||
	t.mx.Lock()
 | 
			
		||||
	defer t.mx.Unlock()
 | 
			
		||||
 | 
			
		||||
	if t.closed {
 | 
			
		||||
		t.mx.Unlock()
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -59,6 +59,7 @@ func (t *basicTracer) Trace(evt *pb.TraceEvent) {
 | 
			
		||||
	} else {
 | 
			
		||||
		t.buf = append(t.buf, evt)
 | 
			
		||||
	}
 | 
			
		||||
	t.mx.Unlock()
 | 
			
		||||
 | 
			
		||||
	select {
 | 
			
		||||
	case t.ch <- struct{}{}:
 | 
			
		||||
@ -68,11 +69,11 @@ func (t *basicTracer) Trace(evt *pb.TraceEvent) {
 | 
			
		||||
 | 
			
		||||
func (t *basicTracer) Close() {
 | 
			
		||||
	t.mx.Lock()
 | 
			
		||||
	defer t.mx.Unlock()
 | 
			
		||||
	if !t.closed {
 | 
			
		||||
		t.closed = true
 | 
			
		||||
		close(t.ch)
 | 
			
		||||
	}
 | 
			
		||||
	t.mx.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// JSONTracer is a tracer that writes events to a file, encoded in ndjson.
 | 
			
		||||
 | 
			
		||||
@ -146,17 +146,18 @@ func (v *validation) AddValidator(req *addValReq) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	v.mx.Lock()
 | 
			
		||||
	defer v.mx.Unlock()
 | 
			
		||||
 | 
			
		||||
	bitmask := val.bitmask
 | 
			
		||||
 | 
			
		||||
	_, ok := v.bitmaskVals[string(bitmask)]
 | 
			
		||||
	if ok {
 | 
			
		||||
		v.mx.Unlock()
 | 
			
		||||
		req.resp <- fmt.Errorf("duplicate validator for bitmask %s", bitmask)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	v.bitmaskVals[string(bitmask)] = val
 | 
			
		||||
	v.mx.Unlock()
 | 
			
		||||
	req.resp <- nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -213,15 +214,16 @@ func (v *validation) makeValidator(req *addValReq) (*validatorImpl, error) {
 | 
			
		||||
// RemoveValidator removes an existing validator
 | 
			
		||||
func (v *validation) RemoveValidator(req *rmValReq) {
 | 
			
		||||
	v.mx.Lock()
 | 
			
		||||
	defer v.mx.Unlock()
 | 
			
		||||
 | 
			
		||||
	bitmask := req.bitmask
 | 
			
		||||
 | 
			
		||||
	_, ok := v.bitmaskVals[string(bitmask)]
 | 
			
		||||
	if ok {
 | 
			
		||||
		delete(v.bitmaskVals, string(bitmask))
 | 
			
		||||
		v.mx.Unlock()
 | 
			
		||||
		req.resp <- nil
 | 
			
		||||
	} else {
 | 
			
		||||
		v.mx.Unlock()
 | 
			
		||||
		req.resp <- fmt.Errorf("no validator for bitmask %s", bitmask)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@ -262,7 +264,6 @@ func (v *validation) Push(src peer.ID, msg *Message) bool {
 | 
			
		||||
// getValidators returns all validators that apply to a given message
 | 
			
		||||
func (v *validation) getValidators(msg *Message) []*validatorImpl {
 | 
			
		||||
	v.mx.Lock()
 | 
			
		||||
	defer v.mx.Unlock()
 | 
			
		||||
 | 
			
		||||
	var vals []*validatorImpl
 | 
			
		||||
	vals = append(vals, v.defaultVals...)
 | 
			
		||||
@ -271,10 +272,13 @@ func (v *validation) getValidators(msg *Message) []*validatorImpl {
 | 
			
		||||
 | 
			
		||||
	val, ok := v.bitmaskVals[string(bitmask)]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		v.mx.Unlock()
 | 
			
		||||
		return vals
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return append(vals, val)
 | 
			
		||||
	impls := append(vals, val)
 | 
			
		||||
	v.mx.Unlock()
 | 
			
		||||
	return impls
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// validateWorker is an active goroutine performing inline validation
 | 
			
		||||
@ -413,7 +417,6 @@ func (v *validation) validateBitmask(vals []*validatorImpl, src peer.ID, msg *Me
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ctx, cancel := context.WithCancel(v.p.ctx)
 | 
			
		||||
	defer cancel()
 | 
			
		||||
 | 
			
		||||
	rch := make(chan ValidationResult, len(vals))
 | 
			
		||||
	rcount := 0
 | 
			
		||||
@ -433,6 +436,7 @@ func (v *validation) validateBitmask(vals []*validatorImpl, src peer.ID, msg *Me
 | 
			
		||||
			rch <- validationThrottled
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	cancel()
 | 
			
		||||
 | 
			
		||||
	result := ValidationAccept
 | 
			
		||||
loop:
 | 
			
		||||
@ -472,14 +476,10 @@ func (v *validation) validateSingleBitmask(val *validatorImpl, src peer.ID, msg
 | 
			
		||||
 | 
			
		||||
func (val *validatorImpl) validateMsg(ctx context.Context, src peer.ID, msg *Message) ValidationResult {
 | 
			
		||||
	start := time.Now()
 | 
			
		||||
	defer func() {
 | 
			
		||||
		log.Debugf("validation done; took %s", time.Since(start))
 | 
			
		||||
	}()
 | 
			
		||||
	var cancel func() = nil
 | 
			
		||||
 | 
			
		||||
	if val.validateTimeout > 0 {
 | 
			
		||||
		var cancel func()
 | 
			
		||||
		ctx, cancel = context.WithTimeout(ctx, val.validateTimeout)
 | 
			
		||||
		defer cancel()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	r := val.validate(ctx, src, msg)
 | 
			
		||||
@ -489,10 +489,18 @@ func (val *validatorImpl) validateMsg(ctx context.Context, src peer.ID, msg *Mes
 | 
			
		||||
	case ValidationReject:
 | 
			
		||||
		fallthrough
 | 
			
		||||
	case ValidationIgnore:
 | 
			
		||||
		log.Debugf("validation done; took %s", time.Since(start))
 | 
			
		||||
		if cancel != nil {
 | 
			
		||||
			cancel()
 | 
			
		||||
		}
 | 
			
		||||
		return r
 | 
			
		||||
 | 
			
		||||
	default:
 | 
			
		||||
		log.Warnf("Unexpected result from validator: %d; ignoring message", r)
 | 
			
		||||
		log.Debugf("validation done; took %s", time.Since(start))
 | 
			
		||||
		if cancel != nil {
 | 
			
		||||
			cancel()
 | 
			
		||||
		}
 | 
			
		||||
		return ValidationIgnore
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -72,11 +72,11 @@ func (v *BasicSeqnoValidator) validate(ctx context.Context, _ peer.ID, m *Messag
 | 
			
		||||
 | 
			
		||||
	// get the nonce and compare again with an exclusive lock before commiting (cf concurrent validation)
 | 
			
		||||
	v.mx.Lock()
 | 
			
		||||
	defer v.mx.Unlock()
 | 
			
		||||
 | 
			
		||||
	nonceBytes, err = v.meta.Get(ctx, p)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Warn("error retrieving peer nonce: %s", err)
 | 
			
		||||
		v.mx.Unlock()
 | 
			
		||||
		return ValidationIgnore
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -85,6 +85,7 @@ func (v *BasicSeqnoValidator) validate(ctx context.Context, _ peer.ID, m *Messag
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if seqno <= nonce {
 | 
			
		||||
		v.mx.Unlock()
 | 
			
		||||
		return ValidationIgnore
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@ -96,6 +97,6 @@ func (v *BasicSeqnoValidator) validate(ctx context.Context, _ peer.ID, m *Messag
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Warn("error storing peer nonce: %s", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	v.mx.Unlock()
 | 
			
		||||
	return ValidationAccept
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user