mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Compare commits
21 Commits
55a4270623
...
0f36f3ee80
Author | SHA1 | Date | |
---|---|---|---|
![]() |
0f36f3ee80 | ||
![]() |
6c4dbcd61a | ||
![]() |
306005279a | ||
![]() |
e1b31841cb | ||
![]() |
59f187b150 | ||
![]() |
483d5691ed | ||
![]() |
7742af3fa5 | ||
![]() |
8351ce2c06 | ||
![]() |
a0ad6d7b6a | ||
![]() |
aba8b9c1ae | ||
![]() |
4df2ebb208 | ||
![]() |
21ebec4a41 | ||
![]() |
774133284f | ||
![]() |
16e70bde68 | ||
![]() |
1de7afec14 | ||
![]() |
0c493880ee | ||
![]() |
fae2d5efb6 | ||
![]() |
3fd800275a | ||
![]() |
baf0521c99 | ||
![]() |
bcbd8b3baa | ||
![]() |
cae5b62440 |
1
.gitignore
vendored
1
.gitignore
vendored
@ -4,6 +4,5 @@
|
||||
/.idea
|
||||
tests/**/__pycache__
|
||||
tests/tmp/**
|
||||
tests/config/zgs
|
||||
.vscode/*.json
|
||||
/0g-storage-contracts-dev
|
||||
|
9
Cargo.lock
generated
9
Cargo.lock
generated
@ -226,6 +226,7 @@ dependencies = [
|
||||
"itertools 0.13.0",
|
||||
"lazy_static",
|
||||
"lru 0.12.5",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"serde",
|
||||
"tiny-keccak",
|
||||
@ -912,7 +913,9 @@ dependencies = [
|
||||
"anyhow",
|
||||
"async-lock 2.8.0",
|
||||
"hashlink 0.8.4",
|
||||
"lazy_static",
|
||||
"log_entry_sync",
|
||||
"metrics",
|
||||
"network",
|
||||
"shared_types",
|
||||
"storage-async",
|
||||
@ -4652,12 +4655,14 @@ dependencies = [
|
||||
"jsonrpsee",
|
||||
"lazy_static",
|
||||
"metrics",
|
||||
"reqwest",
|
||||
"serde_json",
|
||||
"shared_types",
|
||||
"storage",
|
||||
"task_executor",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -5028,6 +5033,7 @@ dependencies = [
|
||||
name = "network"
|
||||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"channel",
|
||||
"directory",
|
||||
"dirs 4.0.0",
|
||||
"discv5",
|
||||
@ -7300,8 +7306,10 @@ dependencies = [
|
||||
"kvdb",
|
||||
"kvdb-memorydb",
|
||||
"kvdb-rocksdb",
|
||||
"lazy_static",
|
||||
"merkle_light",
|
||||
"merkle_tree",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"parking_lot 0.12.3",
|
||||
"rand 0.8.5",
|
||||
@ -7324,6 +7332,7 @@ name = "storage-async"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"backtrace",
|
||||
"eth2_ssz",
|
||||
"shared_types",
|
||||
"storage",
|
||||
|
@ -4,7 +4,7 @@
|
||||
|
||||
0G Storage is the storage layer for the ZeroGravity data availability (DA) system. The 0G Storage layer holds three important features:
|
||||
|
||||
* Buit-in - It is natively built into the ZeroGravity DA system for data storage and retrieval.
|
||||
* Built-in - It is natively built into the ZeroGravity DA system for data storage and retrieval.
|
||||
* General purpose - It is designed to support atomic transactions, mutable kv stores as well as archive log systems to enable wide range of applications with various data types.
|
||||
* Incentive - Instead of being just a decentralized database, 0G Storage introduces PoRA mining algorithm to incentivize storage network participants.
|
||||
|
||||
|
@ -13,5 +13,8 @@ serde = { version = "1.0.137", features = ["derive"] }
|
||||
lazy_static = "1.4.0"
|
||||
tracing = "0.1.36"
|
||||
once_cell = "1.19.0"
|
||||
|
||||
metrics = { workspace = true }
|
||||
|
||||
itertools = "0.13.0"
|
||||
lru = "0.12.5"
|
@ -1,4 +1,5 @@
|
||||
mod merkle_tree;
|
||||
mod metrics;
|
||||
mod node_manager;
|
||||
mod proof;
|
||||
mod sha3;
|
||||
@ -10,6 +11,7 @@ use std::collections::{BTreeMap, HashMap};
|
||||
use std::fmt::Debug;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use tracing::{trace, warn};
|
||||
|
||||
use crate::merkle_tree::MerkleTreeWrite;
|
||||
@ -145,6 +147,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
}
|
||||
|
||||
pub fn append(&mut self, new_leaf: E) {
|
||||
let start_time = Instant::now();
|
||||
if new_leaf == E::null() {
|
||||
// appending null is not allowed.
|
||||
return;
|
||||
@ -152,10 +155,13 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
self.node_manager.start_transaction();
|
||||
self.node_manager.push_node(0, new_leaf);
|
||||
self.recompute_after_append_leaves(self.leaves() - 1);
|
||||
|
||||
self.node_manager.commit();
|
||||
metrics::APPEND.update_since(start_time);
|
||||
}
|
||||
|
||||
pub fn append_list(&mut self, leaf_list: Vec<E>) {
|
||||
let start_time = Instant::now();
|
||||
if leaf_list.contains(&E::null()) {
|
||||
// appending null is not allowed.
|
||||
return;
|
||||
@ -165,6 +171,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
self.node_manager.append_nodes(0, &leaf_list);
|
||||
self.recompute_after_append_leaves(start_index);
|
||||
self.node_manager.commit();
|
||||
metrics::APPEND_LIST.update_since(start_time);
|
||||
}
|
||||
|
||||
/// Append a leaf list by providing their intermediate node hash.
|
||||
@ -173,6 +180,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
/// Other nodes in the subtree will be set to `null` nodes.
|
||||
/// TODO: Optimize to avoid storing the `null` nodes?
|
||||
pub fn append_subtree(&mut self, subtree_depth: usize, subtree_root: E) -> Result<()> {
|
||||
let start_time = Instant::now();
|
||||
if subtree_root == E::null() {
|
||||
// appending null is not allowed.
|
||||
bail!("subtree_root is null");
|
||||
@ -182,10 +190,13 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
self.append_subtree_inner(subtree_depth, subtree_root)?;
|
||||
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
|
||||
self.node_manager.commit();
|
||||
metrics::APPEND_SUBTREE.update_since(start_time);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn append_subtree_list(&mut self, subtree_list: Vec<(usize, E)>) -> Result<()> {
|
||||
let start_time = Instant::now();
|
||||
if subtree_list.iter().any(|(_, root)| root == &E::null()) {
|
||||
// appending null is not allowed.
|
||||
bail!("subtree_list contains null");
|
||||
@ -197,12 +208,15 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
self.recompute_after_append_subtree(start_index, subtree_depth - 1);
|
||||
}
|
||||
self.node_manager.commit();
|
||||
metrics::APPEND_SUBTREE_LIST.update_since(start_time);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Change the value of the last leaf and return the new merkle root.
|
||||
/// This is needed if our merkle-tree in memory only keeps intermediate nodes instead of real leaves.
|
||||
pub fn update_last(&mut self, updated_leaf: E) {
|
||||
let start_time = Instant::now();
|
||||
if updated_leaf == E::null() {
|
||||
// updating to null is not allowed.
|
||||
return;
|
||||
@ -216,6 +230,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
|
||||
}
|
||||
self.recompute_after_append_leaves(self.leaves() - 1);
|
||||
self.node_manager.commit();
|
||||
metrics::UPDATE_LAST.update_since(start_time);
|
||||
}
|
||||
|
||||
/// Fill an unknown `null` leaf with its real value.
|
||||
|
11
common/append_merkle/src/metrics.rs
Normal file
11
common/append_merkle/src/metrics.rs
Normal file
@ -0,0 +1,11 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use metrics::{register_timer, Timer};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref APPEND: Arc<dyn Timer> = register_timer("append_merkle_append");
|
||||
pub static ref APPEND_LIST: Arc<dyn Timer> = register_timer("append_merkle_append_list");
|
||||
pub static ref APPEND_SUBTREE: Arc<dyn Timer> = register_timer("append_merkle_append_subtree");
|
||||
pub static ref APPEND_SUBTREE_LIST: Arc<dyn Timer> = register_timer("append_merkle_append_subtree_list");
|
||||
pub static ref UPDATE_LAST: Arc<dyn Timer> = register_timer("append_merkle_update_last");
|
||||
}
|
@ -15,19 +15,13 @@ abigen!(
|
||||
);
|
||||
|
||||
#[cfg(feature = "dev")]
|
||||
abigen!(
|
||||
ZgsFlow,
|
||||
"../../0g-storage-contracts-dev/artifacts/contracts/dataFlow/Flow.sol/Flow.json"
|
||||
);
|
||||
abigen!(ZgsFlow, "../../storage-contracts-abis/Flow.json");
|
||||
|
||||
#[cfg(feature = "dev")]
|
||||
abigen!(
|
||||
PoraMine,
|
||||
"../../0g-storage-contracts-dev/artifacts/contracts/miner/Mine.sol/PoraMine.json"
|
||||
);
|
||||
abigen!(PoraMine, "../../storage-contracts-abis/PoraMine.json");
|
||||
|
||||
#[cfg(feature = "dev")]
|
||||
abigen!(
|
||||
ChunkLinearReward,
|
||||
"../../0g-storage-contracts-dev/artifacts/contracts/reward/ChunkLinearReward.sol/ChunkLinearReward.json"
|
||||
"../../storage-contracts-abis/ChunkLinearReward.json"
|
||||
);
|
||||
|
@ -1,6 +1,6 @@
|
||||
# Proof of Random Access
|
||||
|
||||
The ZeroGravity network adopts a Proof of Random Access (PoRA) mechanism to incentivize miners to store data. By requiring miners to answer randomly produced queries to archived data chunks, the PoRA mechanism establishes the relation between mining proof generation power and data storage. Miners answer the queries repeatedly and computes an output digest for each loaded chunk util find a digest that satisfies the mining difficulty (i.e., has enough leading zeros). PoRA will stress the miners' disk I/O and reduce their capability to respond user queries. So 0G Storage adopts intermittent mining, in which a mining epoch starts with a block generation at a specific block height on the host chain and stops when a valid PoRA is submitted to the 0G Storage contract.
|
||||
The ZeroGravity network adopts a Proof of Random Access (PoRA) mechanism to incentivize miners to store data. By requiring miners to answer randomly produced queries to archived data chunks, the PoRA mechanism establishes the relation between mining proof generation power and data storage. Miners answer the queries repeatedly and computes an output digest for each loaded chunk until find a digest that satisfies the mining difficulty (i.e., has enough leading zeros). PoRA will stress the miners' disk I/O and reduce their capability to respond user queries. So 0G Storage adopts intermittent mining, in which a mining epoch starts with a block generation at a specific block height on the host chain and stops when a valid PoRA is submitted to the 0G Storage contract.
|
||||
|
||||
In a strawman design, a PoRA iteration consists of a computing stage and a loading stage. In the computing stage, a miner computes a random recall position (the universal offset in the flow) based on an arbitrary picked random nonce and a mining status read from the host chain. In the loading stage, a miner loads the archived data chunks at the given recall position, and computes output digest by hashing the tuple of mining status and the data chunks. If the output digest satisfies the target difficulty, the miner can construct a legitimate PoRA consists of the chosen random nonce, the loaded data chunk and the proof for the correctness of data chunk to the mining contract.
|
||||
|
||||
|
@ -27,4 +27,4 @@ The mining process of 0G Storage requires to prove data accessibility to random
|
||||
|
||||
## Data Flow
|
||||
|
||||
In 0G Storage, committed data are organized sequentially. Such a sequence of data is called a data flow, which can be interpreted as a list of data entries or equivalently a sequence of fixed-size data sectors. Thus, every piece of data in ZeroGravity can be indexed conveniently with a universal offset. This offset will be used to sample challenges in the mining process of PoRA. The default data flow is called the "main flow" of ZeroGravity. It incorporates all new log entries (unless otherwise specified) in an append-only manner. There are also specialized flows that only accept some category of log entries, e.g. data related to a specifc application. The most significant advantage of specialized flows is a consecutive addressing space, which may be crucial in some use cases. Furthermore, a specialized flow can apply customized storage price, which is typically significantly higher than the floor price of the default flow, and hence achieves better data availability and reliability.
|
||||
In 0G Storage, committed data are organized sequentially. Such a sequence of data is called a data flow, which can be interpreted as a list of data entries or equivalently a sequence of fixed-size data sectors. Thus, every piece of data in ZeroGravity can be indexed conveniently with a universal offset. This offset will be used to sample challenges in the mining process of PoRA. The default data flow is called the "main flow" of ZeroGravity. It incorporates all new log entries (unless otherwise specified) in an append-only manner. There are also specialized flows that only accept some category of log entries, e.g. data related to a specific application. The most significant advantage of specialized flows is a consecutive addressing space, which may be crucial in some use cases. Furthermore, a specialized flow can apply customized storage price, which is typically significantly higher than the floor price of the default flow, and hence achieves better data availability and reliability.
|
||||
|
@ -13,3 +13,5 @@ tokio = { version = "1.19.2", features = ["sync"] }
|
||||
async-lock = "2.5.0"
|
||||
hashlink = "0.8.0"
|
||||
tracing = "0.1.35"
|
||||
lazy_static = "1.4.0"
|
||||
metrics = { workspace = true }
|
||||
|
@ -1,11 +1,16 @@
|
||||
use super::mem_pool::MemoryChunkPool;
|
||||
use crate::mem_pool::FileID;
|
||||
use anyhow::Result;
|
||||
use network::NetworkMessage;
|
||||
use metrics::{Histogram, Sample};
|
||||
use network::{NetworkMessage, NetworkSender};
|
||||
use shared_types::{ChunkArray, FileProof};
|
||||
use std::{sync::Arc, time::SystemTime};
|
||||
use std::{sync::Arc, time::Instant};
|
||||
use storage_async::{ShardConfig, Store};
|
||||
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
|
||||
use tokio::sync::mpsc::UnboundedReceiver;
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref FINALIZE_FILE_LATENCY: Arc<dyn Histogram> = Sample::ExpDecay(0.015).register("chunk_pool_finalize_file_latency", 1024);
|
||||
}
|
||||
|
||||
/// Handle the cached file when uploaded completely and verified from blockchain.
|
||||
/// Generally, the file will be persisted into log store.
|
||||
@ -13,7 +18,7 @@ pub struct ChunkPoolHandler {
|
||||
receiver: UnboundedReceiver<ChunkPoolMessage>,
|
||||
mem_pool: Arc<MemoryChunkPool>,
|
||||
log_store: Arc<Store>,
|
||||
sender: UnboundedSender<NetworkMessage>,
|
||||
sender: NetworkSender,
|
||||
}
|
||||
|
||||
impl ChunkPoolHandler {
|
||||
@ -21,7 +26,7 @@ impl ChunkPoolHandler {
|
||||
receiver: UnboundedReceiver<ChunkPoolMessage>,
|
||||
mem_pool: Arc<MemoryChunkPool>,
|
||||
log_store: Arc<Store>,
|
||||
sender: UnboundedSender<NetworkMessage>,
|
||||
sender: NetworkSender,
|
||||
) -> Self {
|
||||
ChunkPoolHandler {
|
||||
receiver,
|
||||
@ -68,7 +73,7 @@ impl ChunkPoolHandler {
|
||||
}
|
||||
}
|
||||
|
||||
let start = SystemTime::now();
|
||||
let start = Instant::now();
|
||||
if !self
|
||||
.log_store
|
||||
.finalize_tx_with_hash(id.tx_id.seq, id.tx_id.hash)
|
||||
@ -77,8 +82,9 @@ impl ChunkPoolHandler {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let elapsed = start.elapsed()?;
|
||||
let elapsed = start.elapsed();
|
||||
debug!(?id, ?elapsed, "Transaction finalized");
|
||||
FINALIZE_FILE_LATENCY.update_since(start);
|
||||
|
||||
// always remove file from pool after transaction finalized
|
||||
self.mem_pool.remove_file(&id.root).await;
|
||||
|
@ -29,7 +29,7 @@ impl Config {
|
||||
pub fn unbounded(
|
||||
config: Config,
|
||||
log_store: Arc<storage_async::Store>,
|
||||
network_send: tokio::sync::mpsc::UnboundedSender<network::NetworkMessage>,
|
||||
network_send: network::NetworkSender,
|
||||
) -> (Arc<MemoryChunkPool>, ChunkPoolHandler) {
|
||||
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
|
||||
|
||||
|
@ -24,3 +24,5 @@ futures-util = "0.3.28"
|
||||
thiserror = "1.0.44"
|
||||
lazy_static = "1.4.0"
|
||||
metrics = { workspace = true }
|
||||
reqwest = {version = "0.11", features = ["json"]}
|
||||
url = { version = "2.4", default-features = false }
|
||||
|
@ -1,3 +1,5 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::ContractAddress;
|
||||
|
||||
pub struct LogSyncConfig {
|
||||
@ -34,6 +36,9 @@ pub struct LogSyncConfig {
|
||||
pub watch_loop_wait_time_ms: u64,
|
||||
// force to sync log from start block number
|
||||
pub force_log_sync_from_start_block_number: bool,
|
||||
|
||||
// the timeout for blockchain rpc connection
|
||||
pub blockchain_rpc_timeout: Duration,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@ -61,6 +66,7 @@ impl LogSyncConfig {
|
||||
remove_finalized_block_interval_minutes: u64,
|
||||
watch_loop_wait_time_ms: u64,
|
||||
force_log_sync_from_start_block_number: bool,
|
||||
blockchain_rpc_timeout: Duration,
|
||||
) -> Self {
|
||||
Self {
|
||||
rpc_endpoint_url,
|
||||
@ -77,6 +83,7 @@ impl LogSyncConfig {
|
||||
remove_finalized_block_interval_minutes,
|
||||
watch_loop_wait_time_ms,
|
||||
force_log_sync_from_start_block_number,
|
||||
blockchain_rpc_timeout,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
use crate::sync_manager::log_query::LogQuery;
|
||||
use crate::sync_manager::RETRY_WAIT_MS;
|
||||
use crate::ContractAddress;
|
||||
use crate::sync_manager::{metrics, RETRY_WAIT_MS};
|
||||
use crate::{ContractAddress, LogSyncConfig};
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use append_merkle::{Algorithm, Sha3Algorithm};
|
||||
use contract_interface::{SubmissionNode, SubmitFilter, ZgsFlow};
|
||||
@ -12,17 +12,13 @@ use futures::StreamExt;
|
||||
use jsonrpsee::tracing::{debug, error, info, warn};
|
||||
use shared_types::{DataRoot, Transaction};
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
use storage::log_store::{tx_store::BlockHashAndSubmissionIndex, Store};
|
||||
use task_executor::TaskExecutor;
|
||||
use tokio::{
|
||||
sync::{
|
||||
mpsc::{UnboundedReceiver, UnboundedSender},
|
||||
RwLock,
|
||||
},
|
||||
time::Instant,
|
||||
use tokio::sync::{
|
||||
mpsc::{UnboundedReceiver, UnboundedSender},
|
||||
RwLock,
|
||||
};
|
||||
|
||||
pub struct LogEntryFetcher {
|
||||
@ -34,28 +30,29 @@ pub struct LogEntryFetcher {
|
||||
}
|
||||
|
||||
impl LogEntryFetcher {
|
||||
pub async fn new(
|
||||
url: &str,
|
||||
contract_address: ContractAddress,
|
||||
log_page_size: u64,
|
||||
confirmation_delay: u64,
|
||||
rate_limit_retries: u32,
|
||||
timeout_retries: u32,
|
||||
initial_backoff: u64,
|
||||
) -> Result<Self> {
|
||||
pub async fn new(config: &LogSyncConfig) -> Result<Self> {
|
||||
let provider = Arc::new(Provider::new(
|
||||
RetryClientBuilder::default()
|
||||
.rate_limit_retries(rate_limit_retries)
|
||||
.timeout_retries(timeout_retries)
|
||||
.initial_backoff(Duration::from_millis(initial_backoff))
|
||||
.build(Http::from_str(url)?, Box::new(HttpRateLimitRetryPolicy)),
|
||||
.rate_limit_retries(config.rate_limit_retries)
|
||||
.timeout_retries(config.timeout_retries)
|
||||
.initial_backoff(Duration::from_millis(config.initial_backoff))
|
||||
.build(
|
||||
Http::new_with_client(
|
||||
url::Url::parse(&config.rpc_endpoint_url)?,
|
||||
reqwest::Client::builder()
|
||||
.timeout(config.blockchain_rpc_timeout)
|
||||
.connect_timeout(config.blockchain_rpc_timeout)
|
||||
.build()?,
|
||||
),
|
||||
Box::new(HttpRateLimitRetryPolicy),
|
||||
),
|
||||
));
|
||||
// TODO: `error` types are removed from the ABI json file.
|
||||
Ok(Self {
|
||||
contract_address,
|
||||
contract_address: config.contract_address,
|
||||
provider,
|
||||
log_page_size,
|
||||
confirmation_delay,
|
||||
log_page_size: config.log_page_size,
|
||||
confirmation_delay: config.confirmation_block_count,
|
||||
})
|
||||
}
|
||||
|
||||
@ -242,6 +239,7 @@ impl LogEntryFetcher {
|
||||
);
|
||||
let (mut block_hash_sent, mut block_number_sent) = (None, None);
|
||||
while let Some(maybe_log) = stream.next().await {
|
||||
let start_time = Instant::now();
|
||||
match maybe_log {
|
||||
Ok(log) => {
|
||||
let sync_progress =
|
||||
@ -301,6 +299,7 @@ impl LogEntryFetcher {
|
||||
tokio::time::sleep(Duration::from_millis(RETRY_WAIT_MS)).await;
|
||||
}
|
||||
}
|
||||
metrics::RECOVER_LOG.update_since(start_time);
|
||||
}
|
||||
|
||||
info!("log recover end");
|
||||
|
@ -1,7 +1,13 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use metrics::{register_timer, Timer};
|
||||
use metrics::{register_timer, Gauge, GaugeUsize, Timer};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref STORE_PUT_TX: Arc<dyn Timer> = register_timer("log_entry_sync_store_put_tx");
|
||||
pub static ref LOG_MANAGER_HANDLE_DATA_TRANSACTION: Arc<dyn Timer> = register_timer("log_manager_handle_data_transaction");
|
||||
|
||||
pub static ref STORE_PUT_TX: Arc<dyn Timer> = register_timer("log_entry_sync_manager_put_tx_inner");
|
||||
|
||||
pub static ref STORE_PUT_TX_SPEED_IN_BYTES: Arc<dyn Gauge<usize>> = GaugeUsize::register("log_entry_sync_manager_put_tx_speed_in_bytes");
|
||||
|
||||
pub static ref RECOVER_LOG: Arc<dyn Timer> = register_timer("log_entry_sync_manager_recover_log");
|
||||
}
|
||||
|
@ -26,6 +26,7 @@ const RETRY_WAIT_MS: u64 = 500;
|
||||
// Each tx has less than 10KB, so the cache size should be acceptable.
|
||||
const BROADCAST_CHANNEL_CAPACITY: usize = 25000;
|
||||
const CATCH_UP_END_GAP: u64 = 10;
|
||||
const CHECK_ROOT_INTERVAL: u64 = 500;
|
||||
|
||||
/// Errors while handle data
|
||||
#[derive(Error, Debug)]
|
||||
@ -86,16 +87,7 @@ impl LogSyncManager {
|
||||
.expect("shutdown send error")
|
||||
},
|
||||
async move {
|
||||
let log_fetcher = LogEntryFetcher::new(
|
||||
&config.rpc_endpoint_url,
|
||||
config.contract_address,
|
||||
config.log_page_size,
|
||||
config.confirmation_block_count,
|
||||
config.rate_limit_retries,
|
||||
config.timeout_retries,
|
||||
config.initial_backoff,
|
||||
)
|
||||
.await?;
|
||||
let log_fetcher = LogEntryFetcher::new(&config).await?;
|
||||
let data_cache = DataCache::new(config.cache_config.clone());
|
||||
|
||||
let block_hash_cache = Arc::new(RwLock::new(
|
||||
@ -277,6 +269,9 @@ impl LogSyncManager {
|
||||
.remove_finalized_block_interval_minutes,
|
||||
);
|
||||
|
||||
// start the pad data store
|
||||
log_sync_manager.store.start_padding(&executor_clone);
|
||||
|
||||
let (watch_progress_tx, watch_progress_rx) =
|
||||
tokio::sync::mpsc::unbounded_channel();
|
||||
let watch_rx = log_sync_manager.log_fetcher.start_watch(
|
||||
@ -408,6 +403,7 @@ impl LogSyncManager {
|
||||
}
|
||||
LogFetchProgress::Transaction((tx, block_number)) => {
|
||||
let mut stop = false;
|
||||
let start_time = Instant::now();
|
||||
match self.put_tx(tx.clone()).await {
|
||||
Some(false) => stop = true,
|
||||
Some(true) => {
|
||||
@ -441,6 +437,8 @@ impl LogSyncManager {
|
||||
// no receivers will be created.
|
||||
warn!("log sync broadcast error, error={:?}", e);
|
||||
}
|
||||
|
||||
metrics::LOG_MANAGER_HANDLE_DATA_TRANSACTION.update_since(start_time);
|
||||
}
|
||||
LogFetchProgress::Reverted(reverted) => {
|
||||
self.process_reverted(reverted).await;
|
||||
@ -453,7 +451,6 @@ impl LogSyncManager {
|
||||
async fn put_tx_inner(&mut self, tx: Transaction) -> bool {
|
||||
let start_time = Instant::now();
|
||||
let result = self.store.put_tx(tx.clone());
|
||||
metrics::STORE_PUT_TX.update_since(start_time);
|
||||
|
||||
if let Err(e) = result {
|
||||
error!("put_tx error: e={:?}", e);
|
||||
@ -509,42 +506,50 @@ impl LogSyncManager {
|
||||
}
|
||||
}
|
||||
self.data_cache.garbage_collect(self.next_tx_seq);
|
||||
|
||||
self.next_tx_seq += 1;
|
||||
|
||||
// Check if the computed data root matches on-chain state.
|
||||
// If the call fails, we won't check the root here and return `true` directly.
|
||||
let flow_contract = self.log_fetcher.flow_contract();
|
||||
match flow_contract
|
||||
.get_flow_root_by_tx_seq(tx.seq.into())
|
||||
.call()
|
||||
.await
|
||||
{
|
||||
Ok(contract_root_bytes) => {
|
||||
let contract_root = H256::from_slice(&contract_root_bytes);
|
||||
// contract_root is zero for tx submitted before upgrading.
|
||||
if !contract_root.is_zero() {
|
||||
match self.store.get_context() {
|
||||
Ok((local_root, _)) => {
|
||||
if contract_root != local_root {
|
||||
error!(
|
||||
?contract_root,
|
||||
?local_root,
|
||||
"local flow root and on-chain flow root mismatch"
|
||||
);
|
||||
return false;
|
||||
if self.next_tx_seq % CHECK_ROOT_INTERVAL == 0 {
|
||||
let flow_contract = self.log_fetcher.flow_contract();
|
||||
|
||||
match flow_contract
|
||||
.get_flow_root_by_tx_seq(tx.seq.into())
|
||||
.call()
|
||||
.await
|
||||
{
|
||||
Ok(contract_root_bytes) => {
|
||||
let contract_root = H256::from_slice(&contract_root_bytes);
|
||||
// contract_root is zero for tx submitted before upgrading.
|
||||
if !contract_root.is_zero() {
|
||||
match self.store.get_context() {
|
||||
Ok((local_root, _)) => {
|
||||
if contract_root != local_root {
|
||||
error!(
|
||||
?contract_root,
|
||||
?local_root,
|
||||
"local flow root and on-chain flow root mismatch"
|
||||
);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(?e, "fail to read the local flow root");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(?e, "fail to read the local flow root");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(?e, "fail to read the on-chain flow root");
|
||||
Err(e) => {
|
||||
warn!(?e, "fail to read the on-chain flow root");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
metrics::STORE_PUT_TX_SPEED_IN_BYTES
|
||||
.update((tx.size * 1000 / start_time.elapsed().as_micros() as u64) as usize);
|
||||
metrics::STORE_PUT_TX.update_since(start_time);
|
||||
|
||||
true
|
||||
}
|
||||
}
|
||||
|
@ -5,17 +5,20 @@ use ethereum_types::Address;
|
||||
use ethers::contract::ContractCall;
|
||||
use ethers::contract::EthEvent;
|
||||
use std::sync::Arc;
|
||||
use storage::log_store::log_manager::DATA_DB_KEY;
|
||||
use storage::H256;
|
||||
use storage_async::Store;
|
||||
|
||||
const MINER_ID: &str = "mine.miner_id";
|
||||
|
||||
pub async fn load_miner_id(store: &Store) -> storage::error::Result<Option<H256>> {
|
||||
store.get_config_decoded(&MINER_ID).await
|
||||
store.get_config_decoded(&MINER_ID, DATA_DB_KEY).await
|
||||
}
|
||||
|
||||
async fn set_miner_id(store: &Store, miner_id: &H256) -> storage::error::Result<()> {
|
||||
store.set_config_encoded(&MINER_ID, miner_id).await
|
||||
store
|
||||
.set_config_encoded(&MINER_ID, miner_id, DATA_DB_KEY)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn check_and_request_miner_id(
|
||||
|
@ -3,13 +3,12 @@ use crate::monitor::Monitor;
|
||||
use crate::sealer::Sealer;
|
||||
use crate::submitter::Submitter;
|
||||
use crate::{config::MinerConfig, mine::PoraService, watcher::MineContextWatcher};
|
||||
use network::NetworkMessage;
|
||||
use network::NetworkSender;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use storage::config::ShardConfig;
|
||||
use storage_async::Store;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum MinerMessage {
|
||||
@ -29,7 +28,7 @@ pub struct MineService;
|
||||
impl MineService {
|
||||
pub async fn spawn(
|
||||
executor: task_executor::TaskExecutor,
|
||||
_network_send: mpsc::UnboundedSender<NetworkMessage>,
|
||||
_network_send: NetworkSender,
|
||||
config: MinerConfig,
|
||||
store: Arc<Store>,
|
||||
) -> Result<broadcast::Sender<MinerMessage>, String> {
|
||||
|
@ -41,6 +41,7 @@ if-addrs = "0.10.1"
|
||||
slog = "2.7.0"
|
||||
igd = "0.12.1"
|
||||
duration-str = "0.5.1"
|
||||
channel = { path = "../../common/channel" }
|
||||
|
||||
[dependencies.libp2p]
|
||||
version = "0.45.1"
|
||||
|
@ -592,7 +592,7 @@ where
|
||||
// peer that originally published the message.
|
||||
match PubsubMessage::decode(&gs_msg.topic, &gs_msg.data) {
|
||||
Err(e) => {
|
||||
debug!(topic = ?gs_msg.topic, error = ?e, "Could not decode gossipsub message");
|
||||
debug!(topic = ?gs_msg.topic, %propagation_source, error = ?e, "Could not decode gossipsub message");
|
||||
//reject the message
|
||||
if let Err(e) = self.gossipsub.report_message_validation_result(
|
||||
&id,
|
||||
@ -601,6 +601,24 @@ where
|
||||
) {
|
||||
warn!(message_id = %id, peer_id = %propagation_source, error = ?e, "Failed to report message validation");
|
||||
}
|
||||
|
||||
self.peer_manager.report_peer(
|
||||
&propagation_source,
|
||||
PeerAction::Fatal,
|
||||
ReportSource::Gossipsub,
|
||||
None,
|
||||
"gossipsub message decode error",
|
||||
);
|
||||
|
||||
if let Some(source) = &gs_msg.source {
|
||||
self.peer_manager.report_peer(
|
||||
source,
|
||||
PeerAction::Fatal,
|
||||
ReportSource::Gossipsub,
|
||||
None,
|
||||
"gossipsub message decode error",
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(msg) => {
|
||||
// Notify the network
|
||||
|
@ -157,8 +157,8 @@ impl Default for Config {
|
||||
let filter_rate_limiter = Some(
|
||||
discv5::RateLimiterBuilder::new()
|
||||
.total_n_every(300, Duration::from_secs(1)) // Allow bursts, average 300 per second
|
||||
.ip_n_every(300, Duration::from_secs(1)) // Allow bursts, average 300 per second
|
||||
.node_n_every(300, Duration::from_secs(1)) // Allow bursts, average 300 per second
|
||||
.ip_n_every(9, Duration::from_secs(1)) // Allow bursts, average 9 per second
|
||||
.node_n_every(8, Duration::from_secs(1)) // Allow bursts, average 8 per second
|
||||
.build()
|
||||
.expect("The total rate limit has been specified"),
|
||||
);
|
||||
|
@ -159,3 +159,10 @@ pub enum NetworkMessage {
|
||||
udp_socket: Option<SocketAddr>,
|
||||
},
|
||||
}
|
||||
|
||||
pub type NetworkSender = channel::metrics::Sender<NetworkMessage>;
|
||||
pub type NetworkReceiver = channel::metrics::Receiver<NetworkMessage>;
|
||||
|
||||
pub fn new_network_channel() -> (NetworkSender, NetworkReceiver) {
|
||||
channel::metrics::unbounded_channel("network")
|
||||
}
|
||||
|
@ -3,10 +3,9 @@
|
||||
//! Currently supported strategies:
|
||||
//! - UPnP
|
||||
|
||||
use crate::{NetworkConfig, NetworkMessage};
|
||||
use crate::{NetworkConfig, NetworkMessage, NetworkSender};
|
||||
use if_addrs::get_if_addrs;
|
||||
use std::net::{IpAddr, SocketAddr, SocketAddrV4};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
/// Configuration required to construct the UPnP port mappings.
|
||||
pub struct UPnPConfig {
|
||||
@ -36,10 +35,7 @@ impl UPnPConfig {
|
||||
}
|
||||
|
||||
/// Attempts to construct external port mappings with UPnP.
|
||||
pub fn construct_upnp_mappings(
|
||||
config: UPnPConfig,
|
||||
network_send: mpsc::UnboundedSender<NetworkMessage>,
|
||||
) {
|
||||
pub fn construct_upnp_mappings(config: UPnPConfig, network_send: NetworkSender) {
|
||||
info!("UPnP Attempting to initialise routes");
|
||||
match igd::search_gateway(Default::default()) {
|
||||
Err(e) => info!(error = %e, "UPnP not available"),
|
||||
|
@ -4,7 +4,7 @@ use crate::discovery::enr;
|
||||
use crate::multiaddr::Protocol;
|
||||
use crate::rpc::{GoodbyeReason, RPCResponseErrorCode, ReqId};
|
||||
use crate::types::{error, GossipKind};
|
||||
use crate::{EnrExt, NetworkMessage};
|
||||
use crate::{EnrExt, NetworkSender};
|
||||
use crate::{NetworkConfig, NetworkGlobals, PeerAction, ReportSource};
|
||||
use futures::prelude::*;
|
||||
use libp2p::core::{
|
||||
@ -21,7 +21,6 @@ use std::io::prelude::*;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
|
||||
use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY_PEER_EXCESS};
|
||||
|
||||
@ -60,7 +59,7 @@ pub struct Context<'a> {
|
||||
impl<AppReqId: ReqId> Service<AppReqId> {
|
||||
pub async fn new(
|
||||
executor: task_executor::TaskExecutor,
|
||||
network_sender: UnboundedSender<NetworkMessage>,
|
||||
network_sender: NetworkSender,
|
||||
ctx: Context<'_>,
|
||||
) -> error::Result<(Arc<NetworkGlobals>, Keypair, Self)> {
|
||||
trace!("Libp2p Service starting");
|
||||
|
@ -1,6 +1,7 @@
|
||||
#![cfg(test)]
|
||||
|
||||
use libp2p::gossipsub::GossipsubConfigBuilder;
|
||||
use network::new_network_channel;
|
||||
use network::Enr;
|
||||
use network::EnrExt;
|
||||
use network::Multiaddr;
|
||||
@ -22,7 +23,6 @@ pub mod swarm;
|
||||
type ReqId = usize;
|
||||
|
||||
use tempfile::Builder as TempBuilder;
|
||||
use tokio::sync::mpsc::unbounded_channel;
|
||||
|
||||
#[allow(unused)]
|
||||
pub struct Libp2pInstance(LibP2PService<ReqId>, exit_future::Signal);
|
||||
@ -72,7 +72,7 @@ pub async fn build_libp2p_instance(rt: Weak<Runtime>, boot_nodes: Vec<Enr>) -> L
|
||||
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
|
||||
let executor = task_executor::TaskExecutor::new(rt, exit, shutdown_tx);
|
||||
let libp2p_context = network::Context { config: &config };
|
||||
let (sender, _) = unbounded_channel();
|
||||
let (sender, _) = new_network_channel();
|
||||
Libp2pInstance(
|
||||
LibP2PService::new(executor, sender, libp2p_context)
|
||||
.await
|
||||
|
@ -11,7 +11,7 @@ use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use storage::config::{ShardConfig, SHARD_CONFIG_KEY};
|
||||
use storage::log_store::log_manager::PORA_CHUNK_SIZE;
|
||||
use storage::log_store::log_manager::{DATA_DB_KEY, FLOW_DB_KEY, PORA_CHUNK_SIZE};
|
||||
use storage_async::Store;
|
||||
use task_executor::TaskExecutor;
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
@ -252,7 +252,7 @@ impl Pruner {
|
||||
.update_shard_config(self.config.shard_config)
|
||||
.await;
|
||||
self.store
|
||||
.set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config)
|
||||
.set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config, DATA_DB_KEY)
|
||||
.await
|
||||
}
|
||||
|
||||
@ -265,17 +265,22 @@ impl Pruner {
|
||||
.set_config_encoded(
|
||||
&FIRST_REWARDABLE_CHUNK_KEY,
|
||||
&(new_first_rewardable_chunk, new_first_tx_seq),
|
||||
FLOW_DB_KEY,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_shard_config(store: &Store) -> Result<Option<ShardConfig>> {
|
||||
store.get_config_decoded(&SHARD_CONFIG_KEY).await
|
||||
store
|
||||
.get_config_decoded(&SHARD_CONFIG_KEY, DATA_DB_KEY)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_first_rewardable_chunk(store: &Store) -> Result<Option<(u64, u64)>> {
|
||||
store.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY).await
|
||||
store
|
||||
.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY, FLOW_DB_KEY)
|
||||
.await
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -16,7 +16,7 @@ use network::{
|
||||
Keypair, MessageAcceptance, MessageId, NetworkGlobals, NetworkMessage, PeerId, PeerRequestId,
|
||||
PublicKey, PubsubMessage, Request, RequestId, Response,
|
||||
};
|
||||
use network::{Multiaddr, PeerAction, ReportSource};
|
||||
use network::{Multiaddr, NetworkSender, PeerAction, ReportSource};
|
||||
use shared_types::{bytes_to_chunks, timestamp_now, NetworkIdentity, TxID};
|
||||
use storage::config::ShardConfig;
|
||||
use storage_async::Store;
|
||||
@ -88,7 +88,7 @@ pub struct Libp2pEventHandler {
|
||||
/// A collection of global variables, accessible outside of the network service.
|
||||
network_globals: Arc<NetworkGlobals>,
|
||||
/// A channel to the router service.
|
||||
network_send: mpsc::UnboundedSender<NetworkMessage>,
|
||||
network_send: NetworkSender,
|
||||
/// A channel to the syncing service.
|
||||
sync_send: SyncSender,
|
||||
/// A channel to the RPC chunk pool service.
|
||||
@ -112,7 +112,7 @@ impl Libp2pEventHandler {
|
||||
pub fn new(
|
||||
config: Config,
|
||||
network_globals: Arc<NetworkGlobals>,
|
||||
network_send: mpsc::UnboundedSender<NetworkMessage>,
|
||||
network_send: NetworkSender,
|
||||
sync_send: SyncSender,
|
||||
chunk_pool_send: UnboundedSender<ChunkPoolMessage>,
|
||||
local_keypair: Keypair,
|
||||
@ -1010,10 +1010,12 @@ mod tests {
|
||||
use network::{
|
||||
discovery::{CombinedKey, ConnectionId},
|
||||
discv5::enr::EnrBuilder,
|
||||
new_network_channel,
|
||||
rpc::{GetChunksRequest, StatusMessage, SubstreamId},
|
||||
types::FindFile,
|
||||
CombinedKeyExt, Keypair, MessageAcceptance, MessageId, Multiaddr, NetworkGlobals,
|
||||
NetworkMessage, PeerId, PubsubMessage, Request, RequestId, Response, SyncId,
|
||||
NetworkMessage, NetworkReceiver, PeerId, PubsubMessage, Request, RequestId, Response,
|
||||
SyncId,
|
||||
};
|
||||
use shared_types::{timestamp_now, ChunkArray, ChunkArrayWithProof, FlowRangeProof, TxID};
|
||||
use storage::{
|
||||
@ -1035,8 +1037,8 @@ mod tests {
|
||||
runtime: TestRuntime,
|
||||
network_globals: Arc<NetworkGlobals>,
|
||||
keypair: Keypair,
|
||||
network_send: mpsc::UnboundedSender<NetworkMessage>,
|
||||
network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
|
||||
network_send: NetworkSender,
|
||||
network_recv: NetworkReceiver,
|
||||
sync_send: SyncSender,
|
||||
sync_recv: SyncReceiver,
|
||||
chunk_pool_send: mpsc::UnboundedSender<ChunkPoolMessage>,
|
||||
@ -1050,12 +1052,11 @@ mod tests {
|
||||
fn default() -> Self {
|
||||
let runtime = TestRuntime::default();
|
||||
let (network_globals, keypair) = Context::new_network_globals();
|
||||
let (network_send, network_recv) = mpsc::unbounded_channel();
|
||||
let (network_send, network_recv) = new_network_channel();
|
||||
let (sync_send, sync_recv) = channel::Channel::unbounded("test");
|
||||
let (chunk_pool_send, _chunk_pool_recv) = mpsc::unbounded_channel();
|
||||
|
||||
let executor = runtime.task_executor.clone();
|
||||
let store = LogManager::memorydb(LogConfig::default(), executor).unwrap();
|
||||
let store = LogManager::memorydb(LogConfig::default()).unwrap();
|
||||
Self {
|
||||
runtime,
|
||||
network_globals: Arc::new(network_globals),
|
||||
|
@ -5,11 +5,9 @@ use chunk_pool::ChunkPoolMessage;
|
||||
use file_location_cache::FileLocationCache;
|
||||
use futures::{channel::mpsc::Sender, prelude::*};
|
||||
use miner::MinerMessage;
|
||||
use network::types::NewFile;
|
||||
use network::PubsubMessage;
|
||||
use network::{
|
||||
BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage, RequestId,
|
||||
Service as LibP2PService, Swarm,
|
||||
types::NewFile, BehaviourEvent, Keypair, Libp2pEvent, NetworkGlobals, NetworkMessage,
|
||||
NetworkReceiver, NetworkSender, PubsubMessage, RequestId, Service as LibP2PService, Swarm,
|
||||
};
|
||||
use pruner::PrunerMessage;
|
||||
use shared_types::timestamp_now;
|
||||
@ -33,7 +31,7 @@ pub struct RouterService {
|
||||
network_globals: Arc<NetworkGlobals>,
|
||||
|
||||
/// The receiver channel for Zgs to communicate with the network service.
|
||||
network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
|
||||
network_recv: NetworkReceiver,
|
||||
|
||||
/// The receiver channel for Zgs to communicate with the pruner service.
|
||||
pruner_recv: Option<mpsc::UnboundedReceiver<PrunerMessage>>,
|
||||
@ -57,8 +55,8 @@ impl RouterService {
|
||||
executor: task_executor::TaskExecutor,
|
||||
libp2p: LibP2PService<RequestId>,
|
||||
network_globals: Arc<NetworkGlobals>,
|
||||
network_recv: mpsc::UnboundedReceiver<NetworkMessage>,
|
||||
network_send: mpsc::UnboundedSender<NetworkMessage>,
|
||||
network_recv: NetworkReceiver,
|
||||
network_send: NetworkSender,
|
||||
sync_send: SyncSender,
|
||||
_miner_send: Option<broadcast::Sender<MinerMessage>>,
|
||||
chunk_pool_send: UnboundedSender<ChunkPoolMessage>,
|
||||
|
@ -17,15 +17,13 @@ use file_location_cache::FileLocationCache;
|
||||
use futures::channel::mpsc::Sender;
|
||||
use jsonrpsee::core::RpcResult;
|
||||
use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle};
|
||||
use network::NetworkGlobals;
|
||||
use network::NetworkMessage;
|
||||
use network::{NetworkGlobals, NetworkMessage, NetworkSender};
|
||||
use std::error::Error;
|
||||
use std::sync::Arc;
|
||||
use storage_async::Store;
|
||||
use sync::{SyncRequest, SyncResponse, SyncSender};
|
||||
use task_executor::ShutdownReason;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use zgs::RpcServer as ZgsRpcServer;
|
||||
use zgs_miner::MinerMessage;
|
||||
|
||||
@ -42,7 +40,7 @@ pub struct Context {
|
||||
pub config: RPCConfig,
|
||||
pub file_location_cache: Arc<FileLocationCache>,
|
||||
pub network_globals: Arc<NetworkGlobals>,
|
||||
pub network_send: UnboundedSender<NetworkMessage>,
|
||||
pub network_send: NetworkSender,
|
||||
pub sync_send: SyncSender,
|
||||
pub chunk_pool: Arc<MemoryChunkPool>,
|
||||
pub log_store: Arc<Store>,
|
||||
|
@ -4,8 +4,8 @@ use file_location_cache::FileLocationCache;
|
||||
use log_entry_sync::{LogSyncConfig, LogSyncEvent, LogSyncManager};
|
||||
use miner::{MineService, MinerConfig, MinerMessage, ShardConfig};
|
||||
use network::{
|
||||
self, Keypair, NetworkConfig, NetworkGlobals, NetworkMessage, RequestId,
|
||||
Service as LibP2PService,
|
||||
self, new_network_channel, Keypair, NetworkConfig, NetworkGlobals, NetworkReceiver,
|
||||
NetworkSender, RequestId, Service as LibP2PService,
|
||||
};
|
||||
use pruner::{Pruner, PrunerConfig, PrunerMessage};
|
||||
use router::RouterService;
|
||||
@ -27,15 +27,12 @@ macro_rules! require {
|
||||
}
|
||||
|
||||
struct NetworkComponents {
|
||||
send: mpsc::UnboundedSender<NetworkMessage>,
|
||||
send: NetworkSender,
|
||||
globals: Arc<NetworkGlobals>,
|
||||
keypair: Keypair,
|
||||
|
||||
// note: these will be owned by the router service
|
||||
owned: Option<(
|
||||
LibP2PService<RequestId>,
|
||||
mpsc::UnboundedReceiver<NetworkMessage>,
|
||||
)>,
|
||||
owned: Option<(LibP2PService<RequestId>, NetworkReceiver)>,
|
||||
}
|
||||
|
||||
struct SyncComponents {
|
||||
@ -89,10 +86,9 @@ impl ClientBuilder {
|
||||
|
||||
/// Initializes in-memory storage.
|
||||
pub fn with_memory_store(mut self) -> Result<Self, String> {
|
||||
let executor = require!("sync", self, runtime_context).clone().executor;
|
||||
// TODO(zz): Set config.
|
||||
let store = Arc::new(
|
||||
LogManager::memorydb(LogConfig::default(), executor)
|
||||
LogManager::memorydb(LogConfig::default())
|
||||
.map_err(|e| format!("Unable to start in-memory store: {:?}", e))?,
|
||||
);
|
||||
|
||||
@ -110,13 +106,11 @@ impl ClientBuilder {
|
||||
|
||||
/// Initializes RocksDB storage.
|
||||
pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
|
||||
let executor = require!("sync", self, runtime_context).clone().executor;
|
||||
let store = Arc::new(
|
||||
LogManager::rocksdb(
|
||||
config.log_config.clone(),
|
||||
config.db_dir.join("flow_db"),
|
||||
config.db_dir.join("data_db"),
|
||||
executor,
|
||||
)
|
||||
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
|
||||
);
|
||||
@ -147,7 +141,7 @@ impl ClientBuilder {
|
||||
let service_context = network::Context { config };
|
||||
|
||||
// construct communication channel
|
||||
let (send, recv) = mpsc::unbounded_channel::<NetworkMessage>();
|
||||
let (send, recv) = new_network_channel();
|
||||
|
||||
// launch libp2p service
|
||||
let (globals, keypair, libp2p) =
|
||||
|
@ -146,6 +146,7 @@ impl ZgsConfig {
|
||||
self.remove_finalized_block_interval_minutes,
|
||||
self.watch_loop_wait_time_ms,
|
||||
self.force_log_sync_from_start_block_number,
|
||||
Duration::from_secs(self.blockchain_rpc_timeout_secs),
|
||||
))
|
||||
}
|
||||
|
||||
|
@ -49,6 +49,8 @@ build_config! {
|
||||
(remove_finalized_block_interval_minutes, (u64), 30)
|
||||
(watch_loop_wait_time_ms, (u64), 500)
|
||||
|
||||
(blockchain_rpc_timeout_secs, (u64), 120)
|
||||
|
||||
// chunk pool
|
||||
(chunk_pool_write_window_size, (usize), 4)
|
||||
(chunk_pool_max_cached_chunks_all, (usize), 4*1024*1024) // 1G
|
||||
|
@ -10,4 +10,5 @@ storage = { path = "../storage" }
|
||||
task_executor = { path = "../../common/task_executor" }
|
||||
tokio = { version = "1.19.2", features = ["sync"] }
|
||||
tracing = "0.1.35"
|
||||
eth2_ssz = "0.4.0"
|
||||
eth2_ssz = "0.4.0"
|
||||
backtrace = "0.3"
|
@ -2,6 +2,7 @@
|
||||
extern crate tracing;
|
||||
|
||||
use anyhow::bail;
|
||||
use backtrace::Backtrace;
|
||||
use shared_types::{
|
||||
Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, FlowRangeProof, Transaction,
|
||||
};
|
||||
@ -74,9 +75,11 @@ impl Store {
|
||||
pub async fn get_config_decoded<K: AsRef<[u8]> + Send + Sync, T: Decode + Send + 'static>(
|
||||
&self,
|
||||
key: &K,
|
||||
dest: &str,
|
||||
) -> Result<Option<T>> {
|
||||
let key = key.as_ref().to_vec();
|
||||
self.spawn(move |store| store.get_config_decoded(&key))
|
||||
let dest = dest.to_string();
|
||||
self.spawn(move |store| store.get_config_decoded(&key, &dest))
|
||||
.await
|
||||
}
|
||||
|
||||
@ -84,10 +87,12 @@ impl Store {
|
||||
&self,
|
||||
key: &K,
|
||||
value: &T,
|
||||
dest: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
let key = key.as_ref().to_vec();
|
||||
let value = value.as_ssz_bytes();
|
||||
self.spawn(move |store| store.set_config(&key, &value))
|
||||
let dest = dest.to_string();
|
||||
self.spawn(move |store| store.set_config(&key, &value, &dest))
|
||||
.await
|
||||
}
|
||||
|
||||
@ -148,8 +153,16 @@ impl Store {
|
||||
WORKER_TASK_NAME,
|
||||
);
|
||||
|
||||
rx.await
|
||||
.unwrap_or_else(|_| bail!(error::Error::Custom("Receiver error".to_string())))
|
||||
rx.await.unwrap_or_else(|_| {
|
||||
let mut backtrace = Backtrace::new();
|
||||
let mut frames = backtrace.frames().to_vec();
|
||||
if frames.len() > 3 {
|
||||
frames.drain(0..); //Remove the first 3 unnecessary frames to simplify// backtrace
|
||||
backtrace = frames.into();
|
||||
Some(format!("{:?}", backtrace));
|
||||
}
|
||||
bail!(error::Error::Custom("Receiver error".to_string()))
|
||||
})
|
||||
}
|
||||
|
||||
// FIXME(zz): Refactor the lock and async call here.
|
||||
|
@ -31,6 +31,8 @@ parking_lot = "0.12.3"
|
||||
serde_json = "1.0.127"
|
||||
tokio = { version = "1.38.0", features = ["full"] }
|
||||
task_executor = { path = "../../common/task_executor" }
|
||||
lazy_static = "1.4.0"
|
||||
metrics = { workspace = true }
|
||||
once_cell = { version = "1.19.0", features = [] }
|
||||
|
||||
[dev-dependencies]
|
||||
|
@ -14,25 +14,16 @@ use storage::{
|
||||
},
|
||||
LogManager,
|
||||
};
|
||||
use task_executor::test_utils::TestRuntime;
|
||||
|
||||
fn write_performance(c: &mut Criterion) {
|
||||
if Path::new("db_write").exists() {
|
||||
fs::remove_dir_all("db_write").unwrap();
|
||||
}
|
||||
let runtime = TestRuntime::default();
|
||||
|
||||
let executor = runtime.task_executor.clone();
|
||||
|
||||
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
|
||||
LogManager::rocksdb(
|
||||
LogConfig::default(),
|
||||
"db_flow_write",
|
||||
"db_data_write",
|
||||
executor,
|
||||
)
|
||||
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
|
||||
.unwrap(),
|
||||
LogManager::rocksdb(LogConfig::default(), "db_flow_write", "db_data_write")
|
||||
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
|
||||
.unwrap(),
|
||||
));
|
||||
|
||||
let chunk_count = 2048;
|
||||
@ -114,19 +105,10 @@ fn read_performance(c: &mut Criterion) {
|
||||
fs::remove_dir_all("db_read").unwrap();
|
||||
}
|
||||
|
||||
let runtime = TestRuntime::default();
|
||||
|
||||
let executor = runtime.task_executor.clone();
|
||||
|
||||
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
|
||||
LogManager::rocksdb(
|
||||
LogConfig::default(),
|
||||
"db_flow_read",
|
||||
"db_data_read",
|
||||
executor,
|
||||
)
|
||||
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
|
||||
.unwrap(),
|
||||
LogManager::rocksdb(LogConfig::default(), "db_flow_read", "db_data_read")
|
||||
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
|
||||
.unwrap(),
|
||||
));
|
||||
|
||||
let tx_size = 1000;
|
||||
|
@ -2,16 +2,55 @@ use anyhow::{anyhow, Result};
|
||||
use kvdb::{DBKey, DBOp};
|
||||
use ssz::{Decode, Encode};
|
||||
|
||||
use crate::log_store::log_manager::{COL_MISC, DATA_DB_KEY, FLOW_DB_KEY};
|
||||
use crate::LogManager;
|
||||
|
||||
use super::log_manager::COL_MISC;
|
||||
macro_rules! db_operation {
|
||||
($self:expr, $dest:expr, get, $key:expr) => {{
|
||||
let db = match $dest {
|
||||
DATA_DB_KEY => &$self.data_db,
|
||||
FLOW_DB_KEY => &$self.flow_db,
|
||||
_ => return Err(anyhow!("Invalid destination")),
|
||||
};
|
||||
Ok(db.get(COL_MISC, $key)?)
|
||||
}};
|
||||
|
||||
($self:expr, $dest:expr, put, $key:expr, $value:expr) => {{
|
||||
let db = match $dest {
|
||||
DATA_DB_KEY => &$self.data_db,
|
||||
FLOW_DB_KEY => &$self.flow_db,
|
||||
_ => return Err(anyhow!("Invalid destination")),
|
||||
};
|
||||
Ok(db.put(COL_MISC, $key, $value)?)
|
||||
}};
|
||||
|
||||
($self:expr, $dest:expr, delete, $key:expr) => {{
|
||||
let db = match $dest {
|
||||
DATA_DB_KEY => &$self.data_db,
|
||||
FLOW_DB_KEY => &$self.flow_db,
|
||||
_ => return Err(anyhow!("Invalid destination")),
|
||||
};
|
||||
Ok(db.delete(COL_MISC, $key)?)
|
||||
}};
|
||||
|
||||
($self:expr, $dest:expr, transaction, $tx:expr) => {{
|
||||
let db = match $dest {
|
||||
DATA_DB_KEY => &$self.data_db,
|
||||
FLOW_DB_KEY => &$self.flow_db,
|
||||
_ => return Err(anyhow!("Invalid destination")),
|
||||
};
|
||||
let mut db_tx = db.transaction();
|
||||
db_tx.ops = $tx.ops;
|
||||
Ok(db.write(db_tx)?)
|
||||
}};
|
||||
}
|
||||
|
||||
pub trait Configurable {
|
||||
fn get_config(&self, key: &[u8]) -> Result<Option<Vec<u8>>>;
|
||||
fn set_config(&self, key: &[u8], value: &[u8]) -> Result<()>;
|
||||
fn remove_config(&self, key: &[u8]) -> Result<()>;
|
||||
fn get_config(&self, key: &[u8], dest: &str) -> Result<Option<Vec<u8>>>;
|
||||
fn set_config(&self, key: &[u8], value: &[u8], dest: &str) -> Result<()>;
|
||||
fn remove_config(&self, key: &[u8], dest: &str) -> Result<()>;
|
||||
|
||||
fn exec_configs(&self, tx: ConfigTx) -> Result<()>;
|
||||
fn exec_configs(&self, tx: ConfigTx, dest: &str) -> Result<()>;
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
@ -41,8 +80,12 @@ impl ConfigTx {
|
||||
}
|
||||
|
||||
pub trait ConfigurableExt: Configurable {
|
||||
fn get_config_decoded<K: AsRef<[u8]>, T: Decode>(&self, key: &K) -> Result<Option<T>> {
|
||||
match self.get_config(key.as_ref())? {
|
||||
fn get_config_decoded<K: AsRef<[u8]>, T: Decode>(
|
||||
&self,
|
||||
key: &K,
|
||||
dest: &str,
|
||||
) -> Result<Option<T>> {
|
||||
match self.get_config(key.as_ref(), dest)? {
|
||||
Some(val) => Ok(Some(
|
||||
T::from_ssz_bytes(&val).map_err(|e| anyhow!("SSZ decode error: {:?}", e))?,
|
||||
)),
|
||||
@ -50,36 +93,36 @@ pub trait ConfigurableExt: Configurable {
|
||||
}
|
||||
}
|
||||
|
||||
fn set_config_encoded<K: AsRef<[u8]>, T: Encode>(&self, key: &K, value: &T) -> Result<()> {
|
||||
self.set_config(key.as_ref(), &value.as_ssz_bytes())
|
||||
fn set_config_encoded<K: AsRef<[u8]>, T: Encode>(
|
||||
&self,
|
||||
key: &K,
|
||||
value: &T,
|
||||
dest: &str,
|
||||
) -> Result<()> {
|
||||
self.set_config(key.as_ref(), &value.as_ssz_bytes(), dest)
|
||||
}
|
||||
|
||||
fn remove_config_by_key<K: AsRef<[u8]>>(&self, key: &K) -> Result<()> {
|
||||
self.remove_config(key.as_ref())
|
||||
fn remove_config_by_key<K: AsRef<[u8]>>(&self, key: &K, dest: &str) -> Result<()> {
|
||||
self.remove_config(key.as_ref(), dest)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ?Sized + Configurable> ConfigurableExt for T {}
|
||||
|
||||
impl Configurable for LogManager {
|
||||
fn get_config(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
|
||||
Ok(self.flow_db.get(COL_MISC, key)?)
|
||||
fn get_config(&self, key: &[u8], dest: &str) -> Result<Option<Vec<u8>>> {
|
||||
db_operation!(self, dest, get, key)
|
||||
}
|
||||
|
||||
fn set_config(&self, key: &[u8], value: &[u8]) -> Result<()> {
|
||||
self.flow_db.put(COL_MISC, key, value)?;
|
||||
Ok(())
|
||||
fn set_config(&self, key: &[u8], value: &[u8], dest: &str) -> Result<()> {
|
||||
db_operation!(self, dest, put, key, value)
|
||||
}
|
||||
|
||||
fn remove_config(&self, key: &[u8]) -> Result<()> {
|
||||
Ok(self.flow_db.delete(COL_MISC, key)?)
|
||||
fn remove_config(&self, key: &[u8], dest: &str) -> Result<()> {
|
||||
db_operation!(self, dest, delete, key)
|
||||
}
|
||||
|
||||
fn exec_configs(&self, tx: ConfigTx) -> Result<()> {
|
||||
let mut db_tx = self.flow_db.transaction();
|
||||
db_tx.ops = tx.ops;
|
||||
self.flow_db.write(db_tx)?;
|
||||
|
||||
Ok(())
|
||||
fn exec_configs(&self, tx: ConfigTx, dest: &str) -> Result<()> {
|
||||
db_operation!(self, dest, transaction, tx)
|
||||
}
|
||||
}
|
||||
|
@ -1,12 +1,14 @@
|
||||
use super::load_chunk::EntryBatch;
|
||||
use super::seal_task_manager::SealTaskManager;
|
||||
use super::{MineLoadChunk, SealAnswer, SealTask};
|
||||
use crate::config::ShardConfig;
|
||||
use crate::error::Error;
|
||||
use crate::log_store::load_chunk::EntryBatch;
|
||||
use crate::log_store::log_manager::{
|
||||
bytes_to_entries, COL_ENTRY_BATCH, COL_FLOW_MPT_NODES, PORA_CHUNK_SIZE,
|
||||
bytes_to_entries, COL_ENTRY_BATCH, COL_FLOW_MPT_NODES, COL_PAD_DATA_LIST,
|
||||
COL_PAD_DATA_SYNC_HEIGH, PORA_CHUNK_SIZE,
|
||||
};
|
||||
use crate::log_store::seal_task_manager::SealTaskManager;
|
||||
use crate::log_store::{
|
||||
metrics, FlowRead, FlowSeal, FlowWrite, MineLoadChunk, SealAnswer, SealTask,
|
||||
};
|
||||
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
|
||||
use crate::{try_option, ZgsKeyValueDB};
|
||||
use any::Any;
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
@ -20,19 +22,22 @@ use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
|
||||
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use std::{any, cmp};
|
||||
use tracing::{debug, error, trace};
|
||||
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
|
||||
|
||||
pub struct FlowStore {
|
||||
flow_db: Arc<FlowDBStore>,
|
||||
data_db: Arc<FlowDBStore>,
|
||||
seal_manager: SealTaskManager,
|
||||
config: FlowConfig,
|
||||
}
|
||||
|
||||
impl FlowStore {
|
||||
pub fn new(data_db: Arc<FlowDBStore>, config: FlowConfig) -> Self {
|
||||
pub fn new(flow_db: Arc<FlowDBStore>, data_db: Arc<FlowDBStore>, config: FlowConfig) -> Self {
|
||||
Self {
|
||||
flow_db,
|
||||
data_db,
|
||||
seal_manager: Default::default(),
|
||||
config,
|
||||
@ -44,6 +49,7 @@ impl FlowStore {
|
||||
batch_index: usize,
|
||||
subtree_list: Vec<(usize, usize, DataRoot)>,
|
||||
) -> Result<()> {
|
||||
let start_time = Instant::now();
|
||||
let mut batch = self
|
||||
.data_db
|
||||
.get_entry_batch(batch_index as u64)?
|
||||
@ -51,7 +57,7 @@ impl FlowStore {
|
||||
batch.set_subtree_list(subtree_list);
|
||||
self.data_db
|
||||
.put_entry_raw(vec![(batch_index as u64, batch)])?;
|
||||
|
||||
metrics::INSERT_SUBTREE_LIST.update_since(start_time);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -199,12 +205,21 @@ impl FlowRead for FlowStore {
|
||||
fn get_shard_config(&self) -> ShardConfig {
|
||||
*self.config.shard_config.read()
|
||||
}
|
||||
|
||||
fn get_pad_data(&self, start_index: u64) -> crate::error::Result<Option<Vec<PadPair>>> {
|
||||
self.flow_db.get_pad_data(start_index)
|
||||
}
|
||||
|
||||
fn get_pad_data_sync_height(&self) -> Result<Option<u64>> {
|
||||
self.data_db.get_pad_data_sync_height()
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowWrite for FlowStore {
|
||||
/// Return the roots of completed chunks. The order is guaranteed to be increasing
|
||||
/// by chunk index.
|
||||
fn append_entries(&self, data: ChunkArray) -> Result<Vec<(u64, DataRoot)>> {
|
||||
let start_time = Instant::now();
|
||||
let mut to_seal_set = self.seal_manager.to_seal_set.write();
|
||||
trace!("append_entries: {} {}", data.start_index, data.data.len());
|
||||
if data.data.len() % BYTES_PER_SECTOR != 0 {
|
||||
@ -247,6 +262,8 @@ impl FlowWrite for FlowStore {
|
||||
|
||||
batch_list.push((chunk_index, batch));
|
||||
}
|
||||
|
||||
metrics::APPEND_ENTRIES.update_since(start_time);
|
||||
self.data_db.put_entry_batch_list(batch_list)
|
||||
}
|
||||
|
||||
@ -266,6 +283,14 @@ impl FlowWrite for FlowStore {
|
||||
fn update_shard_config(&self, shard_config: ShardConfig) {
|
||||
*self.config.shard_config.write() = shard_config;
|
||||
}
|
||||
|
||||
fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> crate::error::Result<()> {
|
||||
self.flow_db.put_pad_data(data_sizes, tx_seq)
|
||||
}
|
||||
|
||||
fn put_pad_data_sync_height(&self, sync_index: u64) -> crate::error::Result<()> {
|
||||
self.data_db.put_pad_data_sync_height(sync_index)
|
||||
}
|
||||
}
|
||||
|
||||
impl FlowSeal for FlowStore {
|
||||
@ -343,6 +368,12 @@ impl FlowSeal for FlowStore {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, DeriveEncode, DeriveDecode)]
|
||||
pub struct PadPair {
|
||||
pub start_index: u64,
|
||||
pub data_size: u64,
|
||||
}
|
||||
|
||||
pub struct FlowDBStore {
|
||||
kvdb: Arc<dyn ZgsKeyValueDB>,
|
||||
}
|
||||
@ -356,6 +387,7 @@ impl FlowDBStore {
|
||||
&self,
|
||||
batch_list: Vec<(u64, EntryBatch)>,
|
||||
) -> Result<Vec<(u64, DataRoot)>> {
|
||||
let start_time = Instant::now();
|
||||
let mut completed_batches = Vec::new();
|
||||
let mut tx = self.kvdb.transaction();
|
||||
for (batch_index, batch) in batch_list {
|
||||
@ -370,6 +402,7 @@ impl FlowDBStore {
|
||||
}
|
||||
}
|
||||
self.kvdb.write(tx)?;
|
||||
metrics::PUT_ENTRY_BATCH_LIST.update_since(start_time);
|
||||
Ok(completed_batches)
|
||||
}
|
||||
|
||||
@ -443,6 +476,48 @@ impl FlowDBStore {
|
||||
}
|
||||
Ok(self.kvdb.write(tx)?)
|
||||
}
|
||||
|
||||
fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> Result<()> {
|
||||
let mut tx = self.kvdb.transaction();
|
||||
|
||||
let mut buffer = Vec::new();
|
||||
for item in data_sizes {
|
||||
buffer.extend(item.as_ssz_bytes());
|
||||
}
|
||||
|
||||
tx.put(COL_PAD_DATA_LIST, &tx_seq.to_be_bytes(), &buffer);
|
||||
self.kvdb.write(tx)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn put_pad_data_sync_height(&self, tx_seq: u64) -> Result<()> {
|
||||
let mut tx = self.kvdb.transaction();
|
||||
tx.put(
|
||||
COL_PAD_DATA_SYNC_HEIGH,
|
||||
b"sync_height",
|
||||
&tx_seq.to_be_bytes(),
|
||||
);
|
||||
self.kvdb.write(tx)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_pad_data_sync_height(&self) -> Result<Option<u64>> {
|
||||
match self.kvdb.get(COL_PAD_DATA_SYNC_HEIGH, b"sync_height")? {
|
||||
Some(v) => Ok(Some(u64::from_be_bytes(
|
||||
v.try_into().map_err(|e| anyhow!("{:?}", e))?,
|
||||
))),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_pad_data(&self, tx_seq: u64) -> Result<Option<Vec<PadPair>>> {
|
||||
match self.kvdb.get(COL_PAD_DATA_LIST, &tx_seq.to_be_bytes())? {
|
||||
Some(v) => Ok(Some(
|
||||
Vec::<PadPair>::from_ssz_bytes(&v).map_err(Error::from)?,
|
||||
)),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(DeriveEncode, DeriveDecode, Clone, Debug)]
|
||||
|
@ -1,10 +1,11 @@
|
||||
use super::tx_store::BlockHashAndSubmissionIndex;
|
||||
use super::{FlowSeal, MineLoadChunk, SealAnswer, SealTask};
|
||||
use crate::config::ShardConfig;
|
||||
use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore};
|
||||
use crate::log_store::tx_store::TransactionStore;
|
||||
use crate::log_store::flow_store::{
|
||||
batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore, PadPair,
|
||||
};
|
||||
use crate::log_store::tx_store::{BlockHashAndSubmissionIndex, TransactionStore};
|
||||
use crate::log_store::{
|
||||
FlowRead, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite,
|
||||
FlowRead, FlowSeal, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead,
|
||||
LogStoreWrite, MineLoadChunk, SealAnswer, SealTask,
|
||||
};
|
||||
use crate::{try_option, ZgsKeyValueDB};
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
@ -24,26 +25,33 @@ use shared_types::{
|
||||
use std::cmp::Ordering;
|
||||
|
||||
use std::path::Path;
|
||||
use std::sync::mpsc;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use tracing::{debug, error, info, instrument, trace, warn};
|
||||
|
||||
use crate::log_store::metrics;
|
||||
|
||||
/// 256 Bytes
|
||||
pub const ENTRY_SIZE: usize = 256;
|
||||
/// 1024 Entries.
|
||||
pub const PORA_CHUNK_SIZE: usize = 1024;
|
||||
|
||||
pub const COL_TX: u32 = 0;
|
||||
pub const COL_ENTRY_BATCH: u32 = 1;
|
||||
pub const COL_TX_DATA_ROOT_INDEX: u32 = 2;
|
||||
pub const COL_ENTRY_BATCH_ROOT: u32 = 3;
|
||||
pub const COL_TX_COMPLETED: u32 = 4;
|
||||
pub const COL_MISC: u32 = 5;
|
||||
pub const COL_SEAL_CONTEXT: u32 = 6;
|
||||
pub const COL_FLOW_MPT_NODES: u32 = 7;
|
||||
pub const COL_BLOCK_PROGRESS: u32 = 8;
|
||||
pub const COL_TX: u32 = 0; // flow db
|
||||
pub const COL_ENTRY_BATCH: u32 = 1; // data db
|
||||
pub const COL_TX_DATA_ROOT_INDEX: u32 = 2; // flow db
|
||||
pub const COL_TX_COMPLETED: u32 = 3; // data db
|
||||
pub const COL_MISC: u32 = 4; // flow db & data db
|
||||
pub const COL_FLOW_MPT_NODES: u32 = 5; // flow db
|
||||
pub const COL_BLOCK_PROGRESS: u32 = 6; // flow db
|
||||
pub const COL_PAD_DATA_LIST: u32 = 7; // flow db
|
||||
pub const COL_PAD_DATA_SYNC_HEIGH: u32 = 8; // data db
|
||||
pub const COL_NUM: u32 = 9;
|
||||
|
||||
pub const DATA_DB_KEY: &str = "data_db";
|
||||
pub const FLOW_DB_KEY: &str = "flow_db";
|
||||
const PAD_DELAY: Duration = Duration::from_secs(2);
|
||||
|
||||
// Process at most 1M entries (256MB) pad data at a time.
|
||||
const PAD_MAX_SIZE: usize = 1 << 20;
|
||||
|
||||
@ -62,10 +70,10 @@ pub struct UpdateFlowMessage {
|
||||
|
||||
pub struct LogManager {
|
||||
pub(crate) flow_db: Arc<dyn ZgsKeyValueDB>,
|
||||
pub(crate) data_db: Arc<dyn ZgsKeyValueDB>,
|
||||
tx_store: TransactionStore,
|
||||
flow_store: Arc<FlowStore>,
|
||||
merkle: RwLock<MerkleManager>,
|
||||
sender: mpsc::Sender<UpdateFlowMessage>,
|
||||
}
|
||||
|
||||
struct MerkleManager {
|
||||
@ -188,6 +196,7 @@ impl LogStoreChunkWrite for LogManager {
|
||||
chunks: ChunkArray,
|
||||
maybe_file_proof: Option<FlowProof>,
|
||||
) -> Result<bool> {
|
||||
let start_time = Instant::now();
|
||||
let mut merkle = self.merkle.write();
|
||||
let tx = self
|
||||
.tx_store
|
||||
@ -219,6 +228,7 @@ impl LogStoreChunkWrite for LogManager {
|
||||
tx.start_entry_index,
|
||||
)?;
|
||||
}
|
||||
metrics::PUT_CHUNKS.update_since(start_time);
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
@ -249,6 +259,7 @@ impl LogStoreWrite for LogManager {
|
||||
/// `put_tx` for the last tx when we restart the node to ensure that it succeeds.
|
||||
///
|
||||
fn put_tx(&self, tx: Transaction) -> Result<()> {
|
||||
let start_time = Instant::now();
|
||||
let mut merkle = self.merkle.write();
|
||||
debug!("put_tx: tx={:?}", tx);
|
||||
let expected_seq = self.tx_store.next_tx_seq();
|
||||
@ -264,7 +275,12 @@ impl LogStoreWrite for LogManager {
|
||||
}
|
||||
let maybe_same_data_tx_seq = self.tx_store.put_tx(tx.clone())?.first().cloned();
|
||||
// TODO(zz): Should we validate received tx?
|
||||
self.append_subtree_list(tx.start_entry_index, tx.merkle_nodes.clone(), &mut merkle)?;
|
||||
self.append_subtree_list(
|
||||
tx.seq,
|
||||
tx.start_entry_index,
|
||||
tx.merkle_nodes.clone(),
|
||||
&mut merkle,
|
||||
)?;
|
||||
merkle.commit_merkle(tx.seq)?;
|
||||
debug!(
|
||||
"commit flow root: root={:?}",
|
||||
@ -278,6 +294,7 @@ impl LogStoreWrite for LogManager {
|
||||
self.copy_tx_and_finalize(old_tx_seq, vec![tx.seq])?;
|
||||
}
|
||||
}
|
||||
metrics::PUT_TX.update_since(start_time);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -308,6 +325,7 @@ impl LogStoreWrite for LogManager {
|
||||
}
|
||||
|
||||
fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> crate::error::Result<bool> {
|
||||
let start_time = Instant::now();
|
||||
trace!(
|
||||
"finalize_tx_with_hash: tx_seq={} tx_hash={:?}",
|
||||
tx_seq,
|
||||
@ -336,6 +354,7 @@ impl LogStoreWrite for LogManager {
|
||||
if same_root_seq_list.first() == Some(&tx_seq) {
|
||||
self.copy_tx_and_finalize(tx_seq, same_root_seq_list[1..].to_vec())?;
|
||||
}
|
||||
metrics::FINALIZE_TX_WITH_HASH.update_since(start_time);
|
||||
Ok(true)
|
||||
} else {
|
||||
bail!("finalize tx hash with data missing: tx_seq={}", tx_seq)
|
||||
@ -401,6 +420,42 @@ impl LogStoreWrite for LogManager {
|
||||
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()> {
|
||||
self.flow_store.submit_seal_result(answers)
|
||||
}
|
||||
|
||||
fn start_padding(&self, executor: &task_executor::TaskExecutor) {
|
||||
let store = self.flow_store.clone();
|
||||
executor.spawn(
|
||||
async move {
|
||||
let current_height = store.get_pad_data_sync_height().unwrap();
|
||||
let mut start_index = current_height.unwrap_or(0);
|
||||
loop {
|
||||
match store.get_pad_data(start_index) {
|
||||
std::result::Result::Ok(data) => {
|
||||
// Update the flow database.
|
||||
// This should be called before `complete_last_chunk_merkle` so that we do not save
|
||||
// subtrees with data known.
|
||||
if let Some(data) = data {
|
||||
for pad in data {
|
||||
store
|
||||
.append_entries(ChunkArray {
|
||||
data: vec![0; pad.data_size as usize],
|
||||
start_index: pad.start_index,
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
};
|
||||
store.put_pad_data_sync_height(start_index).unwrap();
|
||||
start_index += 1;
|
||||
}
|
||||
std::result::Result::Err(_) => {
|
||||
debug!("Unable to get pad data, start_index={}", start_index);
|
||||
tokio::time::sleep(PAD_DELAY).await;
|
||||
}
|
||||
};
|
||||
}
|
||||
},
|
||||
"pad_tx",
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
impl LogStoreChunkRead for LogManager {
|
||||
@ -614,31 +669,33 @@ impl LogManager {
|
||||
config: LogConfig,
|
||||
flow_path: impl AsRef<Path>,
|
||||
data_path: impl AsRef<Path>,
|
||||
executor: task_executor::TaskExecutor,
|
||||
) -> Result<Self> {
|
||||
let mut db_config = DatabaseConfig::with_columns(COL_NUM);
|
||||
db_config.enable_statistics = true;
|
||||
let flow_db_source = Arc::new(Database::open(&db_config, flow_path)?);
|
||||
let data_db_source = Arc::new(Database::open(&db_config, data_path)?);
|
||||
Self::new(flow_db_source, data_db_source, config, executor)
|
||||
Self::new(flow_db_source, data_db_source, config)
|
||||
}
|
||||
|
||||
pub fn memorydb(config: LogConfig, executor: task_executor::TaskExecutor) -> Result<Self> {
|
||||
pub fn memorydb(config: LogConfig) -> Result<Self> {
|
||||
let flow_db = Arc::new(kvdb_memorydb::create(COL_NUM));
|
||||
let data_db = Arc::new(kvdb_memorydb::create(COL_NUM));
|
||||
Self::new(flow_db, data_db, config, executor)
|
||||
Self::new(flow_db, data_db, config)
|
||||
}
|
||||
|
||||
fn new(
|
||||
flow_db_source: Arc<dyn ZgsKeyValueDB>,
|
||||
data_db_source: Arc<dyn ZgsKeyValueDB>,
|
||||
config: LogConfig,
|
||||
executor: task_executor::TaskExecutor,
|
||||
) -> Result<Self> {
|
||||
let tx_store = TransactionStore::new(flow_db_source.clone())?;
|
||||
let tx_store = TransactionStore::new(data_db_source.clone())?;
|
||||
let flow_db = Arc::new(FlowDBStore::new(flow_db_source.clone()));
|
||||
let data_db = Arc::new(FlowDBStore::new(data_db_source.clone()));
|
||||
let flow_store = Arc::new(FlowStore::new(data_db.clone(), config.flow.clone()));
|
||||
let flow_store = Arc::new(FlowStore::new(
|
||||
flow_db.clone(),
|
||||
data_db.clone(),
|
||||
config.flow.clone(),
|
||||
));
|
||||
// If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle`
|
||||
// first and call `put_tx` later.
|
||||
let next_tx_seq = tx_store.next_tx_seq();
|
||||
@ -739,18 +796,14 @@ impl LogManager {
|
||||
last_chunk_merkle,
|
||||
});
|
||||
|
||||
let (sender, receiver) = mpsc::channel();
|
||||
|
||||
let mut log_manager = Self {
|
||||
let log_manager = Self {
|
||||
flow_db: flow_db_source,
|
||||
data_db: data_db_source,
|
||||
tx_store,
|
||||
flow_store,
|
||||
merkle,
|
||||
sender,
|
||||
};
|
||||
|
||||
log_manager.start_receiver(receiver, executor);
|
||||
|
||||
if let Some(tx) = last_tx_to_insert {
|
||||
log_manager.put_tx(tx)?;
|
||||
}
|
||||
@ -765,40 +818,6 @@ impl LogManager {
|
||||
Ok(log_manager)
|
||||
}
|
||||
|
||||
fn start_receiver(
|
||||
&mut self,
|
||||
rx: mpsc::Receiver<UpdateFlowMessage>,
|
||||
executor: task_executor::TaskExecutor,
|
||||
) {
|
||||
let flow_store = self.flow_store.clone();
|
||||
executor.spawn(
|
||||
async move {
|
||||
loop {
|
||||
match rx.recv() {
|
||||
std::result::Result::Ok(data) => {
|
||||
// Update the flow database.
|
||||
// This should be called before `complete_last_chunk_merkle` so that we do not save
|
||||
// subtrees with data known.
|
||||
flow_store
|
||||
.append_entries(ChunkArray {
|
||||
data: vec![0; data.pad_data],
|
||||
start_index: data.tx_start_flow_index,
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
std::result::Result::Err(_) => {
|
||||
debug!("Log manager inner channel closed");
|
||||
break;
|
||||
}
|
||||
};
|
||||
}
|
||||
},
|
||||
"pad_tx",
|
||||
);
|
||||
// Wait for the spawned thread to finish
|
||||
// let _ = handle.join().expect("Thread panicked");
|
||||
}
|
||||
|
||||
fn gen_proof(&self, flow_index: u64, maybe_root: Option<DataRoot>) -> Result<FlowProof> {
|
||||
match maybe_root {
|
||||
None => self.gen_proof_at_version(flow_index, None),
|
||||
@ -854,6 +873,7 @@ impl LogManager {
|
||||
#[instrument(skip(self, merkle))]
|
||||
fn append_subtree_list(
|
||||
&self,
|
||||
tx_seq: u64,
|
||||
tx_start_index: u64,
|
||||
merkle_list: Vec<(usize, DataRoot)>,
|
||||
merkle: &mut MerkleManager,
|
||||
@ -861,8 +881,9 @@ impl LogManager {
|
||||
if merkle_list.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let start_time = Instant::now();
|
||||
|
||||
self.pad_tx(tx_start_index, &mut *merkle)?;
|
||||
self.pad_tx(tx_seq, tx_start_index, &mut *merkle)?;
|
||||
|
||||
for (subtree_depth, subtree_root) in merkle_list {
|
||||
let subtree_size = 1 << (subtree_depth - 1);
|
||||
@ -896,12 +917,15 @@ impl LogManager {
|
||||
.append_subtree(subtree_depth - log2_pow2(PORA_CHUNK_SIZE), subtree_root)?;
|
||||
}
|
||||
}
|
||||
|
||||
metrics::APPEND_SUBTREE_LIST.update_since(start_time);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip(self, merkle))]
|
||||
fn pad_tx(&self, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> {
|
||||
fn pad_tx(&self, tx_seq: u64, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> {
|
||||
// Check if we need to pad the flow.
|
||||
let start_time = Instant::now();
|
||||
let mut tx_start_flow_index =
|
||||
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64;
|
||||
let pad_size = tx_start_index - tx_start_flow_index;
|
||||
@ -910,6 +934,7 @@ impl LogManager {
|
||||
merkle.pora_chunks_merkle.leaves(),
|
||||
merkle.last_chunk_merkle.leaves()
|
||||
);
|
||||
let mut pad_list = vec![];
|
||||
if pad_size != 0 {
|
||||
for pad_data in Self::padding(pad_size as usize) {
|
||||
let mut is_full_empty = true;
|
||||
@ -954,10 +979,10 @@ impl LogManager {
|
||||
|
||||
let data_size = pad_data.len() / ENTRY_SIZE;
|
||||
if is_full_empty {
|
||||
self.sender.send(UpdateFlowMessage {
|
||||
pad_data: pad_data.len(),
|
||||
tx_start_flow_index,
|
||||
})?;
|
||||
pad_list.push(PadPair {
|
||||
data_size: pad_data.len() as u64,
|
||||
start_index: tx_start_flow_index,
|
||||
});
|
||||
} else {
|
||||
// Update the flow database.
|
||||
// This should be called before `complete_last_chunk_merkle` so that we do not save
|
||||
@ -979,6 +1004,10 @@ impl LogManager {
|
||||
merkle.pora_chunks_merkle.leaves(),
|
||||
merkle.last_chunk_merkle.leaves()
|
||||
);
|
||||
|
||||
self.flow_store.put_pad_data(&pad_list, tx_seq)?;
|
||||
|
||||
metrics::PAD_TX.update_since(start_time);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -1107,6 +1136,8 @@ impl LogManager {
|
||||
}
|
||||
|
||||
fn copy_tx_and_finalize(&self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> {
|
||||
let start_time = Instant::now();
|
||||
|
||||
let mut merkle = self.merkle.write();
|
||||
let shard_config = self.flow_store.get_shard_config();
|
||||
// We have all the data need for this tx, so just copy them.
|
||||
@ -1155,6 +1186,8 @@ impl LogManager {
|
||||
for (seq, _) in to_tx_offset_list {
|
||||
self.tx_store.finalize_tx(seq)?;
|
||||
}
|
||||
|
||||
metrics::COPY_TX_AND_FINALIZE.update_since(start_time);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -1227,6 +1260,7 @@ pub fn sub_merkle_tree(leaf_data: &[u8]) -> Result<FileMerkleTree> {
|
||||
}
|
||||
|
||||
pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result<Vec<H256>> {
|
||||
let start_time = Instant::now();
|
||||
if leaf_data.len() % ENTRY_SIZE != 0 {
|
||||
bail!("merkle_tree: mismatched data size");
|
||||
}
|
||||
@ -1242,6 +1276,9 @@ pub fn data_to_merkle_leaves(leaf_data: &[u8]) -> Result<Vec<H256>> {
|
||||
.map(Sha3Algorithm::leaf)
|
||||
.collect()
|
||||
};
|
||||
|
||||
metrics::DATA_TO_MERKLE_LEAVES_SIZE.update(leaf_data.len());
|
||||
metrics::DATA_TO_MERKLE_LEAVES.update_since(start_time);
|
||||
Ok(r)
|
||||
}
|
||||
|
||||
|
41
node/storage/src/log_store/metrics.rs
Normal file
41
node/storage/src/log_store/metrics.rs
Normal file
@ -0,0 +1,41 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use metrics::{register_timer, Gauge, GaugeUsize, Timer};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref PUT_TX: Arc<dyn Timer> = register_timer("log_store_put_tx");
|
||||
|
||||
pub static ref PUT_CHUNKS: Arc<dyn Timer> = register_timer("log_store_put_chunks");
|
||||
|
||||
pub static ref TX_STORE_PUT: Arc<dyn Timer> = register_timer("log_store_tx_store_put_tx");
|
||||
|
||||
pub static ref CHECK_TX_COMPLETED: Arc<dyn Timer> =
|
||||
register_timer("log_store_log_manager_check_tx_completed");
|
||||
|
||||
pub static ref APPEND_SUBTREE_LIST: Arc<dyn Timer> =
|
||||
register_timer("log_store_log_manager_append_subtree_list");
|
||||
|
||||
pub static ref DATA_TO_MERKLE_LEAVES: Arc<dyn Timer> =
|
||||
register_timer("log_store_log_manager_data_to_merkle_leaves");
|
||||
|
||||
pub static ref COPY_TX_AND_FINALIZE: Arc<dyn Timer> =
|
||||
register_timer("log_store_log_manager_copy_tx_and_finalize");
|
||||
|
||||
pub static ref PAD_TX: Arc<dyn Timer> = register_timer("log_store_log_manager_pad_tx");
|
||||
|
||||
pub static ref PUT_BATCH_ROOT_LIST: Arc<dyn Timer> = register_timer("log_store_flow_store_put_batch_root_list");
|
||||
|
||||
pub static ref INSERT_SUBTREE_LIST: Arc<dyn Timer> =
|
||||
register_timer("log_store_flow_store_insert_subtree_list");
|
||||
|
||||
pub static ref PUT_MPT_NODE: Arc<dyn Timer> = register_timer("log_store_flow_store_put_mpt_node");
|
||||
|
||||
pub static ref PUT_ENTRY_BATCH_LIST: Arc<dyn Timer> =
|
||||
register_timer("log_store_flow_store_put_entry_batch_list");
|
||||
|
||||
pub static ref APPEND_ENTRIES: Arc<dyn Timer> = register_timer("log_store_flow_store_append_entries");
|
||||
|
||||
pub static ref FINALIZE_TX_WITH_HASH: Arc<dyn Timer> = register_timer("log_store_log_manager_finalize_tx_with_hash");
|
||||
|
||||
pub static ref DATA_TO_MERKLE_LEAVES_SIZE: Arc<dyn Gauge<usize>> = GaugeUsize::register("log_store_data_to_merkle_leaves_size");
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
use crate::config::ShardConfig;
|
||||
|
||||
use ethereum_types::H256;
|
||||
use flow_store::PadPair;
|
||||
use shared_types::{
|
||||
Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof,
|
||||
Transaction,
|
||||
@ -15,6 +16,7 @@ pub mod config;
|
||||
mod flow_store;
|
||||
mod load_chunk;
|
||||
pub mod log_manager;
|
||||
mod metrics;
|
||||
mod seal_task_manager;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
@ -158,6 +160,8 @@ pub trait LogStoreWrite: LogStoreChunkWrite {
|
||||
fn update_shard_config(&self, shard_config: ShardConfig);
|
||||
|
||||
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()>;
|
||||
|
||||
fn start_padding(&self, executor: &task_executor::TaskExecutor);
|
||||
}
|
||||
|
||||
pub trait LogStoreChunkWrite {
|
||||
@ -217,6 +221,10 @@ pub trait FlowRead {
|
||||
fn get_num_entries(&self) -> Result<u64>;
|
||||
|
||||
fn get_shard_config(&self) -> ShardConfig;
|
||||
|
||||
fn get_pad_data(&self, start_index: u64) -> Result<Option<Vec<PadPair>>>;
|
||||
|
||||
fn get_pad_data_sync_height(&self) -> Result<Option<u64>>;
|
||||
}
|
||||
|
||||
pub trait FlowWrite {
|
||||
@ -231,6 +239,10 @@ pub trait FlowWrite {
|
||||
|
||||
/// Update the shard config.
|
||||
fn update_shard_config(&self, shard_config: ShardConfig);
|
||||
|
||||
fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> Result<()>;
|
||||
|
||||
fn put_pad_data_sync_height(&self, tx_seq: u64) -> Result<()>;
|
||||
}
|
||||
|
||||
pub struct SealTask {
|
||||
@ -269,3 +281,23 @@ pub trait FlowSeal {
|
||||
|
||||
pub trait Flow: FlowRead + FlowWrite + FlowSeal {}
|
||||
impl<T: FlowRead + FlowWrite + FlowSeal> Flow for T {}
|
||||
|
||||
pub trait PadDataStoreRead {
|
||||
fn get_pad_data(&self, start_index: u64) -> Result<Option<Vec<PadPair>>>;
|
||||
fn get_pad_data_sync_height(&self) -> Result<Option<u64>>;
|
||||
}
|
||||
|
||||
pub trait PadDataStoreWrite {
|
||||
fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> Result<()>;
|
||||
fn put_pad_data_sync_height(&self, tx_seq: u64) -> Result<()>;
|
||||
fn start_padding(&mut self, executor: &task_executor::TaskExecutor);
|
||||
}
|
||||
|
||||
pub trait PadDataStore:
|
||||
PadDataStoreRead + PadDataStoreWrite + config::Configurable + Send + Sync + 'static
|
||||
{
|
||||
}
|
||||
impl<T: PadDataStoreRead + PadDataStoreWrite + config::Configurable + Send + Sync + 'static>
|
||||
PadDataStore for T
|
||||
{
|
||||
}
|
||||
|
@ -9,15 +9,10 @@ use rand::random;
|
||||
use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE};
|
||||
use std::cmp;
|
||||
|
||||
use task_executor::test_utils::TestRuntime;
|
||||
|
||||
#[test]
|
||||
fn test_put_get() {
|
||||
let config = LogConfig::default();
|
||||
let runtime = TestRuntime::default();
|
||||
|
||||
let executor = runtime.task_executor.clone();
|
||||
let store = LogManager::memorydb(config.clone(), executor).unwrap();
|
||||
let store = LogManager::memorydb(config.clone()).unwrap();
|
||||
let chunk_count = config.flow.batch_size + config.flow.batch_size / 2 - 1;
|
||||
// Aligned with size.
|
||||
let start_offset = 1024;
|
||||
@ -174,10 +169,7 @@ fn test_put_tx() {
|
||||
|
||||
fn create_store() -> LogManager {
|
||||
let config = LogConfig::default();
|
||||
let runtime = TestRuntime::default();
|
||||
let executor = runtime.task_executor.clone();
|
||||
|
||||
LogManager::memorydb(config, executor).unwrap()
|
||||
LogManager::memorydb(config).unwrap()
|
||||
}
|
||||
|
||||
fn put_tx(store: &mut LogManager, chunk_count: usize, seq: u64) {
|
||||
|
@ -3,6 +3,7 @@ use crate::log_store::log_manager::{
|
||||
data_to_merkle_leaves, sub_merkle_tree, COL_BLOCK_PROGRESS, COL_MISC, COL_TX, COL_TX_COMPLETED,
|
||||
COL_TX_DATA_ROOT_INDEX, ENTRY_SIZE, PORA_CHUNK_SIZE,
|
||||
};
|
||||
use crate::log_store::metrics;
|
||||
use crate::{try_option, LogManager, ZgsKeyValueDB};
|
||||
use anyhow::{anyhow, Result};
|
||||
use append_merkle::{AppendMerkleTree, MerkleTreeRead, Sha3Algorithm};
|
||||
@ -15,6 +16,7 @@ use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use tracing::{error, instrument};
|
||||
|
||||
const LOG_SYNC_PROGRESS_KEY: &str = "log_sync_progress";
|
||||
@ -51,6 +53,8 @@ impl TransactionStore {
|
||||
#[instrument(skip(self))]
|
||||
/// Return `Ok(Some(tx_seq))` if a previous transaction has the same tx root.
|
||||
pub fn put_tx(&self, mut tx: Transaction) -> Result<Vec<u64>> {
|
||||
let start_time = Instant::now();
|
||||
|
||||
let old_tx_seq_list = self.get_tx_seq_list_by_data_root(&tx.data_merkle_root)?;
|
||||
if old_tx_seq_list.last().is_some_and(|seq| *seq == tx.seq) {
|
||||
// The last tx is inserted again, so no need to process it.
|
||||
@ -86,6 +90,7 @@ impl TransactionStore {
|
||||
);
|
||||
self.next_tx_seq.store(tx.seq + 1, Ordering::SeqCst);
|
||||
self.kvdb.write(db_tx)?;
|
||||
metrics::TX_STORE_PUT.update_since(start_time);
|
||||
Ok(old_tx_seq_list)
|
||||
}
|
||||
|
||||
@ -175,8 +180,12 @@ impl TransactionStore {
|
||||
}
|
||||
|
||||
pub fn check_tx_completed(&self, tx_seq: u64) -> Result<bool> {
|
||||
Ok(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?
|
||||
== Some(vec![TX_STATUS_FINALIZED]))
|
||||
let start_time = Instant::now();
|
||||
let res = self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?
|
||||
== Some(vec![TX_STATUS_FINALIZED]);
|
||||
|
||||
metrics::CHECK_TX_COMPLETED.update_since(start_time);
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
pub fn check_tx_pruned(&self, tx_seq: u64) -> Result<bool> {
|
||||
|
@ -1,7 +1,10 @@
|
||||
use super::tx_store::TxStore;
|
||||
use anyhow::Result;
|
||||
use std::sync::Arc;
|
||||
use storage::log_store::config::{ConfigTx, ConfigurableExt};
|
||||
use storage::log_store::{
|
||||
config::{ConfigTx, ConfigurableExt},
|
||||
log_manager::DATA_DB_KEY,
|
||||
};
|
||||
use storage_async::Store;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
@ -66,10 +69,10 @@ impl SyncStore {
|
||||
let store = async_store.get_store();
|
||||
|
||||
// load next_tx_seq
|
||||
let next_tx_seq = store.get_config_decoded(&KEY_NEXT_TX_SEQ)?;
|
||||
let next_tx_seq = store.get_config_decoded(&KEY_NEXT_TX_SEQ, DATA_DB_KEY)?;
|
||||
|
||||
// load max_tx_seq
|
||||
let max_tx_seq = store.get_config_decoded(&KEY_MAX_TX_SEQ)?;
|
||||
let max_tx_seq = store.get_config_decoded(&KEY_MAX_TX_SEQ, DATA_DB_KEY)?;
|
||||
|
||||
Ok((next_tx_seq, max_tx_seq))
|
||||
}
|
||||
@ -77,13 +80,13 @@ impl SyncStore {
|
||||
pub async fn set_next_tx_seq(&self, tx_seq: u64) -> Result<()> {
|
||||
let async_store = self.store.write().await;
|
||||
let store = async_store.get_store();
|
||||
store.set_config_encoded(&KEY_NEXT_TX_SEQ, &tx_seq)
|
||||
store.set_config_encoded(&KEY_NEXT_TX_SEQ, &tx_seq, DATA_DB_KEY)
|
||||
}
|
||||
|
||||
pub async fn set_max_tx_seq(&self, tx_seq: u64) -> Result<()> {
|
||||
let async_store = self.store.write().await;
|
||||
let store = async_store.get_store();
|
||||
store.set_config_encoded(&KEY_MAX_TX_SEQ, &tx_seq)
|
||||
store.set_config_encoded(&KEY_MAX_TX_SEQ, &tx_seq, DATA_DB_KEY)
|
||||
}
|
||||
|
||||
pub async fn contains(&self, tx_seq: u64) -> Result<Option<Queue>> {
|
||||
@ -114,7 +117,7 @@ impl SyncStore {
|
||||
}
|
||||
|
||||
let removed = self.pending_txs.remove(store, Some(&mut tx), tx_seq)?;
|
||||
store.exec_configs(tx)?;
|
||||
store.exec_configs(tx, DATA_DB_KEY)?;
|
||||
|
||||
if removed {
|
||||
Ok(InsertResult::Upgraded)
|
||||
@ -128,7 +131,7 @@ impl SyncStore {
|
||||
}
|
||||
|
||||
let removed = self.ready_txs.remove(store, Some(&mut tx), tx_seq)?;
|
||||
store.exec_configs(tx)?;
|
||||
store.exec_configs(tx, DATA_DB_KEY)?;
|
||||
|
||||
if removed {
|
||||
Ok(InsertResult::Downgraded)
|
||||
@ -151,7 +154,7 @@ impl SyncStore {
|
||||
|
||||
let added = self.ready_txs.add(store, Some(&mut tx), tx_seq)?;
|
||||
|
||||
store.exec_configs(tx)?;
|
||||
store.exec_configs(tx, DATA_DB_KEY)?;
|
||||
|
||||
Ok(added)
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
use anyhow::Result;
|
||||
use rand::Rng;
|
||||
use storage::log_store::config::{ConfigTx, ConfigurableExt};
|
||||
use storage::log_store::log_manager::DATA_DB_KEY;
|
||||
use storage::log_store::Store;
|
||||
|
||||
/// TxStore is used to store pending transactions that to be synchronized in advance.
|
||||
@ -32,11 +33,11 @@ impl TxStore {
|
||||
}
|
||||
|
||||
fn index_of(&self, store: &dyn Store, tx_seq: u64) -> Result<Option<usize>> {
|
||||
store.get_config_decoded(&self.key_seq_to_index(tx_seq))
|
||||
store.get_config_decoded(&self.key_seq_to_index(tx_seq), DATA_DB_KEY)
|
||||
}
|
||||
|
||||
fn at(&self, store: &dyn Store, index: usize) -> Result<Option<u64>> {
|
||||
store.get_config_decoded(&self.key_index_to_seq(index))
|
||||
store.get_config_decoded(&self.key_index_to_seq(index), DATA_DB_KEY)
|
||||
}
|
||||
|
||||
pub fn has(&self, store: &dyn Store, tx_seq: u64) -> Result<bool> {
|
||||
@ -45,7 +46,7 @@ impl TxStore {
|
||||
|
||||
pub fn count(&self, store: &dyn Store) -> Result<usize> {
|
||||
store
|
||||
.get_config_decoded(&self.key_count)
|
||||
.get_config_decoded(&self.key_count, DATA_DB_KEY)
|
||||
.map(|x| x.unwrap_or(0))
|
||||
}
|
||||
|
||||
@ -70,7 +71,7 @@ impl TxStore {
|
||||
if let Some(db_tx) = db_tx {
|
||||
db_tx.append(&mut tx);
|
||||
} else {
|
||||
store.exec_configs(tx)?;
|
||||
store.exec_configs(tx, DATA_DB_KEY)?;
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
@ -130,7 +131,7 @@ impl TxStore {
|
||||
if let Some(db_tx) = db_tx {
|
||||
db_tx.append(&mut tx);
|
||||
} else {
|
||||
store.exec_configs(tx)?;
|
||||
store.exec_configs(tx, DATA_DB_KEY)?;
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
|
@ -1,12 +1,11 @@
|
||||
use network::{NetworkMessage, PeerAction, PeerId, PubsubMessage, ReportSource};
|
||||
use tokio::sync::mpsc;
|
||||
use network::{NetworkMessage, NetworkSender, PeerAction, PeerId, PubsubMessage, ReportSource};
|
||||
|
||||
pub struct SyncNetworkContext {
|
||||
network_send: mpsc::UnboundedSender<NetworkMessage>,
|
||||
network_send: NetworkSender,
|
||||
}
|
||||
|
||||
impl SyncNetworkContext {
|
||||
pub fn new(network_send: mpsc::UnboundedSender<NetworkMessage>) -> Self {
|
||||
pub fn new(network_send: NetworkSender) -> Self {
|
||||
Self { network_send }
|
||||
}
|
||||
|
||||
|
@ -750,13 +750,13 @@ mod tests {
|
||||
use crate::test_util::create_2_store;
|
||||
use crate::test_util::tests::create_file_location_cache;
|
||||
use libp2p::identity;
|
||||
use network::{new_network_channel, NetworkReceiver};
|
||||
use network::{ReportSource, Request};
|
||||
use storage::log_store::log_manager::LogConfig;
|
||||
use storage::log_store::log_manager::LogManager;
|
||||
use storage::log_store::LogStoreRead;
|
||||
use storage::H256;
|
||||
use task_executor::{test_utils::TestRuntime, TaskExecutor};
|
||||
use tokio::sync::mpsc::{self, UnboundedReceiver};
|
||||
|
||||
#[test]
|
||||
fn test_status() {
|
||||
@ -1649,7 +1649,7 @@ mod tests {
|
||||
fn create_default_controller(
|
||||
task_executor: TaskExecutor,
|
||||
peer_id: Option<PeerId>,
|
||||
) -> (SerialSyncController, UnboundedReceiver<NetworkMessage>) {
|
||||
) -> (SerialSyncController, NetworkReceiver) {
|
||||
let tx_id = TxID {
|
||||
seq: 0,
|
||||
hash: H256::random(),
|
||||
@ -1657,7 +1657,7 @@ mod tests {
|
||||
let num_chunks = 123;
|
||||
|
||||
let config = LogConfig::default();
|
||||
let store = Arc::new(LogManager::memorydb(config, task_executor.clone()).unwrap());
|
||||
let store = Arc::new(LogManager::memorydb(config).unwrap());
|
||||
|
||||
create_controller(task_executor, peer_id, store, tx_id, num_chunks)
|
||||
}
|
||||
@ -1668,8 +1668,8 @@ mod tests {
|
||||
store: Arc<LogManager>,
|
||||
tx_id: TxID,
|
||||
num_chunks: usize,
|
||||
) -> (SerialSyncController, UnboundedReceiver<NetworkMessage>) {
|
||||
let (network_send, network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
|
||||
) -> (SerialSyncController, NetworkReceiver) {
|
||||
let (network_send, network_recv) = new_network_channel();
|
||||
let ctx = Arc::new(SyncNetworkContext::new(network_send));
|
||||
|
||||
let peer_id = match peer_id {
|
||||
|
@ -10,10 +10,9 @@ use libp2p::swarm::DialError;
|
||||
use log_entry_sync::LogSyncEvent;
|
||||
use network::rpc::methods::FileAnnouncement;
|
||||
use network::types::{AnnounceChunks, FindFile, NewFile};
|
||||
use network::PubsubMessage;
|
||||
use network::{
|
||||
rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, PeerId,
|
||||
PeerRequestId, SyncId as RequestId,
|
||||
rpc::GetChunksRequest, rpc::RPCResponseErrorCode, Multiaddr, NetworkMessage, NetworkSender,
|
||||
PeerId, PeerRequestId, PubsubMessage, SyncId as RequestId,
|
||||
};
|
||||
use shared_types::{bytes_to_chunks, timestamp_now, ChunkArrayWithProof, Transaction, TxID};
|
||||
use std::sync::atomic::Ordering;
|
||||
@ -27,7 +26,7 @@ use storage::error::Result as StorageResult;
|
||||
use storage::log_store::log_manager::{sector_to_segment, segment_to_sector, PORA_CHUNK_SIZE};
|
||||
use storage::log_store::Store as LogStore;
|
||||
use storage_async::Store;
|
||||
use tokio::sync::{broadcast, mpsc, oneshot};
|
||||
use tokio::sync::{broadcast, oneshot};
|
||||
|
||||
pub type SyncSender = channel::Sender<SyncMessage, SyncRequest, SyncResponse>;
|
||||
pub type SyncReceiver = channel::Receiver<SyncMessage, SyncRequest, SyncResponse>;
|
||||
@ -142,7 +141,7 @@ pub struct SyncService {
|
||||
impl SyncService {
|
||||
pub async fn spawn(
|
||||
executor: task_executor::TaskExecutor,
|
||||
network_send: mpsc::UnboundedSender<NetworkMessage>,
|
||||
network_send: NetworkSender,
|
||||
store: Arc<dyn LogStore>,
|
||||
file_location_cache: Arc<FileLocationCache>,
|
||||
event_recv: broadcast::Receiver<LogSyncEvent>,
|
||||
@ -163,7 +162,7 @@ impl SyncService {
|
||||
pub async fn spawn_with_config(
|
||||
config: Config,
|
||||
executor: task_executor::TaskExecutor,
|
||||
network_send: mpsc::UnboundedSender<NetworkMessage>,
|
||||
network_send: NetworkSender,
|
||||
store: Arc<dyn LogStore>,
|
||||
file_location_cache: Arc<FileLocationCache>,
|
||||
event_recv: broadcast::Receiver<LogSyncEvent>,
|
||||
@ -897,7 +896,9 @@ mod tests {
|
||||
use crate::test_util::tests::create_file_location_cache;
|
||||
use libp2p::identity;
|
||||
use network::discovery::ConnectionId;
|
||||
use network::new_network_channel;
|
||||
use network::rpc::SubstreamId;
|
||||
use network::NetworkReceiver;
|
||||
use network::ReportSource;
|
||||
use shared_types::ChunkArray;
|
||||
use shared_types::Transaction;
|
||||
@ -909,8 +910,6 @@ mod tests {
|
||||
use storage::log_store::LogStoreRead;
|
||||
use storage::H256;
|
||||
use task_executor::test_utils::TestRuntime;
|
||||
use tokio::sync::mpsc::UnboundedReceiver;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
|
||||
struct TestSyncRuntime {
|
||||
runtime: TestRuntime,
|
||||
@ -924,8 +923,8 @@ mod tests {
|
||||
init_peer_id: PeerId,
|
||||
file_location_cache: Arc<FileLocationCache>,
|
||||
|
||||
network_send: UnboundedSender<NetworkMessage>,
|
||||
network_recv: UnboundedReceiver<NetworkMessage>,
|
||||
network_send: NetworkSender,
|
||||
network_recv: NetworkReceiver,
|
||||
event_send: broadcast::Sender<LogSyncEvent>,
|
||||
catch_up_end_recv: Option<oneshot::Receiver<()>>,
|
||||
}
|
||||
@ -942,7 +941,7 @@ mod tests {
|
||||
let (store, peer_store, txs, data) = create_2_store(chunk_counts);
|
||||
let init_data = data[0].clone();
|
||||
let init_peer_id = identity::Keypair::generate_ed25519().public().to_peer_id();
|
||||
let (network_send, network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
|
||||
let (network_send, network_recv) = new_network_channel();
|
||||
let (event_send, _) = broadcast::channel(16);
|
||||
let (_, catch_up_end_recv) = oneshot::channel();
|
||||
|
||||
@ -1006,7 +1005,7 @@ mod tests {
|
||||
let file_location_cache: Arc<FileLocationCache> =
|
||||
create_file_location_cache(init_peer_id, vec![txs[0].id()]);
|
||||
|
||||
let (network_send, mut network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
|
||||
let (network_send, mut network_recv) = new_network_channel();
|
||||
let (_, sync_recv) = channel::Channel::unbounded("test");
|
||||
|
||||
let mut sync = SyncService {
|
||||
@ -1035,7 +1034,7 @@ mod tests {
|
||||
let file_location_cache: Arc<FileLocationCache> =
|
||||
create_file_location_cache(init_peer_id, vec![txs[0].id()]);
|
||||
|
||||
let (network_send, mut network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
|
||||
let (network_send, mut network_recv) = new_network_channel();
|
||||
let (_, sync_recv) = channel::Channel::unbounded("test");
|
||||
|
||||
let mut sync = SyncService {
|
||||
@ -1348,15 +1347,13 @@ mod tests {
|
||||
|
||||
let config = LogConfig::default();
|
||||
|
||||
let executor = runtime.task_executor.clone();
|
||||
|
||||
let store = Arc::new(LogManager::memorydb(config.clone(), executor).unwrap());
|
||||
let store = Arc::new(LogManager::memorydb(config.clone()).unwrap());
|
||||
|
||||
let init_peer_id = identity::Keypair::generate_ed25519().public().to_peer_id();
|
||||
let file_location_cache: Arc<FileLocationCache> =
|
||||
create_file_location_cache(init_peer_id, vec![]);
|
||||
|
||||
let (network_send, mut network_recv) = mpsc::unbounded_channel::<NetworkMessage>();
|
||||
let (network_send, mut network_recv) = new_network_channel();
|
||||
let (_event_send, event_recv) = broadcast::channel(16);
|
||||
let (_, catch_up_end_recv) = oneshot::channel();
|
||||
let sync_send = SyncService::spawn_with_config(
|
||||
@ -1783,7 +1780,7 @@ mod tests {
|
||||
}
|
||||
|
||||
async fn receive_chunk_request(
|
||||
network_recv: &mut UnboundedReceiver<NetworkMessage>,
|
||||
network_recv: &mut NetworkReceiver,
|
||||
sync_send: &SyncSender,
|
||||
peer_store: Arc<LogManager>,
|
||||
init_peer_id: PeerId,
|
||||
|
@ -9,8 +9,6 @@ use storage::{
|
||||
LogManager,
|
||||
};
|
||||
|
||||
use task_executor::test_utils::TestRuntime;
|
||||
|
||||
/// Creates stores for local node and peers with initialized transaction of specified chunk count.
|
||||
/// The first store is for local node, and data not stored. The second store is for peers, and all
|
||||
/// transactions are finalized for file sync.
|
||||
@ -24,11 +22,8 @@ pub fn create_2_store(
|
||||
Vec<Vec<u8>>,
|
||||
) {
|
||||
let config = LogConfig::default();
|
||||
let runtime = TestRuntime::default();
|
||||
|
||||
let executor = runtime.task_executor.clone();
|
||||
let mut store = LogManager::memorydb(config.clone(), executor.clone()).unwrap();
|
||||
let mut peer_store = LogManager::memorydb(config, executor).unwrap();
|
||||
let mut store = LogManager::memorydb(config.clone()).unwrap();
|
||||
let mut peer_store = LogManager::memorydb(config).unwrap();
|
||||
|
||||
let mut offset = 1;
|
||||
let mut txs = vec![];
|
||||
@ -120,10 +115,7 @@ pub mod tests {
|
||||
|
||||
impl TestStoreRuntime {
|
||||
pub fn new_store() -> impl LogStore {
|
||||
let runtime = TestRuntime::default();
|
||||
|
||||
let executor = runtime.task_executor.clone();
|
||||
LogManager::memorydb(LogConfig::default(), executor).unwrap()
|
||||
LogManager::memorydb(LogConfig::default()).unwrap()
|
||||
}
|
||||
|
||||
pub fn new(store: Arc<dyn LogStore>) -> TestStoreRuntime {
|
||||
|
@ -1,3 +1,5 @@
|
||||
import os
|
||||
|
||||
from web3 import Web3
|
||||
|
||||
ZGS_CONFIG = {
|
||||
@ -22,6 +24,8 @@ ZGS_CONFIG = {
|
||||
}
|
||||
}
|
||||
|
||||
CONFIG_DIR = os.path.dirname(__file__)
|
||||
ZGS_KEY_FILE = os.path.join(CONFIG_DIR, "zgs", "network", "key")
|
||||
ZGS_NODEID = "16Uiu2HAmLkGFUbNFYdhuSbTQ5hmnPjFXx2zUDtwQ2uihHpN9YNNe"
|
||||
|
||||
BSC_CONFIG = dict(
|
||||
|
1
tests/config/zgs/network/key
Normal file
1
tests/config/zgs/network/key
Normal file
@ -0,0 +1 @@
|
||||
Y<13><><02><><EFBFBD>Ң<>-
<0A><>r<>7<EFBFBD><37>jq<6A>p<>}<7D>
|
@ -3,7 +3,7 @@
|
||||
import os
|
||||
import time
|
||||
|
||||
from config.node_config import ZGS_NODEID
|
||||
from config.node_config import ZGS_KEY_FILE, ZGS_NODEID
|
||||
from test_framework.test_framework import TestFramework
|
||||
from utility.utils import p2p_port
|
||||
|
||||
@ -17,13 +17,9 @@ class NetworkDiscoveryTest(TestFramework):
|
||||
self.num_nodes = 3
|
||||
|
||||
# setup for node 0 as bootnode
|
||||
tests_dir = os.path.dirname(__file__)
|
||||
network_dir = os.path.join(tests_dir, "config", "zgs", "network")
|
||||
self.zgs_node_key_files = [ZGS_KEY_FILE]
|
||||
bootnode_port = p2p_port(0)
|
||||
self.zgs_node_configs[0] = {
|
||||
# load pre-defined keypair
|
||||
"network_dir": network_dir,
|
||||
|
||||
# enable UDP discovery relevant configs
|
||||
"network_enr_address": "127.0.0.1",
|
||||
"network_enr_tcp_port": bootnode_port,
|
||||
|
@ -3,7 +3,7 @@
|
||||
import os
|
||||
import time
|
||||
|
||||
from config.node_config import ZGS_NODEID
|
||||
from config.node_config import ZGS_KEY_FILE, ZGS_NODEID
|
||||
from test_framework.test_framework import TestFramework
|
||||
from utility.utils import p2p_port
|
||||
|
||||
@ -17,13 +17,9 @@ class NetworkDiscoveryUpgradeTest(TestFramework):
|
||||
self.num_nodes = 2
|
||||
|
||||
# setup for node 0 as bootnode
|
||||
tests_dir = os.path.dirname(__file__)
|
||||
network_dir = os.path.join(tests_dir, "config", "zgs", "network")
|
||||
self.zgs_node_key_files = [ZGS_KEY_FILE]
|
||||
bootnode_port = p2p_port(0)
|
||||
self.zgs_node_configs[0] = {
|
||||
# load pre-defined keypair
|
||||
"network_dir": network_dir,
|
||||
|
||||
# enable UDP discovery relevant configs
|
||||
"network_enr_address": "127.0.0.1",
|
||||
"network_enr_tcp_port": bootnode_port,
|
||||
|
@ -40,14 +40,18 @@ class PrunerTest(TestFramework):
|
||||
for i in range(len(segments)):
|
||||
client_index = i % 2
|
||||
self.nodes[client_index].zgs_upload_segment(segments[i])
|
||||
wait_until(lambda: self.nodes[0].zgs_get_file_info(data_root) is not None)
|
||||
wait_until(lambda: self.nodes[0].zgs_get_file_info(data_root)["finalized"])
|
||||
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root) is not None)
|
||||
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root)["finalized"])
|
||||
|
||||
self.nodes[2].admin_start_sync_file(0)
|
||||
self.nodes[3].admin_start_sync_file(0)
|
||||
wait_until(lambda: self.nodes[2].sync_status_is_completed_or_unknown(0))
|
||||
wait_until(lambda: self.nodes[2].zgs_get_file_info(data_root) is not None)
|
||||
wait_until(lambda: self.nodes[2].zgs_get_file_info(data_root)["finalized"])
|
||||
wait_until(lambda: self.nodes[3].sync_status_is_completed_or_unknown(0))
|
||||
wait_until(lambda: self.nodes[3].zgs_get_file_info(data_root) is not None)
|
||||
wait_until(lambda: self.nodes[3].zgs_get_file_info(data_root)["finalized"])
|
||||
|
||||
for i in range(len(segments)):
|
||||
|
@ -128,10 +128,14 @@ class TestNode:
|
||||
poll_per_s = 4
|
||||
for _ in range(poll_per_s * self.rpc_timeout):
|
||||
if self.process.poll() is not None:
|
||||
self.stderr.seek(0)
|
||||
self.stdout.seek(0)
|
||||
raise FailedToStartError(
|
||||
self._node_msg(
|
||||
"exited with status {} during initialization".format(
|
||||
self.process.returncode
|
||||
"exited with status {} during initialization \n\nstderr: {}\n\nstdout: {}\n\n".format(
|
||||
self.process.returncode,
|
||||
self.stderr.read(),
|
||||
self.stdout.read(),
|
||||
)
|
||||
)
|
||||
)
|
||||
|
@ -55,6 +55,7 @@ class TestFramework:
|
||||
self.lifetime_seconds = 3600
|
||||
self.launch_wait_seconds = 1
|
||||
self.num_deployed_contracts = 0
|
||||
self.zgs_node_key_files = []
|
||||
|
||||
# Set default binary path
|
||||
binary_ext = ".exe" if is_windows_platform() else ""
|
||||
@ -190,6 +191,10 @@ class TestFramework:
|
||||
else:
|
||||
updated_config = {}
|
||||
|
||||
zgs_node_key_file = None
|
||||
if i < len(self.zgs_node_key_files):
|
||||
zgs_node_key_file = self.zgs_node_key_files[i]
|
||||
|
||||
assert os.path.exists(self.zgs_binary), (
|
||||
"%s should be exist" % self.zgs_binary
|
||||
)
|
||||
@ -202,6 +207,7 @@ class TestFramework:
|
||||
self.mine_contract.address(),
|
||||
self.reward_contract.address(),
|
||||
self.log,
|
||||
key_file=zgs_node_key_file,
|
||||
)
|
||||
self.nodes.append(node)
|
||||
node.setup_config()
|
||||
|
@ -3,14 +3,9 @@ import subprocess
|
||||
import tempfile
|
||||
|
||||
from test_framework.blockchain_node import BlockChainNodeType, BlockchainNode
|
||||
from utility.utils import blockchain_rpc_port, arrange_port
|
||||
from utility.utils import blockchain_p2p_port, blockchain_rpc_port, blockchain_ws_port, blockchain_rpc_port_tendermint, pprof_port
|
||||
from utility.build_binary import build_zg
|
||||
|
||||
ZGNODE_PORT_CATEGORY_WS = 0
|
||||
ZGNODE_PORT_CATEGORY_P2P = 1
|
||||
ZGNODE_PORT_CATEGORY_RPC = 2
|
||||
ZGNODE_PORT_CATEGORY_PPROF = 3
|
||||
|
||||
def zg_node_init_genesis(binary: str, root_dir: str, num_nodes: int):
|
||||
assert num_nodes > 0, "Invalid number of blockchain nodes: %s" % num_nodes
|
||||
|
||||
@ -26,7 +21,7 @@ def zg_node_init_genesis(binary: str, root_dir: str, num_nodes: int):
|
||||
os.mkdir(zgchaind_dir)
|
||||
|
||||
log_file = tempfile.NamedTemporaryFile(dir=zgchaind_dir, delete=False, prefix="init_genesis_", suffix=".log")
|
||||
p2p_port_start = arrange_port(ZGNODE_PORT_CATEGORY_P2P, 0)
|
||||
p2p_port_start = blockchain_p2p_port(0)
|
||||
|
||||
ret = subprocess.run(
|
||||
args=["bash", shell_script, zgchaind_dir, str(num_nodes), str(p2p_port_start)],
|
||||
@ -71,13 +66,13 @@ class ZGNode(BlockchainNode):
|
||||
# overwrite json rpc http port: 8545
|
||||
"--json-rpc.address", "127.0.0.1:%s" % blockchain_rpc_port(index),
|
||||
# overwrite json rpc ws port: 8546
|
||||
"--json-rpc.ws-address", "127.0.0.1:%s" % arrange_port(ZGNODE_PORT_CATEGORY_WS, index),
|
||||
"--json-rpc.ws-address", "127.0.0.1:%s" % blockchain_ws_port(index),
|
||||
# overwrite p2p port: 26656
|
||||
"--p2p.laddr", "tcp://127.0.0.1:%s" % arrange_port(ZGNODE_PORT_CATEGORY_P2P, index),
|
||||
"--p2p.laddr", "tcp://127.0.0.1:%s" % blockchain_p2p_port(index),
|
||||
# overwrite rpc port: 26657
|
||||
"--rpc.laddr", "tcp://127.0.0.1:%s" % arrange_port(ZGNODE_PORT_CATEGORY_RPC, index),
|
||||
"--rpc.laddr", "tcp://127.0.0.1:%s" % blockchain_rpc_port_tendermint(index),
|
||||
# overwrite pprof port: 6060
|
||||
"--rpc.pprof_laddr", "127.0.0.1:%s" % arrange_port(ZGNODE_PORT_CATEGORY_PPROF, index),
|
||||
"--rpc.pprof_laddr", "127.0.0.1:%s" % pprof_port(index),
|
||||
"--log_level", "debug"
|
||||
]
|
||||
|
||||
|
@ -24,6 +24,7 @@ class ZgsNode(TestNode):
|
||||
log,
|
||||
rpc_timeout=10,
|
||||
libp2p_nodes=None,
|
||||
key_file=None,
|
||||
):
|
||||
local_conf = ZGS_CONFIG.copy()
|
||||
if libp2p_nodes is None:
|
||||
@ -54,6 +55,7 @@ class ZgsNode(TestNode):
|
||||
# Overwrite with personalized configs.
|
||||
update_config(local_conf, updated_config)
|
||||
data_dir = os.path.join(root_dir, "zgs_node" + str(index))
|
||||
self.key_file = key_file
|
||||
rpc_url = "http://" + rpc_listen_address
|
||||
super().__init__(
|
||||
NodeType.Zgs,
|
||||
@ -68,10 +70,16 @@ class ZgsNode(TestNode):
|
||||
|
||||
def setup_config(self):
|
||||
os.mkdir(self.data_dir)
|
||||
|
||||
log_config_path = os.path.join(self.data_dir, self.config["log_config_file"])
|
||||
with open(log_config_path, "w") as f:
|
||||
f.write("trace,hyper=info,h2=info")
|
||||
|
||||
if self.key_file is not None:
|
||||
network_dir = os.path.join(self.data_dir, "network")
|
||||
os.mkdir(network_dir)
|
||||
shutil.copy(self.key_file, network_dir)
|
||||
|
||||
initialize_toml_config(self.config_file, self.config)
|
||||
|
||||
def wait_for_rpc_connection(self):
|
||||
|
@ -11,6 +11,7 @@ class PortMin:
|
||||
|
||||
|
||||
MAX_NODES = 100
|
||||
MAX_BLOCKCHAIN_NODES = 50
|
||||
|
||||
|
||||
def p2p_port(n):
|
||||
@ -23,18 +24,25 @@ def rpc_port(n):
|
||||
|
||||
|
||||
def blockchain_p2p_port(n):
|
||||
return PortMin.n + 2 * MAX_NODES + n
|
||||
assert n <= MAX_BLOCKCHAIN_NODES
|
||||
return PortMin.n + MAX_NODES + MAX_BLOCKCHAIN_NODES + n
|
||||
|
||||
|
||||
def blockchain_rpc_port(n):
|
||||
return PortMin.n + 3 * MAX_NODES + n
|
||||
return PortMin.n + MAX_NODES + 2 * MAX_BLOCKCHAIN_NODES + n
|
||||
|
||||
|
||||
def blockchain_rpc_port_core(n):
|
||||
return PortMin.n + 4 * MAX_NODES + n
|
||||
return PortMin.n + MAX_NODES + 3 * MAX_BLOCKCHAIN_NODES + n
|
||||
|
||||
def arrange_port(category: int, node_index: int) -> int:
|
||||
return PortMin.n + (100 + category) * MAX_NODES + node_index
|
||||
def blockchain_ws_port(n):
|
||||
return PortMin.n + MAX_NODES + 4 * MAX_BLOCKCHAIN_NODES + n
|
||||
|
||||
def blockchain_rpc_port_tendermint(n):
|
||||
return PortMin.n + MAX_NODES + 5 * MAX_BLOCKCHAIN_NODES + n
|
||||
|
||||
def pprof_port(n):
|
||||
return PortMin.n + MAX_NODES + 6 * MAX_BLOCKCHAIN_NODES + n
|
||||
|
||||
def wait_until(predicate, *, attempts=float("inf"), timeout=float("inf"), lock=None):
|
||||
if attempts == float("inf") and timeout == float("inf"):
|
||||
|
@ -28,7 +28,7 @@ Different identity schemes can be used to define the node id and signatures. Cur
|
||||
|
||||
## Signing Algorithms
|
||||
|
||||
User's wishing to implement their own singing algorithms simply need to
|
||||
User's wishing to implement their own signing algorithms simply need to
|
||||
implement the `EnrKey` trait and apply it to an `Enr`.
|
||||
|
||||
By default, `k256::SigningKey` implement `EnrKey` and can be used to sign and
|
||||
|
Loading…
Reference in New Issue
Block a user