Compare commits

...

4 Commits

Author SHA1 Message Date
0xroy
38146e86d2
Merge dcc4d22c5e into 2947cb7ac6 2024-10-21 21:25:19 +08:00
Joel Liu
2947cb7ac6
Optimizing recover perf by reducing sync progress events (#244)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
* Optimizing recover perf by reducing sync progress events

* add log

* add log
2024-10-21 21:24:50 +08:00
peilun-conflux
39efb721c5
Remove sender from contract call. (#242)
This allows the RPC services to cache the results.
2024-10-21 16:58:50 +08:00
Roy Lu
dcc4d22c5e Updated README 2024-10-12 14:51:12 -07:00
7 changed files with 68 additions and 81 deletions

View File

@ -2,69 +2,32 @@
## Overview ## Overview
0G Storage is the storage layer for the ZeroGravity data availability (DA) system. The 0G Storage layer holds three important features: 0G Storage is a decentralized data storage system designed to address the challenges of high-throughput and low-latency data storage and retrieval, in areas such as AI and gaming.
* Buit-in - It is natively built into the ZeroGravity DA system for data storage and retrieval. In addition, it forms the storage layer for the 0G data availability (DA) system, with the cross-layer integration abstracted away from Rollup and AppChain builders.
* General purpose - It is designed to support atomic transactions, mutable kv stores as well as archive log systems to enable wide range of applications with various data types.
* Incentive - Instead of being just a decentralized database, 0G Storage introduces PoRA mining algorithm to incentivize storage network participants.
To dive deep into the technical details, continue reading [0G Storage Spec.](docs/) ## System Architecture
## Integration 0G Storage consists of two main components:
We provide a [SDK](https://github.com/0glabs/0g-js-storage-sdk) for users to easily integrate 0G Storage in their applications with the following features: 1. **Data Publishing Lane**: Ensures quick data availability and verification through the 0G Consensus network.
2. **Data Storage Lane**: Manages large data transfers and storage using an erasure-coding mechanism for redundancy and reliability.
* File Merkle Tree Class Across the two lanes, 0G Storage supports the following features:
* Flow Contract Types
* RPC methods support
* File upload
* Support browser environment
* Tests for different environments (In Progress)
* File download (In Progress)
## Deployment * **General Purpose Design**: Supports atomic transactions, mutable key-value stores, and archive log systems, enabling a wide range of applications with various data types.
* **Incentivized Participation**: Utilizes the PoRA (Proof of Random Access) mining algorithm to incentivize storage network participants.
Please refer to [Deployment](docs/run.md) page for detailed steps to compile and start a 0G Storage node. For in-depth technical details about 0G Storage, please read our [Intro to 0G Storage](https://0g-doc-new.vercel.app/og-storage).
## Test ## Documentation
### Prerequisites - If you want to run a node, please refer to the [Running a Node](https://0g-doc-new.vercel.app/run-a-node/storage-node) guide.
- If you want build a project using 0G storage, please refer to the [0G Storage SDK](https://0g-doc-new.vercel.app/build-with-0g/storage-sdk) guide.
* Required python version: 3.8, 3.9, 3.10, higher version is not guaranteed (e.g. failed to install `pysha3`). ## Support and Additional Resources
* Install dependencies under root folder: `pip3 install -r requirements.txt` We want to do everything we can to help you be successful while working on your contribution and projects. Here you'll find various resources and communities that may help you complete a project or contribute to 0G.
### Dependencies ### Communities
- [0G Telegram](https://t.me/web3_0glabs)
Python test framework will launch blockchain fullnodes at local for storage node to interact with. There are 2 kinds of fullnodes supported: - [0G Discord](https://discord.com/invite/0glabs)
* Conflux eSpace node (by default).
* BSC node (geth).
For Conflux eSpace node, the test framework will automatically compile the binary at runtime, and copy the binary to `tests/tmp` folder. For BSC node, the test framework will automatically download the latest version binary from [github](https://github.com/bnb-chain/bsc/releases) to `tests/tmp` folder.
Alternatively, you could also manually copy specific version binaries (conflux or geth) to the `tests/tmp` folder. Note, do **NOT** copy released conflux binary on github, since block height of some CIPs are hardcoded.
For testing, it's also dependent on the following repos:
* [0G Storage Contract](https://github.com/0glabs/0g-storage-contracts): It essentially provides two abi interfaces for 0G Storage Node to interact with the on-chain contracts.
* ZgsFlow: It contains apis to submit chunk data.
* PoraMine: It contains apis to submit PoRA answers.
* [0G Storage Client](https://github.com/0glabs/0g-storage-client): It is used to interact with certain 0G Storage Nodes to upload/download files.
### Run Tests
Go to the `tests` folder and run the following command to run all tests:
```
python test_all.py
```
or, run any single test, e.g.
```
python sync_test.py
```
## Contributing
To make contributions to the project, please follow the guidelines [here](contributing.md).

View File

@ -236,22 +236,29 @@ impl LogEntryFetcher {
.filter; .filter;
let mut stream = LogQuery::new(&provider, &filter, log_query_delay) let mut stream = LogQuery::new(&provider, &filter, log_query_delay)
.with_page_size(log_page_size); .with_page_size(log_page_size);
debug!( info!(
"start_recover starts, start={} end={}", "start_recover starts, start={} end={}",
start_block_number, end_block_number start_block_number, end_block_number
); );
let (mut block_hash_sent, mut block_number_sent) = (None, None);
while let Some(maybe_log) = stream.next().await { while let Some(maybe_log) = stream.next().await {
match maybe_log { match maybe_log {
Ok(log) => { Ok(log) => {
let sync_progress = let sync_progress =
if log.block_hash.is_some() && log.block_number.is_some() { if log.block_hash.is_some() && log.block_number.is_some() {
let synced_block = LogFetchProgress::SyncedBlock(( if block_hash_sent != log.block_hash
log.block_number.unwrap().as_u64(), || block_number_sent != log.block_number
log.block_hash.unwrap(), {
None, let synced_block = LogFetchProgress::SyncedBlock((
)); log.block_number.unwrap().as_u64(),
progress = log.block_number.unwrap().as_u64(); log.block_hash.unwrap(),
Some(synced_block) None,
));
progress = log.block_number.unwrap().as_u64();
Some(synced_block)
} else {
None
}
} else { } else {
None None
}; };
@ -268,7 +275,12 @@ impl LogEntryFetcher {
log.block_number.expect("block number exist").as_u64(), log.block_number.expect("block number exist").as_u64(),
)) ))
.and_then(|_| match sync_progress { .and_then(|_| match sync_progress {
Some(b) => recover_tx.send(b), Some(b) => {
recover_tx.send(b)?;
block_hash_sent = log.block_hash;
block_number_sent = log.block_number;
Ok(())
}
None => Ok(()), None => Ok(()),
}) })
{ {
@ -290,6 +302,8 @@ impl LogEntryFetcher {
} }
} }
} }
info!("log recover end");
}, },
"log recover", "log recover",
); );

View File

@ -67,8 +67,8 @@ impl MinerConfig {
}) })
} }
pub(crate) async fn make_provider(&self) -> Result<MineServiceMiddleware, String> { pub(crate) fn make_provider(&self) -> Result<Arc<Provider<RetryClient<Http>>>, String> {
let provider = Arc::new(Provider::new( Ok(Arc::new(Provider::new(
RetryClientBuilder::default() RetryClientBuilder::default()
.rate_limit_retries(self.rate_limit_retries) .rate_limit_retries(self.rate_limit_retries)
.timeout_retries(self.timeout_retries) .timeout_retries(self.timeout_retries)
@ -78,7 +78,11 @@ impl MinerConfig {
.map_err(|e| format!("Cannot parse blockchain endpoint: {:?}", e))?, .map_err(|e| format!("Cannot parse blockchain endpoint: {:?}", e))?,
Box::new(HttpRateLimitRetryPolicy), Box::new(HttpRateLimitRetryPolicy),
), ),
)); )))
}
pub(crate) async fn make_signing_provider(&self) -> Result<MineServiceMiddleware, String> {
let provider = self.make_provider()?;
let chain_id = provider let chain_id = provider
.get_chainid() .get_chainid()
.await .await

View File

@ -1,6 +1,7 @@
use std::{collections::BTreeMap, sync::Arc}; use std::{collections::BTreeMap, sync::Arc};
use ethereum_types::H256; use ethereum_types::H256;
use ethers::prelude::{Http, Provider, RetryClient};
use tokio::time::{sleep, Duration, Instant}; use tokio::time::{sleep, Duration, Instant};
use contract_interface::{EpochRangeWithContextDigest, ZgsFlow}; use contract_interface::{EpochRangeWithContextDigest, ZgsFlow};
@ -12,14 +13,14 @@ use storage_async::Store;
use task_executor::TaskExecutor; use task_executor::TaskExecutor;
use zgs_spec::SECTORS_PER_SEAL; use zgs_spec::SECTORS_PER_SEAL;
use crate::config::{MineServiceMiddleware, MinerConfig}; use crate::config::MinerConfig;
const DB_QUERY_PERIOD_ON_NO_TASK: u64 = 1; const DB_QUERY_PERIOD_ON_NO_TASK: u64 = 1;
const DB_QUERY_PERIOD_ON_ERROR: u64 = 5; const DB_QUERY_PERIOD_ON_ERROR: u64 = 5;
const CHAIN_STATUS_QUERY_PERIOD: u64 = 5; const CHAIN_STATUS_QUERY_PERIOD: u64 = 5;
pub struct Sealer { pub struct Sealer {
flow_contract: ZgsFlow<MineServiceMiddleware>, flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
store: Arc<Store>, store: Arc<Store>,
context_cache: BTreeMap<u128, EpochRangeWithContextDigest>, context_cache: BTreeMap<u128, EpochRangeWithContextDigest>,
last_context_flow_length: u64, last_context_flow_length: u64,
@ -29,7 +30,7 @@ pub struct Sealer {
impl Sealer { impl Sealer {
pub fn spawn( pub fn spawn(
executor: TaskExecutor, executor: TaskExecutor,
provider: Arc<MineServiceMiddleware>, provider: Arc<Provider<RetryClient<Http>>>,
store: Arc<Store>, store: Arc<Store>,
config: &MinerConfig, config: &MinerConfig,
miner_id: H256, miner_id: H256,

View File

@ -33,11 +33,13 @@ impl MineService {
config: MinerConfig, config: MinerConfig,
store: Arc<Store>, store: Arc<Store>,
) -> Result<broadcast::Sender<MinerMessage>, String> { ) -> Result<broadcast::Sender<MinerMessage>, String> {
let provider = Arc::new(config.make_provider().await?); let provider = config.make_provider()?;
let signing_provider = Arc::new(config.make_signing_provider().await?);
let (msg_send, msg_recv) = broadcast::channel(1024); let (msg_send, msg_recv) = broadcast::channel(1024);
let miner_id = check_and_request_miner_id(&config, store.as_ref(), &provider).await?; let miner_id =
check_and_request_miner_id(&config, store.as_ref(), &signing_provider).await?;
debug!("miner id setting complete."); debug!("miner id setting complete.");
let mine_context_receiver = MineContextWatcher::spawn( let mine_context_receiver = MineContextWatcher::spawn(
@ -61,6 +63,7 @@ impl MineService {
mine_answer_receiver, mine_answer_receiver,
mine_context_receiver, mine_context_receiver,
provider.clone(), provider.clone(),
signing_provider,
store.clone(), store.clone(),
&config, &config,
); );

View File

@ -2,6 +2,7 @@ use contract_interface::PoraAnswer;
use contract_interface::{PoraMine, ZgsFlow}; use contract_interface::{PoraMine, ZgsFlow};
use ethereum_types::U256; use ethereum_types::U256;
use ethers::contract::ContractCall; use ethers::contract::ContractCall;
use ethers::prelude::{Http, Provider, RetryClient};
use ethers::providers::PendingTransaction; use ethers::providers::PendingTransaction;
use hex::ToHex; use hex::ToHex;
use shared_types::FlowRangeProof; use shared_types::FlowRangeProof;
@ -24,7 +25,7 @@ pub struct Submitter {
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>, mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
mine_context_receiver: broadcast::Receiver<MineContextMessage>, mine_context_receiver: broadcast::Receiver<MineContextMessage>,
mine_contract: PoraMine<MineServiceMiddleware>, mine_contract: PoraMine<MineServiceMiddleware>,
flow_contract: ZgsFlow<MineServiceMiddleware>, flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
default_gas_limit: Option<U256>, default_gas_limit: Option<U256>,
store: Arc<Store>, store: Arc<Store>,
} }
@ -34,11 +35,12 @@ impl Submitter {
executor: TaskExecutor, executor: TaskExecutor,
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>, mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
mine_context_receiver: broadcast::Receiver<MineContextMessage>, mine_context_receiver: broadcast::Receiver<MineContextMessage>,
provider: Arc<MineServiceMiddleware>, provider: Arc<Provider<RetryClient<Http>>>,
signing_provider: Arc<MineServiceMiddleware>,
store: Arc<Store>, store: Arc<Store>,
config: &MinerConfig, config: &MinerConfig,
) { ) {
let mine_contract = PoraMine::new(config.mine_address, provider.clone()); let mine_contract = PoraMine::new(config.mine_address, signing_provider);
let flow_contract = ZgsFlow::new(config.flow_address, provider); let flow_contract = ZgsFlow::new(config.flow_address, provider);
let default_gas_limit = config.submission_gas; let default_gas_limit = config.submission_gas;

View File

@ -14,13 +14,13 @@ use tokio::{
try_join, try_join,
}; };
use crate::{config::MineServiceMiddleware, mine::PoraPuzzle, MinerConfig, MinerMessage};
use ethers::prelude::{Http, RetryClient};
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use std::{ops::DerefMut, str::FromStr}; use std::{ops::DerefMut, str::FromStr};
use crate::{config::MineServiceMiddleware, mine::PoraPuzzle, MinerConfig, MinerMessage};
pub type MineContextMessage = Option<PoraPuzzle>; pub type MineContextMessage = Option<PoraPuzzle>;
lazy_static! { lazy_static! {
@ -29,9 +29,9 @@ lazy_static! {
} }
pub struct MineContextWatcher { pub struct MineContextWatcher {
provider: Arc<MineServiceMiddleware>, provider: Arc<Provider<RetryClient<Http>>>,
flow_contract: ZgsFlow<MineServiceMiddleware>, flow_contract: ZgsFlow<Provider<RetryClient<Http>>>,
mine_contract: PoraMine<MineServiceMiddleware>, mine_contract: PoraMine<Provider<RetryClient<Http>>>,
mine_context_sender: broadcast::Sender<MineContextMessage>, mine_context_sender: broadcast::Sender<MineContextMessage>,
last_report: MineContextMessage, last_report: MineContextMessage,
@ -44,7 +44,7 @@ impl MineContextWatcher {
pub fn spawn( pub fn spawn(
executor: TaskExecutor, executor: TaskExecutor,
msg_recv: broadcast::Receiver<MinerMessage>, msg_recv: broadcast::Receiver<MinerMessage>,
provider: Arc<MineServiceMiddleware>, provider: Arc<Provider<RetryClient<Http>>>,
config: &MinerConfig, config: &MinerConfig,
) -> broadcast::Receiver<MineContextMessage> { ) -> broadcast::Receiver<MineContextMessage> {
let mine_contract = PoraMine::new(config.mine_address, provider.clone()); let mine_contract = PoraMine::new(config.mine_address, provider.clone());