Compare commits

...

4 Commits

Author SHA1 Message Date
0xroy
38146e86d2
Merge dcc4d22c5e into 2947cb7ac6 2024-10-21 21:25:19 +08:00
Joel Liu
2947cb7ac6
Optimizing recover perf by reducing sync progress events (#244)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
* Optimizing recover perf by reducing sync progress events

* add log

* add log
2024-10-21 21:24:50 +08:00
peilun-conflux
39efb721c5
Remove sender from contract call. (#242)
This allows the RPC services to cache the results.
2024-10-21 16:58:50 +08:00
Roy Lu
dcc4d22c5e Updated README 2024-10-12 14:51:12 -07:00
7 changed files with 68 additions and 81 deletions

View File

@ -2,69 +2,32 @@
## Overview
0G Storage is the storage layer for the ZeroGravity data availability (DA) system. The 0G Storage layer holds three important features:
0G Storage is a decentralized data storage system designed to address the challenges of high-throughput and low-latency data storage and retrieval, in areas such as AI and gaming.
* Buit-in - It is natively built into the ZeroGravity DA system for data storage and retrieval.
* General purpose - It is designed to support atomic transactions, mutable kv stores as well as archive log systems to enable wide range of applications with various data types.
* Incentive - Instead of being just a decentralized database, 0G Storage introduces PoRA mining algorithm to incentivize storage network participants.
In addition, it forms the storage layer for the 0G data availability (DA) system, with the cross-layer integration abstracted away from Rollup and AppChain builders.
To dive deep into the technical details, continue reading [0G Storage Spec.](docs/)
## System Architecture
## Integration
0G Storage consists of two main components:
We provide a [SDK](https://github.com/0glabs/0g-js-storage-sdk) for users to easily integrate 0G Storage in their applications with the following features:
1. **Data Publishing Lane**: Ensures quick data availability and verification through the 0G Consensus network.
2. **Data Storage Lane**: Manages large data transfers and storage using an erasure-coding mechanism for redundancy and reliability.
* File Merkle Tree Class
* Flow Contract Types
* RPC methods support
* File upload
* Support browser environment
* Tests for different environments (In Progress)
* File download (In Progress)
Across the two lanes, 0G Storage supports the following features:
## Deployment
* **General Purpose Design**: Supports atomic transactions, mutable key-value stores, and archive log systems, enabling a wide range of applications with various data types.
* **Incentivized Participation**: Utilizes the PoRA (Proof of Random Access) mining algorithm to incentivize storage network participants.
Please refer to [Deployment](docs/run.md) page for detailed steps to compile and start a 0G Storage node.
For in-depth technical details about 0G Storage, please read our [Intro to 0G Storage](https://0g-doc-new.vercel.app/og-storage).
## Test
## Documentation
### Prerequisites
- If you want to run a node, please refer to the [Running a Node](https://0g-doc-new.vercel.app/run-a-node/storage-node) guide.
- If you want build a project using 0G storage, please refer to the [0G Storage SDK](https://0g-doc-new.vercel.app/build-with-0g/storage-sdk) guide.
* Required python version: 3.8, 3.9, 3.10, higher version is not guaranteed (e.g. failed to install `pysha3`).
* Install dependencies under root folder: `pip3 install -r requirements.txt`
## Support and Additional Resources
We want to do everything we can to help you be successful while working on your contribution and projects. Here you'll find various resources and communities that may help you complete a project or contribute to 0G.
### Dependencies
Python test framework will launch blockchain fullnodes at local for storage node to interact with. There are 2 kinds of fullnodes supported:
* Conflux eSpace node (by default).
* BSC node (geth).
For Conflux eSpace node, the test framework will automatically compile the binary at runtime, and copy the binary to `tests/tmp` folder. For BSC node, the test framework will automatically download the latest version binary from [github](https://github.com/bnb-chain/bsc/releases) to `tests/tmp` folder.
Alternatively, you could also manually copy specific version binaries (conflux or geth) to the `tests/tmp` folder. Note, do **NOT** copy released conflux binary on github, since block height of some CIPs are hardcoded.
For testing, it's also dependent on the following repos:
* [0G Storage Contract](https://github.com/0glabs/0g-storage-contracts): It essentially provides two abi interfaces for 0G Storage Node to interact with the on-chain contracts.
* ZgsFlow: It contains apis to submit chunk data.
* PoraMine: It contains apis to submit PoRA answers.
* [0G Storage Client](https://github.com/0glabs/0g-storage-client): It is used to interact with certain 0G Storage Nodes to upload/download files.
### Run Tests
Go to the `tests` folder and run the following command to run all tests:
```
python test_all.py
```
or, run any single test, e.g.
```
python sync_test.py
```
## Contributing
To make contributions to the project, please follow the guidelines [here](contributing.md).
### Communities
- [0G Telegram](https://t.me/web3_0glabs)
- [0G Discord](https://discord.com/invite/0glabs)

View File

@ -236,15 +236,19 @@ impl LogEntryFetcher {
.filter;
let mut stream = LogQuery::new(&provider, &filter, log_query_delay)
.with_page_size(log_page_size);
debug!(
info!(
"start_recover starts, start={} end={}",
start_block_number, end_block_number
);
let (mut block_hash_sent, mut block_number_sent) = (None, None);
while let Some(maybe_log) = stream.next().await {
match maybe_log {
Ok(log) => {
let sync_progress =
if log.block_hash.is_some() && log.block_number.is_some() {
if block_hash_sent != log.block_hash
|| block_number_sent != log.block_number
{
let synced_block = LogFetchProgress::SyncedBlock((
log.block_number.unwrap().as_u64(),
log.block_hash.unwrap(),
@ -254,6 +258,9 @@ impl LogEntryFetcher {
Some(synced_block)
} else {
None
}
} else {
None
};
debug!("recover: progress={:?}", sync_progress);
@ -268,7 +275,12 @@ impl LogEntryFetcher {
log.block_number.expect("block number exist").as_u64(),
))
.and_then(|_| match sync_progress {
Some(b) => recover_tx.send(b),
Some(b) => {
recover_tx.send(b)?;
block_hash_sent = log.block_hash;
block_number_sent = log.block_number;
Ok(())
}
None => Ok(()),
})
{
@ -290,6 +302,8 @@ impl LogEntryFetcher {
}
}
}
info!("log recover end");
},
"log recover",
);

View File

@ -67,8 +67,8 @@ impl MinerConfig {
})
}
pub(crate) async fn make_provider(&self) -> Result<MineServiceMiddleware, String> {
let provider = Arc::new(Provider::new(
pub(crate) fn make_provider(&self) -> Result<Arc<Provider<RetryClient<Http>>>, String> {
Ok(Arc::new(Provider::new(
RetryClientBuilder::default()
.rate_limit_retries(self.rate_limit_retries)
.timeout_retries(self.timeout_retries)
@ -78,7 +78,11 @@ impl MinerConfig {
.map_err(|e| format!("Cannot parse blockchain endpoint: {:?}", e))?,
Box::new(HttpRateLimitRetryPolicy),
),
));
)))
}
pub(crate) async fn make_signing_provider(&self) -> Result<MineServiceMiddleware, String> {
let provider = self.make_provider()?;
let chain_id = provider
.get_chainid()
.await

View File

@ -1,6 +1,7 @@
use std::{collections::BTreeMap, sync::Arc};
use ethereum_types::H256;
use ethers::prelude::{Http, Provider, RetryClient};
use tokio::time::{sleep, Duration, Instant};
use contract_interface::{EpochRangeWithContextDigest, ZgsFlow};
@ -12,14 +13,14 @@ use storage_async::Store;
use task_executor::TaskExecutor;
use zgs_spec::SECTORS_PER_SEAL;
use crate::config::{MineServiceMiddleware, MinerConfig};
use crate::config::MinerConfig;
const DB_QUERY_PERIOD_ON_NO_TASK: u64 = 1;
const DB_QUERY_PERIOD_ON_ERROR: u64 = 5;
const CHAIN_STATUS_QUERY_PERIOD: u64 = 5;
pub struct Sealer {
flow_contract: ZgsFlow<MineServiceMiddleware>,
flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
store: Arc<Store>,
context_cache: BTreeMap<u128, EpochRangeWithContextDigest>,
last_context_flow_length: u64,
@ -29,7 +30,7 @@ pub struct Sealer {
impl Sealer {
pub fn spawn(
executor: TaskExecutor,
provider: Arc<MineServiceMiddleware>,
provider: Arc<Provider<RetryClient<Http>>>,
store: Arc<Store>,
config: &MinerConfig,
miner_id: H256,

View File

@ -33,11 +33,13 @@ impl MineService {
config: MinerConfig,
store: Arc<Store>,
) -> Result<broadcast::Sender<MinerMessage>, String> {
let provider = Arc::new(config.make_provider().await?);
let provider = config.make_provider()?;
let signing_provider = Arc::new(config.make_signing_provider().await?);
let (msg_send, msg_recv) = broadcast::channel(1024);
let miner_id = check_and_request_miner_id(&config, store.as_ref(), &provider).await?;
let miner_id =
check_and_request_miner_id(&config, store.as_ref(), &signing_provider).await?;
debug!("miner id setting complete.");
let mine_context_receiver = MineContextWatcher::spawn(
@ -61,6 +63,7 @@ impl MineService {
mine_answer_receiver,
mine_context_receiver,
provider.clone(),
signing_provider,
store.clone(),
&config,
);

View File

@ -2,6 +2,7 @@ use contract_interface::PoraAnswer;
use contract_interface::{PoraMine, ZgsFlow};
use ethereum_types::U256;
use ethers::contract::ContractCall;
use ethers::prelude::{Http, Provider, RetryClient};
use ethers::providers::PendingTransaction;
use hex::ToHex;
use shared_types::FlowRangeProof;
@ -24,7 +25,7 @@ pub struct Submitter {
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
mine_context_receiver: broadcast::Receiver<MineContextMessage>,
mine_contract: PoraMine<MineServiceMiddleware>,
flow_contract: ZgsFlow<MineServiceMiddleware>,
flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
default_gas_limit: Option<U256>,
store: Arc<Store>,
}
@ -34,11 +35,12 @@ impl Submitter {
executor: TaskExecutor,
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
mine_context_receiver: broadcast::Receiver<MineContextMessage>,
provider: Arc<MineServiceMiddleware>,
provider: Arc<Provider<RetryClient<Http>>>,
signing_provider: Arc<MineServiceMiddleware>,
store: Arc<Store>,
config: &MinerConfig,
) {
let mine_contract = PoraMine::new(config.mine_address, provider.clone());
let mine_contract = PoraMine::new(config.mine_address, signing_provider);
let flow_contract = ZgsFlow::new(config.flow_address, provider);
let default_gas_limit = config.submission_gas;

View File

@ -14,13 +14,13 @@ use tokio::{
try_join,
};
use crate::{config::MineServiceMiddleware, mine::PoraPuzzle, MinerConfig, MinerMessage};
use ethers::prelude::{Http, RetryClient};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use std::{ops::DerefMut, str::FromStr};
use crate::{config::MineServiceMiddleware, mine::PoraPuzzle, MinerConfig, MinerMessage};
pub type MineContextMessage = Option<PoraPuzzle>;
lazy_static! {
@ -29,9 +29,9 @@ lazy_static! {
}
pub struct MineContextWatcher {
provider: Arc<MineServiceMiddleware>,
flow_contract: ZgsFlow<MineServiceMiddleware>,
mine_contract: PoraMine<MineServiceMiddleware>,
provider: Arc<Provider<RetryClient<Http>>>,
flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
mine_contract: PoraMine<Provider<RetryClient<Http>>>,
mine_context_sender: broadcast::Sender<MineContextMessage>,
last_report: MineContextMessage,
@ -44,7 +44,7 @@ impl MineContextWatcher {
pub fn spawn(
executor: TaskExecutor,
msg_recv: broadcast::Receiver<MinerMessage>,
provider: Arc<MineServiceMiddleware>,
provider: Arc<Provider<RetryClient<Http>>>,
config: &MinerConfig,
) -> broadcast::Receiver<MineContextMessage> {
let mine_contract = PoraMine::new(config.mine_address, provider.clone());