mirror of
				https://github.com/0glabs/0g-chain.git
				synced 2025-11-03 21:27:26 +00:00 
			
		
		
		
	improve app mempool
This commit is contained in:
		
							parent
							
								
									384d899eff
								
							
						
					
					
						commit
						4ff1ab24d1
					
				@ -2,8 +2,10 @@ package app
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"math"
 | 
			
		||||
	"sync"
 | 
			
		||||
 | 
			
		||||
	"github.com/huandu/skiplist"
 | 
			
		||||
 | 
			
		||||
@ -19,6 +21,10 @@ const MAX_TXS_PRE_SENDER_IN_MEMPOOL int = 16
 | 
			
		||||
var (
 | 
			
		||||
	_ mempool.Mempool  = (*PriorityNonceMempool)(nil)
 | 
			
		||||
	_ mempool.Iterator = (*PriorityNonceIterator)(nil)
 | 
			
		||||
 | 
			
		||||
	errMempoolTxGasPriceTooLow = errors.New("gas price is too low")
 | 
			
		||||
	errMempoolTooManyTxs       = errors.New("tx sender has too many txs in mempool")
 | 
			
		||||
	errMempoolIsFull           = errors.New("mempool is full")
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// PriorityNonceMempool is a mempool implementation that stores txs
 | 
			
		||||
@ -29,13 +35,15 @@ var (
 | 
			
		||||
// priority to other sender txs and must be partially ordered by both sender-nonce
 | 
			
		||||
// and priority.
 | 
			
		||||
type PriorityNonceMempool struct {
 | 
			
		||||
	priorityIndex   *skiplist.SkipList
 | 
			
		||||
	priorityCounts  map[int64]int
 | 
			
		||||
	senderIndices   map[string]*skiplist.SkipList
 | 
			
		||||
	scores          map[txMeta]txMeta
 | 
			
		||||
	onRead          func(tx sdk.Tx)
 | 
			
		||||
	txReplacement   func(op, np int64, oTx, nTx sdk.Tx) bool
 | 
			
		||||
	maxTx           int
 | 
			
		||||
	priorityIndex  *skiplist.SkipList
 | 
			
		||||
	priorityCounts map[int64]int
 | 
			
		||||
	senderIndices  map[string]*skiplist.SkipList
 | 
			
		||||
	scores         map[txMeta]txMeta
 | 
			
		||||
	onRead         func(tx sdk.Tx)
 | 
			
		||||
	txReplacement  func(op, np int64, oTx, nTx sdk.Tx) bool
 | 
			
		||||
	maxTx          int
 | 
			
		||||
 | 
			
		||||
	senderTxCntLock sync.RWMutex
 | 
			
		||||
	counterBySender map[string]int
 | 
			
		||||
	txRecord        map[txMeta]struct{}
 | 
			
		||||
 | 
			
		||||
@ -179,73 +187,37 @@ func (mp *PriorityNonceMempool) NextSenderTx(sender string) sdk.Tx {
 | 
			
		||||
// Inserting a duplicate tx with a different priority overwrites the existing tx,
 | 
			
		||||
// changing the total order of the mempool.
 | 
			
		||||
func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
 | 
			
		||||
	if mp.maxTx > 0 && mp.CountTx() >= mp.maxTx {
 | 
			
		||||
		return mempool.ErrMempoolTxMaxCapacity
 | 
			
		||||
	} else if mp.maxTx < 0 {
 | 
			
		||||
	// if mp.maxTx > 0 && mp.CountTx() >= mp.maxTx {
 | 
			
		||||
	// 	return mempool.ErrMempoolTxMaxCapacity
 | 
			
		||||
	// } else
 | 
			
		||||
	if mp.maxTx < 0 {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
 | 
			
		||||
	sdkContext := sdk.UnwrapSDKContext(ctx)
 | 
			
		||||
	priority := sdkContext.Priority()
 | 
			
		||||
	txInfo, err := extractTxInfo(tx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	sdkContext := sdk.UnwrapSDKContext(ctx)
 | 
			
		||||
	priority := sdkContext.Priority()
 | 
			
		||||
 | 
			
		||||
	var replacedTx sdk.Tx
 | 
			
		||||
	var sender string
 | 
			
		||||
	var nonce uint64
 | 
			
		||||
 | 
			
		||||
	if len(sigs) == 0 {
 | 
			
		||||
		msgs := tx.GetMsgs()
 | 
			
		||||
		if len(msgs) != 1 {
 | 
			
		||||
			return fmt.Errorf("tx must have at least one signer")
 | 
			
		||||
		}
 | 
			
		||||
		msgEthTx, ok := msgs[0].(*evmtypes.MsgEthereumTx)
 | 
			
		||||
		if !ok {
 | 
			
		||||
			return fmt.Errorf("tx must have at least one signer")
 | 
			
		||||
		}
 | 
			
		||||
		ethTx := msgEthTx.AsTransaction()
 | 
			
		||||
		signer := gethtypes.NewEIP2930Signer(ethTx.ChainId())
 | 
			
		||||
		ethSender, err := signer.Sender(ethTx)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("tx must have at least one signer")
 | 
			
		||||
		}
 | 
			
		||||
		sender = sdk.AccAddress(ethSender.Bytes()).String()
 | 
			
		||||
		nonce = ethTx.Nonce()
 | 
			
		||||
	} else {
 | 
			
		||||
		sig := sigs[0]
 | 
			
		||||
		sender = sdk.AccAddress(sig.PubKey.Address()).String()
 | 
			
		||||
		nonce = sig.Sequence
 | 
			
		||||
	if !mp.canInsert(txInfo.sender) {
 | 
			
		||||
		return errMempoolTooManyTxs
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	existsKey := txMeta{nonce: nonce, sender: sender}
 | 
			
		||||
	if _, exists := mp.txRecord[existsKey]; !exists {
 | 
			
		||||
		mp.txRecord[existsKey] = struct{}{}
 | 
			
		||||
 | 
			
		||||
		if _, exists := mp.counterBySender[sender]; !exists {
 | 
			
		||||
			mp.counterBySender[sender] = 1
 | 
			
		||||
		} else {
 | 
			
		||||
			if mp.counterBySender[sender] < MAX_TXS_PRE_SENDER_IN_MEMPOOL {
 | 
			
		||||
				mp.counterBySender[sender] += 1
 | 
			
		||||
			} else {
 | 
			
		||||
				return fmt.Errorf("tx sender has too many txs in mempool")
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	key := txMeta{nonce: nonce, priority: priority, sender: sender}
 | 
			
		||||
 | 
			
		||||
	senderIndex, ok := mp.senderIndices[sender]
 | 
			
		||||
	// init sender index if not exists
 | 
			
		||||
	senderIndex, ok := mp.senderIndices[txInfo.sender]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		senderIndex = skiplist.New(skiplist.LessThanFunc(func(a, b any) int {
 | 
			
		||||
			return skiplist.Uint64.Compare(b.(txMeta).nonce, a.(txMeta).nonce)
 | 
			
		||||
		}))
 | 
			
		||||
 | 
			
		||||
		// initialize sender index if not found
 | 
			
		||||
		mp.senderIndices[sender] = senderIndex
 | 
			
		||||
		mp.senderIndices[txInfo.sender] = senderIndex
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	newKey := txMeta{nonce: txInfo.nonce, priority: priority, sender: txInfo.sender}
 | 
			
		||||
 | 
			
		||||
	// Since mp.priorityIndex is scored by priority, then sender, then nonce, a
 | 
			
		||||
	// changed priority will create a new key, so we must remove the old key and
 | 
			
		||||
	// re-insert it to avoid having the same tx with different priorityIndex indexed
 | 
			
		||||
@ -254,39 +226,143 @@ func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
 | 
			
		||||
	// This O(log n) remove operation is rare and only happens when a tx's priority
 | 
			
		||||
	// changes.
 | 
			
		||||
 | 
			
		||||
	sk := txMeta{nonce: nonce, sender: sender}
 | 
			
		||||
	sk := txMeta{nonce: txInfo.nonce, sender: txInfo.sender}
 | 
			
		||||
	if oldScore, txExists := mp.scores[sk]; txExists {
 | 
			
		||||
		if mp.txReplacement != nil && !mp.txReplacement(oldScore.priority, priority, senderIndex.Get(key).Value.(sdk.Tx), tx) {
 | 
			
		||||
			return fmt.Errorf(
 | 
			
		||||
				"tx doesn't fit the replacement rule, oldPriority: %v, newPriority: %v, oldTx: %v, newTx: %v",
 | 
			
		||||
				oldScore.priority,
 | 
			
		||||
				priority,
 | 
			
		||||
				senderIndex.Get(key).Value.(sdk.Tx),
 | 
			
		||||
				tx,
 | 
			
		||||
			)
 | 
			
		||||
		}
 | 
			
		||||
		oldTx := senderIndex.Get(newKey).Value.(sdk.Tx)
 | 
			
		||||
		return mp.doTxReplace(ctx, senderIndex, newKey, oldScore, oldTx, tx)
 | 
			
		||||
	} else {
 | 
			
		||||
		mempoolSize := mp.CountTx()
 | 
			
		||||
		if mempoolSize >= mp.maxTx {
 | 
			
		||||
			lowestPriority := mp.GetLowestPriority()
 | 
			
		||||
			// find one to replace
 | 
			
		||||
			if priority <= lowestPriority {
 | 
			
		||||
				return errMempoolTxGasPriceTooLow
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
		e := mp.priorityIndex.Remove(txMeta{
 | 
			
		||||
			nonce:    nonce,
 | 
			
		||||
			sender:   sender,
 | 
			
		||||
			priority: oldScore.priority,
 | 
			
		||||
			weight:   oldScore.weight,
 | 
			
		||||
		})
 | 
			
		||||
		replacedTx = e.Value.(sdk.Tx)
 | 
			
		||||
		mp.priorityCounts[oldScore.priority]--
 | 
			
		||||
			var maxIndexSize int
 | 
			
		||||
			var lowerPriority int64 = math.MaxInt64
 | 
			
		||||
			var selectedElement *skiplist.Element
 | 
			
		||||
			for sender, index := range mp.senderIndices {
 | 
			
		||||
				indexSize := index.Len()
 | 
			
		||||
				if sender == txInfo.sender {
 | 
			
		||||
					continue
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				if indexSize > 1 {
 | 
			
		||||
					tail := index.Back()
 | 
			
		||||
					if tail != nil {
 | 
			
		||||
						tailKey := tail.Key().(txMeta)
 | 
			
		||||
						if tailKey.priority < lowerPriority {
 | 
			
		||||
							lowerPriority = tailKey.priority
 | 
			
		||||
							maxIndexSize = indexSize
 | 
			
		||||
							selectedElement = tail
 | 
			
		||||
						} else if tailKey.priority == lowerPriority {
 | 
			
		||||
							if indexSize > maxIndexSize {
 | 
			
		||||
								maxIndexSize = indexSize
 | 
			
		||||
								selectedElement = tail
 | 
			
		||||
							}
 | 
			
		||||
						}
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if selectedElement != nil {
 | 
			
		||||
				key := selectedElement.Key().(txMeta)
 | 
			
		||||
				replacedTx, _ := mp.doRemove(key, true)
 | 
			
		||||
 | 
			
		||||
				// insert new tx
 | 
			
		||||
				mp.doInsert(newKey, tx, true)
 | 
			
		||||
 | 
			
		||||
				if mp.txReplacedCallback != nil && replacedTx != nil {
 | 
			
		||||
					mp.txReplacedCallback(ctx, replacedTx, tx)
 | 
			
		||||
				}
 | 
			
		||||
			} else {
 | 
			
		||||
				// not found any index more than 1 except sender's index
 | 
			
		||||
				// We do not replace the sender's only tx in the mempool
 | 
			
		||||
				return errMempoolIsFull
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			mp.doInsert(newKey, tx, true)
 | 
			
		||||
		}
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *PriorityNonceMempool) doInsert(newKey txMeta, tx sdk.Tx, incrCnt bool) {
 | 
			
		||||
	senderIndex, ok := mp.senderIndices[newKey.sender]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		senderIndex = skiplist.New(skiplist.LessThanFunc(func(a, b any) int {
 | 
			
		||||
			return skiplist.Uint64.Compare(b.(txMeta).nonce, a.(txMeta).nonce)
 | 
			
		||||
		}))
 | 
			
		||||
 | 
			
		||||
		// initialize sender index if not found
 | 
			
		||||
		mp.senderIndices[newKey.sender] = senderIndex
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	mp.priorityCounts[priority]++
 | 
			
		||||
	mp.priorityCounts[newKey.priority]++
 | 
			
		||||
	newKey.senderElement = senderIndex.Set(newKey, tx)
 | 
			
		||||
 | 
			
		||||
	// Since senderIndex is scored by nonce, a changed priority will overwrite the
 | 
			
		||||
	// existing key.
 | 
			
		||||
	key.senderElement = senderIndex.Set(key, tx)
 | 
			
		||||
	mp.scores[txMeta{nonce: newKey.nonce, sender: newKey.sender}] = txMeta{priority: newKey.priority}
 | 
			
		||||
	mp.priorityIndex.Set(newKey, tx)
 | 
			
		||||
 | 
			
		||||
	mp.scores[sk] = txMeta{priority: priority}
 | 
			
		||||
	mp.priorityIndex.Set(key, tx)
 | 
			
		||||
	if incrCnt {
 | 
			
		||||
		mp.incrSenderTxCnt(newKey.sender, newKey.nonce)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *PriorityNonceMempool) doRemove(oldKey txMeta, decrCnt bool) (sdk.Tx, error) {
 | 
			
		||||
	scoreKey := txMeta{nonce: oldKey.nonce, sender: oldKey.sender}
 | 
			
		||||
	score, ok := mp.scores[scoreKey]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return nil, mempool.ErrTxNotFound
 | 
			
		||||
	}
 | 
			
		||||
	tk := txMeta{nonce: oldKey.nonce, priority: score.priority, sender: oldKey.sender, weight: score.weight}
 | 
			
		||||
 | 
			
		||||
	senderTxs, ok := mp.senderIndices[oldKey.sender]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return nil, fmt.Errorf("sender %s not found", oldKey.sender)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	mp.priorityIndex.Remove(tk)
 | 
			
		||||
	removedElem := senderTxs.Remove(tk)
 | 
			
		||||
	delete(mp.scores, scoreKey)
 | 
			
		||||
	mp.priorityCounts[score.priority]--
 | 
			
		||||
 | 
			
		||||
	if decrCnt {
 | 
			
		||||
		mp.decrSenderTxCnt(oldKey.sender, oldKey.nonce)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if removedElem == nil {
 | 
			
		||||
		return nil, mempool.ErrTxNotFound
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return removedElem.Value.(sdk.Tx), nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *PriorityNonceMempool) doTxReplace(ctx context.Context, index *skiplist.SkipList, newMate, oldMate txMeta, oldTx, newTx sdk.Tx) error {
 | 
			
		||||
	if mp.txReplacement != nil && !mp.txReplacement(oldMate.priority, newMate.priority, oldTx, newTx) {
 | 
			
		||||
		return fmt.Errorf(
 | 
			
		||||
			"tx doesn't fit the replacement rule, oldPriority: %v, newPriority: %v, oldTx: %v, newTx: %v",
 | 
			
		||||
			oldMate.priority,
 | 
			
		||||
			newMate.priority,
 | 
			
		||||
			oldTx,
 | 
			
		||||
			newTx,
 | 
			
		||||
		)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	e := mp.priorityIndex.Remove(txMeta{
 | 
			
		||||
		nonce:    newMate.nonce,
 | 
			
		||||
		sender:   newMate.sender,
 | 
			
		||||
		priority: oldMate.priority,
 | 
			
		||||
		weight:   oldMate.weight,
 | 
			
		||||
	})
 | 
			
		||||
	replacedTx := e.Value.(sdk.Tx)
 | 
			
		||||
	mp.priorityCounts[oldMate.priority]--
 | 
			
		||||
 | 
			
		||||
	mp.doInsert(newMate, newTx, false)
 | 
			
		||||
 | 
			
		||||
	if mp.txReplacedCallback != nil && replacedTx != nil {
 | 
			
		||||
		mp.txReplacedCallback(ctx, replacedTx, tx)
 | 
			
		||||
		mp.txReplacedCallback(ctx, replacedTx, newTx)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
@ -443,34 +519,81 @@ func (mp *PriorityNonceMempool) CountTx() int {
 | 
			
		||||
// Remove removes a transaction from the mempool in O(log n) time, returning an
 | 
			
		||||
// error if unsuccessful.
 | 
			
		||||
func (mp *PriorityNonceMempool) Remove(tx sdk.Tx) error {
 | 
			
		||||
	sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
 | 
			
		||||
	txInfo, err := extractTxInfo(tx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	var sender string
 | 
			
		||||
	var nonce uint64
 | 
			
		||||
	if len(sigs) == 0 {
 | 
			
		||||
		msgs := tx.GetMsgs()
 | 
			
		||||
		if len(msgs) != 1 {
 | 
			
		||||
			return fmt.Errorf("attempted to remove a tx with no signatures")
 | 
			
		||||
		}
 | 
			
		||||
		msgEthTx, ok := msgs[0].(*evmtypes.MsgEthereumTx)
 | 
			
		||||
		if !ok {
 | 
			
		||||
			return fmt.Errorf("attempted to remove a tx with no signatures")
 | 
			
		||||
		}
 | 
			
		||||
		ethTx := msgEthTx.AsTransaction()
 | 
			
		||||
		signer := gethtypes.NewEIP2930Signer(ethTx.ChainId())
 | 
			
		||||
		ethSender, err := signer.Sender(ethTx)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("attempted to remove a tx with no signatures")
 | 
			
		||||
		}
 | 
			
		||||
		sender = sdk.AccAddress(ethSender.Bytes()).String()
 | 
			
		||||
		nonce = ethTx.Nonce()
 | 
			
		||||
	} else {
 | 
			
		||||
		sig := sigs[0]
 | 
			
		||||
		sender = sdk.AccAddress(sig.PubKey.Address()).String()
 | 
			
		||||
		nonce = sig.Sequence
 | 
			
		||||
 | 
			
		||||
	mp.decrSenderTxCnt(txInfo.sender, txInfo.nonce)
 | 
			
		||||
 | 
			
		||||
	scoreKey := txMeta{nonce: txInfo.nonce, sender: txInfo.sender}
 | 
			
		||||
	score, ok := mp.scores[scoreKey]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return mempool.ErrTxNotFound
 | 
			
		||||
	}
 | 
			
		||||
	tk := txMeta{nonce: txInfo.nonce, priority: score.priority, sender: txInfo.sender, weight: score.weight}
 | 
			
		||||
 | 
			
		||||
	senderTxs, ok := mp.senderIndices[txInfo.sender]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return fmt.Errorf("sender %s not found", txInfo.sender)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	mp.priorityIndex.Remove(tk)
 | 
			
		||||
	senderTxs.Remove(tk)
 | 
			
		||||
	delete(mp.scores, scoreKey)
 | 
			
		||||
	mp.priorityCounts[score.priority]--
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *PriorityNonceMempool) GetLowestPriority() int64 {
 | 
			
		||||
	min := int64(math.MaxInt64)
 | 
			
		||||
	for priority, count := range mp.priorityCounts {
 | 
			
		||||
		if count > 0 {
 | 
			
		||||
			if priority < min {
 | 
			
		||||
				min = priority
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return min
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *PriorityNonceMempool) canInsert(sender string) bool {
 | 
			
		||||
	mp.senderTxCntLock.RLock()
 | 
			
		||||
	defer mp.senderTxCntLock.RUnlock()
 | 
			
		||||
 | 
			
		||||
	if _, exists := mp.counterBySender[sender]; exists {
 | 
			
		||||
		return mp.counterBySender[sender] < MAX_TXS_PRE_SENDER_IN_MEMPOOL
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *PriorityNonceMempool) incrSenderTxCnt(sender string, nonce uint64) error {
 | 
			
		||||
	mp.senderTxCntLock.Lock()
 | 
			
		||||
	defer mp.senderTxCntLock.Unlock()
 | 
			
		||||
 | 
			
		||||
	existsKey := txMeta{nonce: nonce, sender: sender}
 | 
			
		||||
	if _, exists := mp.txRecord[existsKey]; !exists {
 | 
			
		||||
		mp.txRecord[existsKey] = struct{}{}
 | 
			
		||||
 | 
			
		||||
		if _, exists := mp.counterBySender[sender]; !exists {
 | 
			
		||||
			mp.counterBySender[sender] = 1
 | 
			
		||||
		} else {
 | 
			
		||||
			if mp.counterBySender[sender] < MAX_TXS_PRE_SENDER_IN_MEMPOOL {
 | 
			
		||||
				mp.counterBySender[sender] += 1
 | 
			
		||||
			} else {
 | 
			
		||||
				return fmt.Errorf("tx sender has too many txs in mempool")
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (mp *PriorityNonceMempool) decrSenderTxCnt(sender string, nonce uint64) {
 | 
			
		||||
	mp.senderTxCntLock.Lock()
 | 
			
		||||
	defer mp.senderTxCntLock.Unlock()
 | 
			
		||||
 | 
			
		||||
	existsKey := txMeta{nonce: nonce, sender: sender}
 | 
			
		||||
	if _, exists := mp.txRecord[existsKey]; exists {
 | 
			
		||||
@ -484,25 +607,6 @@ func (mp *PriorityNonceMempool) Remove(tx sdk.Tx) error {
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	scoreKey := txMeta{nonce: nonce, sender: sender}
 | 
			
		||||
	score, ok := mp.scores[scoreKey]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return mempool.ErrTxNotFound
 | 
			
		||||
	}
 | 
			
		||||
	tk := txMeta{nonce: nonce, priority: score.priority, sender: sender, weight: score.weight}
 | 
			
		||||
 | 
			
		||||
	senderTxs, ok := mp.senderIndices[sender]
 | 
			
		||||
	if !ok {
 | 
			
		||||
		return fmt.Errorf("sender %s not found", sender)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	mp.priorityIndex.Remove(tk)
 | 
			
		||||
	senderTxs.Remove(tk)
 | 
			
		||||
	delete(mp.scores, scoreKey)
 | 
			
		||||
	mp.priorityCounts[score.priority]--
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func IsEmpty(mempool mempool.Mempool) error {
 | 
			
		||||
@ -535,3 +639,43 @@ func IsEmpty(mempool mempool.Mempool) error {
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type txInfo struct {
 | 
			
		||||
	sender string
 | 
			
		||||
	nonce  uint64
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func extractTxInfo(tx sdk.Tx) (*txInfo, error) {
 | 
			
		||||
	var sender string
 | 
			
		||||
	var nonce uint64
 | 
			
		||||
 | 
			
		||||
	sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(sigs) == 0 {
 | 
			
		||||
		msgs := tx.GetMsgs()
 | 
			
		||||
		if len(msgs) != 1 {
 | 
			
		||||
			return nil, fmt.Errorf("tx must have at least one signer")
 | 
			
		||||
		}
 | 
			
		||||
		msgEthTx, ok := msgs[0].(*evmtypes.MsgEthereumTx)
 | 
			
		||||
		if !ok {
 | 
			
		||||
			return nil, fmt.Errorf("tx must have at least one signer")
 | 
			
		||||
		}
 | 
			
		||||
		ethTx := msgEthTx.AsTransaction()
 | 
			
		||||
		signer := gethtypes.NewEIP2930Signer(ethTx.ChainId())
 | 
			
		||||
		ethSender, err := signer.Sender(ethTx)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, fmt.Errorf("tx must have at least one signer")
 | 
			
		||||
		}
 | 
			
		||||
		sender = sdk.AccAddress(ethSender.Bytes()).String()
 | 
			
		||||
		nonce = ethTx.Nonce()
 | 
			
		||||
	} else {
 | 
			
		||||
		sig := sigs[0]
 | 
			
		||||
		sender = sdk.AccAddress(sig.PubKey.Address()).String()
 | 
			
		||||
		nonce = sig.Sequence
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &txInfo{sender: sender, nonce: nonce}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user