Compare commits

...

27 Commits

Author SHA1 Message Date
crStiv
e3d89d55c1
Merge 7fbf83bacb into 80208ed9b7 2025-03-27 14:16:51 +08:00
Solovyov1796
80208ed9b7
Merge pull request #122 from 0glabs/testnet/v0.5.x
merge Testnet/v0.5.x to dev
2025-03-27 14:15:26 +08:00
Solovyov1796
96926b4cbf
Merge pull request #127 from 0glabs/mempool_priority
support txn replacement when mempool is full
2025-03-27 14:07:51 +08:00
Solovyov1796
da2e0feffb bump dependencies' version 2025-03-27 14:05:05 +08:00
Solovyov1796
2c436a7d45 allow to replace sender's only txn in mempool 2025-03-21 01:55:18 +08:00
Solovyov1796
4c48f7ea63 handle error if account is meeting for the first time. 2025-03-21 01:36:41 +08:00
Solovyov1796
2cc584be0b add new interface to query the uncommitted txn count for sender 2025-03-16 22:28:19 +08:00
Solovyov1796
d8e968146b change limit of txn count in mempool for each sender from 16 to 48 2025-03-16 22:27:34 +08:00
Solovyov1796
962943d32b support account nonce rewind 2025-03-12 21:13:23 +08:00
Solovyov1796
b42c82d59c add lock and support nonce rewind 2025-03-12 21:09:37 +08:00
Solovyov1796
08000544c9 add account nonce operator to rewind account nonce if tx removed by full of mempool 2025-03-12 21:07:58 +08:00
Solovyov1796
248db0f47c update priority nonce mempool using 2025-03-12 21:07:14 +08:00
Solovyov1796
0b027e10ed update 2025-03-11 09:56:10 +08:00
Solovyov1796
e41c65c92d update error message 2025-03-11 02:14:51 +08:00
Solovyov1796
c70b0a1c2b fix insert issue 2025-03-11 01:50:16 +08:00
Solovyov1796
9b171dbd4c fix issue about missing mempool config 2025-03-11 01:49:46 +08:00
Solovyov1796
4ff1ab24d1 improve app mempool 2025-03-11 00:03:17 +08:00
Solovyov1796
384d899eff add mempool tx replace callback 2025-03-10 20:34:02 +08:00
Solovyov1796
c2fdb3109e add tx info extracter 2025-03-10 20:32:07 +08:00
Solovyov1796
45508f5954 set max size for priority nonce mempool 2025-03-10 19:54:39 +08:00
Solovyov1796
72e8508651 bump ethermint 2025-03-10 18:30:12 +08:00
Solovyov1796
c066af2a47 fix txs limit in mempool 2025-03-10 17:33:31 +08:00
Solovyov1796
d1b83b5ac8 bump ethermint 2025-03-10 15:32:42 +08:00
Solovyov1796
dc888ceb78
Merge pull request #119 from 0glabs/add_mempool_limit
add mempool limit
2025-03-04 17:53:33 +08:00
Solovyov1796
ea3e4b84e8 add mempool limit 2025-03-04 15:13:54 +08:00
Solovyov1796
553d111f40
Merge pull request #111 from 0glabs/dev
merge dev to testnet/v0.5.x
2025-02-18 18:01:25 +08:00
crStiv
7fbf83bacb
Update README.md 2025-01-12 02:58:41 +01:00
7 changed files with 588 additions and 148 deletions

View File

@ -25,11 +25,58 @@ Continue reading [here](https://docs.0g.ai/intro) if you want to learn more abou
- If you want to run a validator node, DA node, or storage node, please refer to the [Run a Node Documentation](https://docs.0g.ai/run-a-node/overview).
## Getting Started
To get started with 0G Chain, you'll need to:
1. Read our [documentation](https://docs.0g.ai/intro)
2. Choose which component you want to work with (DA, Storage, Inference Serving, or Network)
3. Follow the setup instructions in the relevant documentation section
## Contributing
We welcome contributions from the community! Here's how you can contribute:
1. Fork the repository
2. Create your feature branch (`git checkout -b feature/AmazingFeature`)
3. Commit your changes (`git commit -m 'Add some AmazingFeature'`)
4. Push to the branch (`git push origin feature/AmazingFeature`)
5. Open a Pull Request
Please make sure to:
- Follow our coding standards
- Write clear commit messages
- Update documentation as needed
- Add tests for new features
## Technical Overview
0G Chain consists of several key components:
### Data Availability (DA) Layer
- Ultra high-performance with KZG commitments
- Quorum-based Data Availability Sampling
- Optimized for AI workloads
### Decentralized Storage
- Erasure coding for data reliability
- Efficient replication mechanisms
- Optimized for large-scale AI models
### Inference Serving
- Flexible framework for AI model deployment
- Support for model fine-tuning
- Distributed inference capabilities
### Network Layer
- High-performance P2P communication
- Low-latency data transfer
- Decentralized architecture
## Support and Additional Resources
We want to do everything we can to help you be successful while working on your contribution and projects. Here you'll find various resources and communities that may help you complete a project or contribute to 0G.
We're here to help you succeed in contributing to and building with 0G Chain:
### Official Links
- [Official Website](https://0g.ai)
- [Technical Documentation](https://docs.0g.ai)
- [GitHub Repository](https://github.com/0glabs)
### Communities
- [0G Telegram](https://t.me/web3_0glabs)
- [0G Discord](https://discord.com/invite/0glabs)
- [0G Discord](https://discord.com/invite/0glabs)

View File

@ -1,9 +1,9 @@
package app
import (
"errors"
"fmt"
"github.com/cockroachdb/errors"
abci "github.com/cometbft/cometbft/abci/types"
gethtypes "github.com/ethereum/go-ethereum/core/types"
evmtypes "github.com/evmos/ethermint/x/evm/types"
@ -97,11 +97,12 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan
return abci.ResponsePrepareProposal{Txs: h.txSelector.SelectedTxs()}
}
iterator := h.mempool.Select(ctx, req.Txs)
selectedTxsSignersSeqs := make(map[string]uint64)
var selectedTxsNums int
for iterator != nil {
memTx := iterator.Tx()
var waitRemoveTxs []sdk.Tx
mempool.SelectBy(ctx, h.mempool, req.Txs, func(memTx sdk.Tx) bool {
sigs, err := memTx.(signing.SigVerifiableTx).GetSignaturesV2()
if err != nil {
panic(fmt.Errorf("failed to get signatures: %w", err))
@ -157,47 +158,49 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan
}
}
if !shouldAdd {
iterator = iterator.Next()
continue
}
// NOTE: Since transaction verification was already executed in CheckTx,
// which calls mempool.Insert, in theory everything in the pool should be
// valid. But some mempool implementations may insert invalid txs, so we
// check again.
txBz, err := h.txVerifier.PrepareProposalVerifyTx(memTx)
if err != nil {
err := h.mempool.Remove(memTx)
if err != nil && !errors.Is(err, mempool.ErrTxNotFound) {
panic(err)
}
} else {
stop := h.txSelector.SelectTxForProposal(uint64(req.MaxTxBytes), maxBlockGas, memTx, txBz)
if stop {
break
}
txsLen := len(h.txSelector.SelectedTxs())
for sender, seq := range txSignersSeqs {
// If txsLen != selectedTxsNums is true, it means that we've
// added a new tx to the selected txs, so we need to update
// the sequence of the sender.
if txsLen != selectedTxsNums {
selectedTxsSignersSeqs[sender] = seq
} else if _, ok := selectedTxsSignersSeqs[sender]; !ok {
// The transaction hasn't been added but it passed the
// verification, so we know that the sequence is correct.
// So we set this sender's sequence to seq-1, in order
// to avoid unnecessary calls to PrepareProposalVerifyTx.
selectedTxsSignersSeqs[sender] = seq - 1
if shouldAdd {
// NOTE: Since transaction verification was already executed in CheckTx,
// which calls mempool.Insert, in theory everything in the pool should be
// valid. But some mempool implementations may insert invalid txs, so we
// check again.
txBz, err := h.txVerifier.PrepareProposalVerifyTx(memTx)
if err != nil {
waitRemoveTxs = append(waitRemoveTxs, memTx)
} else {
stop := h.txSelector.SelectTxForProposal(uint64(req.MaxTxBytes), maxBlockGas, memTx, txBz)
if stop {
return false
}
txsLen := len(h.txSelector.SelectedTxs())
for sender, seq := range txSignersSeqs {
// If txsLen != selectedTxsNums is true, it means that we've
// added a new tx to the selected txs, so we need to update
// the sequence of the sender.
if txsLen != selectedTxsNums {
selectedTxsSignersSeqs[sender] = seq
} else if _, ok := selectedTxsSignersSeqs[sender]; !ok {
// The transaction hasn't been added but it passed the
// verification, so we know that the sequence is correct.
// So we set this sender's sequence to seq-1, in order
// to avoid unnecessary calls to PrepareProposalVerifyTx.
selectedTxsSignersSeqs[sender] = seq - 1
}
}
selectedTxsNums = txsLen
}
selectedTxsNums = txsLen
}
iterator = iterator.Next()
return true
})
for i := range waitRemoveTxs {
err := h.mempool.Remove(waitRemoveTxs[i])
if err != nil && !errors.Is(err, mempool.ErrTxNotFound) {
panic(err)
}
}
return abci.ResponsePrepareProposal{Txs: h.txSelector.SelectedTxs()}
}
}

View File

@ -1068,3 +1068,47 @@ func GetMaccPerms() map[string][]string {
}
return perms
}
type accountNonceOp struct {
ak evmtypes.AccountKeeper
}
type AccountNonceOp interface {
GetAccountNonce(ctx sdk.Context, address string) uint64
SetAccountNonce(ctx sdk.Context, address string, nonce uint64)
}
func NewAccountNonceOp(app *App) AccountNonceOp {
return &accountNonceOp{
ak: app.accountKeeper,
}
}
func (ano *accountNonceOp) GetAccountNonce(ctx sdk.Context, address string) uint64 {
bzAcc, err := sdk.AccAddressFromBech32(address)
if err != nil {
ctx.Logger().Error("GetAccountNonce: failed to parse address", "address", address, "error", err)
return 0
}
acc := ano.ak.GetAccount(ctx, bzAcc)
if acc == nil {
ctx.Logger().Error("GetAccountNonce: account not found", "address", address)
return 0
}
return acc.GetSequence()
}
func (ano *accountNonceOp) SetAccountNonce(ctx sdk.Context, address string, nonce uint64) {
bzAcc, err := sdk.AccAddressFromBech32(address)
if err != nil {
ctx.Logger().Error("SetAccountNonce: failed to parse address", "address", address, "nonce", nonce, "error", err)
return
}
acc := ano.ak.GetAccount(ctx, bzAcc)
if acc != nil {
acc.SetSequence(nonce)
ano.ak.SetAccount(ctx, acc)
} else {
ctx.Logger().Error("SetAccountNonce: account not found", "address", address)
}
}

View File

@ -2,10 +2,13 @@ package app
import (
"context"
"sync"
"fmt"
"math"
"github.com/huandu/skiplist"
"github.com/pkg/errors"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/mempool"
@ -14,9 +17,16 @@ import (
evmtypes "github.com/evmos/ethermint/x/evm/types"
)
const MAX_TXS_PRE_SENDER_IN_MEMPOOL int = 48
var (
_ mempool.Mempool = (*PriorityNonceMempool)(nil)
_ mempool.Iterator = (*PriorityNonceIterator)(nil)
errMempoolTxGasPriceTooLow = errors.New("gas price is too low")
errMempoolTooManyTxs = errors.New("tx sender has too many txs in mempool")
errMempoolIsFull = errors.New("mempool is full")
errTxInMempool = errors.New("tx already in mempool")
)
// PriorityNonceMempool is a mempool implementation that stores txs
@ -27,6 +37,7 @@ var (
// priority to other sender txs and must be partially ordered by both sender-nonce
// and priority.
type PriorityNonceMempool struct {
mtx sync.Mutex
priorityIndex *skiplist.SkipList
priorityCounts map[int64]int
senderIndices map[string]*skiplist.SkipList
@ -34,6 +45,12 @@ type PriorityNonceMempool struct {
onRead func(tx sdk.Tx)
txReplacement func(op, np int64, oTx, nTx sdk.Tx) bool
maxTx int
senderTxCntLock sync.RWMutex
counterBySender map[string]int
txRecord map[txMeta]struct{}
txReplacedCallback func(ctx context.Context, oldTx, newTx *TxInfo)
}
type PriorityNonceIterator struct {
@ -120,6 +137,12 @@ func PriorityNonceWithMaxTx(maxTx int) PriorityNonceMempoolOption {
}
}
func PriorityNonceWithTxReplacedCallback(cb func(ctx context.Context, oldTx, newTx *TxInfo)) PriorityNonceMempoolOption {
return func(mp *PriorityNonceMempool) {
mp.txReplacedCallback = cb
}
}
// DefaultPriorityMempool returns a priorityNonceMempool with no options.
func DefaultPriorityMempool() mempool.Mempool {
return NewPriorityMempool()
@ -129,10 +152,12 @@ func DefaultPriorityMempool() mempool.Mempool {
// returns txs in a partial order by 2 dimensions; priority, and sender-nonce.
func NewPriorityMempool(opts ...PriorityNonceMempoolOption) *PriorityNonceMempool {
mp := &PriorityNonceMempool{
priorityIndex: skiplist.New(skiplist.LessThanFunc(txMetaLess)),
priorityCounts: make(map[int64]int),
senderIndices: make(map[string]*skiplist.SkipList),
scores: make(map[txMeta]txMeta),
priorityIndex: skiplist.New(skiplist.LessThanFunc(txMetaLess)),
priorityCounts: make(map[int64]int),
senderIndices: make(map[string]*skiplist.SkipList),
scores: make(map[txMeta]txMeta),
counterBySender: make(map[string]int),
txRecord: make(map[txMeta]struct{}),
}
for _, opt := range opts {
@ -165,57 +190,41 @@ func (mp *PriorityNonceMempool) NextSenderTx(sender string) sdk.Tx {
// Inserting a duplicate tx with a different priority overwrites the existing tx,
// changing the total order of the mempool.
func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
if mp.maxTx > 0 && mp.CountTx() >= mp.maxTx {
return mempool.ErrMempoolTxMaxCapacity
} else if mp.maxTx < 0 {
mp.mtx.Lock()
defer mp.mtx.Unlock()
// if mp.maxTx > 0 && mp.CountTx() >= mp.maxTx {
// return mempool.ErrMempoolTxMaxCapacity
// } else
if mp.maxTx < 0 {
return nil
}
sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
if err != nil {
return err
}
sdkContext := sdk.UnwrapSDKContext(ctx)
priority := sdkContext.Priority()
var sender string
var nonce uint64
if len(sigs) == 0 {
msgs := tx.GetMsgs()
if len(msgs) != 1 {
return fmt.Errorf("tx must have at least one signer")
}
msgEthTx, ok := msgs[0].(*evmtypes.MsgEthereumTx)
if !ok {
return fmt.Errorf("tx must have at least one signer")
}
ethTx := msgEthTx.AsTransaction()
signer := gethtypes.NewEIP2930Signer(ethTx.ChainId())
ethSender, err := signer.Sender(ethTx)
if err != nil {
return fmt.Errorf("tx must have at least one signer")
}
sender = sdk.AccAddress(ethSender.Bytes()).String()
nonce = ethTx.Nonce()
} else {
sig := sigs[0]
sender = sdk.AccAddress(sig.PubKey.Address()).String()
nonce = sig.Sequence
txInfo, err := extractTxInfo(tx)
if err != nil {
return err
}
key := txMeta{nonce: nonce, priority: priority, sender: sender}
if !mp.canInsert(txInfo.Sender) {
return errors.Wrapf(errMempoolTooManyTxs, "[%d@%s]sender has too many txs in mempool", txInfo.Nonce, txInfo.Sender)
}
senderIndex, ok := mp.senderIndices[sender]
// init sender index if not exists
senderIndex, ok := mp.senderIndices[txInfo.Sender]
if !ok {
senderIndex = skiplist.New(skiplist.LessThanFunc(func(a, b any) int {
return skiplist.Uint64.Compare(b.(txMeta).nonce, a.(txMeta).nonce)
}))
// initialize sender index if not found
mp.senderIndices[sender] = senderIndex
mp.senderIndices[txInfo.Sender] = senderIndex
}
newKey := txMeta{nonce: txInfo.Nonce, priority: priority, sender: txInfo.Sender}
// Since mp.priorityIndex is scored by priority, then sender, then nonce, a
// changed priority will create a new key, so we must remove the old key and
// re-insert it to avoid having the same tx with different priorityIndex indexed
@ -223,35 +232,155 @@ func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error {
//
// This O(log n) remove operation is rare and only happens when a tx's priority
// changes.
sk := txMeta{nonce: nonce, sender: sender}
if oldScore, txExists := mp.scores[sk]; txExists {
if mp.txReplacement != nil && !mp.txReplacement(oldScore.priority, priority, senderIndex.Get(key).Value.(sdk.Tx), tx) {
return fmt.Errorf(
"tx doesn't fit the replacement rule, oldPriority: %v, newPriority: %v, oldTx: %v, newTx: %v",
oldScore.priority,
priority,
senderIndex.Get(key).Value.(sdk.Tx),
tx,
)
}
mp.priorityIndex.Remove(txMeta{
nonce: nonce,
sender: sender,
priority: oldScore.priority,
weight: oldScore.weight,
})
mp.priorityCounts[oldScore.priority]--
sk := txMeta{nonce: txInfo.Nonce, sender: txInfo.Sender}
if oldScore, txExists := mp.scores[sk]; txExists {
if oldScore.priority < priority {
oldTx := senderIndex.Get(newKey).Value.(sdk.Tx)
return mp.doTxReplace(ctx, newKey, oldScore, oldTx, tx)
}
return errors.Wrapf(errTxInMempool, "[%d@%s] tx already in mempool", txInfo.Nonce, txInfo.Sender)
} else {
mempoolSize := mp.priorityIndex.Len()
if mempoolSize >= mp.maxTx {
lowestPriority := mp.getLowestPriority()
// find one to replace
if lowestPriority > 0 && priority <= lowestPriority {
return errors.Wrapf(errMempoolTxGasPriceTooLow, "[%d@%s]tx with priority %d is too low, current lowest priority is %d", newKey.nonce, newKey.sender, priority, lowestPriority)
}
var maxIndexSize int
var lowerPriority int64 = math.MaxInt64
var selectedElement *skiplist.Element
for sender, index := range mp.senderIndices {
indexSize := index.Len()
if sender == txInfo.Sender {
continue
}
if indexSize > 0 {
tail := index.Back()
if tail != nil {
tailKey := tail.Key().(txMeta)
if tailKey.priority < lowerPriority {
lowerPriority = tailKey.priority
maxIndexSize = indexSize
selectedElement = tail
} else if tailKey.priority == lowerPriority {
if indexSize > maxIndexSize {
maxIndexSize = indexSize
selectedElement = tail
}
}
}
}
}
if selectedElement != nil {
key := selectedElement.Key().(txMeta)
replacedTx, _ := mp.doRemove(key, true)
// insert new tx
mp.doInsert(newKey, tx, true)
if mp.txReplacedCallback != nil && replacedTx != nil {
sdkContext.Logger().Debug("txn replaced caused by full of mempool", "old", fmt.Sprintf("%d@%s", key.nonce, key.sender), "new", fmt.Sprintf("%d@%s", newKey.nonce, newKey.sender), "mempoolSize", mempoolSize)
mp.txReplacedCallback(ctx,
&TxInfo{Sender: key.sender, Nonce: key.nonce, Tx: replacedTx},
&TxInfo{Sender: newKey.sender, Nonce: newKey.nonce, Tx: tx},
)
}
} else {
return errors.Wrapf(errMempoolIsFull, "%d@%s with priority%d", newKey.nonce, newKey.sender, newKey.priority)
}
} else {
mp.doInsert(newKey, tx, true)
}
return nil
}
}
func (mp *PriorityNonceMempool) doInsert(newKey txMeta, tx sdk.Tx, incrCnt bool) {
senderIndex, ok := mp.senderIndices[newKey.sender]
if !ok {
senderIndex = skiplist.New(skiplist.LessThanFunc(func(a, b any) int {
return skiplist.Uint64.Compare(b.(txMeta).nonce, a.(txMeta).nonce)
}))
// initialize sender index if not found
mp.senderIndices[newKey.sender] = senderIndex
}
mp.priorityCounts[priority]++
mp.priorityCounts[newKey.priority]++
newKey.senderElement = senderIndex.Set(newKey, tx)
// Since senderIndex is scored by nonce, a changed priority will overwrite the
// existing key.
key.senderElement = senderIndex.Set(key, tx)
mp.scores[txMeta{nonce: newKey.nonce, sender: newKey.sender}] = txMeta{priority: newKey.priority}
mp.priorityIndex.Set(newKey, tx)
mp.scores[sk] = txMeta{priority: priority}
mp.priorityIndex.Set(key, tx)
if incrCnt {
mp.incrSenderTxCnt(newKey.sender, newKey.nonce)
}
}
func (mp *PriorityNonceMempool) doRemove(oldKey txMeta, decrCnt bool) (sdk.Tx, error) {
scoreKey := txMeta{nonce: oldKey.nonce, sender: oldKey.sender}
score, ok := mp.scores[scoreKey]
if !ok {
return nil, errors.Wrapf(mempool.ErrTxNotFound, "%d@%s not found", oldKey.nonce, oldKey.sender)
}
tk := txMeta{nonce: oldKey.nonce, priority: score.priority, sender: oldKey.sender, weight: score.weight}
senderTxs, ok := mp.senderIndices[oldKey.sender]
if !ok {
return nil, fmt.Errorf("%d@%s not found", oldKey.nonce, oldKey.sender)
}
mp.priorityIndex.Remove(tk)
removedElem := senderTxs.Remove(tk)
delete(mp.scores, scoreKey)
mp.priorityCounts[score.priority]--
if decrCnt {
mp.decrSenderTxCnt(oldKey.sender, oldKey.nonce)
}
if removedElem == nil {
return nil, mempool.ErrTxNotFound
}
return removedElem.Value.(sdk.Tx), nil
}
func (mp *PriorityNonceMempool) doTxReplace(ctx context.Context, newMate, oldMate txMeta, oldTx, newTx sdk.Tx) error {
if mp.txReplacement != nil && !mp.txReplacement(oldMate.priority, newMate.priority, oldTx, newTx) {
return fmt.Errorf(
"tx doesn't fit the replacement rule, oldPriority: %v, newPriority: %v, oldTx: %v, newTx: %v",
oldMate.priority,
newMate.priority,
oldTx,
newTx,
)
}
e := mp.priorityIndex.Remove(txMeta{
nonce: newMate.nonce,
sender: newMate.sender,
priority: oldMate.priority,
weight: oldMate.weight,
})
replacedTx := e.Value.(sdk.Tx)
mp.priorityCounts[oldMate.priority]--
mp.doInsert(newMate, newTx, false)
if mp.txReplacedCallback != nil && replacedTx != nil {
sdkContext := sdk.UnwrapSDKContext(ctx)
sdkContext.Logger().Debug("txn update", "txn", fmt.Sprintf("%d@%s", newMate.nonce, newMate.sender), "oldPriority", oldMate.priority, "newPriority", newMate.priority)
mp.txReplacedCallback(ctx,
&TxInfo{Sender: newMate.sender, Nonce: newMate.nonce, Tx: replacedTx},
&TxInfo{Sender: newMate.sender, Nonce: newMate.nonce, Tx: newTx},
)
}
return nil
}
@ -332,7 +461,24 @@ func (i *PriorityNonceIterator) Tx() sdk.Tx {
//
// NOTE: It is not safe to use this iterator while removing transactions from
// the underlying mempool.
func (mp *PriorityNonceMempool) Select(_ context.Context, _ [][]byte) mempool.Iterator {
func (mp *PriorityNonceMempool) Select(ctx context.Context, txs [][]byte) mempool.Iterator {
mp.mtx.Lock()
defer mp.mtx.Unlock()
return mp.doSelect(ctx, txs)
}
func (mp *PriorityNonceMempool) SelectBy(ctx context.Context, txs [][]byte, callback func(sdk.Tx) bool) {
mp.mtx.Lock()
defer mp.mtx.Unlock()
iter := mp.doSelect(ctx, txs)
for iter != nil && callback(iter.Tx()) {
iter = iter.Next()
}
}
func (mp *PriorityNonceMempool) doSelect(_ context.Context, _ [][]byte) mempool.Iterator {
if mp.priorityIndex.Len() == 0 {
return nil
}
@ -347,6 +493,16 @@ func (mp *PriorityNonceMempool) Select(_ context.Context, _ [][]byte) mempool.It
return iterator.iteratePriority()
}
func (mp *PriorityNonceMempool) GetSenderUncommittedTxnCount(ctx context.Context, sender string) int {
mp.mtx.Lock()
defer mp.mtx.Unlock()
if _, exists := mp.counterBySender[sender]; exists {
return mp.counterBySender[sender]
}
return 0
}
type reorderKey struct {
deleteKey txMeta
insertKey txMeta
@ -401,51 +557,34 @@ func senderWeight(senderCursor *skiplist.Element) int64 {
// CountTx returns the number of transactions in the mempool.
func (mp *PriorityNonceMempool) CountTx() int {
mp.mtx.Lock()
defer mp.mtx.Unlock()
return mp.priorityIndex.Len()
}
// Remove removes a transaction from the mempool in O(log n) time, returning an
// error if unsuccessful.
func (mp *PriorityNonceMempool) Remove(tx sdk.Tx) error {
sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
mp.mtx.Lock()
defer mp.mtx.Unlock()
txInfo, err := extractTxInfo(tx)
if err != nil {
return err
}
var sender string
var nonce uint64
if len(sigs) == 0 {
msgs := tx.GetMsgs()
if len(msgs) != 1 {
return fmt.Errorf("attempted to remove a tx with no signatures")
}
msgEthTx, ok := msgs[0].(*evmtypes.MsgEthereumTx)
if !ok {
return fmt.Errorf("attempted to remove a tx with no signatures")
}
ethTx := msgEthTx.AsTransaction()
signer := gethtypes.NewEIP2930Signer(ethTx.ChainId())
ethSender, err := signer.Sender(ethTx)
if err != nil {
return fmt.Errorf("attempted to remove a tx with no signatures")
}
sender = sdk.AccAddress(ethSender.Bytes()).String()
nonce = ethTx.Nonce()
} else {
sig := sigs[0]
sender = sdk.AccAddress(sig.PubKey.Address()).String()
nonce = sig.Sequence
}
scoreKey := txMeta{nonce: nonce, sender: sender}
mp.decrSenderTxCnt(txInfo.Sender, txInfo.Nonce)
scoreKey := txMeta{nonce: txInfo.Nonce, sender: txInfo.Sender}
score, ok := mp.scores[scoreKey]
if !ok {
return mempool.ErrTxNotFound
}
tk := txMeta{nonce: nonce, priority: score.priority, sender: sender, weight: score.weight}
tk := txMeta{nonce: txInfo.Nonce, priority: score.priority, sender: txInfo.Sender, weight: score.weight}
senderTxs, ok := mp.senderIndices[sender]
senderTxs, ok := mp.senderIndices[txInfo.Sender]
if !ok {
return fmt.Errorf("sender %s not found", sender)
return fmt.Errorf("sender %s not found", txInfo.Sender)
}
mp.priorityIndex.Remove(tk)
@ -456,6 +595,73 @@ func (mp *PriorityNonceMempool) Remove(tx sdk.Tx) error {
return nil
}
func (mp *PriorityNonceMempool) getLowestPriority() int64 {
if mp.priorityIndex.Len() == 0 {
return 0
}
min := int64(math.MaxInt64)
for priority, count := range mp.priorityCounts {
if count > 0 {
if priority < min {
min = priority
}
}
}
return min
}
func (mp *PriorityNonceMempool) canInsert(sender string) bool {
mp.senderTxCntLock.RLock()
defer mp.senderTxCntLock.RUnlock()
if _, exists := mp.counterBySender[sender]; exists {
return mp.counterBySender[sender] < MAX_TXS_PRE_SENDER_IN_MEMPOOL
}
return true
}
func (mp *PriorityNonceMempool) incrSenderTxCnt(sender string, nonce uint64) error {
mp.senderTxCntLock.Lock()
defer mp.senderTxCntLock.Unlock()
existsKey := txMeta{nonce: nonce, sender: sender}
if _, exists := mp.txRecord[existsKey]; !exists {
mp.txRecord[existsKey] = struct{}{}
if _, exists := mp.counterBySender[sender]; !exists {
mp.counterBySender[sender] = 1
} else {
if mp.counterBySender[sender] < MAX_TXS_PRE_SENDER_IN_MEMPOOL {
mp.counterBySender[sender] += 1
} else {
return fmt.Errorf("tx sender has too many txs in mempool")
}
}
}
return nil
}
func (mp *PriorityNonceMempool) decrSenderTxCnt(sender string, nonce uint64) {
mp.senderTxCntLock.Lock()
defer mp.senderTxCntLock.Unlock()
existsKey := txMeta{nonce: nonce, sender: sender}
if _, exists := mp.txRecord[existsKey]; exists {
delete(mp.txRecord, existsKey)
if _, exists := mp.counterBySender[sender]; exists {
if mp.counterBySender[sender] > 1 {
mp.counterBySender[sender] -= 1
} else {
delete(mp.counterBySender, sender)
}
}
}
}
func IsEmpty(mempool mempool.Mempool) error {
mp := mempool.(*PriorityNonceMempool)
if mp.priorityIndex.Len() != 0 {
@ -486,3 +692,44 @@ func IsEmpty(mempool mempool.Mempool) error {
return nil
}
type TxInfo struct {
Sender string
Nonce uint64
Tx sdk.Tx
}
func extractTxInfo(tx sdk.Tx) (*TxInfo, error) {
var sender string
var nonce uint64
sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
if err != nil {
return nil, err
}
if len(sigs) == 0 {
msgs := tx.GetMsgs()
if len(msgs) != 1 {
return nil, fmt.Errorf("tx must have at least one signer")
}
msgEthTx, ok := msgs[0].(*evmtypes.MsgEthereumTx)
if !ok {
return nil, fmt.Errorf("tx must have at least one signer")
}
ethTx := msgEthTx.AsTransaction()
signer := gethtypes.NewEIP2930Signer(ethTx.ChainId())
ethSender, err := signer.Sender(ethTx)
if err != nil {
return nil, fmt.Errorf("tx must have at least one signer")
}
sender = sdk.AccAddress(ethSender.Bytes()).String()
nonce = ethTx.Nonce()
} else {
sig := sigs[0]
sender = sdk.AccAddress(sig.PubKey.Address()).String()
nonce = sig.Sequence
}
return &TxInfo{Sender: sender, Nonce: nonce, Tx: tx}, nil
}

View File

@ -1,6 +1,7 @@
package main
import (
"context"
"errors"
"fmt"
"io"
@ -19,6 +20,7 @@ import (
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
"github.com/cosmos/cosmos-sdk/store"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/x/auth/signing"
"github.com/cosmos/cosmos-sdk/x/crisis"
ethermintflags "github.com/evmos/ethermint/server/flags"
"github.com/spf13/cast"
@ -26,6 +28,8 @@ import (
"github.com/0glabs/0g-chain/app"
"github.com/0glabs/0g-chain/app/params"
gethtypes "github.com/ethereum/go-ethereum/core/types"
evmtypes "github.com/evmos/ethermint/x/evm/types"
)
const (
@ -34,6 +38,8 @@ const (
flagSkipLoadLatest = "skip-load-latest"
)
var accountNonceOp app.AccountNonceOp
// appCreator holds functions used by the sdk server to control the 0g-chain app.
// The methods implement types in cosmos-sdk/server/types
type appCreator struct {
@ -107,8 +113,6 @@ func (ac appCreator) newApp(
skipLoadLatest = cast.ToBool(appOpts.Get(flagSkipLoadLatest))
}
mempool := app.NewPriorityMempool()
bApp := app.NewBaseApp(logger, db, ac.encodingConfig,
baseapp.SetPruning(pruningOpts),
baseapp.SetMinGasPrices(strings.Replace(cast.ToString(appOpts.Get(server.FlagMinGasPrices)), ";", ",", -1)),
@ -123,8 +127,32 @@ func (ac appCreator) newApp(
baseapp.SetIAVLDisableFastNode(cast.ToBool(iavlDisableFastNode)),
baseapp.SetIAVLLazyLoading(cast.ToBool(appOpts.Get(server.FlagIAVLLazyLoading))),
baseapp.SetChainID(chainID),
baseapp.SetMempool(mempool),
baseapp.SetTxInfoExtracter(extractTxInfo),
)
mempool := app.NewPriorityMempool(
app.PriorityNonceWithMaxTx(fixMempoolSize(appOpts)),
app.PriorityNonceWithTxReplacedCallback(func(ctx context.Context, oldTx, newTx *app.TxInfo) {
if oldTx.Sender != newTx.Sender {
sdkContext := sdk.UnwrapSDKContext(ctx)
if accountNonceOp != nil {
nonce := accountNonceOp.GetAccountNonce(sdkContext, oldTx.Sender)
if nonce > 0 {
accountNonceOp.SetAccountNonce(sdkContext, oldTx.Sender, nonce-1)
sdkContext.Logger().Debug("rewind the nonce of the account", "account", oldTx.Sender, "from", nonce, "to", nonce-1)
} else {
sdkContext.Logger().Info("First meeting account", "account", oldTx.Sender)
}
}
} else {
sdkContext := sdk.UnwrapSDKContext(ctx)
sdkContext.Logger().Info("tx replace", "account", oldTx.Sender, "nonce", oldTx.Nonce)
}
bApp.RegisterMempoolTxReplacedEvent(ctx, oldTx.Tx, newTx.Tx)
}),
)
bApp.SetMempool(mempool)
bApp.SetTxEncoder(ac.encodingConfig.TxConfig.TxEncoder())
abciProposalHandler := app.NewDefaultProposalHandler(mempool, bApp)
bApp.SetPrepareProposal(abciProposalHandler.PrepareProposalHandler())
@ -144,6 +172,8 @@ func (ac appCreator) newApp(
bApp,
)
accountNonceOp = app.NewAccountNonceOp(newApp)
return newApp
}
@ -199,3 +229,72 @@ func accAddressesFromBech32(addresses ...string) ([]sdk.AccAddress, error) {
}
return decodedAddresses, nil
}
var ErrMustHaveSigner error = errors.New("tx must have at least one signer")
func extractTxInfo(ctx sdk.Context, tx sdk.Tx) (*sdk.TxInfo, error) {
sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2()
if err != nil {
return nil, err
}
var sender string
var nonce uint64
var gasPrice uint64
var gasLimit uint64
var txType int32
if len(sigs) == 0 {
txType = 1
msgs := tx.GetMsgs()
if len(msgs) != 1 {
return nil, ErrMustHaveSigner
}
msgEthTx, ok := msgs[0].(*evmtypes.MsgEthereumTx)
if !ok {
return nil, ErrMustHaveSigner
}
ethTx := msgEthTx.AsTransaction()
signer := gethtypes.NewEIP2930Signer(ethTx.ChainId())
ethSender, err := signer.Sender(ethTx)
if err != nil {
return nil, ErrMustHaveSigner
}
sender = sdk.AccAddress(ethSender.Bytes()).String()
nonce = ethTx.Nonce()
gasPrice = ethTx.GasPrice().Uint64()
gasLimit = ethTx.Gas()
} else {
sig := sigs[0]
sender = sdk.AccAddress(sig.PubKey.Address()).String()
nonce = sig.Sequence
}
return &sdk.TxInfo{
SignerAddress: sender,
Nonce: nonce,
GasLimit: gasLimit,
GasPrice: gasPrice,
TxType: txType,
}, nil
}
func fixMempoolSize(appOpts servertypes.AppOptions) int {
val1 := appOpts.Get("mempool.size")
val2 := appOpts.Get(server.FlagMempoolMaxTxs)
if val1 != nil && val2 != nil {
size1 := cast.ToInt(val1)
size2 := cast.ToInt(val2)
if size1 != size2 {
panic("the value of mempool.size and mempool.max-txs are different")
}
return size1
} else if val1 == nil && val2 == nil {
panic("not found mempool size in config")
} else if val1 == nil {
return cast.ToInt(val2)
} else { //if val2 == nil {
return cast.ToInt(val1)
}
}

10
go.mod
View File

@ -9,7 +9,6 @@ require (
cosmossdk.io/simapp v0.0.0-20231127212628-044ff4d8c015
github.com/Kava-Labs/opendb v0.0.0-20240719173129-a2f11f6d7e51
github.com/cenkalti/backoff/v4 v4.1.3
github.com/cockroachdb/errors v1.11.1
github.com/cometbft/cometbft v0.37.9
github.com/cometbft/cometbft-db v0.9.1
github.com/coniks-sys/coniks-go v0.0.0-20180722014011-11acf4819b71
@ -32,6 +31,7 @@ require (
github.com/huandu/skiplist v1.2.0
github.com/linxGnu/grocksdb v1.8.13
github.com/pelletier/go-toml/v2 v2.1.0
github.com/pkg/errors v0.9.1
github.com/shopspring/decimal v1.4.0
github.com/spf13/cast v1.6.0
github.com/spf13/cobra v1.8.0
@ -78,6 +78,7 @@ require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/chzyer/readline v1.5.1 // indirect
github.com/cockroachdb/apd/v2 v2.0.2 // indirect
github.com/cockroachdb/errors v1.11.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/pebble v1.1.0 // indirect
github.com/cockroachdb/redact v1.1.5 // indirect
@ -174,7 +175,6 @@ require (
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
@ -238,11 +238,11 @@ replace (
// Use the cosmos keyring code
github.com/99designs/keyring => github.com/cosmos/keyring v1.2.0
// Use cometbft fork of tendermint
github.com/cometbft/cometbft => github.com/0glabs/cometbft v0.37.9-0glabs.1
github.com/cometbft/cometbft => github.com/0glabs/cometbft v0.37.9-0glabs.3
github.com/cometbft/cometbft-db => github.com/kava-labs/cometbft-db v0.9.1-kava.2
// Use cosmos-sdk fork with backported fix for unsafe-reset-all, staking transfer events, and custom tally handler support
// github.com/cosmos/cosmos-sdk => github.com/0glabs/cosmos-sdk v0.46.11-kava.3
github.com/cosmos/cosmos-sdk => github.com/0glabs/cosmos-sdk v0.47.10-0glabs.10
github.com/cosmos/cosmos-sdk => github.com/0glabs/cosmos-sdk v0.47.10-0glabs.12
github.com/cosmos/iavl => github.com/kava-labs/iavl v1.2.0-kava.1
// See https://github.com/cosmos/cosmos-sdk/pull/13093
github.com/dgrijalva/jwt-go => github.com/golang-jwt/jwt/v4 v4.4.2
@ -250,7 +250,7 @@ replace (
// TODO: Tag before release
github.com/ethereum/go-ethereum => github.com/evmos/go-ethereum v1.10.26-evmos-rc2
// Use ethermint fork that respects min-gas-price with NoBaseFee true and london enabled, and includes eip712 support
github.com/evmos/ethermint => github.com/0glabs/ethermint v0.21.0-0g.v3.1.12
github.com/evmos/ethermint => github.com/0glabs/ethermint v0.21.0-0g.v3.1.15
// See https://github.com/cosmos/cosmos-sdk/pull/10401, https://github.com/cosmos/cosmos-sdk/commit/0592ba6158cd0bf49d894be1cef4faeec59e8320
github.com/gin-gonic/gin => github.com/gin-gonic/gin v1.9.0
// Downgraded to avoid bugs in following commits which causes "version does not exist" errors

12
go.sum
View File

@ -209,12 +209,12 @@ filippo.io/edwards25519 v1.0.0 h1:0wAIcmJUqRdI8IJ/3eGi5/HwXZWPujYXXlkrQogz0Ek=
filippo.io/edwards25519 v1.0.0/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7eHn5EwJns=
git.sr.ht/~sircmpwn/getopt v0.0.0-20191230200459-23622cc906b3/go.mod h1:wMEGFFFNuPos7vHmWXfszqImLppbc0wEhh6JBfJIUgw=
git.sr.ht/~sircmpwn/go-bare v0.0.0-20210406120253-ab86bc2846d9/go.mod h1:BVJwbDfVjCjoFiKrhkei6NdGcZYpkDkdyCdg1ukytRA=
github.com/0glabs/cometbft v0.37.9-0glabs.1 h1:KQJG17Y21suKP3QNICLto4b5Ak73XbSmKxeLbg0ZM68=
github.com/0glabs/cometbft v0.37.9-0glabs.1/go.mod h1:j0Q3RqrCd+cztWCugs3obbzC4NyHGBPZZjtm/fWV00I=
github.com/0glabs/cosmos-sdk v0.47.10-0glabs.10 h1:NJp0RwczHBO4EvrQdDxxftHOgUDBtNh7M/vpaG7wFtQ=
github.com/0glabs/cosmos-sdk v0.47.10-0glabs.10/go.mod h1:KskIVnhXTFqrw7CDccMvx7To5KzUsOomIsQV7sPGOog=
github.com/0glabs/ethermint v0.21.0-0g.v3.1.12 h1:IRVTFhDEH2J5w8ywQW7obXQxYhJYib70SNgKqLOXikU=
github.com/0glabs/ethermint v0.21.0-0g.v3.1.12/go.mod h1:6e/gOcDLhvlDWK3JLJVBgki0gD6H4E1eG7l9byocgWA=
github.com/0glabs/cometbft v0.37.9-0glabs.3 h1:sobMz3C+OdFYNRQ3degfCZUHUzyuSPUIZqVMYgDtJs4=
github.com/0glabs/cometbft v0.37.9-0glabs.3/go.mod h1:j0Q3RqrCd+cztWCugs3obbzC4NyHGBPZZjtm/fWV00I=
github.com/0glabs/cosmos-sdk v0.47.10-0glabs.12 h1:mVUhlaGUPn8izK6TfdXD13xakN8+HGl3Y349YF6Kgqc=
github.com/0glabs/cosmos-sdk v0.47.10-0glabs.12/go.mod h1:KskIVnhXTFqrw7CDccMvx7To5KzUsOomIsQV7sPGOog=
github.com/0glabs/ethermint v0.21.0-0g.v3.1.15 h1:j3GwMVy1bjOb7TNyH7v7qOUu5LRl6oruZECIx9W77J0=
github.com/0glabs/ethermint v0.21.0-0g.v3.1.15/go.mod h1:6e/gOcDLhvlDWK3JLJVBgki0gD6H4E1eG7l9byocgWA=
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs=
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4/go.mod h1:hN7oaIRCjzsZ2dE+yG5k+rsdt3qcwykqK6HVGcKwsw4=
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.1/go.mod h1:fBF9PQNqB8scdgpZ3ufzaLntG0AG7C1WjPMsiFOmfHM=