diff --git a/Cargo.lock b/Cargo.lock index ed02af5..0bdf505 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -225,6 +225,7 @@ dependencies = [ "ethereum-types 0.14.1", "itertools 0.13.0", "lazy_static", + "lru 0.12.5", "once_cell", "serde", "tiny-keccak", @@ -1674,7 +1675,7 @@ dependencies = [ "hkdf", "lazy_static", "libp2p-core 0.30.2", - "lru", + "lru 0.7.8", "parking_lot 0.11.2", "rand 0.8.5", "rlp", @@ -2515,6 +2516,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2" + [[package]] name = "foreign-types" version = "0.3.2" @@ -2947,6 +2954,17 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "hashbrown" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", +] + [[package]] name = "hashers" version = "1.0.1" @@ -4118,7 +4136,7 @@ dependencies = [ "libp2p-core 0.33.0", "libp2p-swarm", "log", - "lru", + "lru 0.7.8", "prost 0.10.4", "prost-build 0.10.4", "prost-codec", @@ -4651,6 +4669,15 @@ dependencies = [ "hashbrown 0.12.3", ] +[[package]] +name = "lru" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" +dependencies = [ + "hashbrown 0.15.0", +] + [[package]] name = "lru-cache" version = "0.1.2" @@ -5019,7 +5046,7 @@ dependencies = [ "lazy_static", "libp2p", "lighthouse_metrics", - "lru", + "lru 0.7.8", "parking_lot 0.12.3", "rand 0.8.5", "regex", diff --git a/common/append_merkle/Cargo.toml b/common/append_merkle/Cargo.toml index 19f4750..2b1b8d6 100644 --- a/common/append_merkle/Cargo.toml +++ b/common/append_merkle/Cargo.toml @@ -13,4 +13,5 @@ serde = { version = "1.0.137", features = ["derive"] } lazy_static = "1.4.0" tracing = "0.1.36" once_cell = "1.19.0" -itertools = "0.13.0" \ No newline at end of file +itertools = "0.13.0" +lru = "0.12.5" \ No newline at end of file diff --git a/common/append_merkle/src/lib.rs b/common/append_merkle/src/lib.rs index bfbf1b5..eb02147 100644 --- a/common/append_merkle/src/lib.rs +++ b/common/append_merkle/src/lib.rs @@ -38,14 +38,9 @@ pub struct AppendMerkleTree> { } impl> AppendMerkleTree { - pub fn new( - node_db: Arc>, - leaves: Vec, - leaf_height: usize, - start_tx_seq: Option, - ) -> Self { + pub fn new(leaves: Vec, leaf_height: usize, start_tx_seq: Option) -> Self { let mut merkle = Self { - node_manager: NodeManager::new(node_db), + node_manager: NodeManager::new_dummy(), delta_nodes_map: BTreeMap::new(), root_to_tx_seq_map: HashMap::new(), min_depth: None, @@ -75,12 +70,13 @@ impl> AppendMerkleTree { pub fn new_with_subtrees( node_db: Arc>, + node_cache_capacity: usize, initial_data: MerkleTreeInitialData, leaf_height: usize, start_tx_seq: Option, ) -> Result { let mut merkle = Self { - node_manager: NodeManager::new(node_db), + node_manager: NodeManager::new(node_db, node_cache_capacity), delta_nodes_map: BTreeMap::new(), root_to_tx_seq_map: HashMap::new(), min_depth: None, @@ -117,7 +113,7 @@ impl> AppendMerkleTree { // Create an empty merkle tree with `depth`. let mut merkle = Self { // dummy node manager for the last chunk. - node_manager: NodeManager::new(Arc::new(EmptyNodeDatabase {})), + node_manager: NodeManager::new_dummy(), delta_nodes_map: BTreeMap::new(), root_to_tx_seq_map: HashMap::new(), min_depth: Some(depth), @@ -139,7 +135,7 @@ impl> AppendMerkleTree { } else { let mut merkle = Self { // dummy node manager for the last chunk. - node_manager: NodeManager::new(Arc::new(EmptyNodeDatabase {})), + node_manager: NodeManager::new_dummy(), delta_nodes_map: BTreeMap::new(), root_to_tx_seq_map: HashMap::new(), min_depth: Some(depth), @@ -775,12 +771,8 @@ mod tests { for _ in 0..entry_len { data.push(H256::random()); } - let mut merkle = AppendMerkleTree::::new( - Arc::new(EmptyNodeDatabase {}), - vec![H256::zero()], - 0, - None, - ); + let mut merkle = + AppendMerkleTree::::new(vec![H256::zero()], 0, None); merkle.append_list(data.clone()); merkle.commit(Some(0)); verify(&data, &mut merkle); @@ -807,12 +799,8 @@ mod tests { for _ in 0..entry_len { data.push(H256::random()); } - let mut merkle = AppendMerkleTree::::new( - Arc::new(EmptyNodeDatabase {}), - vec![H256::zero()], - 0, - None, - ); + let mut merkle = + AppendMerkleTree::::new(vec![H256::zero()], 0, None); merkle.append_list(data.clone()); merkle.commit(Some(0)); @@ -836,12 +824,7 @@ mod tests { #[test] fn test_proof_at_version() { let n = [2, 255, 256, 257]; - let mut merkle = AppendMerkleTree::::new( - Arc::new(EmptyNodeDatabase {}), - vec![H256::zero()], - 0, - None, - ); + let mut merkle = AppendMerkleTree::::new(vec![H256::zero()], 0, None); let mut start_pos = 0; for (tx_seq, &entry_len) in n.iter().enumerate() { diff --git a/common/append_merkle/src/node_manager.rs b/common/append_merkle/src/node_manager.rs index 8fa87ed..369fc2f 100644 --- a/common/append_merkle/src/node_manager.rs +++ b/common/append_merkle/src/node_manager.rs @@ -1,24 +1,33 @@ use crate::HashElement; use anyhow::Result; -use std::collections::BTreeMap; +use lru::LruCache; +use std::num::NonZeroUsize; use std::sync::Arc; use tracing::error; pub struct NodeManager { - cache: BTreeMap<(usize, usize), E>, + cache: LruCache<(usize, usize), E>, layer_size: Vec, db: Arc>, } impl NodeManager { - pub fn new(db: Arc>) -> Self { + pub fn new(db: Arc>, capacity: usize) -> Self { Self { - cache: BTreeMap::new(), + cache: LruCache::new(NonZeroUsize::new(capacity).expect("capacity should be non-zero")), layer_size: vec![], db, } } + pub fn new_dummy() -> Self { + Self { + cache: LruCache::unbounded(), + layer_size: vec![], + db: Arc::new(EmptyNodeDatabase {}), + } + } + pub fn push_node(&mut self, layer: usize, node: E) { self.add_node(layer, self.layer_size[layer], node); self.layer_size[layer] += 1; @@ -28,7 +37,7 @@ impl NodeManager { let pos = &mut self.layer_size[layer]; let mut saved_nodes = Vec::with_capacity(nodes.len()); for node in nodes { - self.cache.insert((layer, *pos), node.clone()); + self.cache.put((layer, *pos), node.clone()); saved_nodes.push((layer, *pos, node)); *pos += 1; } @@ -38,7 +47,7 @@ impl NodeManager { } pub fn get_node(&self, layer: usize, pos: usize) -> Option { - match self.cache.get(&(layer, pos)) { + match self.cache.peek(&(layer, pos)) { Some(node) => Some(node.clone()), None => self.db.get_node(layer, pos).unwrap_or_else(|e| { error!("Failed to get node: {}", e); @@ -60,7 +69,7 @@ impl NodeManager { if let Err(e) = self.db.save_node(layer, pos, &node) { error!("Failed to save node: {}", e); } - self.cache.insert((layer, pos), node); + self.cache.put((layer, pos), node); } pub fn add_layer(&mut self) { @@ -78,7 +87,7 @@ impl NodeManager { pub fn truncate_nodes(&mut self, layer: usize, pos_end: usize) { let mut removed_nodes = Vec::new(); for pos in pos_end..self.layer_size[layer] { - self.cache.remove(&(layer, pos)); + self.cache.pop(&(layer, pos)); removed_nodes.push((layer, pos)); } if let Err(e) = self.db.remove_node_list(&removed_nodes) { diff --git a/node/src/client/builder.rs b/node/src/client/builder.rs index 4d742e1..0cb284d 100644 --- a/node/src/client/builder.rs +++ b/node/src/client/builder.rs @@ -112,7 +112,7 @@ impl ClientBuilder { pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result { let executor = require!("sync", self, runtime_context).clone().executor; let store = Arc::new( - LogManager::rocksdb(LogConfig::default(), &config.db_dir, executor) + LogManager::rocksdb(config.log_config.clone(), &config.db_dir, executor) .map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?, ); diff --git a/node/src/config/convert.rs b/node/src/config/convert.rs index e871b84..43ca653 100644 --- a/node/src/config/convert.rs +++ b/node/src/config/convert.rs @@ -11,6 +11,7 @@ use shared_types::{NetworkIdentity, ProtocolVersion}; use std::net::IpAddr; use std::time::Duration; use storage::config::ShardConfig; +use storage::log_store::log_manager::LogConfig; use storage::StorageConfig; impl ZgsConfig { @@ -101,8 +102,11 @@ impl ZgsConfig { } pub fn storage_config(&self) -> Result { + let mut log_config = LogConfig::default(); + log_config.flow.merkle_node_cache_capacity = self.merkle_node_cache_capacity; Ok(StorageConfig { db_dir: self.db_dir.clone().into(), + log_config, }) } diff --git a/node/src/config/mod.rs b/node/src/config/mod.rs index bad72b9..6d30745 100644 --- a/node/src/config/mod.rs +++ b/node/src/config/mod.rs @@ -60,6 +60,7 @@ build_config! { (prune_check_time_s, (u64), 60) (prune_batch_size, (usize), 16 * 1024) (prune_batch_wait_time_ms, (u64), 1000) + (merkle_node_cache_capacity, (usize), 32 * 1024 * 1024) // misc (log_config_file, (String), "log_config".to_string()) diff --git a/node/storage/src/config.rs b/node/storage/src/config.rs index b5fde0c..2b7160e 100644 --- a/node/storage/src/config.rs +++ b/node/storage/src/config.rs @@ -1,3 +1,4 @@ +use crate::log_store::log_manager::LogConfig; use serde::{Deserialize, Serialize}; use ssz_derive::{Decode, Encode}; use std::{cell::RefCell, path::PathBuf, rc::Rc, str::FromStr}; @@ -7,6 +8,7 @@ pub const SHARD_CONFIG_KEY: &str = "shard_config"; #[derive(Clone)] pub struct Config { pub db_dir: PathBuf, + pub log_config: LogConfig, } #[derive(Clone, Copy, Debug, Decode, Encode, Serialize, Deserialize, Eq, PartialEq)] diff --git a/node/storage/src/log_store/flow_store.rs b/node/storage/src/log_store/flow_store.rs index 204f595..e6c454a 100644 --- a/node/storage/src/log_store/flow_store.rs +++ b/node/storage/src/log_store/flow_store.rs @@ -10,7 +10,7 @@ use crate::log_store::log_manager::{ use crate::log_store::{FlowRead, FlowSeal, FlowWrite}; use crate::{try_option, ZgsKeyValueDB}; use anyhow::{anyhow, bail, Result}; -use append_merkle::{EmptyNodeDatabase, MerkleTreeInitialData, MerkleTreeRead, NodeDatabase}; +use append_merkle::{MerkleTreeInitialData, MerkleTreeRead, NodeDatabase}; use itertools::Itertools; use parking_lot::RwLock; use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle}; @@ -93,6 +93,7 @@ impl FlowStore { #[derive(Clone, Debug)] pub struct FlowConfig { pub batch_size: usize, + pub merkle_node_cache_capacity: usize, pub shard_config: Arc>, } @@ -100,6 +101,8 @@ impl Default for FlowConfig { fn default() -> Self { Self { batch_size: SECTORS_PER_LOAD, + // Each node takes (8+8+32=)48 Bytes, so the default value is 1.5 GB memory size. + merkle_node_cache_capacity: 32 * 1024 * 1024, shard_config: Default::default(), } } @@ -436,13 +439,7 @@ impl FlowDBStore { let mut expected_index = 0; let empty_data = vec![0; PORA_CHUNK_SIZE * ENTRY_SIZE]; - let empty_root = Merkle::new( - Arc::new(EmptyNodeDatabase {}), - data_to_merkle_leaves(&empty_data)?, - 0, - None, - ) - .root(); + let empty_root = Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root(); for r in self.kvdb.iter(COL_ENTRY_BATCH_ROOT) { let (index_bytes, root_bytes) = r?; diff --git a/node/storage/src/log_store/load_chunk/mod.rs b/node/storage/src/log_store/load_chunk/mod.rs index 5fa88ee..0a68dbc 100644 --- a/node/storage/src/log_store/load_chunk/mod.rs +++ b/node/storage/src/log_store/load_chunk/mod.rs @@ -8,11 +8,10 @@ use anyhow::Result; use ethereum_types::H256; use ssz_derive::{Decode, Encode}; use std::cmp::min; -use std::sync::Arc; use crate::log_store::log_manager::data_to_merkle_leaves; use crate::try_option; -use append_merkle::{Algorithm, EmptyNodeDatabase, MerkleTreeRead, Sha3Algorithm}; +use append_merkle::{Algorithm, MerkleTreeRead, Sha3Algorithm}; use shared_types::{ChunkArray, DataRoot, Merkle}; use tracing::trace; use zgs_spec::{ @@ -248,7 +247,7 @@ impl EntryBatch { } else { vec![] }; - let mut merkle = Merkle::new(Arc::new(EmptyNodeDatabase {}), initial_leaves, 0, None); + let mut merkle = Merkle::new(initial_leaves, 0, None); for subtree in self.data.get_subtree_list() { trace!(?subtree, "get subtree, leaves={}", merkle.leaves()); if subtree.start_sector != merkle.leaves() { diff --git a/node/storage/src/log_store/log_manager.rs b/node/storage/src/log_store/log_manager.rs index 2d07ef9..d81f500 100644 --- a/node/storage/src/log_store/log_manager.rs +++ b/node/storage/src/log_store/log_manager.rs @@ -6,7 +6,7 @@ use crate::log_store::{ }; use crate::{try_option, ZgsKeyValueDB}; use anyhow::{anyhow, bail, Result}; -use append_merkle::{Algorithm, EmptyNodeDatabase, MerkleTreeRead, Sha3Algorithm}; +use append_merkle::{Algorithm, MerkleTreeRead, Sha3Algorithm}; use ethereum_types::H256; use kvdb_rocksdb::{Database, DatabaseConfig}; use merkle_light::merkle::{log2_pow2, MerkleTree}; @@ -628,7 +628,7 @@ impl LogManager { ) -> Result { let tx_store = TransactionStore::new(db.clone())?; let flow_db = Arc::new(FlowDBStore::new(db.clone())); - let flow_store = Arc::new(FlowStore::new(flow_db.clone(), config.flow)); + let flow_store = Arc::new(FlowStore::new(flow_db.clone(), config.flow.clone())); let mut initial_data = flow_store.get_chunk_root_list()?; // If the last tx `put_tx` does not complete, we will revert it in `initial_data.subtree_list` // first and call `put_tx` later. The known leaves in its data will be saved in `extra_leaves` @@ -709,6 +709,7 @@ impl LogManager { let mut pora_chunks_merkle = Merkle::new_with_subtrees( flow_db, + config.flow.merkle_node_cache_capacity, initial_data, log2_pow2(PORA_CHUNK_SIZE), start_tx_seq, @@ -983,13 +984,7 @@ impl LogManager { let data = pad_data[start_index * ENTRY_SIZE ..(start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE] .to_vec(); - let root = Merkle::new( - Arc::new(EmptyNodeDatabase {}), - data_to_merkle_leaves(&data)?, - 0, - None, - ) - .root(); + let root = Merkle::new(data_to_merkle_leaves(&data)?, 0, None).root(); merkle.pora_chunks_merkle.append(root); root_map.insert(merkle.pora_chunks_merkle.leaves() - 1, (root, 1)); start_index += PORA_CHUNK_SIZE; diff --git a/node/storage/src/log_store/tests.rs b/node/storage/src/log_store/tests.rs index aef7d55..d77b5f5 100644 --- a/node/storage/src/log_store/tests.rs +++ b/node/storage/src/log_store/tests.rs @@ -29,12 +29,7 @@ fn test_put_get() { data[i * CHUNK_SIZE] = random(); } let (padded_chunks, _) = compute_padded_chunk_size(data_size); - let mut merkle = AppendMerkleTree::::new( - Arc::new(EmptyNodeDatabase {}), - vec![H256::zero()], - 0, - None, - ); + let mut merkle = AppendMerkleTree::::new(vec![H256::zero()], 0, None); merkle.append_list(data_to_merkle_leaves(&LogManager::padding_raw(start_offset - 1)).unwrap()); let mut data_padded = data.clone(); data_padded.append(&mut vec![0u8; CHUNK_SIZE]); @@ -132,7 +127,6 @@ fn test_root() { let mt = sub_merkle_tree(&data).unwrap(); println!("{:?} {}", mt.root(), hex::encode(mt.root())); let append_mt = AppendMerkleTree::::new( - Arc::new(EmptyNodeDatabase {}), data_to_merkle_leaves(&data).unwrap(), 0, None,