Use LRU for cache.

This commit is contained in:
Peilun Li 2024-10-10 20:38:02 +08:00
parent b16abe2303
commit fd96737c6d
12 changed files with 80 additions and 68 deletions

33
Cargo.lock generated
View File

@ -225,6 +225,7 @@ dependencies = [
"ethereum-types 0.14.1",
"itertools 0.13.0",
"lazy_static",
"lru 0.12.5",
"once_cell",
"serde",
"tiny-keccak",
@ -1674,7 +1675,7 @@ dependencies = [
"hkdf",
"lazy_static",
"libp2p-core 0.30.2",
"lru",
"lru 0.7.8",
"parking_lot 0.11.2",
"rand 0.8.5",
"rlp",
@ -2515,6 +2516,12 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foldhash"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2"
[[package]]
name = "foreign-types"
version = "0.3.2"
@ -2947,6 +2954,17 @@ dependencies = [
"allocator-api2",
]
[[package]]
name = "hashbrown"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb"
dependencies = [
"allocator-api2",
"equivalent",
"foldhash",
]
[[package]]
name = "hashers"
version = "1.0.1"
@ -4118,7 +4136,7 @@ dependencies = [
"libp2p-core 0.33.0",
"libp2p-swarm",
"log",
"lru",
"lru 0.7.8",
"prost 0.10.4",
"prost-build 0.10.4",
"prost-codec",
@ -4651,6 +4669,15 @@ dependencies = [
"hashbrown 0.12.3",
]
[[package]]
name = "lru"
version = "0.12.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38"
dependencies = [
"hashbrown 0.15.0",
]
[[package]]
name = "lru-cache"
version = "0.1.2"
@ -5019,7 +5046,7 @@ dependencies = [
"lazy_static",
"libp2p",
"lighthouse_metrics",
"lru",
"lru 0.7.8",
"parking_lot 0.12.3",
"rand 0.8.5",
"regex",

View File

@ -13,4 +13,5 @@ serde = { version = "1.0.137", features = ["derive"] }
lazy_static = "1.4.0"
tracing = "0.1.36"
once_cell = "1.19.0"
itertools = "0.13.0"
itertools = "0.13.0"
lru = "0.12.5"

View File

@ -38,14 +38,9 @@ pub struct AppendMerkleTree<E: HashElement, A: Algorithm<E>> {
}
impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
pub fn new(
node_db: Arc<dyn NodeDatabase<E>>,
leaves: Vec<E>,
leaf_height: usize,
start_tx_seq: Option<u64>,
) -> Self {
pub fn new(leaves: Vec<E>, leaf_height: usize, start_tx_seq: Option<u64>) -> Self {
let mut merkle = Self {
node_manager: NodeManager::new(node_db),
node_manager: NodeManager::new_dummy(),
delta_nodes_map: BTreeMap::new(),
root_to_tx_seq_map: HashMap::new(),
min_depth: None,
@ -75,12 +70,13 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
pub fn new_with_subtrees(
node_db: Arc<dyn NodeDatabase<E>>,
node_cache_capacity: usize,
initial_data: MerkleTreeInitialData<E>,
leaf_height: usize,
start_tx_seq: Option<u64>,
) -> Result<Self> {
let mut merkle = Self {
node_manager: NodeManager::new(node_db),
node_manager: NodeManager::new(node_db, node_cache_capacity),
delta_nodes_map: BTreeMap::new(),
root_to_tx_seq_map: HashMap::new(),
min_depth: None,
@ -117,7 +113,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
// Create an empty merkle tree with `depth`.
let mut merkle = Self {
// dummy node manager for the last chunk.
node_manager: NodeManager::new(Arc::new(EmptyNodeDatabase {})),
node_manager: NodeManager::new_dummy(),
delta_nodes_map: BTreeMap::new(),
root_to_tx_seq_map: HashMap::new(),
min_depth: Some(depth),
@ -139,7 +135,7 @@ impl<E: HashElement, A: Algorithm<E>> AppendMerkleTree<E, A> {
} else {
let mut merkle = Self {
// dummy node manager for the last chunk.
node_manager: NodeManager::new(Arc::new(EmptyNodeDatabase {})),
node_manager: NodeManager::new_dummy(),
delta_nodes_map: BTreeMap::new(),
root_to_tx_seq_map: HashMap::new(),
min_depth: Some(depth),
@ -775,12 +771,8 @@ mod tests {
for _ in 0..entry_len {
data.push(H256::random());
}
let mut merkle = AppendMerkleTree::<H256, Sha3Algorithm>::new(
Arc::new(EmptyNodeDatabase {}),
vec![H256::zero()],
0,
None,
);
let mut merkle =
AppendMerkleTree::<H256, Sha3Algorithm>::new(vec![H256::zero()], 0, None);
merkle.append_list(data.clone());
merkle.commit(Some(0));
verify(&data, &mut merkle);
@ -807,12 +799,8 @@ mod tests {
for _ in 0..entry_len {
data.push(H256::random());
}
let mut merkle = AppendMerkleTree::<H256, Sha3Algorithm>::new(
Arc::new(EmptyNodeDatabase {}),
vec![H256::zero()],
0,
None,
);
let mut merkle =
AppendMerkleTree::<H256, Sha3Algorithm>::new(vec![H256::zero()], 0, None);
merkle.append_list(data.clone());
merkle.commit(Some(0));
@ -836,12 +824,7 @@ mod tests {
#[test]
fn test_proof_at_version() {
let n = [2, 255, 256, 257];
let mut merkle = AppendMerkleTree::<H256, Sha3Algorithm>::new(
Arc::new(EmptyNodeDatabase {}),
vec![H256::zero()],
0,
None,
);
let mut merkle = AppendMerkleTree::<H256, Sha3Algorithm>::new(vec![H256::zero()], 0, None);
let mut start_pos = 0;
for (tx_seq, &entry_len) in n.iter().enumerate() {

View File

@ -1,24 +1,33 @@
use crate::HashElement;
use anyhow::Result;
use std::collections::BTreeMap;
use lru::LruCache;
use std::num::NonZeroUsize;
use std::sync::Arc;
use tracing::error;
pub struct NodeManager<E: HashElement> {
cache: BTreeMap<(usize, usize), E>,
cache: LruCache<(usize, usize), E>,
layer_size: Vec<usize>,
db: Arc<dyn NodeDatabase<E>>,
}
impl<E: HashElement> NodeManager<E> {
pub fn new(db: Arc<dyn NodeDatabase<E>>) -> Self {
pub fn new(db: Arc<dyn NodeDatabase<E>>, capacity: usize) -> Self {
Self {
cache: BTreeMap::new(),
cache: LruCache::new(NonZeroUsize::new(capacity).expect("capacity should be non-zero")),
layer_size: vec![],
db,
}
}
pub fn new_dummy() -> Self {
Self {
cache: LruCache::unbounded(),
layer_size: vec![],
db: Arc::new(EmptyNodeDatabase {}),
}
}
pub fn push_node(&mut self, layer: usize, node: E) {
self.add_node(layer, self.layer_size[layer], node);
self.layer_size[layer] += 1;
@ -28,7 +37,7 @@ impl<E: HashElement> NodeManager<E> {
let pos = &mut self.layer_size[layer];
let mut saved_nodes = Vec::with_capacity(nodes.len());
for node in nodes {
self.cache.insert((layer, *pos), node.clone());
self.cache.put((layer, *pos), node.clone());
saved_nodes.push((layer, *pos, node));
*pos += 1;
}
@ -38,7 +47,7 @@ impl<E: HashElement> NodeManager<E> {
}
pub fn get_node(&self, layer: usize, pos: usize) -> Option<E> {
match self.cache.get(&(layer, pos)) {
match self.cache.peek(&(layer, pos)) {
Some(node) => Some(node.clone()),
None => self.db.get_node(layer, pos).unwrap_or_else(|e| {
error!("Failed to get node: {}", e);
@ -60,7 +69,7 @@ impl<E: HashElement> NodeManager<E> {
if let Err(e) = self.db.save_node(layer, pos, &node) {
error!("Failed to save node: {}", e);
}
self.cache.insert((layer, pos), node);
self.cache.put((layer, pos), node);
}
pub fn add_layer(&mut self) {
@ -78,7 +87,7 @@ impl<E: HashElement> NodeManager<E> {
pub fn truncate_nodes(&mut self, layer: usize, pos_end: usize) {
let mut removed_nodes = Vec::new();
for pos in pos_end..self.layer_size[layer] {
self.cache.remove(&(layer, pos));
self.cache.pop(&(layer, pos));
removed_nodes.push((layer, pos));
}
if let Err(e) = self.db.remove_node_list(&removed_nodes) {

View File

@ -112,7 +112,7 @@ impl ClientBuilder {
pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
let executor = require!("sync", self, runtime_context).clone().executor;
let store = Arc::new(
LogManager::rocksdb(LogConfig::default(), &config.db_dir, executor)
LogManager::rocksdb(config.log_config.clone(), &config.db_dir, executor)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
);

View File

@ -11,6 +11,7 @@ use shared_types::{NetworkIdentity, ProtocolVersion};
use std::net::IpAddr;
use std::time::Duration;
use storage::config::ShardConfig;
use storage::log_store::log_manager::LogConfig;
use storage::StorageConfig;
impl ZgsConfig {
@ -101,8 +102,11 @@ impl ZgsConfig {
}
pub fn storage_config(&self) -> Result<StorageConfig, String> {
let mut log_config = LogConfig::default();
log_config.flow.merkle_node_cache_capacity = self.merkle_node_cache_capacity;
Ok(StorageConfig {
db_dir: self.db_dir.clone().into(),
log_config,
})
}

View File

@ -60,6 +60,7 @@ build_config! {
(prune_check_time_s, (u64), 60)
(prune_batch_size, (usize), 16 * 1024)
(prune_batch_wait_time_ms, (u64), 1000)
(merkle_node_cache_capacity, (usize), 32 * 1024 * 1024)
// misc
(log_config_file, (String), "log_config".to_string())

View File

@ -1,3 +1,4 @@
use crate::log_store::log_manager::LogConfig;
use serde::{Deserialize, Serialize};
use ssz_derive::{Decode, Encode};
use std::{cell::RefCell, path::PathBuf, rc::Rc, str::FromStr};
@ -7,6 +8,7 @@ pub const SHARD_CONFIG_KEY: &str = "shard_config";
#[derive(Clone)]
pub struct Config {
pub db_dir: PathBuf,
pub log_config: LogConfig,
}
#[derive(Clone, Copy, Debug, Decode, Encode, Serialize, Deserialize, Eq, PartialEq)]

View File

@ -10,7 +10,7 @@ use crate::log_store::log_manager::{
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
use crate::{try_option, ZgsKeyValueDB};
use anyhow::{anyhow, bail, Result};
use append_merkle::{EmptyNodeDatabase, MerkleTreeInitialData, MerkleTreeRead, NodeDatabase};
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead, NodeDatabase};
use itertools::Itertools;
use parking_lot::RwLock;
use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle};
@ -93,6 +93,7 @@ impl FlowStore {
#[derive(Clone, Debug)]
pub struct FlowConfig {
pub batch_size: usize,
pub merkle_node_cache_capacity: usize,
pub shard_config: Arc<RwLock<ShardConfig>>,
}
@ -100,6 +101,8 @@ impl Default for FlowConfig {
fn default() -> Self {
Self {
batch_size: SECTORS_PER_LOAD,
// Each node takes (8+8+32=)48 Bytes, so the default value is 1.5 GB memory size.
merkle_node_cache_capacity: 32 * 1024 * 1024,
shard_config: Default::default(),
}
}
@ -436,13 +439,7 @@ impl FlowDBStore {
let mut expected_index = 0;
let empty_data = vec![0; PORA_CHUNK_SIZE * ENTRY_SIZE];
let empty_root = Merkle::new(
Arc::new(EmptyNodeDatabase {}),
data_to_merkle_leaves(&empty_data)?,
0,
None,
)
.root();
let empty_root = Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root();
for r in self.kvdb.iter(COL_ENTRY_BATCH_ROOT) {
let (index_bytes, root_bytes) = r?;

View File

@ -8,11 +8,10 @@ use anyhow::Result;
use ethereum_types::H256;
use ssz_derive::{Decode, Encode};
use std::cmp::min;
use std::sync::Arc;
use crate::log_store::log_manager::data_to_merkle_leaves;
use crate::try_option;
use append_merkle::{Algorithm, EmptyNodeDatabase, MerkleTreeRead, Sha3Algorithm};
use append_merkle::{Algorithm, MerkleTreeRead, Sha3Algorithm};
use shared_types::{ChunkArray, DataRoot, Merkle};
use tracing::trace;
use zgs_spec::{
@ -248,7 +247,7 @@ impl EntryBatch {
} else {
vec![]
};
let mut merkle = Merkle::new(Arc::new(EmptyNodeDatabase {}), initial_leaves, 0, None);
let mut merkle = Merkle::new(initial_leaves, 0, None);
for subtree in self.data.get_subtree_list() {
trace!(?subtree, "get subtree, leaves={}", merkle.leaves());
if subtree.start_sector != merkle.leaves() {

View File

@ -6,7 +6,7 @@ use crate::log_store::{
};
use crate::{try_option, ZgsKeyValueDB};
use anyhow::{anyhow, bail, Result};
use append_merkle::{Algorithm, EmptyNodeDatabase, MerkleTreeRead, Sha3Algorithm};
use append_merkle::{Algorithm, MerkleTreeRead, Sha3Algorithm};
use ethereum_types::H256;
use kvdb_rocksdb::{Database, DatabaseConfig};
use merkle_light::merkle::{log2_pow2, MerkleTree};
@ -628,7 +628,7 @@ impl LogManager {
) -> Result<Self> {
let tx_store = TransactionStore::new(db.clone())?;
let flow_db = Arc::new(FlowDBStore::new(db.clone()));
let flow_store = Arc::new(FlowStore::new(flow_db.clone(), config.flow));
let flow_store = Arc::new(FlowStore::new(flow_db.clone(), config.flow.clone()));
let mut initial_data = flow_store.get_chunk_root_list()?;
// If the last tx `put_tx` does not complete, we will revert it in `initial_data.subtree_list`
// first and call `put_tx` later. The known leaves in its data will be saved in `extra_leaves`
@ -709,6 +709,7 @@ impl LogManager {
let mut pora_chunks_merkle = Merkle::new_with_subtrees(
flow_db,
config.flow.merkle_node_cache_capacity,
initial_data,
log2_pow2(PORA_CHUNK_SIZE),
start_tx_seq,
@ -983,13 +984,7 @@ impl LogManager {
let data = pad_data[start_index * ENTRY_SIZE
..(start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE]
.to_vec();
let root = Merkle::new(
Arc::new(EmptyNodeDatabase {}),
data_to_merkle_leaves(&data)?,
0,
None,
)
.root();
let root = Merkle::new(data_to_merkle_leaves(&data)?, 0, None).root();
merkle.pora_chunks_merkle.append(root);
root_map.insert(merkle.pora_chunks_merkle.leaves() - 1, (root, 1));
start_index += PORA_CHUNK_SIZE;

View File

@ -29,12 +29,7 @@ fn test_put_get() {
data[i * CHUNK_SIZE] = random();
}
let (padded_chunks, _) = compute_padded_chunk_size(data_size);
let mut merkle = AppendMerkleTree::<H256, Sha3Algorithm>::new(
Arc::new(EmptyNodeDatabase {}),
vec![H256::zero()],
0,
None,
);
let mut merkle = AppendMerkleTree::<H256, Sha3Algorithm>::new(vec![H256::zero()], 0, None);
merkle.append_list(data_to_merkle_leaves(&LogManager::padding_raw(start_offset - 1)).unwrap());
let mut data_padded = data.clone();
data_padded.append(&mut vec![0u8; CHUNK_SIZE]);
@ -132,7 +127,6 @@ fn test_root() {
let mt = sub_merkle_tree(&data).unwrap();
println!("{:?} {}", mt.root(), hex::encode(mt.root()));
let append_mt = AppendMerkleTree::<H256, Sha3Algorithm>::new(
Arc::new(EmptyNodeDatabase {}),
data_to_merkle_leaves(&data).unwrap(),
0,
None,