Compare commits

..

No commits in common. "96926b4cbf25f9a3fb4935359a8b53a3ae798b21" and "dc888ceb783e36791caa4838eaba85e02972ea86" have entirely different histories.

6 changed files with 162 additions and 533 deletions

View File

@ -1,9 +1,9 @@
package app package app
import ( import (
"errors"
"fmt" "fmt"
"github.com/cockroachdb/errors"
abci "github.com/cometbft/cometbft/abci/types" abci "github.com/cometbft/cometbft/abci/types"
gethtypes "github.com/ethereum/go-ethereum/core/types" gethtypes "github.com/ethereum/go-ethereum/core/types"
evmtypes "github.com/evmos/ethermint/x/evm/types" evmtypes "github.com/evmos/ethermint/x/evm/types"
@ -97,12 +97,11 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan
return abci.ResponsePrepareProposal{Txs: h.txSelector.SelectedTxs()} return abci.ResponsePrepareProposal{Txs: h.txSelector.SelectedTxs()}
} }
iterator := h.mempool.Select(ctx, req.Txs)
selectedTxsSignersSeqs := make(map[string]uint64) selectedTxsSignersSeqs := make(map[string]uint64)
var selectedTxsNums int var selectedTxsNums int
for iterator != nil {
var waitRemoveTxs []sdk.Tx memTx := iterator.Tx()
mempool.SelectBy(ctx, h.mempool, req.Txs, func(memTx sdk.Tx) bool {
sigs, err := memTx.(signing.SigVerifiableTx).GetSignaturesV2() sigs, err := memTx.(signing.SigVerifiableTx).GetSignaturesV2()
if err != nil { if err != nil {
panic(fmt.Errorf("failed to get signatures: %w", err)) panic(fmt.Errorf("failed to get signatures: %w", err))
@ -158,49 +157,47 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan
} }
} }
if shouldAdd { if !shouldAdd {
// NOTE: Since transaction verification was already executed in CheckTx, iterator = iterator.Next()
// which calls mempool.Insert, in theory everything in the pool should be continue
// valid. But some mempool implementations may insert invalid txs, so we }
// check again.
txBz, err := h.txVerifier.PrepareProposalVerifyTx(memTx)
if err != nil {
waitRemoveTxs = append(waitRemoveTxs, memTx)
} else {
stop := h.txSelector.SelectTxForProposal(uint64(req.MaxTxBytes), maxBlockGas, memTx, txBz)
if stop {
return false
}
txsLen := len(h.txSelector.SelectedTxs()) // NOTE: Since transaction verification was already executed in CheckTx,
for sender, seq := range txSignersSeqs { // which calls mempool.Insert, in theory everything in the pool should be
// If txsLen != selectedTxsNums is true, it means that we've // valid. But some mempool implementations may insert invalid txs, so we
// added a new tx to the selected txs, so we need to update // check again.
// the sequence of the sender. txBz, err := h.txVerifier.PrepareProposalVerifyTx(memTx)
if txsLen != selectedTxsNums { if err != nil {
selectedTxsSignersSeqs[sender] = seq err := h.mempool.Remove(memTx)
} else if _, ok := selectedTxsSignersSeqs[sender]; !ok { if err != nil && !errors.Is(err, mempool.ErrTxNotFound) {
// The transaction hasn't been added but it passed the panic(err)
// verification, so we know that the sequence is correct.
// So we set this sender's sequence to seq-1, in order
// to avoid unnecessary calls to PrepareProposalVerifyTx.
selectedTxsSignersSeqs[sender] = seq - 1
}
}
selectedTxsNums = txsLen
} }
} else {
stop := h.txSelector.SelectTxForProposal(uint64(req.MaxTxBytes), maxBlockGas, memTx, txBz)
if stop {
break
}
txsLen := len(h.txSelector.SelectedTxs())
for sender, seq := range txSignersSeqs {
// If txsLen != selectedTxsNums is true, it means that we've
// added a new tx to the selected txs, so we need to update
// the sequence of the sender.
if txsLen != selectedTxsNums {
selectedTxsSignersSeqs[sender] = seq
} else if _, ok := selectedTxsSignersSeqs[sender]; !ok {
// The transaction hasn't been added but it passed the
// verification, so we know that the sequence is correct.
// So we set this sender's sequence to seq-1, in order
// to avoid unnecessary calls to PrepareProposalVerifyTx.
selectedTxsSignersSeqs[sender] = seq - 1
}
}
selectedTxsNums = txsLen
} }
return true iterator = iterator.Next()
})
for i := range waitRemoveTxs {
err := h.mempool.Remove(waitRemoveTxs[i])
if err != nil && !errors.Is(err, mempool.ErrTxNotFound) {
panic(err)
}
} }
return abci.ResponsePrepareProposal{Txs: h.txSelector.SelectedTxs()} return abci.ResponsePrepareProposal{Txs: h.txSelector.SelectedTxs()}
} }
} }

View File

@ -1068,47 +1068,3 @@ func GetMaccPerms() map[string][]string {
} }
return perms return perms
} }
type accountNonceOp struct {
ak evmtypes.AccountKeeper
}
type AccountNonceOp interface {
GetAccountNonce(ctx sdk.Context, address string) uint64
SetAccountNonce(ctx sdk.Context, address string, nonce uint64)
}
func NewAccountNonceOp(app *App) AccountNonceOp {
return &accountNonceOp{
ak: app.accountKeeper,
}
}
func (ano *accountNonceOp) GetAccountNonce(ctx sdk.Context, address string) uint64 {
bzAcc, err := sdk.AccAddressFromBech32(address)
if err != nil {
ctx.Logger().Error("GetAccountNonce: failed to parse address", "address", address, "error", err)
return 0
}
acc := ano.ak.GetAccount(ctx, bzAcc)
if acc == nil {
ctx.Logger().Error("GetAccountNonce: account not found", "address", address)
return 0
}
return acc.GetSequence()
}
func (ano *accountNonceOp) SetAccountNonce(ctx sdk.Context, address string, nonce uint64) {
bzAcc, err := sdk.AccAddressFromBech32(address)
if err != nil {
ctx.Logger().Error("SetAccountNonce: failed to parse address", "address", address, "nonce", nonce, "error", err)
return
}
acc := ano.ak.GetAccount(ctx, bzAcc)
if acc != nil {
acc.SetSequence(nonce)
ano.ak.SetAccount(ctx, acc)
} else {
ctx.Logger().Error("SetAccountNonce: account not found", "address", address)
}
}

View File

@ -2,13 +2,10 @@ package app
import ( import (
"context" "context"
"sync"
"fmt" "fmt"
"math" "math"
"github.com/huandu/skiplist" "github.com/huandu/skiplist"
"github.com/pkg/errors"
sdk "github.com/cosmos/cosmos-sdk/types" sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/mempool" "github.com/cosmos/cosmos-sdk/types/mempool"
@ -17,16 +14,11 @@ import (
evmtypes "github.com/evmos/ethermint/x/evm/types" evmtypes "github.com/evmos/ethermint/x/evm/types"
) )
const MAX_TXS_PRE_SENDER_IN_MEMPOOL int = 48 const MAX_TXS_PRE_SENDER_IN_MEMPOOL int = 10
var ( var (
_ mempool.Mempool = (*PriorityNonceMempool)(nil) _ mempool.Mempool = (*PriorityNonceMempool)(nil)
_ mempool.Iterator = (*PriorityNonceIterator)(nil) _ mempool.Iterator = (*PriorityNonceIterator)(nil)
errMempoolTxGasPriceTooLow = errors.New("gas price is too low")
errMempoolTooManyTxs = errors.New("tx sender has too many txs in mempool")
errMempoolIsFull = errors.New("mempool is full")
errTxInMempool = errors.New("tx already in mempool")
) )
// PriorityNonceMempool is a mempool implementation that stores txs // PriorityNonceMempool is a mempool implementation that stores txs
@ -37,20 +29,14 @@ var (
// priority to other sender txs and must be partially ordered by both sender-nonce // priority to other sender txs and must be partially ordered by both sender-nonce
// and priority. // and priority.
type PriorityNonceMempool struct { type PriorityNonceMempool struct {
mtx sync.Mutex priorityIndex *skiplist.SkipList
priorityIndex *skiplist.SkipList priorityCounts map[int64]int
priorityCounts map[int64]int senderIndices map[string]*skiplist.SkipList
senderIndices map[string]*skiplist.SkipList scores map[txMeta]txMeta
scores map[txMeta]txMeta onRead func(tx sdk.Tx)
onRead func(tx sdk.Tx) txReplacement func(op, np int64, oTx, nTx sdk.Tx) bool
txReplacement func(op, np int64, oTx, nTx sdk.Tx) bool maxTx int
maxTx int
senderTxCntLock sync.RWMutex
counterBySender map[string]int counterBySender map[string]int
txRecord map[txMeta]struct{}
txReplacedCallback func(ctx context.Context, oldTx, newTx *TxInfo)
} }
type PriorityNonceIterator struct { type PriorityNonceIterator struct {
@ -137,12 +123,6 @@ func PriorityNonceWithMaxTx(maxTx int) PriorityNonceMempoolOption {
} }
} }
func PriorityNonceWithTxReplacedCallback(cb func(ctx context.Context, oldTx, newTx *TxInfo)) PriorityNonceMempoolOption {
return func(mp *PriorityNonceMempool) {
mp.txReplacedCallback = cb
}
}
// DefaultPriorityMempool returns a priorityNonceMempool with no options. // DefaultPriorityMempool returns a priorityNonceMempool with no options.
func DefaultPriorityMempool() mempool.Mempool { func DefaultPriorityMempool() mempool.Mempool {
return NewPriorityMempool() return NewPriorityMempool()
@ -157,7 +137,6 @@ func NewPriorityMempool(opts ...PriorityNonceMempoolOption) *PriorityNonceMempoo
senderIndices: make(map[string]*skiplist.SkipList), senderIndices: make(map[string]*skiplist.SkipList),
scores: make(map[txMeta]txMeta), scores: make(map[txMeta]txMeta),
counterBySender: make(map[string]int), counterBySender: make(map[string]int),
txRecord: make(map[txMeta]struct{}),
} }
for _, opt := range opts { for _, opt := range opts {
@ -190,41 +169,67 @@ func (mp *PriorityNonceMempool) NextSenderTx(sender string) sdk.Tx {
// Inserting a duplicate tx with a different priority overwrites the existing tx, // Inserting a duplicate tx with a different priority overwrites the existing tx,
// changing the total order of the mempool. // changing the total order of the mempool.
func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error { func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
mp.mtx.Lock() if mp.maxTx > 0 && mp.CountTx() >= mp.maxTx {
defer mp.mtx.Unlock() return mempool.ErrMempoolTxMaxCapacity
} else if mp.maxTx < 0 {
// if mp.maxTx > 0 && mp.CountTx() >= mp.maxTx {
// return mempool.ErrMempoolTxMaxCapacity
// } else
if mp.maxTx < 0 {
return nil return nil
} }
sdkContext := sdk.UnwrapSDKContext(ctx) sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
priority := sdkContext.Priority()
txInfo, err := extractTxInfo(tx)
if err != nil { if err != nil {
return err return err
} }
sdkContext := sdk.UnwrapSDKContext(ctx)
priority := sdkContext.Priority()
if !mp.canInsert(txInfo.Sender) { var sender string
return errors.Wrapf(errMempoolTooManyTxs, "[%d@%s]sender has too many txs in mempool", txInfo.Nonce, txInfo.Sender) var nonce uint64
if len(sigs) == 0 {
msgs := tx.GetMsgs()
if len(msgs) != 1 {
return fmt.Errorf("tx must have at least one signer")
}
msgEthTx, ok := msgs[0].(*evmtypes.MsgEthereumTx)
if !ok {
return fmt.Errorf("tx must have at least one signer")
}
ethTx := msgEthTx.AsTransaction()
signer := gethtypes.NewEIP2930Signer(ethTx.ChainId())
ethSender, err := signer.Sender(ethTx)
if err != nil {
return fmt.Errorf("tx must have at least one signer")
}
sender = sdk.AccAddress(ethSender.Bytes()).String()
nonce = ethTx.Nonce()
} else {
sig := sigs[0]
sender = sdk.AccAddress(sig.PubKey.Address()).String()
nonce = sig.Sequence
} }
// init sender index if not exists if _, exists := mp.counterBySender[sender]; !exists {
senderIndex, ok := mp.senderIndices[txInfo.Sender] mp.counterBySender[sender] = 1
} else {
if mp.counterBySender[sender] < MAX_TXS_PRE_SENDER_IN_MEMPOOL {
mp.counterBySender[sender] += 1
} else {
return fmt.Errorf("tx sender has too many txs in mempool")
}
}
key := txMeta{nonce: nonce, priority: priority, sender: sender}
senderIndex, ok := mp.senderIndices[sender]
if !ok { if !ok {
senderIndex = skiplist.New(skiplist.LessThanFunc(func(a, b any) int { senderIndex = skiplist.New(skiplist.LessThanFunc(func(a, b any) int {
return skiplist.Uint64.Compare(b.(txMeta).nonce, a.(txMeta).nonce) return skiplist.Uint64.Compare(b.(txMeta).nonce, a.(txMeta).nonce)
})) }))
// initialize sender index if not found // initialize sender index if not found
mp.senderIndices[txInfo.Sender] = senderIndex mp.senderIndices[sender] = senderIndex
} }
newKey := txMeta{nonce: txInfo.Nonce, priority: priority, sender: txInfo.Sender}
// Since mp.priorityIndex is scored by priority, then sender, then nonce, a // Since mp.priorityIndex is scored by priority, then sender, then nonce, a
// changed priority will create a new key, so we must remove the old key and // changed priority will create a new key, so we must remove the old key and
// re-insert it to avoid having the same tx with different priorityIndex indexed // re-insert it to avoid having the same tx with different priorityIndex indexed
@ -232,155 +237,35 @@ func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
// //
// This O(log n) remove operation is rare and only happens when a tx's priority // This O(log n) remove operation is rare and only happens when a tx's priority
// changes. // changes.
sk := txMeta{nonce: nonce, sender: sender}
sk := txMeta{nonce: txInfo.Nonce, sender: txInfo.Sender}
if oldScore, txExists := mp.scores[sk]; txExists { if oldScore, txExists := mp.scores[sk]; txExists {
if oldScore.priority < priority { if mp.txReplacement != nil && !mp.txReplacement(oldScore.priority, priority, senderIndex.Get(key).Value.(sdk.Tx), tx) {
oldTx := senderIndex.Get(newKey).Value.(sdk.Tx) return fmt.Errorf(
return mp.doTxReplace(ctx, newKey, oldScore, oldTx, tx) "tx doesn't fit the replacement rule, oldPriority: %v, newPriority: %v, oldTx: %v, newTx: %v",
oldScore.priority,
priority,
senderIndex.Get(key).Value.(sdk.Tx),
tx,
)
} }
return errors.Wrapf(errTxInMempool, "[%d@%s] tx already in mempool", txInfo.Nonce, txInfo.Sender)
} else {
mempoolSize := mp.priorityIndex.Len()
if mempoolSize >= mp.maxTx {
lowestPriority := mp.getLowestPriority()
// find one to replace
if lowestPriority > 0 && priority <= lowestPriority {
return errors.Wrapf(errMempoolTxGasPriceTooLow, "[%d@%s]tx with priority %d is too low, current lowest priority is %d", newKey.nonce, newKey.sender, priority, lowestPriority)
}
var maxIndexSize int mp.priorityIndex.Remove(txMeta{
var lowerPriority int64 = math.MaxInt64 nonce: nonce,
var selectedElement *skiplist.Element sender: sender,
for sender, index := range mp.senderIndices { priority: oldScore.priority,
indexSize := index.Len() weight: oldScore.weight,
if sender == txInfo.Sender { })
continue mp.priorityCounts[oldScore.priority]--
}
if indexSize > 0 {
tail := index.Back()
if tail != nil {
tailKey := tail.Key().(txMeta)
if tailKey.priority < lowerPriority {
lowerPriority = tailKey.priority
maxIndexSize = indexSize
selectedElement = tail
} else if tailKey.priority == lowerPriority {
if indexSize > maxIndexSize {
maxIndexSize = indexSize
selectedElement = tail
}
}
}
}
}
if selectedElement != nil {
key := selectedElement.Key().(txMeta)
replacedTx, _ := mp.doRemove(key, true)
// insert new tx
mp.doInsert(newKey, tx, true)
if mp.txReplacedCallback != nil && replacedTx != nil {
sdkContext.Logger().Debug("txn replaced caused by full of mempool", "old", fmt.Sprintf("%d@%s", key.nonce, key.sender), "new", fmt.Sprintf("%d@%s", newKey.nonce, newKey.sender), "mempoolSize", mempoolSize)
mp.txReplacedCallback(ctx,
&TxInfo{Sender: key.sender, Nonce: key.nonce, Tx: replacedTx},
&TxInfo{Sender: newKey.sender, Nonce: newKey.nonce, Tx: tx},
)
}
} else {
return errors.Wrapf(errMempoolIsFull, "%d@%s with priority%d", newKey.nonce, newKey.sender, newKey.priority)
}
} else {
mp.doInsert(newKey, tx, true)
}
return nil
}
}
func (mp *PriorityNonceMempool) doInsert(newKey txMeta, tx sdk.Tx, incrCnt bool) {
senderIndex, ok := mp.senderIndices[newKey.sender]
if !ok {
senderIndex = skiplist.New(skiplist.LessThanFunc(func(a, b any) int {
return skiplist.Uint64.Compare(b.(txMeta).nonce, a.(txMeta).nonce)
}))
// initialize sender index if not found
mp.senderIndices[newKey.sender] = senderIndex
} }
mp.priorityCounts[newKey.priority]++ mp.priorityCounts[priority]++
newKey.senderElement = senderIndex.Set(newKey, tx)
mp.scores[txMeta{nonce: newKey.nonce, sender: newKey.sender}] = txMeta{priority: newKey.priority} // Since senderIndex is scored by nonce, a changed priority will overwrite the
mp.priorityIndex.Set(newKey, tx) // existing key.
key.senderElement = senderIndex.Set(key, tx)
if incrCnt { mp.scores[sk] = txMeta{priority: priority}
mp.incrSenderTxCnt(newKey.sender, newKey.nonce) mp.priorityIndex.Set(key, tx)
}
}
func (mp *PriorityNonceMempool) doRemove(oldKey txMeta, decrCnt bool) (sdk.Tx, error) {
scoreKey := txMeta{nonce: oldKey.nonce, sender: oldKey.sender}
score, ok := mp.scores[scoreKey]
if !ok {
return nil, errors.Wrapf(mempool.ErrTxNotFound, "%d@%s not found", oldKey.nonce, oldKey.sender)
}
tk := txMeta{nonce: oldKey.nonce, priority: score.priority, sender: oldKey.sender, weight: score.weight}
senderTxs, ok := mp.senderIndices[oldKey.sender]
if !ok {
return nil, fmt.Errorf("%d@%s not found", oldKey.nonce, oldKey.sender)
}
mp.priorityIndex.Remove(tk)
removedElem := senderTxs.Remove(tk)
delete(mp.scores, scoreKey)
mp.priorityCounts[score.priority]--
if decrCnt {
mp.decrSenderTxCnt(oldKey.sender, oldKey.nonce)
}
if removedElem == nil {
return nil, mempool.ErrTxNotFound
}
return removedElem.Value.(sdk.Tx), nil
}
func (mp *PriorityNonceMempool) doTxReplace(ctx context.Context, newMate, oldMate txMeta, oldTx, newTx sdk.Tx) error {
if mp.txReplacement != nil && !mp.txReplacement(oldMate.priority, newMate.priority, oldTx, newTx) {
return fmt.Errorf(
"tx doesn't fit the replacement rule, oldPriority: %v, newPriority: %v, oldTx: %v, newTx: %v",
oldMate.priority,
newMate.priority,
oldTx,
newTx,
)
}
e := mp.priorityIndex.Remove(txMeta{
nonce: newMate.nonce,
sender: newMate.sender,
priority: oldMate.priority,
weight: oldMate.weight,
})
replacedTx := e.Value.(sdk.Tx)
mp.priorityCounts[oldMate.priority]--
mp.doInsert(newMate, newTx, false)
if mp.txReplacedCallback != nil && replacedTx != nil {
sdkContext := sdk.UnwrapSDKContext(ctx)
sdkContext.Logger().Debug("txn update", "txn", fmt.Sprintf("%d@%s", newMate.nonce, newMate.sender), "oldPriority", oldMate.priority, "newPriority", newMate.priority)
mp.txReplacedCallback(ctx,
&TxInfo{Sender: newMate.sender, Nonce: newMate.nonce, Tx: replacedTx},
&TxInfo{Sender: newMate.sender, Nonce: newMate.nonce, Tx: newTx},
)
}
return nil return nil
} }
@ -461,24 +346,7 @@ func (i *PriorityNonceIterator) Tx() sdk.Tx {
// //
// NOTE: It is not safe to use this iterator while removing transactions from // NOTE: It is not safe to use this iterator while removing transactions from
// the underlying mempool. // the underlying mempool.
func (mp *PriorityNonceMempool) Select(ctx context.Context, txs [][]byte) mempool.Iterator { func (mp *PriorityNonceMempool) Select(_ context.Context, _ [][]byte) mempool.Iterator {
mp.mtx.Lock()
defer mp.mtx.Unlock()
return mp.doSelect(ctx, txs)
}
func (mp *PriorityNonceMempool) SelectBy(ctx context.Context, txs [][]byte, callback func(sdk.Tx) bool) {
mp.mtx.Lock()
defer mp.mtx.Unlock()
iter := mp.doSelect(ctx, txs)
for iter != nil && callback(iter.Tx()) {
iter = iter.Next()
}
}
func (mp *PriorityNonceMempool) doSelect(_ context.Context, _ [][]byte) mempool.Iterator {
if mp.priorityIndex.Len() == 0 { if mp.priorityIndex.Len() == 0 {
return nil return nil
} }
@ -493,16 +361,6 @@ func (mp *PriorityNonceMempool) doSelect(_ context.Context, _ [][]byte) mempool.
return iterator.iteratePriority() return iterator.iteratePriority()
} }
func (mp *PriorityNonceMempool) GetSenderUncommittedTxnCount(ctx context.Context, sender string) int {
mp.mtx.Lock()
defer mp.mtx.Unlock()
if _, exists := mp.counterBySender[sender]; exists {
return mp.counterBySender[sender]
}
return 0
}
type reorderKey struct { type reorderKey struct {
deleteKey txMeta deleteKey txMeta
insertKey txMeta insertKey txMeta
@ -557,34 +415,51 @@ func senderWeight(senderCursor *skiplist.Element) int64 {
// CountTx returns the number of transactions in the mempool. // CountTx returns the number of transactions in the mempool.
func (mp *PriorityNonceMempool) CountTx() int { func (mp *PriorityNonceMempool) CountTx() int {
mp.mtx.Lock()
defer mp.mtx.Unlock()
return mp.priorityIndex.Len() return mp.priorityIndex.Len()
} }
// Remove removes a transaction from the mempool in O(log n) time, returning an // Remove removes a transaction from the mempool in O(log n) time, returning an
// error if unsuccessful. // error if unsuccessful.
func (mp *PriorityNonceMempool) Remove(tx sdk.Tx) error { func (mp *PriorityNonceMempool) Remove(tx sdk.Tx) error {
mp.mtx.Lock() sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
defer mp.mtx.Unlock()
txInfo, err := extractTxInfo(tx)
if err != nil { if err != nil {
return err return err
} }
var sender string
var nonce uint64
if len(sigs) == 0 {
msgs := tx.GetMsgs()
if len(msgs) != 1 {
return fmt.Errorf("attempted to remove a tx with no signatures")
}
msgEthTx, ok := msgs[0].(*evmtypes.MsgEthereumTx)
if !ok {
return fmt.Errorf("attempted to remove a tx with no signatures")
}
ethTx := msgEthTx.AsTransaction()
signer := gethtypes.NewEIP2930Signer(ethTx.ChainId())
ethSender, err := signer.Sender(ethTx)
if err != nil {
return fmt.Errorf("attempted to remove a tx with no signatures")
}
sender = sdk.AccAddress(ethSender.Bytes()).String()
nonce = ethTx.Nonce()
} else {
sig := sigs[0]
sender = sdk.AccAddress(sig.PubKey.Address()).String()
nonce = sig.Sequence
}
mp.decrSenderTxCnt(txInfo.Sender, txInfo.Nonce) scoreKey := txMeta{nonce: nonce, sender: sender}
scoreKey := txMeta{nonce: txInfo.Nonce, sender: txInfo.Sender}
score, ok := mp.scores[scoreKey] score, ok := mp.scores[scoreKey]
if !ok { if !ok {
return mempool.ErrTxNotFound return mempool.ErrTxNotFound
} }
tk := txMeta{nonce: txInfo.Nonce, priority: score.priority, sender: txInfo.Sender, weight: score.weight} tk := txMeta{nonce: nonce, priority: score.priority, sender: sender, weight: score.weight}
senderTxs, ok := mp.senderIndices[txInfo.Sender] senderTxs, ok := mp.senderIndices[sender]
if !ok { if !ok {
return fmt.Errorf("sender %s not found", txInfo.Sender) return fmt.Errorf("sender %s not found", sender)
} }
mp.priorityIndex.Remove(tk) mp.priorityIndex.Remove(tk)
@ -592,76 +467,17 @@ func (mp *PriorityNonceMempool) Remove(tx sdk.Tx) error {
delete(mp.scores, scoreKey) delete(mp.scores, scoreKey)
mp.priorityCounts[score.priority]-- mp.priorityCounts[score.priority]--
return nil
}
func (mp *PriorityNonceMempool) getLowestPriority() int64 {
if mp.priorityIndex.Len() == 0 {
return 0
}
min := int64(math.MaxInt64)
for priority, count := range mp.priorityCounts {
if count > 0 {
if priority < min {
min = priority
}
}
}
return min
}
func (mp *PriorityNonceMempool) canInsert(sender string) bool {
mp.senderTxCntLock.RLock()
defer mp.senderTxCntLock.RUnlock()
if _, exists := mp.counterBySender[sender]; exists { if _, exists := mp.counterBySender[sender]; exists {
return mp.counterBySender[sender] < MAX_TXS_PRE_SENDER_IN_MEMPOOL if mp.counterBySender[sender] > 1 {
} mp.counterBySender[sender] -= 1
return true
}
func (mp *PriorityNonceMempool) incrSenderTxCnt(sender string, nonce uint64) error {
mp.senderTxCntLock.Lock()
defer mp.senderTxCntLock.Unlock()
existsKey := txMeta{nonce: nonce, sender: sender}
if _, exists := mp.txRecord[existsKey]; !exists {
mp.txRecord[existsKey] = struct{}{}
if _, exists := mp.counterBySender[sender]; !exists {
mp.counterBySender[sender] = 1
} else { } else {
if mp.counterBySender[sender] < MAX_TXS_PRE_SENDER_IN_MEMPOOL { delete(mp.counterBySender, sender)
mp.counterBySender[sender] += 1
} else {
return fmt.Errorf("tx sender has too many txs in mempool")
}
} }
} }
return nil return nil
} }
func (mp *PriorityNonceMempool) decrSenderTxCnt(sender string, nonce uint64) {
mp.senderTxCntLock.Lock()
defer mp.senderTxCntLock.Unlock()
existsKey := txMeta{nonce: nonce, sender: sender}
if _, exists := mp.txRecord[existsKey]; exists {
delete(mp.txRecord, existsKey)
if _, exists := mp.counterBySender[sender]; exists {
if mp.counterBySender[sender] > 1 {
mp.counterBySender[sender] -= 1
} else {
delete(mp.counterBySender, sender)
}
}
}
}
func IsEmpty(mempool mempool.Mempool) error { func IsEmpty(mempool mempool.Mempool) error {
mp := mempool.(*PriorityNonceMempool) mp := mempool.(*PriorityNonceMempool)
if mp.priorityIndex.Len() != 0 { if mp.priorityIndex.Len() != 0 {
@ -692,44 +508,3 @@ func IsEmpty(mempool mempool.Mempool) error {
return nil return nil
} }
type TxInfo struct {
Sender string
Nonce uint64
Tx sdk.Tx
}
func extractTxInfo(tx sdk.Tx) (*TxInfo, error) {
var sender string
var nonce uint64
sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
if err != nil {
return nil, err
}
if len(sigs) == 0 {
msgs := tx.GetMsgs()
if len(msgs) != 1 {
return nil, fmt.Errorf("tx must have at least one signer")
}
msgEthTx, ok := msgs[0].(*evmtypes.MsgEthereumTx)
if !ok {
return nil, fmt.Errorf("tx must have at least one signer")
}
ethTx := msgEthTx.AsTransaction()
signer := gethtypes.NewEIP2930Signer(ethTx.ChainId())
ethSender, err := signer.Sender(ethTx)
if err != nil {
return nil, fmt.Errorf("tx must have at least one signer")
}
sender = sdk.AccAddress(ethSender.Bytes()).String()
nonce = ethTx.Nonce()
} else {
sig := sigs[0]
sender = sdk.AccAddress(sig.PubKey.Address()).String()
nonce = sig.Sequence
}
return &TxInfo{Sender: sender, Nonce: nonce, Tx: tx}, nil
}

View File

@ -1,7 +1,6 @@
package main package main
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -20,7 +19,6 @@ import (
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types" snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
"github.com/cosmos/cosmos-sdk/store" "github.com/cosmos/cosmos-sdk/store"
sdk "github.com/cosmos/cosmos-sdk/types" sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/x/auth/signing"
"github.com/cosmos/cosmos-sdk/x/crisis" "github.com/cosmos/cosmos-sdk/x/crisis"
ethermintflags "github.com/evmos/ethermint/server/flags" ethermintflags "github.com/evmos/ethermint/server/flags"
"github.com/spf13/cast" "github.com/spf13/cast"
@ -28,8 +26,6 @@ import (
"github.com/0glabs/0g-chain/app" "github.com/0glabs/0g-chain/app"
"github.com/0glabs/0g-chain/app/params" "github.com/0glabs/0g-chain/app/params"
gethtypes "github.com/ethereum/go-ethereum/core/types"
evmtypes "github.com/evmos/ethermint/x/evm/types"
) )
const ( const (
@ -38,8 +34,6 @@ const (
flagSkipLoadLatest = "skip-load-latest" flagSkipLoadLatest = "skip-load-latest"
) )
var accountNonceOp app.AccountNonceOp
// appCreator holds functions used by the sdk server to control the 0g-chain app. // appCreator holds functions used by the sdk server to control the 0g-chain app.
// The methods implement types in cosmos-sdk/server/types // The methods implement types in cosmos-sdk/server/types
type appCreator struct { type appCreator struct {
@ -113,6 +107,8 @@ func (ac appCreator) newApp(
skipLoadLatest = cast.ToBool(appOpts.Get(flagSkipLoadLatest)) skipLoadLatest = cast.ToBool(appOpts.Get(flagSkipLoadLatest))
} }
mempool := app.NewPriorityMempool()
bApp := app.NewBaseApp(logger, db, ac.encodingConfig, bApp := app.NewBaseApp(logger, db, ac.encodingConfig,
baseapp.SetPruning(pruningOpts), baseapp.SetPruning(pruningOpts),
baseapp.SetMinGasPrices(strings.Replace(cast.ToString(appOpts.Get(server.FlagMinGasPrices)), ";", ",", -1)), baseapp.SetMinGasPrices(strings.Replace(cast.ToString(appOpts.Get(server.FlagMinGasPrices)), ";", ",", -1)),
@ -127,32 +123,8 @@ func (ac appCreator) newApp(
baseapp.SetIAVLDisableFastNode(cast.ToBool(iavlDisableFastNode)), baseapp.SetIAVLDisableFastNode(cast.ToBool(iavlDisableFastNode)),
baseapp.SetIAVLLazyLoading(cast.ToBool(appOpts.Get(server.FlagIAVLLazyLoading))), baseapp.SetIAVLLazyLoading(cast.ToBool(appOpts.Get(server.FlagIAVLLazyLoading))),
baseapp.SetChainID(chainID), baseapp.SetChainID(chainID),
baseapp.SetTxInfoExtracter(extractTxInfo), baseapp.SetMempool(mempool),
) )
mempool := app.NewPriorityMempool(
app.PriorityNonceWithMaxTx(fixMempoolSize(appOpts)),
app.PriorityNonceWithTxReplacedCallback(func(ctx context.Context, oldTx, newTx *app.TxInfo) {
if oldTx.Sender != newTx.Sender {
sdkContext := sdk.UnwrapSDKContext(ctx)
if accountNonceOp != nil {
nonce := accountNonceOp.GetAccountNonce(sdkContext, oldTx.Sender)
if nonce > 0 {
accountNonceOp.SetAccountNonce(sdkContext, oldTx.Sender, nonce-1)
sdkContext.Logger().Debug("rewind the nonce of the account", "account", oldTx.Sender, "from", nonce, "to", nonce-1)
} else {
sdkContext.Logger().Info("First meeting account", "account", oldTx.Sender)
}
}
} else {
sdkContext := sdk.UnwrapSDKContext(ctx)
sdkContext.Logger().Info("tx replace", "account", oldTx.Sender, "nonce", oldTx.Nonce)
}
bApp.RegisterMempoolTxReplacedEvent(ctx, oldTx.Tx, newTx.Tx)
}),
)
bApp.SetMempool(mempool)
bApp.SetTxEncoder(ac.encodingConfig.TxConfig.TxEncoder()) bApp.SetTxEncoder(ac.encodingConfig.TxConfig.TxEncoder())
abciProposalHandler := app.NewDefaultProposalHandler(mempool, bApp) abciProposalHandler := app.NewDefaultProposalHandler(mempool, bApp)
bApp.SetPrepareProposal(abciProposalHandler.PrepareProposalHandler()) bApp.SetPrepareProposal(abciProposalHandler.PrepareProposalHandler())
@ -172,8 +144,6 @@ func (ac appCreator) newApp(
bApp, bApp,
) )
accountNonceOp = app.NewAccountNonceOp(newApp)
return newApp return newApp
} }
@ -229,72 +199,3 @@ func accAddressesFromBech32(addresses ...string) ([]sdk.AccAddress, error) {
} }
return decodedAddresses, nil return decodedAddresses, nil
} }
var ErrMustHaveSigner error = errors.New("tx must have at least one signer")
func extractTxInfo(ctx sdk.Context, tx sdk.Tx) (*sdk.TxInfo, error) {
sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
if err != nil {
return nil, err
}
var sender string
var nonce uint64
var gasPrice uint64
var gasLimit uint64
var txType int32
if len(sigs) == 0 {
txType = 1
msgs := tx.GetMsgs()
if len(msgs) != 1 {
return nil, ErrMustHaveSigner
}
msgEthTx, ok := msgs[0].(*evmtypes.MsgEthereumTx)
if !ok {
return nil, ErrMustHaveSigner
}
ethTx := msgEthTx.AsTransaction()
signer := gethtypes.NewEIP2930Signer(ethTx.ChainId())
ethSender, err := signer.Sender(ethTx)
if err != nil {
return nil, ErrMustHaveSigner
}
sender = sdk.AccAddress(ethSender.Bytes()).String()
nonce = ethTx.Nonce()
gasPrice = ethTx.GasPrice().Uint64()
gasLimit = ethTx.Gas()
} else {
sig := sigs[0]
sender = sdk.AccAddress(sig.PubKey.Address()).String()
nonce = sig.Sequence
}
return &sdk.TxInfo{
SignerAddress: sender,
Nonce: nonce,
GasLimit: gasLimit,
GasPrice: gasPrice,
TxType: txType,
}, nil
}
func fixMempoolSize(appOpts servertypes.AppOptions) int {
val1 := appOpts.Get("mempool.size")
val2 := appOpts.Get(server.FlagMempoolMaxTxs)
if val1 != nil && val2 != nil {
size1 := cast.ToInt(val1)
size2 := cast.ToInt(val2)
if size1 != size2 {
panic("the value of mempool.size and mempool.max-txs are different")
}
return size1
} else if val1 == nil && val2 == nil {
panic("not found mempool size in config")
} else if val1 == nil {
return cast.ToInt(val2)
} else { //if val2 == nil {
return cast.ToInt(val1)
}
}

10
go.mod
View File

@ -9,6 +9,7 @@ require (
cosmossdk.io/simapp v0.0.0-20231127212628-044ff4d8c015 cosmossdk.io/simapp v0.0.0-20231127212628-044ff4d8c015
github.com/Kava-Labs/opendb v0.0.0-20240719173129-a2f11f6d7e51 github.com/Kava-Labs/opendb v0.0.0-20240719173129-a2f11f6d7e51
github.com/cenkalti/backoff/v4 v4.1.3 github.com/cenkalti/backoff/v4 v4.1.3
github.com/cockroachdb/errors v1.11.1
github.com/cometbft/cometbft v0.37.9 github.com/cometbft/cometbft v0.37.9
github.com/cometbft/cometbft-db v0.9.1 github.com/cometbft/cometbft-db v0.9.1
github.com/coniks-sys/coniks-go v0.0.0-20180722014011-11acf4819b71 github.com/coniks-sys/coniks-go v0.0.0-20180722014011-11acf4819b71
@ -31,7 +32,6 @@ require (
github.com/huandu/skiplist v1.2.0 github.com/huandu/skiplist v1.2.0
github.com/linxGnu/grocksdb v1.8.13 github.com/linxGnu/grocksdb v1.8.13
github.com/pelletier/go-toml/v2 v2.1.0 github.com/pelletier/go-toml/v2 v2.1.0
github.com/pkg/errors v0.9.1
github.com/shopspring/decimal v1.4.0 github.com/shopspring/decimal v1.4.0
github.com/spf13/cast v1.6.0 github.com/spf13/cast v1.6.0
github.com/spf13/cobra v1.8.0 github.com/spf13/cobra v1.8.0
@ -78,7 +78,6 @@ require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/chzyer/readline v1.5.1 // indirect github.com/chzyer/readline v1.5.1 // indirect
github.com/cockroachdb/apd/v2 v2.0.2 // indirect github.com/cockroachdb/apd/v2 v2.0.2 // indirect
github.com/cockroachdb/errors v1.11.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/pebble v1.1.0 // indirect github.com/cockroachdb/pebble v1.1.0 // indirect
github.com/cockroachdb/redact v1.1.5 // indirect github.com/cockroachdb/redact v1.1.5 // indirect
@ -175,6 +174,7 @@ require (
github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67 // indirect github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/client_model v0.6.1 // indirect
@ -238,11 +238,11 @@ replace (
// Use the cosmos keyring code // Use the cosmos keyring code
github.com/99designs/keyring => github.com/cosmos/keyring v1.2.0 github.com/99designs/keyring => github.com/cosmos/keyring v1.2.0
// Use cometbft fork of tendermint // Use cometbft fork of tendermint
github.com/cometbft/cometbft => github.com/0glabs/cometbft v0.37.9-0glabs.3 github.com/cometbft/cometbft => github.com/0glabs/cometbft v0.37.9-0glabs.1
github.com/cometbft/cometbft-db => github.com/kava-labs/cometbft-db v0.9.1-kava.2 github.com/cometbft/cometbft-db => github.com/kava-labs/cometbft-db v0.9.1-kava.2
// Use cosmos-sdk fork with backported fix for unsafe-reset-all, staking transfer events, and custom tally handler support // Use cosmos-sdk fork with backported fix for unsafe-reset-all, staking transfer events, and custom tally handler support
// github.com/cosmos/cosmos-sdk => github.com/0glabs/cosmos-sdk v0.46.11-kava.3 // github.com/cosmos/cosmos-sdk => github.com/0glabs/cosmos-sdk v0.46.11-kava.3
github.com/cosmos/cosmos-sdk => github.com/0glabs/cosmos-sdk v0.47.10-0glabs.12 github.com/cosmos/cosmos-sdk => github.com/0glabs/cosmos-sdk v0.47.10-0glabs.10
github.com/cosmos/iavl => github.com/kava-labs/iavl v1.2.0-kava.1 github.com/cosmos/iavl => github.com/kava-labs/iavl v1.2.0-kava.1
// See https://github.com/cosmos/cosmos-sdk/pull/13093 // See https://github.com/cosmos/cosmos-sdk/pull/13093
github.com/dgrijalva/jwt-go => github.com/golang-jwt/jwt/v4 v4.4.2 github.com/dgrijalva/jwt-go => github.com/golang-jwt/jwt/v4 v4.4.2
@ -250,7 +250,7 @@ replace (
// TODO: Tag before release // TODO: Tag before release
github.com/ethereum/go-ethereum => github.com/evmos/go-ethereum v1.10.26-evmos-rc2 github.com/ethereum/go-ethereum => github.com/evmos/go-ethereum v1.10.26-evmos-rc2
// Use ethermint fork that respects min-gas-price with NoBaseFee true and london enabled, and includes eip712 support // Use ethermint fork that respects min-gas-price with NoBaseFee true and london enabled, and includes eip712 support
github.com/evmos/ethermint => github.com/0glabs/ethermint v0.21.0-0g.v3.1.15 github.com/evmos/ethermint => github.com/0glabs/ethermint v0.21.0-0g.v3.1.12
// See https://github.com/cosmos/cosmos-sdk/pull/10401, https://github.com/cosmos/cosmos-sdk/commit/0592ba6158cd0bf49d894be1cef4faeec59e8320 // See https://github.com/cosmos/cosmos-sdk/pull/10401, https://github.com/cosmos/cosmos-sdk/commit/0592ba6158cd0bf49d894be1cef4faeec59e8320
github.com/gin-gonic/gin => github.com/gin-gonic/gin v1.9.0 github.com/gin-gonic/gin => github.com/gin-gonic/gin v1.9.0
// Downgraded to avoid bugs in following commits which causes "version does not exist" errors // Downgraded to avoid bugs in following commits which causes "version does not exist" errors

12
go.sum
View File

@ -209,12 +209,12 @@ filippo.io/edwards25519 v1.0.0 h1:0wAIcmJUqRdI8IJ/3eGi5/HwXZWPujYXXlkrQogz0Ek=
filippo.io/edwards25519 v1.0.0/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7eHn5EwJns= filippo.io/edwards25519 v1.0.0/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7eHn5EwJns=
git.sr.ht/~sircmpwn/getopt v0.0.0-20191230200459-23622cc906b3/go.mod h1:wMEGFFFNuPos7vHmWXfszqImLppbc0wEhh6JBfJIUgw= git.sr.ht/~sircmpwn/getopt v0.0.0-20191230200459-23622cc906b3/go.mod h1:wMEGFFFNuPos7vHmWXfszqImLppbc0wEhh6JBfJIUgw=
git.sr.ht/~sircmpwn/go-bare v0.0.0-20210406120253-ab86bc2846d9/go.mod h1:BVJwbDfVjCjoFiKrhkei6NdGcZYpkDkdyCdg1ukytRA= git.sr.ht/~sircmpwn/go-bare v0.0.0-20210406120253-ab86bc2846d9/go.mod h1:BVJwbDfVjCjoFiKrhkei6NdGcZYpkDkdyCdg1ukytRA=
github.com/0glabs/cometbft v0.37.9-0glabs.3 h1:sobMz3C+OdFYNRQ3degfCZUHUzyuSPUIZqVMYgDtJs4= github.com/0glabs/cometbft v0.37.9-0glabs.1 h1:KQJG17Y21suKP3QNICLto4b5Ak73XbSmKxeLbg0ZM68=
github.com/0glabs/cometbft v0.37.9-0glabs.3/go.mod h1:j0Q3RqrCd+cztWCugs3obbzC4NyHGBPZZjtm/fWV00I= github.com/0glabs/cometbft v0.37.9-0glabs.1/go.mod h1:j0Q3RqrCd+cztWCugs3obbzC4NyHGBPZZjtm/fWV00I=
github.com/0glabs/cosmos-sdk v0.47.10-0glabs.12 h1:mVUhlaGUPn8izK6TfdXD13xakN8+HGl3Y349YF6Kgqc= github.com/0glabs/cosmos-sdk v0.47.10-0glabs.10 h1:NJp0RwczHBO4EvrQdDxxftHOgUDBtNh7M/vpaG7wFtQ=
github.com/0glabs/cosmos-sdk v0.47.10-0glabs.12/go.mod h1:KskIVnhXTFqrw7CDccMvx7To5KzUsOomIsQV7sPGOog= github.com/0glabs/cosmos-sdk v0.47.10-0glabs.10/go.mod h1:KskIVnhXTFqrw7CDccMvx7To5KzUsOomIsQV7sPGOog=
github.com/0glabs/ethermint v0.21.0-0g.v3.1.15 h1:j3GwMVy1bjOb7TNyH7v7qOUu5LRl6oruZECIx9W77J0= github.com/0glabs/ethermint v0.21.0-0g.v3.1.12 h1:IRVTFhDEH2J5w8ywQW7obXQxYhJYib70SNgKqLOXikU=
github.com/0glabs/ethermint v0.21.0-0g.v3.1.15/go.mod h1:6e/gOcDLhvlDWK3JLJVBgki0gD6H4E1eG7l9byocgWA= github.com/0glabs/ethermint v0.21.0-0g.v3.1.12/go.mod h1:6e/gOcDLhvlDWK3JLJVBgki0gD6H4E1eG7l9byocgWA=
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs= github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs=
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4/go.mod h1:hN7oaIRCjzsZ2dE+yG5k+rsdt3qcwykqK6HVGcKwsw4= github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4/go.mod h1:hN7oaIRCjzsZ2dE+yG5k+rsdt3qcwykqK6HVGcKwsw4=
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.1/go.mod h1:fBF9PQNqB8scdgpZ3ufzaLntG0AG7C1WjPMsiFOmfHM= github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.1/go.mod h1:fBF9PQNqB8scdgpZ3ufzaLntG0AG7C1WjPMsiFOmfHM=