mirror of
https://github.com/0glabs/0g-chain.git
synced 2025-04-04 15:55:23 +00:00
Compare commits
6 Commits
0b027e10ed
...
2cc584be0b
Author | SHA1 | Date | |
---|---|---|---|
![]() |
2cc584be0b | ||
![]() |
d8e968146b | ||
![]() |
962943d32b | ||
![]() |
b42c82d59c | ||
![]() |
08000544c9 | ||
![]() |
248db0f47c |
@ -1,9 +1,9 @@
|
|||||||
package app
|
package app
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/cockroachdb/errors"
|
|
||||||
abci "github.com/cometbft/cometbft/abci/types"
|
abci "github.com/cometbft/cometbft/abci/types"
|
||||||
gethtypes "github.com/ethereum/go-ethereum/core/types"
|
gethtypes "github.com/ethereum/go-ethereum/core/types"
|
||||||
evmtypes "github.com/evmos/ethermint/x/evm/types"
|
evmtypes "github.com/evmos/ethermint/x/evm/types"
|
||||||
@ -97,11 +97,12 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan
|
|||||||
return abci.ResponsePrepareProposal{Txs: h.txSelector.SelectedTxs()}
|
return abci.ResponsePrepareProposal{Txs: h.txSelector.SelectedTxs()}
|
||||||
}
|
}
|
||||||
|
|
||||||
iterator := h.mempool.Select(ctx, req.Txs)
|
|
||||||
selectedTxsSignersSeqs := make(map[string]uint64)
|
selectedTxsSignersSeqs := make(map[string]uint64)
|
||||||
var selectedTxsNums int
|
var selectedTxsNums int
|
||||||
for iterator != nil {
|
|
||||||
memTx := iterator.Tx()
|
var waitRemoveTxs []sdk.Tx
|
||||||
|
|
||||||
|
mempool.SelectBy(ctx, h.mempool, req.Txs, func(memTx sdk.Tx) bool {
|
||||||
sigs, err := memTx.(signing.SigVerifiableTx).GetSignaturesV2()
|
sigs, err := memTx.(signing.SigVerifiableTx).GetSignaturesV2()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Errorf("failed to get signatures: %w", err))
|
panic(fmt.Errorf("failed to get signatures: %w", err))
|
||||||
@ -157,25 +158,18 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !shouldAdd {
|
if shouldAdd {
|
||||||
iterator = iterator.Next()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// NOTE: Since transaction verification was already executed in CheckTx,
|
// NOTE: Since transaction verification was already executed in CheckTx,
|
||||||
// which calls mempool.Insert, in theory everything in the pool should be
|
// which calls mempool.Insert, in theory everything in the pool should be
|
||||||
// valid. But some mempool implementations may insert invalid txs, so we
|
// valid. But some mempool implementations may insert invalid txs, so we
|
||||||
// check again.
|
// check again.
|
||||||
txBz, err := h.txVerifier.PrepareProposalVerifyTx(memTx)
|
txBz, err := h.txVerifier.PrepareProposalVerifyTx(memTx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err := h.mempool.Remove(memTx)
|
waitRemoveTxs = append(waitRemoveTxs, memTx)
|
||||||
if err != nil && !errors.Is(err, mempool.ErrTxNotFound) {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
stop := h.txSelector.SelectTxForProposal(uint64(req.MaxTxBytes), maxBlockGas, memTx, txBz)
|
stop := h.txSelector.SelectTxForProposal(uint64(req.MaxTxBytes), maxBlockGas, memTx, txBz)
|
||||||
if stop {
|
if stop {
|
||||||
break
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
txsLen := len(h.txSelector.SelectedTxs())
|
txsLen := len(h.txSelector.SelectedTxs())
|
||||||
@ -195,9 +189,18 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan
|
|||||||
}
|
}
|
||||||
selectedTxsNums = txsLen
|
selectedTxsNums = txsLen
|
||||||
}
|
}
|
||||||
|
|
||||||
iterator = iterator.Next()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
for i := range waitRemoveTxs {
|
||||||
|
err := h.mempool.Remove(waitRemoveTxs[i])
|
||||||
|
if err != nil && !errors.Is(err, mempool.ErrTxNotFound) {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return abci.ResponsePrepareProposal{Txs: h.txSelector.SelectedTxs()}
|
return abci.ResponsePrepareProposal{Txs: h.txSelector.SelectedTxs()}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
28
app/app.go
28
app/app.go
@ -1068,3 +1068,31 @@ func GetMaccPerms() map[string][]string {
|
|||||||
}
|
}
|
||||||
return perms
|
return perms
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type accountNonceOp struct {
|
||||||
|
ak evmtypes.AccountKeeper
|
||||||
|
}
|
||||||
|
|
||||||
|
type AccountNonceOp interface {
|
||||||
|
GetAccountNonce(ctx sdk.Context, address string) uint64
|
||||||
|
SetAccountNonce(ctx sdk.Context, address string, nonce uint64)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewAccountNonceOp(app *App) AccountNonceOp {
|
||||||
|
return &accountNonceOp{
|
||||||
|
ak: app.accountKeeper,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ano *accountNonceOp) GetAccountNonce(ctx sdk.Context, address string) uint64 {
|
||||||
|
bzAcc, _ := sdk.AccAddressFromBech32(address)
|
||||||
|
acc := ano.ak.GetAccount(ctx, bzAcc)
|
||||||
|
return acc.GetSequence()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ano *accountNonceOp) SetAccountNonce(ctx sdk.Context, address string, nonce uint64) {
|
||||||
|
bzAcc, _ := sdk.AccAddressFromBech32(address)
|
||||||
|
acc := ano.ak.GetAccount(ctx, bzAcc)
|
||||||
|
acc.SetSequence(nonce)
|
||||||
|
ano.ak.SetAccount(ctx, acc)
|
||||||
|
}
|
||||||
|
@ -17,7 +17,7 @@ import (
|
|||||||
evmtypes "github.com/evmos/ethermint/x/evm/types"
|
evmtypes "github.com/evmos/ethermint/x/evm/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
const MAX_TXS_PRE_SENDER_IN_MEMPOOL int = 16
|
const MAX_TXS_PRE_SENDER_IN_MEMPOOL int = 48
|
||||||
|
|
||||||
var (
|
var (
|
||||||
_ mempool.Mempool = (*PriorityNonceMempool)(nil)
|
_ mempool.Mempool = (*PriorityNonceMempool)(nil)
|
||||||
@ -26,6 +26,7 @@ var (
|
|||||||
errMempoolTxGasPriceTooLow = errors.New("gas price is too low")
|
errMempoolTxGasPriceTooLow = errors.New("gas price is too low")
|
||||||
errMempoolTooManyTxs = errors.New("tx sender has too many txs in mempool")
|
errMempoolTooManyTxs = errors.New("tx sender has too many txs in mempool")
|
||||||
errMempoolIsFull = errors.New("mempool is full")
|
errMempoolIsFull = errors.New("mempool is full")
|
||||||
|
errTxInMempool = errors.New("tx already in mempool")
|
||||||
)
|
)
|
||||||
|
|
||||||
// PriorityNonceMempool is a mempool implementation that stores txs
|
// PriorityNonceMempool is a mempool implementation that stores txs
|
||||||
@ -36,6 +37,7 @@ var (
|
|||||||
// priority to other sender txs and must be partially ordered by both sender-nonce
|
// priority to other sender txs and must be partially ordered by both sender-nonce
|
||||||
// and priority.
|
// and priority.
|
||||||
type PriorityNonceMempool struct {
|
type PriorityNonceMempool struct {
|
||||||
|
mtx sync.Mutex
|
||||||
priorityIndex *skiplist.SkipList
|
priorityIndex *skiplist.SkipList
|
||||||
priorityCounts map[int64]int
|
priorityCounts map[int64]int
|
||||||
senderIndices map[string]*skiplist.SkipList
|
senderIndices map[string]*skiplist.SkipList
|
||||||
@ -48,7 +50,7 @@ type PriorityNonceMempool struct {
|
|||||||
counterBySender map[string]int
|
counterBySender map[string]int
|
||||||
txRecord map[txMeta]struct{}
|
txRecord map[txMeta]struct{}
|
||||||
|
|
||||||
txReplacedCallback func(ctx context.Context, oldTx, newTx sdk.Tx)
|
txReplacedCallback func(ctx context.Context, oldTx, newTx *TxInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
type PriorityNonceIterator struct {
|
type PriorityNonceIterator struct {
|
||||||
@ -135,7 +137,7 @@ func PriorityNonceWithMaxTx(maxTx int) PriorityNonceMempoolOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func PriorityNonceWithTxReplacedCallback(cb func(ctx context.Context, oldTx, newTx sdk.Tx)) PriorityNonceMempoolOption {
|
func PriorityNonceWithTxReplacedCallback(cb func(ctx context.Context, oldTx, newTx *TxInfo)) PriorityNonceMempoolOption {
|
||||||
return func(mp *PriorityNonceMempool) {
|
return func(mp *PriorityNonceMempool) {
|
||||||
mp.txReplacedCallback = cb
|
mp.txReplacedCallback = cb
|
||||||
}
|
}
|
||||||
@ -188,6 +190,9 @@ func (mp *PriorityNonceMempool) NextSenderTx(sender string) sdk.Tx {
|
|||||||
// Inserting a duplicate tx with a different priority overwrites the existing tx,
|
// Inserting a duplicate tx with a different priority overwrites the existing tx,
|
||||||
// changing the total order of the mempool.
|
// changing the total order of the mempool.
|
||||||
func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
|
func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
|
||||||
|
mp.mtx.Lock()
|
||||||
|
defer mp.mtx.Unlock()
|
||||||
|
|
||||||
// if mp.maxTx > 0 && mp.CountTx() >= mp.maxTx {
|
// if mp.maxTx > 0 && mp.CountTx() >= mp.maxTx {
|
||||||
// return mempool.ErrMempoolTxMaxCapacity
|
// return mempool.ErrMempoolTxMaxCapacity
|
||||||
// } else
|
// } else
|
||||||
@ -197,27 +202,28 @@ func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
|
|||||||
|
|
||||||
sdkContext := sdk.UnwrapSDKContext(ctx)
|
sdkContext := sdk.UnwrapSDKContext(ctx)
|
||||||
priority := sdkContext.Priority()
|
priority := sdkContext.Priority()
|
||||||
|
|
||||||
txInfo, err := extractTxInfo(tx)
|
txInfo, err := extractTxInfo(tx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !mp.canInsert(txInfo.sender) {
|
if !mp.canInsert(txInfo.Sender) {
|
||||||
return errors.Wrapf(errMempoolTooManyTxs, "sender %s has too many txs in mempool", txInfo.sender)
|
return errors.Wrapf(errMempoolTooManyTxs, "[%d@%s]sender has too many txs in mempool", txInfo.Nonce, txInfo.Sender)
|
||||||
}
|
}
|
||||||
|
|
||||||
// init sender index if not exists
|
// init sender index if not exists
|
||||||
senderIndex, ok := mp.senderIndices[txInfo.sender]
|
senderIndex, ok := mp.senderIndices[txInfo.Sender]
|
||||||
if !ok {
|
if !ok {
|
||||||
senderIndex = skiplist.New(skiplist.LessThanFunc(func(a, b any) int {
|
senderIndex = skiplist.New(skiplist.LessThanFunc(func(a, b any) int {
|
||||||
return skiplist.Uint64.Compare(b.(txMeta).nonce, a.(txMeta).nonce)
|
return skiplist.Uint64.Compare(b.(txMeta).nonce, a.(txMeta).nonce)
|
||||||
}))
|
}))
|
||||||
|
|
||||||
// initialize sender index if not found
|
// initialize sender index if not found
|
||||||
mp.senderIndices[txInfo.sender] = senderIndex
|
mp.senderIndices[txInfo.Sender] = senderIndex
|
||||||
}
|
}
|
||||||
|
|
||||||
newKey := txMeta{nonce: txInfo.nonce, priority: priority, sender: txInfo.sender}
|
newKey := txMeta{nonce: txInfo.Nonce, priority: priority, sender: txInfo.Sender}
|
||||||
|
|
||||||
// Since mp.priorityIndex is scored by priority, then sender, then nonce, a
|
// Since mp.priorityIndex is scored by priority, then sender, then nonce, a
|
||||||
// changed priority will create a new key, so we must remove the old key and
|
// changed priority will create a new key, so we must remove the old key and
|
||||||
@ -227,17 +233,20 @@ func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
|
|||||||
// This O(log n) remove operation is rare and only happens when a tx's priority
|
// This O(log n) remove operation is rare and only happens when a tx's priority
|
||||||
// changes.
|
// changes.
|
||||||
|
|
||||||
sk := txMeta{nonce: txInfo.nonce, sender: txInfo.sender}
|
sk := txMeta{nonce: txInfo.Nonce, sender: txInfo.Sender}
|
||||||
if oldScore, txExists := mp.scores[sk]; txExists {
|
if oldScore, txExists := mp.scores[sk]; txExists {
|
||||||
|
if oldScore.priority < priority {
|
||||||
oldTx := senderIndex.Get(newKey).Value.(sdk.Tx)
|
oldTx := senderIndex.Get(newKey).Value.(sdk.Tx)
|
||||||
return mp.doTxReplace(ctx, newKey, oldScore, oldTx, tx)
|
return mp.doTxReplace(ctx, newKey, oldScore, oldTx, tx)
|
||||||
|
}
|
||||||
|
return errors.Wrapf(errTxInMempool, "[%d@%s] tx already in mempool", txInfo.Nonce, txInfo.Sender)
|
||||||
} else {
|
} else {
|
||||||
mempoolSize := mp.CountTx()
|
mempoolSize := mp.priorityIndex.Len()
|
||||||
if mempoolSize >= mp.maxTx {
|
if mempoolSize >= mp.maxTx {
|
||||||
lowestPriority := mp.GetLowestPriority()
|
lowestPriority := mp.getLowestPriority()
|
||||||
// find one to replace
|
// find one to replace
|
||||||
if lowestPriority > 0 && priority <= lowestPriority {
|
if lowestPriority > 0 && priority <= lowestPriority {
|
||||||
return errors.Wrapf(errMempoolTxGasPriceTooLow, "tx with priority %d is too low, current lowest priority is %d", priority, lowestPriority)
|
return errors.Wrapf(errMempoolTxGasPriceTooLow, "[%d@%s]tx with priority %d is too low, current lowest priority is %d", newKey.nonce, newKey.sender, priority, lowestPriority)
|
||||||
}
|
}
|
||||||
|
|
||||||
var maxIndexSize int
|
var maxIndexSize int
|
||||||
@ -245,7 +254,7 @@ func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
|
|||||||
var selectedElement *skiplist.Element
|
var selectedElement *skiplist.Element
|
||||||
for sender, index := range mp.senderIndices {
|
for sender, index := range mp.senderIndices {
|
||||||
indexSize := index.Len()
|
indexSize := index.Len()
|
||||||
if sender == txInfo.sender {
|
if sender == txInfo.Sender {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -275,12 +284,16 @@ func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
|
|||||||
mp.doInsert(newKey, tx, true)
|
mp.doInsert(newKey, tx, true)
|
||||||
|
|
||||||
if mp.txReplacedCallback != nil && replacedTx != nil {
|
if mp.txReplacedCallback != nil && replacedTx != nil {
|
||||||
mp.txReplacedCallback(ctx, replacedTx, tx)
|
sdkContext.Logger().Debug("txn replaced caused by full of mempool", "old", fmt.Sprintf("%d@%s", key.nonce, key.sender), "new", fmt.Sprintf("%d@%s", newKey.nonce, newKey.sender), "mempoolSize", mempoolSize)
|
||||||
|
mp.txReplacedCallback(ctx,
|
||||||
|
&TxInfo{Sender: key.sender, Nonce: key.nonce, Tx: replacedTx},
|
||||||
|
&TxInfo{Sender: newKey.sender, Nonce: newKey.nonce, Tx: tx},
|
||||||
|
)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// not found any index more than 1 except sender's index
|
// not found any index more than 1 except sender's index
|
||||||
// We do not replace the sender's only tx in the mempool
|
// We do not replace the sender's only tx in the mempool
|
||||||
return errMempoolIsFull
|
return errors.Wrapf(errMempoolIsFull, "%d@%s with priority%d", newKey.nonce, newKey.sender, newKey.priority)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
mp.doInsert(newKey, tx, true)
|
mp.doInsert(newKey, tx, true)
|
||||||
@ -315,13 +328,13 @@ func (mp *PriorityNonceMempool) doRemove(oldKey txMeta, decrCnt bool) (sdk.Tx, e
|
|||||||
scoreKey := txMeta{nonce: oldKey.nonce, sender: oldKey.sender}
|
scoreKey := txMeta{nonce: oldKey.nonce, sender: oldKey.sender}
|
||||||
score, ok := mp.scores[scoreKey]
|
score, ok := mp.scores[scoreKey]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, mempool.ErrTxNotFound
|
return nil, errors.Wrapf(mempool.ErrTxNotFound, "%d@%s not found", oldKey.nonce, oldKey.sender)
|
||||||
}
|
}
|
||||||
tk := txMeta{nonce: oldKey.nonce, priority: score.priority, sender: oldKey.sender, weight: score.weight}
|
tk := txMeta{nonce: oldKey.nonce, priority: score.priority, sender: oldKey.sender, weight: score.weight}
|
||||||
|
|
||||||
senderTxs, ok := mp.senderIndices[oldKey.sender]
|
senderTxs, ok := mp.senderIndices[oldKey.sender]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("sender %s not found", oldKey.sender)
|
return nil, fmt.Errorf("%d@%s not found", oldKey.nonce, oldKey.sender)
|
||||||
}
|
}
|
||||||
|
|
||||||
mp.priorityIndex.Remove(tk)
|
mp.priorityIndex.Remove(tk)
|
||||||
@ -363,7 +376,12 @@ func (mp *PriorityNonceMempool) doTxReplace(ctx context.Context, newMate, oldMat
|
|||||||
mp.doInsert(newMate, newTx, false)
|
mp.doInsert(newMate, newTx, false)
|
||||||
|
|
||||||
if mp.txReplacedCallback != nil && replacedTx != nil {
|
if mp.txReplacedCallback != nil && replacedTx != nil {
|
||||||
mp.txReplacedCallback(ctx, replacedTx, newTx)
|
sdkContext := sdk.UnwrapSDKContext(ctx)
|
||||||
|
sdkContext.Logger().Debug("txn update", "txn", fmt.Sprintf("%d@%s", newMate.nonce, newMate.sender), "oldPriority", oldMate.priority, "newPriority", newMate.priority)
|
||||||
|
mp.txReplacedCallback(ctx,
|
||||||
|
&TxInfo{Sender: newMate.sender, Nonce: newMate.nonce, Tx: replacedTx},
|
||||||
|
&TxInfo{Sender: newMate.sender, Nonce: newMate.nonce, Tx: newTx},
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -445,7 +463,24 @@ func (i *PriorityNonceIterator) Tx() sdk.Tx {
|
|||||||
//
|
//
|
||||||
// NOTE: It is not safe to use this iterator while removing transactions from
|
// NOTE: It is not safe to use this iterator while removing transactions from
|
||||||
// the underlying mempool.
|
// the underlying mempool.
|
||||||
func (mp *PriorityNonceMempool) Select(_ context.Context, _ [][]byte) mempool.Iterator {
|
func (mp *PriorityNonceMempool) Select(ctx context.Context, txs [][]byte) mempool.Iterator {
|
||||||
|
mp.mtx.Lock()
|
||||||
|
defer mp.mtx.Unlock()
|
||||||
|
|
||||||
|
return mp.doSelect(ctx, txs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *PriorityNonceMempool) SelectBy(ctx context.Context, txs [][]byte, callback func(sdk.Tx) bool) {
|
||||||
|
mp.mtx.Lock()
|
||||||
|
defer mp.mtx.Unlock()
|
||||||
|
|
||||||
|
iter := mp.doSelect(ctx, txs)
|
||||||
|
for iter != nil && callback(iter.Tx()) {
|
||||||
|
iter = iter.Next()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mp *PriorityNonceMempool) doSelect(_ context.Context, _ [][]byte) mempool.Iterator {
|
||||||
if mp.priorityIndex.Len() == 0 {
|
if mp.priorityIndex.Len() == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -460,6 +495,16 @@ func (mp *PriorityNonceMempool) Select(_ context.Context, _ [][]byte) mempool.It
|
|||||||
return iterator.iteratePriority()
|
return iterator.iteratePriority()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mp *PriorityNonceMempool) GetSenderUncommittedTxnCount(ctx context.Context, sender string) int {
|
||||||
|
mp.mtx.Lock()
|
||||||
|
defer mp.mtx.Unlock()
|
||||||
|
|
||||||
|
if _, exists := mp.counterBySender[sender]; exists {
|
||||||
|
return mp.counterBySender[sender]
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
type reorderKey struct {
|
type reorderKey struct {
|
||||||
deleteKey txMeta
|
deleteKey txMeta
|
||||||
insertKey txMeta
|
insertKey txMeta
|
||||||
@ -514,29 +559,34 @@ func senderWeight(senderCursor *skiplist.Element) int64 {
|
|||||||
|
|
||||||
// CountTx returns the number of transactions in the mempool.
|
// CountTx returns the number of transactions in the mempool.
|
||||||
func (mp *PriorityNonceMempool) CountTx() int {
|
func (mp *PriorityNonceMempool) CountTx() int {
|
||||||
|
mp.mtx.Lock()
|
||||||
|
defer mp.mtx.Unlock()
|
||||||
return mp.priorityIndex.Len()
|
return mp.priorityIndex.Len()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove removes a transaction from the mempool in O(log n) time, returning an
|
// Remove removes a transaction from the mempool in O(log n) time, returning an
|
||||||
// error if unsuccessful.
|
// error if unsuccessful.
|
||||||
func (mp *PriorityNonceMempool) Remove(tx sdk.Tx) error {
|
func (mp *PriorityNonceMempool) Remove(tx sdk.Tx) error {
|
||||||
|
mp.mtx.Lock()
|
||||||
|
defer mp.mtx.Unlock()
|
||||||
|
|
||||||
txInfo, err := extractTxInfo(tx)
|
txInfo, err := extractTxInfo(tx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
mp.decrSenderTxCnt(txInfo.sender, txInfo.nonce)
|
mp.decrSenderTxCnt(txInfo.Sender, txInfo.Nonce)
|
||||||
|
|
||||||
scoreKey := txMeta{nonce: txInfo.nonce, sender: txInfo.sender}
|
scoreKey := txMeta{nonce: txInfo.Nonce, sender: txInfo.Sender}
|
||||||
score, ok := mp.scores[scoreKey]
|
score, ok := mp.scores[scoreKey]
|
||||||
if !ok {
|
if !ok {
|
||||||
return mempool.ErrTxNotFound
|
return mempool.ErrTxNotFound
|
||||||
}
|
}
|
||||||
tk := txMeta{nonce: txInfo.nonce, priority: score.priority, sender: txInfo.sender, weight: score.weight}
|
tk := txMeta{nonce: txInfo.Nonce, priority: score.priority, sender: txInfo.Sender, weight: score.weight}
|
||||||
|
|
||||||
senderTxs, ok := mp.senderIndices[txInfo.sender]
|
senderTxs, ok := mp.senderIndices[txInfo.Sender]
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("sender %s not found", txInfo.sender)
|
return fmt.Errorf("sender %s not found", txInfo.Sender)
|
||||||
}
|
}
|
||||||
|
|
||||||
mp.priorityIndex.Remove(tk)
|
mp.priorityIndex.Remove(tk)
|
||||||
@ -547,7 +597,7 @@ func (mp *PriorityNonceMempool) Remove(tx sdk.Tx) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *PriorityNonceMempool) GetLowestPriority() int64 {
|
func (mp *PriorityNonceMempool) getLowestPriority() int64 {
|
||||||
if mp.priorityIndex.Len() == 0 {
|
if mp.priorityIndex.Len() == 0 {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
@ -645,12 +695,13 @@ func IsEmpty(mempool mempool.Mempool) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type txInfo struct {
|
type TxInfo struct {
|
||||||
sender string
|
Sender string
|
||||||
nonce uint64
|
Nonce uint64
|
||||||
|
Tx sdk.Tx
|
||||||
}
|
}
|
||||||
|
|
||||||
func extractTxInfo(tx sdk.Tx) (*txInfo, error) {
|
func extractTxInfo(tx sdk.Tx) (*TxInfo, error) {
|
||||||
var sender string
|
var sender string
|
||||||
var nonce uint64
|
var nonce uint64
|
||||||
|
|
||||||
@ -682,5 +733,5 @@ func extractTxInfo(tx sdk.Tx) (*txInfo, error) {
|
|||||||
nonce = sig.Sequence
|
nonce = sig.Sequence
|
||||||
}
|
}
|
||||||
|
|
||||||
return &txInfo{sender: sender, nonce: nonce}, nil
|
return &TxInfo{Sender: sender, Nonce: nonce, Tx: tx}, nil
|
||||||
}
|
}
|
||||||
|
@ -38,6 +38,8 @@ const (
|
|||||||
flagSkipLoadLatest = "skip-load-latest"
|
flagSkipLoadLatest = "skip-load-latest"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var accountNonceOp app.AccountNonceOp
|
||||||
|
|
||||||
// appCreator holds functions used by the sdk server to control the 0g-chain app.
|
// appCreator holds functions used by the sdk server to control the 0g-chain app.
|
||||||
// The methods implement types in cosmos-sdk/server/types
|
// The methods implement types in cosmos-sdk/server/types
|
||||||
type appCreator struct {
|
type appCreator struct {
|
||||||
@ -130,8 +132,19 @@ func (ac appCreator) newApp(
|
|||||||
|
|
||||||
mempool := app.NewPriorityMempool(
|
mempool := app.NewPriorityMempool(
|
||||||
app.PriorityNonceWithMaxTx(fixMempoolSize(appOpts)),
|
app.PriorityNonceWithMaxTx(fixMempoolSize(appOpts)),
|
||||||
app.PriorityNonceWithTxReplacedCallback(func(ctx context.Context, oldTx, newTx sdk.Tx) {
|
app.PriorityNonceWithTxReplacedCallback(func(ctx context.Context, oldTx, newTx *app.TxInfo) {
|
||||||
bApp.RegisterMempoolTxReplacedEvent(ctx, oldTx, newTx)
|
if oldTx.Sender != newTx.Sender {
|
||||||
|
sdkContext := sdk.UnwrapSDKContext(ctx)
|
||||||
|
if accountNonceOp != nil {
|
||||||
|
nonce := accountNonceOp.GetAccountNonce(sdkContext, oldTx.Sender)
|
||||||
|
accountNonceOp.SetAccountNonce(sdkContext, oldTx.Sender, nonce-1)
|
||||||
|
sdkContext.Logger().Debug("rewind the nonce of the account", "account", oldTx.Sender, "from", nonce, "to", nonce-1)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
sdkContext := sdk.UnwrapSDKContext(ctx)
|
||||||
|
sdkContext.Logger().Info("tx replace", "account", oldTx.Sender, "nonce", oldTx.Nonce)
|
||||||
|
}
|
||||||
|
bApp.RegisterMempoolTxReplacedEvent(ctx, oldTx.Tx, newTx.Tx)
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
bApp.SetMempool(mempool)
|
bApp.SetMempool(mempool)
|
||||||
@ -155,6 +168,8 @@ func (ac appCreator) newApp(
|
|||||||
bApp,
|
bApp,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
accountNonceOp = app.NewAccountNonceOp(newApp)
|
||||||
|
|
||||||
return newApp
|
return newApp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user