Compare commits

...

23 Commits

Author SHA1 Message Date
Peter Zhang
acd17bcfa7 move backtrace 2024-11-13 17:02:23 +08:00
Peter Zhang
8c57422be1 add data to leaves size 2024-11-13 17:02:23 +08:00
Peter Zhang
22b9eb62c8 add backtrace 2024-11-13 17:02:23 +08:00
Peter Zhang
1b5b5210f2 add detailed metrics in slow operations 2024-11-13 17:02:23 +08:00
Peter Zhang
abecfc4c90 add detailed metrics in slow operations 2024-11-13 17:02:23 +08:00
Peter Zhang
6954c55bf6 add detailed metrics in slow operations 2024-11-13 17:02:23 +08:00
Peter Zhang
672840b25f add detailed metrics for storage layer 2024-11-13 17:02:23 +08:00
Peter Zhang
94eacf5094 add detailed metrics for storage layer 2024-11-13 17:02:23 +08:00
Peter Zhang
93f487b67c add detailed metrics for storage layer 2024-11-13 17:02:23 +08:00
Peter Zhang
bea72edecb add detailed metrics for storage layer 2024-11-13 17:02:23 +08:00
Peter Zhang
75c56b503a add detailed metrics in slow operations 2024-11-13 17:02:23 +08:00
Peter Zhang
8fa9929003 add detailed metrics in slow operations 2024-11-13 17:02:21 +08:00
Peter Zhang
24e5663e63 format code 2024-11-13 17:01:06 +08:00
Peter Zhang
382e6a1226 add detailed metrics in slow operations 2024-11-13 17:01:06 +08:00
Bo QIU
d93f453d50
RPC return file pruned info (#266)
* RPC return file pruned info

* return tx status in atomic manner

* fix clippy
2024-11-13 16:55:57 +08:00
leopardracer
16e70bde68
fix: typos in documentation files (#265)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
* Update README.md
2024-11-13 09:07:06 +08:00
Bo QIU
1de7afec14
Add more metrics for network unbounded channel (#264)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
* Add metrics for file finalization in chunk pool

* Add metrics for network unbounded channel
2024-11-12 17:25:49 +08:00
Joel Liu
0c493880ee
add timeout for rpc connections (#263)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
2024-11-09 13:37:09 +08:00
0g-peterzhb
fae2d5efb6
@peter/db split (#262)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
* do not pad tx during sync phase

* add pad data store

* support async padding after sync phase

* split misc

* add sleep for the next loop
2024-11-08 22:06:45 +08:00
Bo QIU
3fd800275a
Improve rate limit for UDP discovery (#261)
Some checks failed
abi-consistent-check / build-and-compare (push) Has been cancelled
code-coverage / unittest-cov (push) Has been cancelled
rust / check (push) Has been cancelled
rust / test (push) Has been cancelled
rust / lints (push) Has been cancelled
functional-test / test (push) Has been cancelled
2024-11-06 18:25:08 +08:00
Bo QIU
baf0521c99
copy key file to bootnode folder in python tests (#260)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
2024-11-06 14:01:47 +08:00
Bo QIU
bcbd8b3baa
Ban peer if failed to decode pubsub message (#259)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
2024-11-05 15:10:44 +08:00
Bo QIU
cae5b62440
Hotfix for python tests caused by unexpected file deletion (#258)
* Hotfix for python tests caused by unexpected file deletion

* add more info when launch blockchain node failed

* add stdout if blockchain launch failed

* seek stdout and err to 0 if failed to launch blockchain

* Improve zg chain port to avoid port conflict in parallel execution

* fix float issue

* Fix py failures
2024-11-05 13:49:58 +08:00
63 changed files with 751 additions and 373 deletions

1
.gitignore vendored
View File

@ -4,6 +4,5 @@
/.idea
tests/**/__pycache__
tests/tmp/**
tests/config/zgs
.vscode/*.json
/0g-storage-contracts-dev

9
Cargo.lock generated
View File

@ -226,6 +226,7 @@ dependencies = [
"itertools 0.13.0",
"lazy_static",
"lru 0.12.5",
"metrics",
"once_cell",
"serde",
"tiny-keccak",
@ -912,7 +913,9 @@ dependencies = [
"anyhow",
"async-lock 2.8.0",
"hashlink 0.8.4",
"lazy_static",
"log_entry_sync",
"metrics",
"network",
"shared_types",
"storage-async",
@ -4652,12 +4655,14 @@ dependencies = [
"jsonrpsee",
"lazy_static",
"metrics",
"reqwest",
"serde_json",
"shared_types",
"storage",
"task_executor",
"thiserror",
"tokio",
"url",
]
[[package]]
@ -5028,6 +5033,7 @@ dependencies = [
name = "network"
version = "0.2.0"
dependencies = [
"channel",
"directory",
"dirs 4.0.0",
"discv5",
@ -7300,8 +7306,10 @@ dependencies = [
"kvdb",
"kvdb-memorydb",
"kvdb-rocksdb",
"lazy_static",
"merkle_light",
"merkle_tree",
"metrics",
"once_cell",
"parking_lot 0.12.3",
"rand 0.8.5",
@ -7324,6 +7332,7 @@ name = "storage-async"
version = "0.1.0"
dependencies = [
"anyhow",
"backtrace",
"eth2_ssz",
"shared_types",
"storage",

View File

@ -4,7 +4,7 @@
0G Storage is the storage layer for the ZeroGravity data availability (DA) system. The 0G Storage layer holds three important features:
* Buit-in - It is natively built into the ZeroGravity DA system for data storage and retrieval.
* Built-in - It is natively built into the ZeroGravity DA system for data storage and retrieval.
* General purpose - It is designed to support atomic transactions, mutable kv stores as well as archive log systems to enable wide range of applications with various data types.
* Incentive - Instead of being just a decentralized database, 0G Storage introduces PoRA mining algorithm to incentivize storage network participants.

View File

@ -13,5 +13,8 @@ serde = { version = "1.0.137", features = ["derive"] }
lazy_static = "1.4.0"
tracing = "0.1.36"
once_cell = "1.19.0"
metrics = { workspace = true }
itertools = "0.13.0"
lru = "0.12.5"

View File

@ -1,4 +1,5 @@
mod merkle_tree;
mod metrics;
mod node_manager;
mod proof;
mod sha3;
@ -10,6 +11,7 @@ use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Instant;
use tracing::{trace, warn};
use crate::merkle_tree::MerkleTreeWrite;
@ -145,6 +147,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}
pub fn append(&mut self, new_leaf: E) {
let start_time = Instant::now();
if new_leaf == E::null() {
// appending null is not allowed.
return;
@ -152,10 +155,13 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
self.node_manager.start_transaction();
self.node_manager.push_node(0, new_leaf);
self.recompute_after_append_leaves(self.leaves() - 1);
self.node_manager.commit();
metrics::APPEND.update_since(start_time);
}
pub fn append_list(&mut self, leaf_list: Vec<E>) {
let start_time = Instant::now();
if leaf_list.contains(&E::null()) {
// appending null is not allowed.
return;
@ -165,6 +171,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
self.node_manager.append_nodes(0, &leaf_list);
self.recompute_after_append_leaves(start_index);
self.node_manager.commit();
metrics::APPEND_LIST.update_since(start_time);
}
/// Append a leaf list by providing their intermediate node hash.
@ -173,6 +180,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
/// Other nodes in the subtree will be set to `null` nodes.
/// TODO: Optimize to avoid storing the `null` nodes?
pub fn append_subtree(&mut self, subtree_depth: usize, subtree_root: E) -> Result<()> {
let start_time = Instant::now();
if subtree_root == E::null() {
// appending null is not allowed.
bail!("subtree_root is null");
@ -182,10 +190,13 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
self.append_subtree_inner(subtree_depth, subtree_root)?;
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
self.node_manager.commit();
metrics::APPEND_SUBTREE.update_since(start_time);
Ok(())
}
pub fn append_subtree_list(&mut self, subtree_list: Vec<(usize, E)>) -> Result<()> {
let start_time = Instant::now();
if subtree_list.iter().any(|(_, root)| root == &E::null()) {
// appending null is not allowed.
bail!("subtree_list contains null");
@ -197,12 +208,15 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
}
self.node_manager.commit();
metrics::APPEND_SUBTREE_LIST.update_since(start_time);
Ok(())
}
/// Change the value of the last leaf and return the new merkle root.
/// This is needed if our merkle-tree in memory only keeps intermediate nodes instead of real leaves.
pub fn update_last(&mut self, updated_leaf: E) {
let start_time = Instant::now();
if updated_leaf == E::null() {
// updating to null is not allowed.
return;
@ -216,6 +230,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
}
self.recompute_after_append_leaves(self.leaves() - 1);
self.node_manager.commit();
metrics::UPDATE_LAST.update_since(start_time);
}
/// Fill an unknown `null` leaf with its real value.

View File

@ -0,0 +1,11 @@
use std::sync::Arc;
use metrics::{register_timer, Timer};
lazy_static::lazy_static! {
pub static ref APPEND: Arc<dyn Timer> = register_timer("append_merkle_append");
pub static ref APPEND_LIST: Arc<dyn Timer> = register_timer("append_merkle_append_list");
pub static ref APPEND_SUBTREE: Arc<dyn Timer> = register_timer("append_merkle_append_subtree");
pub static ref APPEND_SUBTREE_LIST: Arc<dyn Timer> = register_timer("append_merkle_append_subtree_list");
pub static ref UPDATE_LAST: Arc<dyn Timer> = register_timer("append_merkle_update_last");
}

View File

@ -15,19 +15,13 @@ abigen!(
);
#[cfg(feature = "dev")]
abigen!(
ZgsFlow,
"../../0g-storage-contracts-dev/artifacts/contracts/dataFlow/Flow.sol/Flow.json"
);
abigen!(ZgsFlow, "../../storage-contracts-abis/Flow.json");
#[cfg(feature = "dev")]
abigen!(
PoraMine,
"../../0g-storage-contracts-dev/artifacts/contracts/miner/Mine.sol/PoraMine.json"
);
abigen!(PoraMine, "../../storage-contracts-abis/PoraMine.json");
#[cfg(feature = "dev")]
abigen!(
ChunkLinearReward,
"../../0g-storage-contracts-dev/artifacts/contracts/reward/ChunkLinearReward.sol/ChunkLinearReward.json"
"../../storage-contracts-abis/ChunkLinearReward.json"
);

View File

@ -1,6 +1,6 @@
# Proof of Random Access
The ZeroGravity network adopts a Proof of Random Access (PoRA) mechanism to incentivize miners to store data. By requiring miners to answer randomly produced queries to archived data chunks, the PoRA mechanism establishes the relation between mining proof generation power and data storage. Miners answer the queries repeatedly and computes an output digest for each loaded chunk util find a digest that satisfies the mining difficulty (i.e., has enough leading zeros). PoRA will stress the miners' disk I/O and reduce their capability to respond user queries. So 0G Storage adopts intermittent mining, in which a mining epoch starts with a block generation at a specific block height on the host chain and stops when a valid PoRA is submitted to the 0G Storage contract.
The ZeroGravity network adopts a Proof of Random Access (PoRA) mechanism to incentivize miners to store data. By requiring miners to answer randomly produced queries to archived data chunks, the PoRA mechanism establishes the relation between mining proof generation power and data storage. Miners answer the queries repeatedly and computes an output digest for each loaded chunk until find a digest that satisfies the mining difficulty (i.e., has enough leading zeros). PoRA will stress the miners' disk I/O and reduce their capability to respond user queries. So 0G Storage adopts intermittent mining, in which a mining epoch starts with a block generation at a specific block height on the host chain and stops when a valid PoRA is submitted to the 0G Storage contract.
In a strawman design, a PoRA iteration consists of a computing stage and a loading stage. In the computing stage, a miner computes a random recall position (the universal offset in the flow) based on an arbitrary picked random nonce and a mining status read from the host chain. In the loading stage, a miner loads the archived data chunks at the given recall position, and computes output digest by hashing the tuple of mining status and the data chunks. If the output digest satisfies the target difficulty, the miner can construct a legitimate PoRA consists of the chosen random nonce, the loaded data chunk and the proof for the correctness of data chunk to the mining contract.

View File

@ -27,4 +27,4 @@ The mining process of 0G Storage requires to prove data accessibility to random
## Data Flow
In 0G Storage, committed data are organized sequentially. Such a sequence of data is called a data flow, which can be interpreted as a list of data entries or equivalently a sequence of fixed-size data sectors. Thus, every piece of data in ZeroGravity can be indexed conveniently with a universal offset. This offset will be used to sample challenges in the mining process of PoRA. The default data flow is called the "main flow" of ZeroGravity. It incorporates all new log entries (unless otherwise specified) in an append-only manner. There are also specialized flows that only accept some category of log entries, e.g. data related to a specifc application. The most significant advantage of specialized flows is a consecutive addressing space, which may be crucial in some use cases. Furthermore, a specialized flow can apply customized storage price, which is typically significantly higher than the floor price of the default flow, and hence achieves better data availability and reliability.
In 0G Storage, committed data are organized sequentially. Such a sequence of data is called a data flow, which can be interpreted as a list of data entries or equivalently a sequence of fixed-size data sectors. Thus, every piece of data in ZeroGravity can be indexed conveniently with a universal offset. This offset will be used to sample challenges in the mining process of PoRA. The default data flow is called the "main flow" of ZeroGravity. It incorporates all new log entries (unless otherwise specified) in an append-only manner. There are also specialized flows that only accept some category of log entries, e.g. data related to a specific application. The most significant advantage of specialized flows is a consecutive addressing space, which may be crucial in some use cases. Furthermore, a specialized flow can apply customized storage price, which is typically significantly higher than the floor price of the default flow, and hence achieves better data availability and reliability.

View File

@ -13,3 +13,5 @@ tokio = { version = "1.19.2", features = ["sync"] }
async-lock = "2.5.0"
hashlink = "0.8.0"
tracing = "0.1.35"
lazy_static = "1.4.0"
metrics = { workspace = true }

View File

@ -1,11 +1,16 @@
use super::mem_pool::MemoryChunkPool;
use crate::mem_pool::FileID;
use anyhow::Result;
use network::NetworkMessage;
use metrics::{Histogram, Sample};
use network::{NetworkMessage, NetworkSender};
use shared_types::{ChunkArray, FileProof};
use std::{sync::Arc, time::SystemTime};
use std::{sync::Arc, time::Instant};
use storage_async::{ShardConfig, Store};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::mpsc::UnboundedReceiver;
lazy_static::lazy_static! {
pub static ref FINALIZE_FILE_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("chunk_pool_finalize_file_latency", 1024);
}
/// Handle the cached file when uploaded completely and verified from blockchain.
/// Generally, the file will be persisted into log store.
@ -13,7 +18,7 @@ pub struct ChunkPoolHandler {
receiver: UnboundedReceiver<ChunkPoolMessage>,
mem_pool: Arc<MemoryChunkPool>,
log_store: Arc<Store>,
sender: UnboundedSender<NetworkMessage>,
sender: NetworkSender,
}
impl ChunkPoolHandler {
@ -21,7 +26,7 @@ impl ChunkPoolHandler {
receiver: UnboundedReceiver<ChunkPoolMessage>,
mem_pool: Arc<MemoryChunkPool>,
log_store: Arc<Store>,
sender: UnboundedSender<NetworkMessage>,
sender: NetworkSender,
) -> Self {
ChunkPoolHandler {
receiver,
@ -68,7 +73,7 @@ impl ChunkPoolHandler {
}
}
let start = SystemTime::now();
let start = Instant::now();
if !self
.log_store
.finalize_tx_with_hash(id.tx_id.seq, id.tx_id.hash)
@ -77,8 +82,9 @@ impl ChunkPoolHandler {
return Ok(false);
}
let elapsed = start.elapsed()?;
let elapsed = start.elapsed();
debug!(?id, ?elapsed, "Transaction finalized");
FINALIZE_FILE_LATENCY.update_since(start);
// always remove file from pool after transaction finalized
self.mem_pool.remove_file(&id.root).await;

View File

@ -29,7 +29,7 @@ impl Config {
pub fn unbounded(
config: Config,
log_store: Arc<storage_async::Store>,
network_send: tokio::sync::mpsc::UnboundedSender<network::NetworkMessage>,
network_send: network::NetworkSender,
) -> (Arc<MemoryChunkPool>, ChunkPoolHandler) {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();

View File

@ -24,3 +24,5 @@ futures-util = "0.3.28"
thiserror = "1.0.44"
lazy_static = "1.4.0"
metrics = { workspace = true }
reqwest = {version = "0.11", features = ["json"]}
url = { version = "2.4", default-features = false }

View File

@ -1,3 +1,5 @@
use std::time::Duration;
use crate::ContractAddress;
pub struct LogSyncConfig {
@ -34,6 +36,9 @@ pub struct LogSyncConfig {
pub watch_loop_wait_time_ms: u64,
// force to sync log from start block number
pub force_log_sync_from_start_block_number: bool,
// the timeout for blockchain rpc connection
pub blockchain_rpc_timeout: Duration,
}
#[derive(Clone)]
@ -61,6 +66,7 @@ impl LogSyncConfig {
remove_finalized_block_interval_minutes: u64,
watch_loop_wait_time_ms: u64,
force_log_sync_from_start_block_number: bool,
blockchain_rpc_timeout: Duration,
) -> Self {
Self {
rpc_endpoint_url,
@ -77,6 +83,7 @@ impl LogSyncConfig {
remove_finalized_block_interval_minutes,
watch_loop_wait_time_ms,
force_log_sync_from_start_block_number,
blockchain_rpc_timeout,
}
}
}

View File

@ -1,6 +1,6 @@
use crate::sync_manager::log_query::LogQuery;
use crate::sync_manager::RETRY_WAIT_MS;
use crate::ContractAddress;
use crate::sync_manager::{metrics, RETRY_WAIT_MS};
use crate::{ContractAddress, LogSyncConfig};
use anyhow::{anyhow, bail, Result};
use append_merkle::{Algorithm, Sha3Algorithm};
use contract_interface::{SubmissionNode, SubmitFilter, ZgsFlow};
@ -12,17 +12,13 @@ use futures::StreamExt;
use jsonrpsee::tracing::{debug, error, info, warn};
use shared_types::{DataRoot, Transaction};
use std::collections::{BTreeMap, HashMap};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
use task_executor::TaskExecutor;
use tokio::{
sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
RwLock,
},
time::Instant,
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
RwLock,
};
pub struct LogEntryFetcher {
@ -34,28 +30,29 @@ pub struct LogEntryFetcher {
}
impl LogEntryFetcher {
pub async fn new(
url: &str,
contract_address: ContractAddress,
log_page_size: u64,
confirmation_delay: u64,
rate_limit_retries: u32,
timeout_retries: u32,
initial_backoff: u64,
) -> Result<Self> {
pub async fn new(config: &LogSyncConfig) -> Result<Self> {
let provider = Arc::new(Provider::new(
RetryClientBuilder::default()
.rate_limit_retries(rate_limit_retries)
.timeout_retries(timeout_retries)
.initial_backoff(Duration::from_millis(initial_backoff))
.build(Http::from_str(url)?, Box::new(HttpRateLimitRetryPolicy)),
.rate_limit_retries(config.rate_limit_retries)
.timeout_retries(config.timeout_retries)
.initial_backoff(Duration::from_millis(config.initial_backoff))
.build(
Http::new_with_client(
url::Url::parse(&config.rpc_endpoint_url)?,
reqwest::Client::builder()
.timeout(config.blockchain_rpc_timeout)
.connect_timeout(config.blockchain_rpc_timeout)
.build()?,
),
Box::new(HttpRateLimitRetryPolicy),
),
));
// TODO: `error` types are removed from the ABI json file.
Ok(Self {
contract_address,
contract_address: config.contract_address,
provider,
log_page_size,
confirmation_delay,
log_page_size: config.log_page_size,
confirmation_delay: config.confirmation_block_count,
})
}
@ -242,6 +239,7 @@ impl LogEntryFetcher {
);
let (mut block_hash_sent, mut block_number_sent) = (None, None);
while let Some(maybe_log) = stream.next().await {
let start_time = Instant::now();
match maybe_log {
Ok(log) => {
let sync_progress =
@ -301,6 +299,7 @@ impl LogEntryFetcher {
tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await;
}
}
metrics::RECOVER_LOG.update_since(start_time);
}
info!("log recover end");

View File

@ -1,7 +1,13 @@
use std::sync::Arc;
use metrics::{register_timer, Timer};
use metrics::{register_timer, Gauge, GaugeUsize, Timer};
lazy_static::lazy_static! {
pub static ref STORE_PUT_TX: Arc<dyn Timer> = register_timer("log_entry_sync_store_put_tx");
pub static ref LOG_MANAGER_HANDLE_DATA_TRANSACTION: Arc<dyn Timer> = register_timer("log_manager_handle_data_transaction");
pub static ref STORE_PUT_TX: Arc<dyn Timer> = register_timer("log_entry_sync_manager_put_tx_inner");
pub static ref STORE_PUT_TX_SPEED_IN_BYTES: Arc<dyn Gauge<usize>> = GaugeUsize::register("log_entry_sync_manager_put_tx_speed_in_bytes");
pub static ref RECOVER_LOG: Arc<dyn Timer> = register_timer("log_entry_sync_manager_recover_log");
}

View File

@ -26,6 +26,7 @@ const RETRY_WAIT_MS: u64 = 500;
// Each tx has less than 10KB, so the cache size should be acceptable.
const BROADCAST_CHANNEL_CAPACITY: usize = 25000;
const CATCH_UP_END_GAP: u64 = 10;
const CHECK_ROOT_INTERVAL: u64 = 500;
/// Errors while handle data
#[derive(Error, Debug)]
@ -86,16 +87,7 @@ impl LogSyncManager {
.expect("shutdown send error")
},
async move {
let log_fetcher = LogEntryFetcher::new(
&config.rpc_endpoint_url,
config.contract_address,
config.log_page_size,
config.confirmation_block_count,
config.rate_limit_retries,
config.timeout_retries,
config.initial_backoff,
)
.await?;
let log_fetcher = LogEntryFetcher::new(&config).await?;
let data_cache = DataCache::new(config.cache_config.clone());
let block_hash_cache = Arc::new(RwLock::new(
@ -277,6 +269,9 @@ impl LogSyncManager {
.remove_finalized_block_interval_minutes,
);
// start the pad data store
log_sync_manager.store.start_padding(&executor_clone);
let (watch_progress_tx, watch_progress_rx) =
tokio::sync::mpsc::unbounded_channel();
let watch_rx = log_sync_manager.log_fetcher.start_watch(
@ -408,6 +403,7 @@ impl LogSyncManager {
}
LogFetchProgress::Transaction((tx, block_number)) => {
let mut stop = false;
let start_time = Instant::now();
match self.put_tx(tx.clone()).await {
Some(false) => stop = true,
Some(true) => {
@ -441,6 +437,8 @@ impl LogSyncManager {
// no receivers will be created.
warn!("log sync broadcast error, error={:?}", e);
}
metrics::LOG_MANAGER_HANDLE_DATA_TRANSACTION.update_since(start_time);
}
LogFetchProgress::Reverted(reverted) => {
self.process_reverted(reverted).await;
@ -453,7 +451,6 @@ impl LogSyncManager {
async fn put_tx_inner(&mut self, tx: Transaction) -> bool {
let start_time = Instant::now();
let result = self.store.put_tx(tx.clone());
metrics::STORE_PUT_TX.update_since(start_time);
if let Err(e) = result {
error!("put_tx error: e={:?}", e);
@ -509,42 +506,50 @@ impl LogSyncManager {
}
}
self.data_cache.garbage_collect(self.next_tx_seq);
self.next_tx_seq += 1;
// Check if the computed data root matches on-chain state.
// If the call fails, we won't check the root here and return `true` directly.
let flow_contract = self.log_fetcher.flow_contract();
match flow_contract
.get_flow_root_by_tx_seq(tx.seq.into())
.call()
.await
{
Ok(contract_root_bytes) => {
let contract_root = H256::from_slice(&contract_root_bytes);
// contract_root is zero for tx submitted before upgrading.
if !contract_root.is_zero() {
match self.store.get_context() {
Ok((local_root, _)) => {
if contract_root != local_root {
error!(
?contract_root,
?local_root,
"local flow root and on-chain flow root mismatch"
);
return false;
if self.next_tx_seq % CHECK_ROOT_INTERVAL == 0 {
let flow_contract = self.log_fetcher.flow_contract();
match flow_contract
.get_flow_root_by_tx_seq(tx.seq.into())
.call()
.await
{
Ok(contract_root_bytes) => {
let contract_root = H256::from_slice(&contract_root_bytes);
// contract_root is zero for tx submitted before upgrading.
if !contract_root.is_zero() {
match self.store.get_context() {
Ok((local_root, _)) => {
if contract_root != local_root {
error!(
?contract_root,
?local_root,
"local flow root and on-chain flow root mismatch"
);
return false;
}
}
Err(e) => {
warn!(?e, "fail to read the local flow root");
}
}
Err(e) => {
warn!(?e, "fail to read the local flow root");
}
}
}
}
Err(e) => {
warn!(?e, "fail to read the on-chain flow root");
Err(e) => {
warn!(?e, "fail to read the on-chain flow root");
}
}
}
metrics::STORE_PUT_TX_SPEED_IN_BYTES
.update((tx.size * 1000 / start_time.elapsed().as_micros() as u64) as usize);
metrics::STORE_PUT_TX.update_since(start_time);
true
}
}

View File

@ -5,17 +5,20 @@ use ethereum_types::Address;
use ethers::contract::ContractCall;
use ethers::contract::EthEvent;
use std::sync::Arc;
use storage::log_store::log_manager::DATA_DB_KEY;
use storage::H256;
use storage_async::Store;
const MINER_ID: &str = "mine.miner_id";
pub async fn load_miner_id(store: &Store) -> storage::error::Result<Option<H256>> {
store.get_config_decoded(&MINER_ID).await
store.get_config_decoded(&MINER_ID, DATA_DB_KEY).await
}
async fn set_miner_id(store: &Store, miner_id: &H256) -> storage::error::Result<()> {
store.set_config_encoded(&MINER_ID, miner_id).await
store
.set_config_encoded(&MINER_ID, miner_id, DATA_DB_KEY)
.await
}
pub(crate) async fn check_and_request_miner_id(

View File

@ -3,13 +3,12 @@ use crate::monitor::Monitor;
use crate::sealer::Sealer;
use crate::submitter::Submitter;
use crate::{config::MinerConfig, mine::PoraService, watcher::MineContextWatcher};
use network::NetworkMessage;
use network::NetworkSender;
use std::sync::Arc;
use std::time::Duration;
use storage::config::ShardConfig;
use storage_async::Store;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
#[derive(Clone, Debug)]
pub enum MinerMessage {
@ -29,7 +28,7 @@ pub struct MineService;
impl MineService {
pub async fn spawn(
executor: task_executor::TaskExecutor,
_network_send: mpsc::UnboundedSender<NetworkMessage>,
_network_send: NetworkSender,
config: MinerConfig,
store: Arc<Store>,
) -> Result<broadcast::Sender<MinerMessage>, String> {

View File

@ -41,6 +41,7 @@ if-addrs = "0.10.1"
slog = "2.7.0"
igd = "0.12.1"
duration-str = "0.5.1"
channel = { path = "../../common/channel" }
[dependencies.libp2p]
version = "0.45.1"

View File

@ -592,7 +592,7 @@ where
// peer that originally published the message.
match PubsubMessage::decode(&gs_msg.topic, &gs_msg.data) {
Err(e) => {
debug!(topic = ?gs_msg.topic, error = ?e, "Could not decode gossipsub message");
debug!(topic = ?gs_msg.topic, %propagation_source, error = ?e, "Could not decode gossipsub message");
//reject the message
if let Err(e) = self.gossipsub.report_message_validation_result(
&id,
@ -601,6 +601,24 @@ where
) {
warn!(message_id = %id, peer_id = %propagation_source, error = ?e, "Failed to report message validation");
}
self.peer_manager.report_peer(
&propagation_source,
PeerAction::Fatal,
ReportSource::Gossipsub,
None,
"gossipsub message decode error",
);
if let Some(source) = &gs_msg.source {
self.peer_manager.report_peer(
source,
PeerAction::Fatal,
ReportSource::Gossipsub,
None,
"gossipsub message decode error",
);
}
}
Ok(msg) => {
// Notify the network

View File

@ -157,8 +157,8 @@ impl Default for Config {
let filter_rate_limiter = Some(
discv5::RateLimiterBuilder::new()
.total_n_every(300, Duration::from_secs(1)) // Allow bursts, average 300 per second
.ip_n_every(300, Duration::from_secs(1)) // Allow bursts, average 300 per second
.node_n_every(300, Duration::from_secs(1)) // Allow bursts, average 300 per second
.ip_n_every(9, Duration::from_secs(1)) // Allow bursts, average 9 per second
.node_n_every(8, Duration::from_secs(1)) // Allow bursts, average 8 per second
.build()
.expect("The total rate limit has been specified"),
);

View File

@ -159,3 +159,10 @@ pub enum NetworkMessage {
udp_socket: Option<SocketAddr>,
},
}
pub type NetworkSender = channel::metrics::Sender<NetworkMessage>;
pub type NetworkReceiver = channel::metrics::Receiver<NetworkMessage>;
pub fn new_network_channel() -> (NetworkSender, NetworkReceiver) {
channel::metrics::unbounded_channel("network")
}

View File

@ -3,10 +3,9 @@
//! Currently supported strategies:
//! - UPnP
use crate::{NetworkConfig, NetworkMessage};
use crate::{NetworkConfig, NetworkMessage, NetworkSender};
use if_addrs::get_if_addrs;
use std::net::{IpAddr, SocketAddr, SocketAddrV4};
use tokio::sync::mpsc;
/// Configuration required to construct the UPnP port mappings.
pub struct UPnPConfig {
@ -36,10 +35,7 @@ impl UPnPConfig {
}
/// Attempts to construct external port mappings with UPnP.
pub fn construct_upnp_mappings(
config: UPnPConfig,
network_send: mpsc::UnboundedSender<NetworkMessage>,
) {
pub fn construct_upnp_mappings(config: UPnPConfig, network_send: NetworkSender) {
info!("UPnP Attempting to initialise routes");
match igd::search_gateway(Default::default()) {
Err(e) => info!(error = %e, "UPnP not available"),

View File

@ -4,7 +4,7 @@ use crate::discovery::enr;
use crate::multiaddr::Protocol;
use crate::rpc::{GoodbyeReason, RPCResponseErrorCode, ReqId};
use crate::types::{error, GossipKind};
use crate::{EnrExt, NetworkMessage};
use crate::{EnrExt, NetworkSender};
use crate::{NetworkConfig, NetworkGlobals, PeerAction, ReportSource};
use futures::prelude::*;
use libp2p::core::{
@ -21,7 +21,6 @@ use std::io::prelude::*;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY_PEER_EXCESS};
@ -60,7 +59,7 @@ pub struct Context<'a> {
impl<AppReqId: ReqId> Service<AppReqId> {
pub async fn new(
executor: task_executor::TaskExecutor,
network_sender: UnboundedSender<NetworkMessage>,
network_sender: NetworkSender,
ctx: Context<'_>,
) -> error::Result<(Arc<NetworkGlobals>, Keypair, Self)> {
trace!("Libp2p Service starting");

View File

@ -1,6 +1,7 @@
#![cfg(test)]
use libp2p::gossipsub::GossipsubConfigBuilder;
use network::new_network_channel;
use network::Enr;
use network::EnrExt;
use network::Multiaddr;
@ -22,7 +23,6 @@ pub mod swarm;
type ReqId = usize;
use tempfile::Builder as TempBuilder;
use tokio::sync::mpsc::unbounded_channel;
#[allow(unused)]
pub struct Libp2pInstance(LibP2PService<ReqId>, exit_future::Signal);
@ -72,7 +72,7 @@ pub async fn build_libp2p_instance(rt: Weak<Runtime>, boot_nodes: Vec<Enr>) -> L
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = task_executor::TaskExecutor::new(rt, exit, shutdown_tx);
let libp2p_context = network::Context { config: &config };
let (sender, _) = unbounded_channel();
let (sender, _) = new_network_channel();
Libp2pInstance(
LibP2PService::new(executor, sender, libp2p_context)
.await

View File

@ -11,7 +11,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use storage::config::{ShardConfig, SHARD_CONFIG_KEY};
use storage::log_store::log_manager::PORA_CHUNK_SIZE;
use storage::log_store::log_manager::{DATA_DB_KEY, FLOW_DB_KEY, PORA_CHUNK_SIZE};
use storage_async::Store;
use task_executor::TaskExecutor;
use tokio::sync::{broadcast, mpsc};
@ -252,7 +252,7 @@ impl Pruner {
.update_shard_config(self.config.shard_config)
.await;
self.store
.set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config)
.set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config, DATA_DB_KEY)
.await
}
@ -265,17 +265,22 @@ impl Pruner {
.set_config_encoded(
&FIRST_REWARDABLE_CHUNK_KEY,
&(new_first_rewardable_chunk, new_first_tx_seq),
FLOW_DB_KEY,
)
.await
}
}
async fn get_shard_config(store: &Store) -> Result<Option<ShardConfig>> {
store.get_config_decoded(&SHARD_CONFIG_KEY).await
store
.get_config_decoded(&SHARD_CONFIG_KEY, DATA_DB_KEY)
.await
}
async fn get_first_rewardable_chunk(store: &Store) -> Result<Option<(u64, u64)>> {
store.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY).await
store
.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY, FLOW_DB_KEY)
.await
}
#[derive(Debug)]

View File

@ -16,7 +16,7 @@ use network::{
Keypair, MessageAcceptance, MessageId, NetworkGlobals, NetworkMessage, PeerId, PeerRequestId,
PublicKey, PubsubMessage, Request, RequestId, Response,
};
use network::{Multiaddr, PeerAction, ReportSource};
use network::{Multiaddr, NetworkSender, PeerAction, ReportSource};
use shared_types::{bytes_to_chunks, timestamp_now, NetworkIdentity, TxID};
use storage::config::ShardConfig;
use storage_async::Store;
@ -88,7 +88,7 @@ pub struct Libp2pEventHandler {
/// A collection of global variables, accessible outside of the network service.
network_globals: Arc<NetworkGlobals>,
/// A channel to the router service.
network_send: mpsc::UnboundedSender<NetworkMessage>,
network_send: NetworkSender,
/// A channel to the syncing service.
sync_send: SyncSender,
/// A channel to the RPC chunk pool service.
@ -112,7 +112,7 @@ impl Libp2pEventHandler {
pub fn new(
config: Config,
network_globals: Arc<NetworkGlobals>,
network_send: mpsc::UnboundedSender<NetworkMessage>,
network_send: NetworkSender,
sync_send: SyncSender,
chunk_pool_send: UnboundedSender<ChunkPoolMessage>,
local_keypair: Keypair,
@ -1010,10 +1010,12 @@ mod tests {
use network::{
discovery::{CombinedKey, ConnectionId},
discv5::enr::EnrBuilder,
new_network_channel,
rpc::{GetChunksRequest, StatusMessage, SubstreamId},
types::FindFile,
CombinedKeyExt, Keypair, MessageAcceptance, MessageId, Multiaddr, NetworkGlobals,
NetworkMessage, PeerId, PubsubMessage, Request, RequestId, Response, SyncId,
NetworkMessage, NetworkReceiver, PeerId, PubsubMessage, Request, RequestId, Response,
SyncId,
};
use shared_types::{timestamp_now, ChunkArray, ChunkArrayWithProof, FlowRangeProof, TxID};
use storage::{
@ -1035,8 +1037,8 @@ mod tests {
runtime: TestRuntime,
network_globals: Arc<NetworkGlobals>,
keypair: Keypair,
network_send: mpsc::UnboundedSender<NetworkMessage>,
network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
network_send: NetworkSender,
network_recv: NetworkReceiver,
sync_send: SyncSender,
sync_recv: SyncReceiver,
chunk_pool_send: mpsc::UnboundedSender<ChunkPoolMessage>,
@ -1050,12 +1052,11 @@ mod tests {
fn default() -> Self {
let runtime = TestRuntime::default();
let (network_globals, keypair) = Context::new_network_globals();
let (network_send, network_recv) = mpsc::unbounded_channel();
let (network_send, network_recv) = new_network_channel();
let (sync_send, sync_recv) = channel::Channel::unbounded("test");
let (chunk_pool_send, _chunk_pool_recv) = mpsc::unbounded_channel();
let executor = runtime.task_executor.clone();
let store = LogManager::memorydb(LogConfig::default(), executor).unwrap();
let store = LogManager::memorydb(LogConfig::default()).unwrap();
Self {
runtime,
network_globals: Arc::new(network_globals),

View File

@ -5,11 +5,9 @@ use chunk_pool::ChunkPoolMessage;
use file_location_cache::FileLocationCache;
use futures::{channel::mpsc::Sender, prelude::*};
use miner::MinerMessage;
use network::types::NewFile;
use network::PubsubMessage;
use network::{
BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage, RequestId,
Service as LibP2PService, Swarm,
types::NewFile, BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage,
NetworkReceiver, NetworkSender, PubsubMessage, RequestId, Service as LibP2PService, Swarm,
};
use pruner::PrunerMessage;
use shared_types::timestamp_now;
@ -33,7 +31,7 @@ pub struct RouterService {
network_globals: Arc<NetworkGlobals>,
/// The receiver channel for Zgs to communicate with the network service.
network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
network_recv: NetworkReceiver,
/// The receiver channel for Zgs to communicate with the pruner service.
pruner_recv: Option<mpsc::UnboundedReceiver<PrunerMessage>>,
@ -57,8 +55,8 @@ impl RouterService {
executor: task_executor::TaskExecutor,
libp2p: LibP2PService<RequestId>,
network_globals: Arc<NetworkGlobals>,
network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
network_send: mpsc::UnboundedSender<NetworkMessage>,
network_recv: NetworkReceiver,
network_send: NetworkSender,
sync_send: SyncSender,
_miner_send: Option<broadcast::Sender<MinerMessage>>,
chunk_pool_send: UnboundedSender<ChunkPoolMessage>,

View File

@ -17,15 +17,13 @@ use file_location_cache::FileLocationCache;
use futures::channel::mpsc::Sender;
use jsonrpsee::core::RpcResult;
use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle};
use network::NetworkGlobals;
use network::NetworkMessage;
use network::{NetworkGlobals, NetworkMessage, NetworkSender};
use std::error::Error;
use std::sync::Arc;
use storage_async::Store;
use sync::{SyncRequest, SyncResponse, SyncSender};
use task_executor::ShutdownReason;
use tokio::sync::broadcast;
use tokio::sync::mpsc::UnboundedSender;
use zgs::RpcServer as ZgsRpcServer;
use zgs_miner::MinerMessage;
@ -42,7 +40,7 @@ pub struct Context {
pub config: RPCConfig,
pub file_location_cache: Arc<FileLocationCache>,
pub network_globals: Arc<NetworkGlobals>,
pub network_send: UnboundedSender<NetworkMessage>,
pub network_send: NetworkSender,
pub sync_send: SyncSender,
pub chunk_pool: Arc<MemoryChunkPool>,
pub log_store: Arc<Store>,

View File

@ -53,6 +53,8 @@ pub struct FileInfo {
pub finalized: bool,
pub is_cached: bool,
pub uploaded_seg_num: usize,
/// Whether file is pruned, in which case `finalized` will be `false`.
pub pruned: bool,
}
#[derive(Debug, Serialize, Deserialize)]

View File

@ -8,6 +8,7 @@ use jsonrpsee::core::RpcResult;
use shared_types::{DataRoot, FlowProof, Transaction, TxSeqOrRoot, CHUNK_SIZE};
use std::fmt::{Debug, Formatter, Result};
use storage::config::ShardConfig;
use storage::log_store::tx_store::TxStatus;
use storage::{try_option, H256};
pub struct RpcServerImpl {
@ -245,7 +246,17 @@ impl RpcServerImpl {
}
async fn get_file_info_by_tx(&self, tx: Transaction) -> RpcResult<FileInfo> {
let finalized = self.ctx.log_store.check_tx_completed(tx.seq).await?;
let (finalized, pruned) = match self
.ctx
.log_store
.get_store()
.get_tx_status(TxSeqOrRoot::TxSeq(tx.seq))?
{
Some(TxStatus::Finalized) => (true, false),
Some(TxStatus::Pruned) => (false, true),
None => (false, false),
};
let (uploaded_seg_num, is_cached) = match self
.ctx
.chunk_pool
@ -254,7 +265,7 @@ impl RpcServerImpl {
{
Some(v) => v,
_ => (
if finalized {
if finalized || pruned {
let chunks_per_segment = self.ctx.config.chunks_per_segment;
let (num_segments, _) = SegmentWithProof::split_file_into_segments(
tx.size as usize,
@ -273,6 +284,7 @@ impl RpcServerImpl {
finalized,
is_cached,
uploaded_seg_num,
pruned,
})
}

View File

@ -4,8 +4,8 @@ use file_location_cache::FileLocationCache;
use log_entry_sync::{LogSyncConfig, LogSyncEvent, LogSyncManager};
use miner::{MineService, MinerConfig, MinerMessage, ShardConfig};
use network::{
self, Keypair, NetworkConfig, NetworkGlobals, NetworkMessage, RequestId,
Service as LibP2PService,
self, new_network_channel, Keypair, NetworkConfig, NetworkGlobals, NetworkReceiver,
NetworkSender, RequestId, Service as LibP2PService,
};
use pruner::{Pruner, PrunerConfig, PrunerMessage};
use router::RouterService;
@ -27,15 +27,12 @@ macro_rules! require {
}
struct NetworkComponents {
send: mpsc::UnboundedSender<NetworkMessage>,
send: NetworkSender,
globals: Arc<NetworkGlobals>,
keypair: Keypair,
// note: these will be owned by the router service
owned: Option<(
LibP2PService<RequestId>,
mpsc::UnboundedReceiver<NetworkMessage>,
)>,
owned: Option<(LibP2PService<RequestId>, NetworkReceiver)>,
}
struct SyncComponents {
@ -89,10 +86,9 @@ impl ClientBuilder {
/// Initializes in-memory storage.
pub fn with_memory_store(mut self) -> Result<Self, String> {
let executor = require!("sync", self, runtime_context).clone().executor;
// TODO(zz): Set config.
let store = Arc::new(
LogManager::memorydb(LogConfig::default(), executor)
LogManager::memorydb(LogConfig::default())
.map_err(|e| format!("Unable to start in-memory store: {:?}", e))?,
);
@ -110,13 +106,11 @@ impl ClientBuilder {
/// Initializes RocksDB storage.
pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
let executor = require!("sync", self, runtime_context).clone().executor;
let store = Arc::new(
LogManager::rocksdb(
config.log_config.clone(),
config.db_dir.join("flow_db"),
config.db_dir.join("data_db"),
executor,
)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
);
@ -147,7 +141,7 @@ impl ClientBuilder {
let service_context = network::Context { config };
// construct communication channel
let (send, recv) = mpsc::unbounded_channel::<NetworkMessage>();
let (send, recv) = new_network_channel();
// launch libp2p service
let (globals, keypair, libp2p) =

View File

@ -146,6 +146,7 @@ impl ZgsConfig {
self.remove_finalized_block_interval_minutes,
self.watch_loop_wait_time_ms,
self.force_log_sync_from_start_block_number,
Duration::from_secs(self.blockchain_rpc_timeout_secs),
))
}

View File

@ -49,6 +49,8 @@ build_config! {
(remove_finalized_block_interval_minutes, (u64), 30)
(watch_loop_wait_time_ms, (u64), 500)
(blockchain_rpc_timeout_secs, (u64), 120)
// chunk pool
(chunk_pool_write_window_size, (usize), 4)
(chunk_pool_max_cached_chunks_all, (usize), 4*1024*1024) // 1G

View File

@ -10,4 +10,5 @@ storage = { path = "../storage" }
task_executor = { path = "../../common/task_executor" }
tokio = { version = "1.19.2", features = ["sync"] }
tracing = "0.1.35"
eth2_ssz = "0.4.0"
eth2_ssz = "0.4.0"
backtrace = "0.3"

View File

@ -2,6 +2,7 @@
extern crate tracing;
use anyhow::bail;
use backtrace::Backtrace;
use shared_types::{
Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, FlowRangeProof, Transaction,
};
@ -74,9 +75,11 @@ impl Store {
pub async fn get_config_decoded<K: AsRef<[u8]> + Send + Sync, T: Decode + Send + 'static>(
&self,
key: &K,
dest: &str,
) -> Result<Option<T>> {
let key = key.as_ref().to_vec();
self.spawn(move |store| store.get_config_decoded(&key))
let dest = dest.to_string();
self.spawn(move |store| store.get_config_decoded(&key, &dest))
.await
}
@ -84,10 +87,12 @@ impl Store {
&self,
key: &K,
value: &T,
dest: &str,
) -> anyhow::Result<()> {
let key = key.as_ref().to_vec();
let value = value.as_ssz_bytes();
self.spawn(move |store| store.set_config(&key, &value))
let dest = dest.to_string();
self.spawn(move |store| store.set_config(&key, &value, &dest))
.await
}
@ -135,6 +140,9 @@ impl Store {
{
let store = self.store.clone();
let (tx, rx) = oneshot::channel();
let mut backtrace = Backtrace::new();
let frames = backtrace.frames().to_vec();
backtrace = frames.into();
self.executor.spawn_blocking(
move || {
@ -142,6 +150,7 @@ impl Store {
let res = f(&*store);
if tx.send(res).is_err() {
warn!("Backtrace: {:?}", backtrace);
error!("Unable to complete async storage operation: the receiver dropped");
}
},

View File

@ -31,6 +31,8 @@ parking_lot = "0.12.3"
serde_json = "1.0.127"
tokio = { version = "1.38.0", features = ["full"] }
task_executor = { path = "../../common/task_executor" }
lazy_static = "1.4.0"
metrics = { workspace = true }
once_cell = { version = "1.19.0", features = [] }
[dev-dependencies]

View File

@ -14,25 +14,16 @@ use storage::{
},
LogManager,
};
use task_executor::test_utils::TestRuntime;
fn write_performance(c: &mut Criterion) {
if Path::new("db_write").exists() {
fs::remove_dir_all("db_write").unwrap();
}
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
LogManager::rocksdb(
LogConfig::default(),
"db_flow_write",
"db_data_write",
executor,
)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
.unwrap(),
LogManager::rocksdb(LogConfig::default(), "db_flow_write", "db_data_write")
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
.unwrap(),
));
let chunk_count = 2048;
@ -114,19 +105,10 @@ fn read_performance(c: &mut Criterion) {
fs::remove_dir_all("db_read").unwrap();
}
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
LogManager::rocksdb(
LogConfig::default(),
"db_flow_read",
"db_data_read",
executor,
)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
.unwrap(),
LogManager::rocksdb(LogConfig::default(), "db_flow_read", "db_data_read")
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
.unwrap(),
));
let tx_size = 1000;

View File

@ -2,16 +2,55 @@ use anyhow::{anyhow, Result};
use kvdb::{DBKey, DBOp};
use ssz::{Decode, Encode};
use crate::log_store::log_manager::{COL_MISC, DATA_DB_KEY, FLOW_DB_KEY};
use crate::LogManager;
use super::log_manager::COL_MISC;
macro_rules! db_operation {
($self:expr, $dest:expr, get, $key:expr) => {{
let db = match $dest {
DATA_DB_KEY => &$self.data_db,
FLOW_DB_KEY => &$self.flow_db,
_ => return Err(anyhow!("Invalid destination")),
};
Ok(db.get(COL_MISC, $key)?)
}};
($self:expr, $dest:expr, put, $key:expr, $value:expr) => {{
let db = match $dest {
DATA_DB_KEY => &$self.data_db,
FLOW_DB_KEY => &$self.flow_db,
_ => return Err(anyhow!("Invalid destination")),
};
Ok(db.put(COL_MISC, $key, $value)?)
}};
($self:expr, $dest:expr, delete, $key:expr) => {{
let db = match $dest {
DATA_DB_KEY => &$self.data_db,
FLOW_DB_KEY => &$self.flow_db,
_ => return Err(anyhow!("Invalid destination")),
};
Ok(db.delete(COL_MISC, $key)?)
}};
($self:expr, $dest:expr, transaction, $tx:expr) => {{
let db = match $dest {
DATA_DB_KEY => &$self.data_db,
FLOW_DB_KEY => &$self.flow_db,
_ => return Err(anyhow!("Invalid destination")),
};
let mut db_tx = db.transaction();
db_tx.ops = $tx.ops;
Ok(db.write(db_tx)?)
}};
}
pub trait Configurable {
fn get_config(&self, key: &[u8]) -> Result<Option<Vec<u8>>>;
fn set_config(&self, key: &[u8], value: &[u8]) -> Result<()>;
fn remove_config(&self, key: &[u8]) -> Result<()>;
fn get_config(&self, key: &[u8], dest: &str) -> Result<Option<Vec<u8>>>;
fn set_config(&self, key: &[u8], value: &[u8], dest: &str) -> Result<()>;
fn remove_config(&self, key: &[u8], dest: &str) -> Result<()>;
fn exec_configs(&self, tx: ConfigTx) -> Result<()>;
fn exec_configs(&self, tx: ConfigTx, dest: &str) -> Result<()>;
}
#[derive(Default)]
@ -41,8 +80,12 @@ impl ConfigTx {
}
pub trait ConfigurableExt: Configurable {
fn get_config_decoded<K: AsRef<[u8]>, T: Decode>(&self, key: &K) -> Result<Option<T>> {
match self.get_config(key.as_ref())? {
fn get_config_decoded<K: AsRef<[u8]>, T: Decode>(
&self,
key: &K,
dest: &str,
) -> Result<Option<T>> {
match self.get_config(key.as_ref(), dest)? {
Some(val) => Ok(Some(
T::from_ssz_bytes(&val).map_err(|e| anyhow!("SSZ decode error: {:?}", e))?,
)),
@ -50,36 +93,36 @@ pub trait ConfigurableExt: Configurable {
}
}
fn set_config_encoded<K: AsRef<[u8]>, T: Encode>(&self, key: &K, value: &T) -> Result<()> {
self.set_config(key.as_ref(), &value.as_ssz_bytes())
fn set_config_encoded<K: AsRef<[u8]>, T: Encode>(
&self,
key: &K,
value: &T,
dest: &str,
) -> Result<()> {
self.set_config(key.as_ref(), &value.as_ssz_bytes(), dest)
}
fn remove_config_by_key<K: AsRef<[u8]>>(&self, key: &K) -> Result<()> {
self.remove_config(key.as_ref())
fn remove_config_by_key<K: AsRef<[u8]>>(&self, key: &K, dest: &str) -> Result<()> {
self.remove_config(key.as_ref(), dest)
}
}
impl<T: ?Sized + Configurable> ConfigurableExt for T {}
impl Configurable for LogManager {
fn get_config(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(self.flow_db.get(COL_MISC, key)?)
fn get_config(&self, key: &[u8], dest: &str) -> Result<Option<Vec<u8>>> {
db_operation!(self, dest, get, key)
}
fn set_config(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.flow_db.put(COL_MISC, key, value)?;
Ok(())
fn set_config(&self, key: &[u8], value: &[u8], dest: &str) -> Result<()> {
db_operation!(self, dest, put, key, value)
}
fn remove_config(&self, key: &[u8]) -> Result<()> {
Ok(self.flow_db.delete(COL_MISC, key)?)
fn remove_config(&self, key: &[u8], dest: &str) -> Result<()> {
db_operation!(self, dest, delete, key)
}
fn exec_configs(&self, tx: ConfigTx) -> Result<()> {
let mut db_tx = self.flow_db.transaction();
db_tx.ops = tx.ops;
self.flow_db.write(db_tx)?;
Ok(())
fn exec_configs(&self, tx: ConfigTx, dest: &str) -> Result<()> {
db_operation!(self, dest, transaction, tx)
}
}

View File

@ -1,12 +1,14 @@
use super::load_chunk::EntryBatch;
use super::seal_task_manager::SealTaskManager;
use super::{MineLoadChunk, SealAnswer, SealTask};
use crate::config::ShardConfig;
use crate::error::Error;
use crate::log_store::load_chunk::EntryBatch;
use crate::log_store::log_manager::{
bytes_to_entries, COL_ENTRY_BATCH, COL_FLOW_MPT_NODES, PORA_CHUNK_SIZE,
bytes_to_entries, COL_ENTRY_BATCH, COL_FLOW_MPT_NODES, COL_PAD_DATA_LIST,
COL_PAD_DATA_SYNC_HEIGH, PORA_CHUNK_SIZE,
};
use crate::log_store::seal_task_manager::SealTaskManager;
use crate::log_store::{
metrics, FlowRead, FlowSeal, FlowWrite, MineLoadChunk, SealAnswer, SealTask,
};
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
use crate::{try_option, ZgsKeyValueDB};
use any::Any;
use anyhow::{anyhow, bail, Result};
@ -20,19 +22,22 @@ use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Instant;
use std::{any, cmp};
use tracing::{debug, error, trace};
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
pub struct FlowStore {
flow_db: Arc<FlowDBStore>,
data_db: Arc<FlowDBStore>,
seal_manager: SealTaskManager,
config: FlowConfig,
}
impl FlowStore {
pub fn new(data_db: Arc<FlowDBStore>, config: FlowConfig) -> Self {
pub fn new(flow_db: Arc<FlowDBStore>, data_db: Arc<FlowDBStore>, config: FlowConfig) -> Self {
Self {
flow_db,
data_db,
seal_manager: Default::default(),
config,
@ -44,6 +49,7 @@ impl FlowStore {
batch_index: usize,
subtree_list: Vec<(usize, usize, DataRoot)>,
) -> Result<()> {
let start_time = Instant::now();
let mut batch = self
.data_db
.get_entry_batch(batch_index as u64)?
@ -51,7 +57,7 @@ impl FlowStore {
batch.set_subtree_list(subtree_list);
self.data_db
.put_entry_raw(vec![(batch_index as u64, batch)])?;
metrics::INSERT_SUBTREE_LIST.update_since(start_time);
Ok(())
}
@ -199,12 +205,21 @@ impl FlowRead for FlowStore {
fn get_shard_config(&self) -> ShardConfig {
*self.config.shard_config.read()
}
fn get_pad_data(&self, start_index: u64) -> crate::error::Result<Option<Vec<PadPair>>> {
self.flow_db.get_pad_data(start_index)
}
fn get_pad_data_sync_height(&self) -> Result<Option<u64>> {
self.data_db.get_pad_data_sync_height()
}
}
impl FlowWrite for FlowStore {
/// Return the roots of completed chunks. The order is guaranteed to be increasing
/// by chunk index.
fn append_entries(&self, data: ChunkArray) -> Result<Vec<(u64, DataRoot)>> {
let start_time = Instant::now();
let mut to_seal_set = self.seal_manager.to_seal_set.write();
trace!("append_entries: {} {}", data.start_index, data.data.len());
if data.data.len() % BYTES_PER_SECTOR != 0 {
@ -247,6 +262,8 @@ impl FlowWrite for FlowStore {
batch_list.push((chunk_index, batch));
}
metrics::APPEND_ENTRIES.update_since(start_time);
self.data_db.put_entry_batch_list(batch_list)
}
@ -266,6 +283,14 @@ impl FlowWrite for FlowStore {
fn update_shard_config(&self, shard_config: ShardConfig) {
*self.config.shard_config.write() = shard_config;
}
fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> crate::error::Result<()> {
self.flow_db.put_pad_data(data_sizes, tx_seq)
}
fn put_pad_data_sync_height(&self, sync_index: u64) -> crate::error::Result<()> {
self.data_db.put_pad_data_sync_height(sync_index)
}
}
impl FlowSeal for FlowStore {
@ -343,6 +368,12 @@ impl FlowSeal for FlowStore {
}
}
#[derive(Debug, PartialEq, DeriveEncode, DeriveDecode)]
pub struct PadPair {
pub start_index: u64,
pub data_size: u64,
}
pub struct FlowDBStore {
kvdb: Arc<dyn ZgsKeyValueDB>,
}
@ -356,6 +387,7 @@ impl FlowDBStore {
&self,
batch_list: Vec<(u64, EntryBatch)>,
) -> Result<Vec<(u64, DataRoot)>> {
let start_time = Instant::now();
let mut completed_batches = Vec::new();
let mut tx = self.kvdb.transaction();
for (batch_index, batch) in batch_list {
@ -370,6 +402,7 @@ impl FlowDBStore {
}
}
self.kvdb.write(tx)?;
metrics::PUT_ENTRY_BATCH_LIST.update_since(start_time);
Ok(completed_batches)
}
@ -443,6 +476,48 @@ impl FlowDBStore {
}
Ok(self.kvdb.write(tx)?)
}
fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> Result<()> {
let mut tx = self.kvdb.transaction();
let mut buffer = Vec::new();
for item in data_sizes {
buffer.extend(item.as_ssz_bytes());
}
tx.put(COL_PAD_DATA_LIST, &tx_seq.to_be_bytes(), &buffer);
self.kvdb.write(tx)?;
Ok(())
}
fn put_pad_data_sync_height(&self, tx_seq: u64) -> Result<()> {
let mut tx = self.kvdb.transaction();
tx.put(
COL_PAD_DATA_SYNC_HEIGH,
b"sync_height",
&tx_seq.to_be_bytes(),
);
self.kvdb.write(tx)?;
Ok(())
}
fn get_pad_data_sync_height(&self) -> Result<Option<u64>> {
match self.kvdb.get(COL_PAD_DATA_SYNC_HEIGH, b"sync_height")? {
Some(v) => Ok(Some(u64::from_be_bytes(
v.try_into().map_err(|e| anyhow!("{:?}", e))?,
))),
None => Ok(None),
}
}
fn get_pad_data(&self, tx_seq: u64) -> Result<Option<Vec<PadPair>>> {
match self.kvdb.get(COL_PAD_DATA_LIST, &tx_seq.to_be_bytes())? {
Some(v) => Ok(Some(
Vec::<PadPair>::from_ssz_bytes(&v).map_err(Error::from)?,
)),
None => Ok(None),
}
}
}
#[derive(DeriveEncode, DeriveDecode, Clone, Debug)]

View File

@ -1,10 +1,11 @@
use super::tx_store::BlockHashAndSubmissionIndex;
use super::{FlowSeal, MineLoadChunk, SealAnswer, SealTask};
use crate::config::ShardConfig;
use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore};
use crate::log_store::tx_store::TransactionStore;
use crate::log_store::flow_store::{
batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore, PadPair,
};
use crate::log_store::tx_store::{BlockHashAndSubmissionIndex, TransactionStore, TxStatus};
use crate::log_store::{
FlowRead, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite,
FlowRead, FlowSeal, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead,
LogStoreWrite, MineLoadChunk, SealAnswer, SealTask,
};
use crate::{try_option, ZgsKeyValueDB};
use anyhow::{anyhow, bail, Result};
@ -20,30 +21,38 @@ use rayon::prelude::ParallelSlice;
use shared_types::{
bytes_to_chunks, compute_padded_chunk_size, compute_segment_size, Chunk, ChunkArray,
ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof, Merkle, Transaction,
TxSeqOrRoot,
};
use std::cmp::Ordering;
use std::path::Path;
use std::sync::mpsc;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::{debug, error, info, instrument, trace, warn};
use crate::log_store::metrics;
/// 256 Bytes
pub const ENTRY_SIZE: usize = 256;
/// 1024 Entries.
pub const PORA_CHUNK_SIZE: usize = 1024;
pub const COL_TX: u32 = 0;
pub const COL_ENTRY_BATCH: u32 = 1;
pub const COL_TX_DATA_ROOT_INDEX: u32 = 2;
pub const COL_ENTRY_BATCH_ROOT: u32 = 3;
pub const COL_TX_COMPLETED: u32 = 4;
pub const COL_MISC: u32 = 5;
pub const COL_SEAL_CONTEXT: u32 = 6;
pub const COL_FLOW_MPT_NODES: u32 = 7;
pub const COL_BLOCK_PROGRESS: u32 = 8;
pub const COL_TX: u32 = 0; // flow db
pub const COL_ENTRY_BATCH: u32 = 1; // data db
pub const COL_TX_DATA_ROOT_INDEX: u32 = 2; // flow db
pub const COL_TX_COMPLETED: u32 = 3; // data db
pub const COL_MISC: u32 = 4; // flow db & data db
pub const COL_FLOW_MPT_NODES: u32 = 5; // flow db
pub const COL_BLOCK_PROGRESS: u32 = 6; // flow db
pub const COL_PAD_DATA_LIST: u32 = 7; // flow db
pub const COL_PAD_DATA_SYNC_HEIGH: u32 = 8; // data db
pub const COL_NUM: u32 = 9;
pub const DATA_DB_KEY: &str = "data_db";
pub const FLOW_DB_KEY: &str = "flow_db";
const PAD_DELAY: Duration = Duration::from_secs(2);
// Process at most 1M entries (256MB) pad data at a time.
const PAD_MAX_SIZE: usize = 1 << 20;
@ -62,10 +71,10 @@ pub struct UpdateFlowMessage {
pub struct LogManager {
pub(crate) flow_db: Arc<dyn ZgsKeyValueDB>,
pub(crate) data_db: Arc<dyn ZgsKeyValueDB>,
tx_store: TransactionStore,
flow_store: Arc<FlowStore>,
merkle: RwLock<MerkleManager>,
sender: mpsc::Sender<UpdateFlowMessage>,
}
struct MerkleManager {
@ -188,6 +197,7 @@ impl LogStoreChunkWrite for LogManager {
chunks: ChunkArray,
maybe_file_proof: Option<FlowProof>,
) -> Result<bool> {
let start_time = Instant::now();
let mut merkle = self.merkle.write();
let tx = self
.tx_store
@ -219,6 +229,7 @@ impl LogStoreChunkWrite for LogManager {
tx.start_entry_index,
)?;
}
metrics::PUT_CHUNKS.update_since(start_time);
Ok(true)
}
@ -249,6 +260,7 @@ impl LogStoreWrite for LogManager {
/// `put_tx` for the last tx when we restart the node to ensure that it succeeds.
///
fn put_tx(&self, tx: Transaction) -> Result<()> {
let start_time = Instant::now();
let mut merkle = self.merkle.write();
debug!("put_tx: tx={:?}", tx);
let expected_seq = self.tx_store.next_tx_seq();
@ -264,7 +276,12 @@ impl LogStoreWrite for LogManager {
}
let maybe_same_data_tx_seq = self.tx_store.put_tx(tx.clone())?.first().cloned();
// TODO(zz): Should we validate received tx?
self.append_subtree_list(tx.start_entry_index, tx.merkle_nodes.clone(), &mut merkle)?;
self.append_subtree_list(
tx.seq,
tx.start_entry_index,
tx.merkle_nodes.clone(),
&mut merkle,
)?;
merkle.commit_merkle(tx.seq)?;
debug!(
"commit flow root: root={:?}",
@ -278,6 +295,7 @@ impl LogStoreWrite for LogManager {
self.copy_tx_and_finalize(old_tx_seq, vec![tx.seq])?;
}
}
metrics::PUT_TX.update_since(start_time);
Ok(())
}
@ -308,6 +326,7 @@ impl LogStoreWrite for LogManager {
}
fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> crate::error::Result<bool> {
let start_time = Instant::now();
trace!(
"finalize_tx_with_hash: tx_seq={} tx_hash={:?}",
tx_seq,
@ -336,6 +355,7 @@ impl LogStoreWrite for LogManager {
if same_root_seq_list.first() == Some(&tx_seq) {
self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?;
}
metrics::FINALIZE_TX_WITH_HASH.update_since(start_time);
Ok(true)
} else {
bail!("finalize tx hash with data missing: tx_seq={}", tx_seq)
@ -401,6 +421,42 @@ impl LogStoreWrite for LogManager {
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()> {
self.flow_store.submit_seal_result(answers)
}
fn start_padding(&self, executor: &task_executor::TaskExecutor) {
let store = self.flow_store.clone();
executor.spawn(
async move {
let current_height = store.get_pad_data_sync_height().unwrap();
let mut start_index = current_height.unwrap_or(0);
loop {
match store.get_pad_data(start_index) {
std::result::Result::Ok(data) => {
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known.
if let Some(data) = data {
for pad in data {
store
.append_entries(ChunkArray {
data: vec![0; pad.data_size as usize],
start_index: pad.start_index,
})
.unwrap();
}
};
store.put_pad_data_sync_height(start_index).unwrap();
start_index += 1;
}
std::result::Result::Err(_) => {
debug!("Unable to get pad data, start_index={}", start_index);
tokio::time::sleep(PAD_DELAY).await;
}
};
}
},
"pad_tx",
);
}
}
impl LogStoreChunkRead for LogManager {
@ -526,6 +582,17 @@ impl LogStoreRead for LogManager {
}))
}
fn get_tx_status(&self, tx_seq_or_data_root: TxSeqOrRoot) -> Result<Option<TxStatus>> {
let tx_seq = match tx_seq_or_data_root {
TxSeqOrRoot::TxSeq(v) => v,
TxSeqOrRoot::Root(root) => {
try_option!(self.tx_store.get_first_tx_seq_by_data_root(&root)?)
}
};
self.tx_store.get_tx_status(tx_seq)
}
fn check_tx_completed(&self, tx_seq: u64) -> crate::error::Result<bool> {
self.tx_store.check_tx_completed(tx_seq)
}
@ -614,31 +681,33 @@ impl LogManager {
config: LogConfig,
flow_path: impl AsRef<Path>,
data_path: impl AsRef<Path>,
executor: task_executor::TaskExecutor,
) -> Result<Self> {
let mut db_config = DatabaseConfig::with_columns(COL_NUM);
db_config.enable_statistics = true;
let flow_db_source = Arc::new(Database::open(&db_config, flow_path)?);
let data_db_source = Arc::new(Database::open(&db_config, data_path)?);
Self::new(flow_db_source, data_db_source, config, executor)
Self::new(flow_db_source, data_db_source, config)
}
pub fn memorydb(config: LogConfig, executor: task_executor::TaskExecutor) -> Result<Self> {
pub fn memorydb(config: LogConfig) -> Result<Self> {
let flow_db = Arc::new(kvdb_memorydb::create(COL_NUM));
let data_db = Arc::new(kvdb_memorydb::create(COL_NUM));
Self::new(flow_db, data_db, config, executor)
Self::new(flow_db, data_db, config)
}
fn new(
flow_db_source: Arc<dyn ZgsKeyValueDB>,
data_db_source: Arc<dyn ZgsKeyValueDB>,
config: LogConfig,
executor: task_executor::TaskExecutor,
) -> Result<Self> {
let tx_store = TransactionStore::new(flow_db_source.clone())?;
let tx_store = TransactionStore::new(data_db_source.clone())?;
let flow_db = Arc::new(FlowDBStore::new(flow_db_source.clone()));
let data_db = Arc::new(FlowDBStore::new(data_db_source.clone()));
let flow_store = Arc::new(FlowStore::new(data_db.clone(), config.flow.clone()));
let flow_store = Arc::new(FlowStore::new(
flow_db.clone(),
data_db.clone(),
config.flow.clone(),
));
// If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle`
// first and call `put_tx` later.
let next_tx_seq = tx_store.next_tx_seq();
@ -739,18 +808,14 @@ impl LogManager {
last_chunk_merkle,
});
let (sender, receiver) = mpsc::channel();
let mut log_manager = Self {
let log_manager = Self {
flow_db: flow_db_source,
data_db: data_db_source,
tx_store,
flow_store,
merkle,
sender,
};
log_manager.start_receiver(receiver, executor);
if let Some(tx) = last_tx_to_insert {
log_manager.put_tx(tx)?;
}
@ -765,40 +830,6 @@ impl LogManager {
Ok(log_manager)
}
fn start_receiver(
&mut self,
rx: mpsc::Receiver<UpdateFlowMessage>,
executor: task_executor::TaskExecutor,
) {
let flow_store = self.flow_store.clone();
executor.spawn(
async move {
loop {
match rx.recv() {
std::result::Result::Ok(data) => {
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known.
flow_store
.append_entries(ChunkArray {
data: vec![0; data.pad_data],
start_index: data.tx_start_flow_index,
})
.unwrap();
}
std::result::Result::Err(_) => {
debug!("Log manager inner channel closed");
break;
}
};
}
},
"pad_tx",
);
// Wait for the spawned thread to finish
// let _ = handle.join().expect("Thread panicked");
}
fn gen_proof(&self, flow_index: u64, maybe_root: Option<DataRoot>) -> Result<FlowProof> {
match maybe_root {
None => self.gen_proof_at_version(flow_index, None),
@ -854,6 +885,7 @@ impl LogManager {
#[instrument(skip(self, merkle))]
fn append_subtree_list(
&self,
tx_seq: u64,
tx_start_index: u64,
merkle_list: Vec<(usize, DataRoot)>,
merkle: &mut MerkleManager,
@ -861,8 +893,9 @@ impl LogManager {
if merkle_list.is_empty() {
return Ok(());
}
let start_time = Instant::now();
self.pad_tx(tx_start_index, &mut *merkle)?;
self.pad_tx(tx_seq, tx_start_index, &mut *merkle)?;
for (subtree_depth, subtree_root) in merkle_list {
let subtree_size = 1 << (subtree_depth - 1);
@ -896,12 +929,15 @@ impl LogManager {
.append_subtree(subtree_depth - log2_pow2(PORA_CHUNK_SIZE), subtree_root)?;
}
}
metrics::APPEND_SUBTREE_LIST.update_since(start_time);
Ok(())
}
#[instrument(skip(self, merkle))]
fn pad_tx(&self, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> {
fn pad_tx(&self, tx_seq: u64, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> {
// Check if we need to pad the flow.
let start_time = Instant::now();
let mut tx_start_flow_index =
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64;
let pad_size = tx_start_index - tx_start_flow_index;
@ -910,6 +946,7 @@ impl LogManager {
merkle.pora_chunks_merkle.leaves(),
merkle.last_chunk_merkle.leaves()
);
let mut pad_list = vec![];
if pad_size != 0 {
for pad_data in Self::padding(pad_size as usize) {
let mut is_full_empty = true;
@ -954,10 +991,10 @@ impl LogManager {
let data_size = pad_data.len() / ENTRY_SIZE;
if is_full_empty {
self.sender.send(UpdateFlowMessage {
pad_data: pad_data.len(),
tx_start_flow_index,
})?;
pad_list.push(PadPair {
data_size: pad_data.len() as u64,
start_index: tx_start_flow_index,
});
} else {
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
@ -979,6 +1016,10 @@ impl LogManager {
merkle.pora_chunks_merkle.leaves(),
merkle.last_chunk_merkle.leaves()
);
self.flow_store.put_pad_data(&pad_list, tx_seq)?;
metrics::PAD_TX.update_since(start_time);
Ok(())
}
@ -1107,6 +1148,8 @@ impl LogManager {
}
fn copy_tx_and_finalize(&self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> {
let start_time = Instant::now();
let mut merkle = self.merkle.write();
let shard_config = self.flow_store.get_shard_config();
// We have all the data need for this tx, so just copy them.
@ -1155,6 +1198,8 @@ impl LogManager {
for (seq, _) in to_tx_offset_list {
self.tx_store.finalize_tx(seq)?;
}
metrics::COPY_TX_AND_FINALIZE.update_since(start_time);
Ok(())
}
@ -1227,6 +1272,7 @@ pub fn sub_merkle_tree(leaf_data: &[u8]) -> Result<FileMerkleTree> {
}
pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result<Vec<H256>> {
let start_time = Instant::now();
if leaf_data.len() % ENTRY_SIZE != 0 {
bail!("merkle_tree: mismatched data size");
}
@ -1242,6 +1288,9 @@ pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result<Vec<H256>> {
.map(Sha3Algorithm::leaf)
.collect()
};
metrics::DATA_TO_MERKLE_LEAVES_SIZE.update(leaf_data.len());
metrics::DATA_TO_MERKLE_LEAVES.update_since(start_time);
Ok(r)
}

View File

@ -0,0 +1,41 @@
use std::sync::Arc;
use metrics::{register_timer, Gauge, GaugeUsize, Timer};
lazy_static::lazy_static! {
pub static ref PUT_TX: Arc<dyn Timer> = register_timer("log_store_put_tx");
pub static ref PUT_CHUNKS: Arc<dyn Timer> = register_timer("log_store_put_chunks");
pub static ref TX_STORE_PUT: Arc<dyn Timer> = register_timer("log_store_tx_store_put_tx");
pub static ref CHECK_TX_COMPLETED: Arc<dyn Timer> =
register_timer("log_store_log_manager_check_tx_completed");
pub static ref APPEND_SUBTREE_LIST: Arc<dyn Timer> =
register_timer("log_store_log_manager_append_subtree_list");
pub static ref DATA_TO_MERKLE_LEAVES: Arc<dyn Timer> =
register_timer("log_store_log_manager_data_to_merkle_leaves");
pub static ref COPY_TX_AND_FINALIZE: Arc<dyn Timer> =
register_timer("log_store_log_manager_copy_tx_and_finalize");
pub static ref PAD_TX: Arc<dyn Timer> = register_timer("log_store_log_manager_pad_tx");
pub static ref PUT_BATCH_ROOT_LIST: Arc<dyn Timer> = register_timer("log_store_flow_store_put_batch_root_list");
pub static ref INSERT_SUBTREE_LIST: Arc<dyn Timer> =
register_timer("log_store_flow_store_insert_subtree_list");
pub static ref PUT_MPT_NODE: Arc<dyn Timer> = register_timer("log_store_flow_store_put_mpt_node");
pub static ref PUT_ENTRY_BATCH_LIST: Arc<dyn Timer> =
register_timer("log_store_flow_store_put_entry_batch_list");
pub static ref APPEND_ENTRIES: Arc<dyn Timer> = register_timer("log_store_flow_store_append_entries");
pub static ref FINALIZE_TX_WITH_HASH: Arc<dyn Timer> = register_timer("log_store_log_manager_finalize_tx_with_hash");
pub static ref DATA_TO_MERKLE_LEAVES_SIZE: Arc<dyn Gauge<usize>> = GaugeUsize::register("log_store_data_to_merkle_leaves_size");
}

View File

@ -1,20 +1,22 @@
use crate::config::ShardConfig;
use ethereum_types::H256;
use flow_store::PadPair;
use shared_types::{
Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof,
Transaction,
Transaction, TxSeqOrRoot,
};
use zgs_spec::{BYTES_PER_SEAL, SEALS_PER_LOAD};
use crate::error::Result;
use self::tx_store::BlockHashAndSubmissionIndex;
use self::tx_store::{BlockHashAndSubmissionIndex, TxStatus};
pub mod config;
mod flow_store;
mod load_chunk;
pub mod log_manager;
mod metrics;
mod seal_task_manager;
#[cfg(test)]
mod tests;
@ -56,6 +58,8 @@ pub trait LogStoreRead: LogStoreChunkRead {
fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool>;
fn get_tx_status(&self, tx_seq_or_data_root: TxSeqOrRoot) -> Result<Option<TxStatus>>;
fn next_tx_seq(&self) -> u64;
fn get_sync_progress(&self) -> Result<Option<(u64, H256)>>;
@ -158,6 +162,8 @@ pub trait LogStoreWrite: LogStoreChunkWrite {
fn update_shard_config(&self, shard_config: ShardConfig);
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()>;
fn start_padding(&self, executor: &task_executor::TaskExecutor);
}
pub trait LogStoreChunkWrite {
@ -217,6 +223,10 @@ pub trait FlowRead {
fn get_num_entries(&self) -> Result<u64>;
fn get_shard_config(&self) -> ShardConfig;
fn get_pad_data(&self, start_index: u64) -> Result<Option<Vec<PadPair>>>;
fn get_pad_data_sync_height(&self) -> Result<Option<u64>>;
}
pub trait FlowWrite {
@ -231,6 +241,10 @@ pub trait FlowWrite {
/// Update the shard config.
fn update_shard_config(&self, shard_config: ShardConfig);
fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> Result<()>;
fn put_pad_data_sync_height(&self, tx_seq: u64) -> Result<()>;
}
pub struct SealTask {
@ -269,3 +283,23 @@ pub trait FlowSeal {
pub trait Flow: FlowRead + FlowWrite + FlowSeal {}
impl<T: FlowRead + FlowWrite + FlowSeal> Flow for T {}
pub trait PadDataStoreRead {
fn get_pad_data(&self, start_index: u64) -> Result<Option<Vec<PadPair>>>;
fn get_pad_data_sync_height(&self) -> Result<Option<u64>>;
}
pub trait PadDataStoreWrite {
fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> Result<()>;
fn put_pad_data_sync_height(&self, tx_seq: u64) -> Result<()>;
fn start_padding(&mut self, executor: &task_executor::TaskExecutor);
}
pub trait PadDataStore:
PadDataStoreRead + PadDataStoreWrite + config::Configurable + Send + Sync + 'static
{
}
impl<T: PadDataStoreRead + PadDataStoreWrite + config::Configurable + Send + Sync + 'static>
PadDataStore for T
{
}

View File

@ -9,15 +9,10 @@ use rand::random;
use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE};
use std::cmp;
use task_executor::test_utils::TestRuntime;
#[test]
fn test_put_get() {
let config = LogConfig::default();
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
let store = LogManager::memorydb(config.clone(), executor).unwrap();
let store = LogManager::memorydb(config.clone()).unwrap();
let chunk_count = config.flow.batch_size + config.flow.batch_size / 2 - 1;
// Aligned with size.
let start_offset = 1024;
@ -174,10 +169,7 @@ fn test_put_tx() {
fn create_store() -> LogManager {
let config = LogConfig::default();
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
LogManager::memorydb(config, executor).unwrap()
LogManager::memorydb(config).unwrap()
}
fn put_tx(store: &mut LogManager, chunk_count: usize, seq: u64) {

View File

@ -3,6 +3,7 @@ use crate::log_store::log_manager::{
data_to_merkle_leaves, sub_merkle_tree, COL_BLOCK_PROGRESS, COL_MISC, COL_TX, COL_TX_COMPLETED,
COL_TX_DATA_ROOT_INDEX, ENTRY_SIZE, PORA_CHUNK_SIZE,
};
use crate::log_store::metrics;
use crate::{try_option, LogManager, ZgsKeyValueDB};
use anyhow::{anyhow, Result};
use append_merkle::{AppendMerkleTree, MerkleTreeRead, Sha3Algorithm};
@ -15,14 +16,38 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use tracing::{error, instrument};
const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
const NEXT_TX_KEY: &str = "next_tx_seq";
const LOG_LATEST_BLOCK_NUMBER_KEY: &str = "log_latest_block_number_key";
const TX_STATUS_FINALIZED: u8 = 0;
const TX_STATUS_PRUNED: u8 = 1;
pub enum TxStatus {
Finalized,
Pruned,
}
impl From<TxStatus> for u8 {
fn from(value: TxStatus) -> Self {
match value {
TxStatus::Finalized => 0,
TxStatus::Pruned => 1,
}
}
}
impl TryFrom<u8> for TxStatus {
type Error = anyhow::Error;
fn try_from(value: u8) -> std::result::Result<Self, Self::Error> {
match value {
0 => Ok(TxStatus::Finalized),
1 => Ok(TxStatus::Pruned),
_ => Err(anyhow!("invalid value for tx status {}", value)),
}
}
}
#[derive(Clone, Debug)]
pub struct BlockHashAndSubmissionIndex {
@ -51,6 +76,8 @@ impl TransactionStore {
#[instrument(skip(self))]
/// Return `Ok(Some(tx_seq))` if a previous transaction has the same tx root.
pub fn put_tx(&self, mut tx: Transaction) -> Result<Vec<u64>> {
let start_time = Instant::now();
let old_tx_seq_list = self.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
if old_tx_seq_list.last().is_some_and(|seq| *seq == tx.seq) {
// The last tx is inserted again, so no need to process it.
@ -86,6 +113,7 @@ impl TransactionStore {
);
self.next_tx_seq.store(tx.seq + 1, Ordering::SeqCst);
self.kvdb.write(db_tx)?;
metrics::TX_STORE_PUT.update_since(start_time);
Ok(old_tx_seq_list)
}
@ -163,24 +191,38 @@ impl TransactionStore {
Ok(self.kvdb.put(
COL_TX_COMPLETED,
&tx_seq.to_be_bytes(),
&[TX_STATUS_FINALIZED],
&[TxStatus::Finalized.into()],
)?)
}
#[instrument(skip(self))]
pub fn prune_tx(&self, tx_seq: u64) -> Result<()> {
Ok(self
.kvdb
.put(COL_TX_COMPLETED, &tx_seq.to_be_bytes(), &[TX_STATUS_PRUNED])?)
Ok(self.kvdb.put(
COL_TX_COMPLETED,
&tx_seq.to_be_bytes(),
&[TxStatus::Pruned.into()],
)?)
}
pub fn get_tx_status(&self, tx_seq: u64) -> Result<Option<TxStatus>> {
let value = try_option!(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?);
match value.first() {
Some(v) => Ok(Some(TxStatus::try_from(*v)?)),
None => Ok(None),
}
}
pub fn check_tx_completed(&self, tx_seq: u64) -> Result<bool> {
Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?
== Some(vec![TX_STATUS_FINALIZED]))
let start_time = Instant::now();
let status = self.get_tx_status(tx_seq)?;
metrics::CHECK_TX_COMPLETED.update_since(start_time);
Ok(matches!(status, Some(TxStatus::Finalized)))
}
pub fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool> {
Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())? == Some(vec![TX_STATUS_PRUNED]))
let status = self.get_tx_status(tx_seq)?;
Ok(matches!(status, Some(TxStatus::Pruned)))
}
pub fn next_tx_seq(&self) -> u64 {

View File

@ -1,7 +1,10 @@
use super::tx_store::TxStore;
use anyhow::Result;
use std::sync::Arc;
use storage::log_store::config::{ConfigTx, ConfigurableExt};
use storage::log_store::{
config::{ConfigTx, ConfigurableExt},
log_manager::DATA_DB_KEY,
};
use storage_async::Store;
use tokio::sync::RwLock;
@ -66,10 +69,10 @@ impl SyncStore {
let store = async_store.get_store();
// load next_tx_seq
let next_tx_seq = store.get_config_decoded(&KEY_NEXT_TX_SEQ)?;
let next_tx_seq = store.get_config_decoded(&KEY_NEXT_TX_SEQ, DATA_DB_KEY)?;
// load max_tx_seq
let max_tx_seq = store.get_config_decoded(&KEY_MAX_TX_SEQ)?;
let max_tx_seq = store.get_config_decoded(&KEY_MAX_TX_SEQ, DATA_DB_KEY)?;
Ok((next_tx_seq, max_tx_seq))
}
@ -77,13 +80,13 @@ impl SyncStore {
pub async fn set_next_tx_seq(&self, tx_seq: u64) -> Result<()> {
let async_store = self.store.write().await;
let store = async_store.get_store();
store.set_config_encoded(&KEY_NEXT_TX_SEQ, &tx_seq)
store.set_config_encoded(&KEY_NEXT_TX_SEQ, &tx_seq, DATA_DB_KEY)
}
pub async fn set_max_tx_seq(&self, tx_seq: u64) -> Result<()> {
let async_store = self.store.write().await;
let store = async_store.get_store();
store.set_config_encoded(&KEY_MAX_TX_SEQ, &tx_seq)
store.set_config_encoded(&KEY_MAX_TX_SEQ, &tx_seq, DATA_DB_KEY)
}
pub async fn contains(&self, tx_seq: u64) -> Result<Option<Queue>> {
@ -114,7 +117,7 @@ impl SyncStore {
}
let removed = self.pending_txs.remove(store, Some(&mut tx), tx_seq)?;
store.exec_configs(tx)?;
store.exec_configs(tx, DATA_DB_KEY)?;
if removed {
Ok(InsertResult::Upgraded)
@ -128,7 +131,7 @@ impl SyncStore {
}
let removed = self.ready_txs.remove(store, Some(&mut tx), tx_seq)?;
store.exec_configs(tx)?;
store.exec_configs(tx, DATA_DB_KEY)?;
if removed {
Ok(InsertResult::Downgraded)
@ -151,7 +154,7 @@ impl SyncStore {
let added = self.ready_txs.add(store, Some(&mut tx), tx_seq)?;
store.exec_configs(tx)?;
store.exec_configs(tx, DATA_DB_KEY)?;
Ok(added)
}

View File

@ -1,6 +1,7 @@
use anyhow::Result;
use rand::Rng;
use storage::log_store::config::{ConfigTx, ConfigurableExt};
use storage::log_store::log_manager::DATA_DB_KEY;
use storage::log_store::Store;
/// TxStore is used to store pending transactions that to be synchronized in advance.
@ -32,11 +33,11 @@ impl TxStore {
}
fn index_of(&self, store: &dyn Store, tx_seq: u64) -> Result<Option<usize>> {
store.get_config_decoded(&self.key_seq_to_index(tx_seq))
store.get_config_decoded(&self.key_seq_to_index(tx_seq), DATA_DB_KEY)
}
fn at(&self, store: &dyn Store, index: usize) -> Result<Option<u64>> {
store.get_config_decoded(&self.key_index_to_seq(index))
store.get_config_decoded(&self.key_index_to_seq(index), DATA_DB_KEY)
}
pub fn has(&self, store: &dyn Store, tx_seq: u64) -> Result<bool> {
@ -45,7 +46,7 @@ impl TxStore {
pub fn count(&self, store: &dyn Store) -> Result<usize> {
store
.get_config_decoded(&self.key_count)
.get_config_decoded(&self.key_count, DATA_DB_KEY)
.map(|x| x.unwrap_or(0))
}
@ -70,7 +71,7 @@ impl TxStore {
if let Some(db_tx) = db_tx {
db_tx.append(&mut tx);
} else {
store.exec_configs(tx)?;
store.exec_configs(tx, DATA_DB_KEY)?;
}
Ok(true)
@ -130,7 +131,7 @@ impl TxStore {
if let Some(db_tx) = db_tx {
db_tx.append(&mut tx);
} else {
store.exec_configs(tx)?;
store.exec_configs(tx, DATA_DB_KEY)?;
}
Ok(true)

View File

@ -1,12 +1,11 @@
use network::{NetworkMessage, PeerAction, PeerId, PubsubMessage, ReportSource};
use tokio::sync::mpsc;
use network::{NetworkMessage, NetworkSender, PeerAction, PeerId, PubsubMessage, ReportSource};
pub struct SyncNetworkContext {
network_send: mpsc::UnboundedSender<NetworkMessage>,
network_send: NetworkSender,
}
impl SyncNetworkContext {
pub fn new(network_send: mpsc::UnboundedSender<NetworkMessage>) -> Self {
pub fn new(network_send: NetworkSender) -> Self {
Self { network_send }
}

View File

@ -750,13 +750,13 @@ mod tests {
use crate::test_util::create_2_store;
use crate::test_util::tests::create_file_location_cache;
use libp2p::identity;
use network::{new_network_channel, NetworkReceiver};
use network::{ReportSource, Request};
use storage::log_store::log_manager::LogConfig;
use storage::log_store::log_manager::LogManager;
use storage::log_store::LogStoreRead;
use storage::H256;
use task_executor::{test_utils::TestRuntime, TaskExecutor};
use tokio::sync::mpsc::{self, UnboundedReceiver};
#[test]
fn test_status() {
@ -1649,7 +1649,7 @@ mod tests {
fn create_default_controller(
task_executor: TaskExecutor,
peer_id: Option<PeerId>,
) -> (SerialSyncController, UnboundedReceiver<NetworkMessage>) {
) -> (SerialSyncController, NetworkReceiver) {
let tx_id = TxID {
seq: 0,
hash: H256::random(),
@ -1657,7 +1657,7 @@ mod tests {
let num_chunks = 123;
let config = LogConfig::default();
let store = Arc::new(LogManager::memorydb(config, task_executor.clone()).unwrap());
let store = Arc::new(LogManager::memorydb(config).unwrap());
create_controller(task_executor, peer_id, store, tx_id, num_chunks)
}
@ -1668,8 +1668,8 @@ mod tests {
store: Arc<LogManager>,
tx_id: TxID,
num_chunks: usize,
) -> (SerialSyncController, UnboundedReceiver<NetworkMessage>) {
let (network_send, network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
) -> (SerialSyncController, NetworkReceiver) {
let (network_send, network_recv) = new_network_channel();
let ctx = Arc::new(SyncNetworkContext::new(network_send));
let peer_id = match peer_id {

View File

@ -10,10 +10,9 @@ use libp2p::swarm::DialError;
use log_entry_sync::LogSyncEvent;
use network::rpc::methods::FileAnnouncement;
use network::types::{AnnounceChunks, FindFile, NewFile};
use network::PubsubMessage;
use network::{
rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, PeerId,
PeerRequestId, SyncId as RequestId,
rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, NetworkSender,
PeerId, PeerRequestId, PubsubMessage, SyncId as RequestId,
};
use shared_types::{bytes_to_chunks, timestamp_now, ChunkArrayWithProof, Transaction, TxID};
use std::sync::atomic::Ordering;
@ -27,7 +26,7 @@ use storage::error::Result as StorageResult;
use storage::log_store::log_manager::{sector_to_segment, segment_to_sector, PORA_CHUNK_SIZE};
use storage::log_store::Store as LogStore;
use storage_async::Store;
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio::sync::{broadcast, oneshot};
pub type SyncSender = channel::Sender<SyncMessage, SyncRequest, SyncResponse>;
pub type SyncReceiver = channel::Receiver<SyncMessage, SyncRequest, SyncResponse>;
@ -142,7 +141,7 @@ pub struct SyncService {
impl SyncService {
pub async fn spawn(
executor: task_executor::TaskExecutor,
network_send: mpsc::UnboundedSender<NetworkMessage>,
network_send: NetworkSender,
store: Arc<dyn LogStore>,
file_location_cache: Arc<FileLocationCache>,
event_recv: broadcast::Receiver<LogSyncEvent>,
@ -163,7 +162,7 @@ impl SyncService {
pub async fn spawn_with_config(
config: Config,
executor: task_executor::TaskExecutor,
network_send: mpsc::UnboundedSender<NetworkMessage>,
network_send: NetworkSender,
store: Arc<dyn LogStore>,
file_location_cache: Arc<FileLocationCache>,
event_recv: broadcast::Receiver<LogSyncEvent>,
@ -897,7 +896,9 @@ mod tests {
use crate::test_util::tests::create_file_location_cache;
use libp2p::identity;
use network::discovery::ConnectionId;
use network::new_network_channel;
use network::rpc::SubstreamId;
use network::NetworkReceiver;
use network::ReportSource;
use shared_types::ChunkArray;
use shared_types::Transaction;
@ -909,8 +910,6 @@ mod tests {
use storage::log_store::LogStoreRead;
use storage::H256;
use task_executor::test_utils::TestRuntime;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;
struct TestSyncRuntime {
runtime: TestRuntime,
@ -924,8 +923,8 @@ mod tests {
init_peer_id: PeerId,
file_location_cache: Arc<FileLocationCache>,
network_send: UnboundedSender<NetworkMessage>,
network_recv: UnboundedReceiver<NetworkMessage>,
network_send: NetworkSender,
network_recv: NetworkReceiver,
event_send: broadcast::Sender<LogSyncEvent>,
catch_up_end_recv: Option<oneshot::Receiver<()>>,
}
@ -942,7 +941,7 @@ mod tests {
let (store, peer_store, txs, data) = create_2_store(chunk_counts);
let init_data = data[0].clone();
let init_peer_id = identity::Keypair::generate_ed25519().public().to_peer_id();
let (network_send, network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
let (network_send, network_recv) = new_network_channel();
let (event_send, _) = broadcast::channel(16);
let (_, catch_up_end_recv) = oneshot::channel();
@ -1006,7 +1005,7 @@ mod tests {
let file_location_cache: Arc<FileLocationCache> =
create_file_location_cache(init_peer_id, vec![txs[0].id()]);
let (network_send, mut network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
let (network_send, mut network_recv) = new_network_channel();
let (_, sync_recv) = channel::Channel::unbounded("test");
let mut sync = SyncService {
@ -1035,7 +1034,7 @@ mod tests {
let file_location_cache: Arc<FileLocationCache> =
create_file_location_cache(init_peer_id, vec![txs[0].id()]);
let (network_send, mut network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
let (network_send, mut network_recv) = new_network_channel();
let (_, sync_recv) = channel::Channel::unbounded("test");
let mut sync = SyncService {
@ -1348,15 +1347,13 @@ mod tests {
let config = LogConfig::default();
let executor = runtime.task_executor.clone();
let store = Arc::new(LogManager::memorydb(config.clone(), executor).unwrap());
let store = Arc::new(LogManager::memorydb(config.clone()).unwrap());
let init_peer_id = identity::Keypair::generate_ed25519().public().to_peer_id();
let file_location_cache: Arc<FileLocationCache> =
create_file_location_cache(init_peer_id, vec![]);
let (network_send, mut network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
let (network_send, mut network_recv) = new_network_channel();
let (_event_send, event_recv) = broadcast::channel(16);
let (_, catch_up_end_recv) = oneshot::channel();
let sync_send = SyncService::spawn_with_config(
@ -1783,7 +1780,7 @@ mod tests {
}
async fn receive_chunk_request(
network_recv: &mut UnboundedReceiver<NetworkMessage>,
network_recv: &mut NetworkReceiver,
sync_send: &SyncSender,
peer_store: Arc<LogManager>,
init_peer_id: PeerId,

View File

@ -9,8 +9,6 @@ use storage::{
LogManager,
};
use task_executor::test_utils::TestRuntime;
/// Creates stores for local node and peers with initialized transaction of specified chunk count.
/// The first store is for local node, and data not stored. The second store is for peers, and all
/// transactions are finalized for file sync.
@ -24,11 +22,8 @@ pub fn create_2_store(
Vec<Vec<u8>>,
) {
let config = LogConfig::default();
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
let mut store = LogManager::memorydb(config.clone(), executor.clone()).unwrap();
let mut peer_store = LogManager::memorydb(config, executor).unwrap();
let mut store = LogManager::memorydb(config.clone()).unwrap();
let mut peer_store = LogManager::memorydb(config).unwrap();
let mut offset = 1;
let mut txs = vec![];
@ -120,10 +115,7 @@ pub mod tests {
impl TestStoreRuntime {
pub fn new_store() -> impl LogStore {
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
LogManager::memorydb(LogConfig::default(), executor).unwrap()
LogManager::memorydb(LogConfig::default()).unwrap()
}
pub fn new(store: Arc<dyn LogStore>) -> TestStoreRuntime {

View File

@ -1,3 +1,5 @@
import os
from web3 import Web3
ZGS_CONFIG = {
@ -22,6 +24,8 @@ ZGS_CONFIG = {
}
}
CONFIG_DIR = os.path.dirname(__file__)
ZGS_KEY_FILE = os.path.join(CONFIG_DIR, "zgs", "network", "key")
ZGS_NODEID = "16Uiu2HAmLkGFUbNFYdhuSbTQ5hmnPjFXx2zUDtwQ2uihHpN9YNNe"
BSC_CONFIG = dict(

View File

@ -0,0 +1 @@
Y<13><><02><><EFBFBD>Ң<>- <0A><>r<>7<EFBFBD><37>jq<6A>p<>}<7D>

View File

@ -3,7 +3,7 @@
import os
import time
from config.node_config import ZGS_NODEID
from config.node_config import ZGS_KEY_FILE, ZGS_NODEID
from test_framework.test_framework import TestFramework
from utility.utils import p2p_port
@ -17,13 +17,9 @@ class NetworkDiscoveryTest(TestFramework):
self.num_nodes = 3
# setup for node 0 as bootnode
tests_dir = os.path.dirname(__file__)
network_dir = os.path.join(tests_dir, "config", "zgs", "network")
self.zgs_node_key_files = [ZGS_KEY_FILE]
bootnode_port = p2p_port(0)
self.zgs_node_configs[0] = {
# load pre-defined keypair
"network_dir": network_dir,
# enable UDP discovery relevant configs
"network_enr_address": "127.0.0.1",
"network_enr_tcp_port": bootnode_port,

View File

@ -3,7 +3,7 @@
import os
import time
from config.node_config import ZGS_NODEID
from config.node_config import ZGS_KEY_FILE, ZGS_NODEID
from test_framework.test_framework import TestFramework
from utility.utils import p2p_port
@ -17,13 +17,9 @@ class NetworkDiscoveryUpgradeTest(TestFramework):
self.num_nodes = 2
# setup for node 0 as bootnode
tests_dir = os.path.dirname(__file__)
network_dir = os.path.join(tests_dir, "config", "zgs", "network")
self.zgs_node_key_files = [ZGS_KEY_FILE]
bootnode_port = p2p_port(0)
self.zgs_node_configs[0] = {
# load pre-defined keypair
"network_dir": network_dir,
# enable UDP discovery relevant configs
"network_enr_address": "127.0.0.1",
"network_enr_tcp_port": bootnode_port,

View File

@ -40,14 +40,18 @@ class PrunerTest(TestFramework):
for i in range(len(segments)):
client_index = i % 2
self.nodes[client_index].zgs_upload_segment(segments[i])
wait_until(lambda: self.nodes[0].zgs_get_file_info(data_root) is not None)
wait_until(lambda: self.nodes[0].zgs_get_file_info(data_root)["finalized"])
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root) is not None)
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root)["finalized"])
self.nodes[2].admin_start_sync_file(0)
self.nodes[3].admin_start_sync_file(0)
wait_until(lambda: self.nodes[2].sync_status_is_completed_or_unknown(0))
wait_until(lambda: self.nodes[2].zgs_get_file_info(data_root) is not None)
wait_until(lambda: self.nodes[2].zgs_get_file_info(data_root)["finalized"])
wait_until(lambda: self.nodes[3].sync_status_is_completed_or_unknown(0))
wait_until(lambda: self.nodes[3].zgs_get_file_info(data_root) is not None)
wait_until(lambda: self.nodes[3].zgs_get_file_info(data_root)["finalized"])
for i in range(len(segments)):

View File

@ -128,10 +128,14 @@ class TestNode:
poll_per_s = 4
for _ in range(poll_per_s * self.rpc_timeout):
if self.process.poll() is not None:
self.stderr.seek(0)
self.stdout.seek(0)
raise FailedToStartError(
self._node_msg(
"exited with status {} during initialization".format(
self.process.returncode
"exited with status {} during initialization \n\nstderr: {}\n\nstdout: {}\n\n".format(
self.process.returncode,
self.stderr.read(),
self.stdout.read(),
)
)
)

View File

@ -55,6 +55,7 @@ class TestFramework:
self.lifetime_seconds = 3600
self.launch_wait_seconds = 1
self.num_deployed_contracts = 0
self.zgs_node_key_files = []
# Set default binary path
binary_ext = ".exe" if is_windows_platform() else ""
@ -190,6 +191,10 @@ class TestFramework:
else:
updated_config = {}
zgs_node_key_file = None
if i < len(self.zgs_node_key_files):
zgs_node_key_file = self.zgs_node_key_files[i]
assert os.path.exists(self.zgs_binary), (
"%s should be exist" % self.zgs_binary
)
@ -202,6 +207,7 @@ class TestFramework:
self.mine_contract.address(),
self.reward_contract.address(),
self.log,
key_file=zgs_node_key_file,
)
self.nodes.append(node)
node.setup_config()

View File

@ -3,14 +3,9 @@ import subprocess
import tempfile
from test_framework.blockchain_node import BlockChainNodeType, BlockchainNode
from utility.utils import blockchain_rpc_port, arrange_port
from utility.utils import blockchain_p2p_port, blockchain_rpc_port, blockchain_ws_port, blockchain_rpc_port_tendermint, pprof_port
from utility.build_binary import build_zg
ZGNODE_PORT_CATEGORY_WS = 0
ZGNODE_PORT_CATEGORY_P2P = 1
ZGNODE_PORT_CATEGORY_RPC = 2
ZGNODE_PORT_CATEGORY_PPROF = 3
def zg_node_init_genesis(binary: str, root_dir: str, num_nodes: int):
assert num_nodes > 0, "Invalid number of blockchain nodes: %s" % num_nodes
@ -26,7 +21,7 @@ def zg_node_init_genesis(binary: str, root_dir: str, num_nodes: int):
os.mkdir(zgchaind_dir)
log_file = tempfile.NamedTemporaryFile(dir=zgchaind_dir, delete=False, prefix="init_genesis_", suffix=".log")
p2p_port_start = arrange_port(ZGNODE_PORT_CATEGORY_P2P, 0)
p2p_port_start = blockchain_p2p_port(0)
ret = subprocess.run(
args=["bash", shell_script, zgchaind_dir, str(num_nodes), str(p2p_port_start)],
@ -71,13 +66,13 @@ class ZGNode(BlockchainNode):
# overwrite json rpc http port: 8545
"--json-rpc.address", "127.0.0.1:%s" % blockchain_rpc_port(index),
# overwrite json rpc ws port: 8546
"--json-rpc.ws-address", "127.0.0.1:%s" % arrange_port(ZGNODE_PORT_CATEGORY_WS, index),
"--json-rpc.ws-address", "127.0.0.1:%s" % blockchain_ws_port(index),
# overwrite p2p port: 26656
"--p2p.laddr", "tcp://127.0.0.1:%s" % arrange_port(ZGNODE_PORT_CATEGORY_P2P, index),
"--p2p.laddr", "tcp://127.0.0.1:%s" % blockchain_p2p_port(index),
# overwrite rpc port: 26657
"--rpc.laddr", "tcp://127.0.0.1:%s" % arrange_port(ZGNODE_PORT_CATEGORY_RPC, index),
"--rpc.laddr", "tcp://127.0.0.1:%s" % blockchain_rpc_port_tendermint(index),
# overwrite pprof port: 6060
"--rpc.pprof_laddr", "127.0.0.1:%s" % arrange_port(ZGNODE_PORT_CATEGORY_PPROF, index),
"--rpc.pprof_laddr", "127.0.0.1:%s" % pprof_port(index),
"--log_level", "debug"
]

View File

@ -24,6 +24,7 @@ class ZgsNode(TestNode):
log,
rpc_timeout=10,
libp2p_nodes=None,
key_file=None,
):
local_conf = ZGS_CONFIG.copy()
if libp2p_nodes is None:
@ -54,6 +55,7 @@ class ZgsNode(TestNode):
# Overwrite with personalized configs.
update_config(local_conf, updated_config)
data_dir = os.path.join(root_dir, "zgs_node" + str(index))
self.key_file = key_file
rpc_url = "http://" + rpc_listen_address
super().__init__(
NodeType.Zgs,
@ -68,10 +70,16 @@ class ZgsNode(TestNode):
def setup_config(self):
os.mkdir(self.data_dir)
log_config_path = os.path.join(self.data_dir, self.config["log_config_file"])
with open(log_config_path, "w") as f:
f.write("trace,hyper=info,h2=info")
if self.key_file is not None:
network_dir = os.path.join(self.data_dir, "network")
os.mkdir(network_dir)
shutil.copy(self.key_file, network_dir)
initialize_toml_config(self.config_file, self.config)
def wait_for_rpc_connection(self):

View File

@ -11,6 +11,7 @@ class PortMin:
MAX_NODES = 100
MAX_BLOCKCHAIN_NODES = 50
def p2p_port(n):
@ -23,18 +24,25 @@ def rpc_port(n):
def blockchain_p2p_port(n):
return PortMin.n + 2 * MAX_NODES + n
assert n <= MAX_BLOCKCHAIN_NODES
return PortMin.n + MAX_NODES + MAX_BLOCKCHAIN_NODES + n
def blockchain_rpc_port(n):
return PortMin.n + 3 * MAX_NODES + n
return PortMin.n + MAX_NODES + 2 * MAX_BLOCKCHAIN_NODES + n
def blockchain_rpc_port_core(n):
return PortMin.n + 4 * MAX_NODES + n
return PortMin.n + MAX_NODES + 3 * MAX_BLOCKCHAIN_NODES + n
def arrange_port(category: int, node_index: int) -> int:
return PortMin.n + (100 + category) * MAX_NODES + node_index
def blockchain_ws_port(n):
return PortMin.n + MAX_NODES + 4 * MAX_BLOCKCHAIN_NODES + n
def blockchain_rpc_port_tendermint(n):
return PortMin.n + MAX_NODES + 5 * MAX_BLOCKCHAIN_NODES + n
def pprof_port(n):
return PortMin.n + MAX_NODES + 6 * MAX_BLOCKCHAIN_NODES + n
def wait_until(predicate, *, attempts=float("inf"), timeout=float("inf"), lock=None):
if attempts == float("inf") and timeout == float("inf"):

View File

@ -28,7 +28,7 @@ Different identity schemes can be used to define the node id and signatures. Cur
## Signing Algorithms
User's wishing to implement their own singing algorithms simply need to
User's wishing to implement their own signing algorithms simply need to
implement the `EnrKey` trait and apply it to an `Enr`.
By default, `k256::SigningKey` implement `EnrKey` and can be used to sign and