mirror of
				https://github.com/0glabs/0g-storage-node.git
				synced 2025-11-04 00:27:39 +00:00 
			
		
		
		
	
							parent
							
								
									395aeabde7
								
							
						
					
					
						commit
						20266e0a6c
					
				
							
								
								
									
										2
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										2
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							@ -7280,7 +7280,9 @@ dependencies = [
 | 
			
		||||
 "serde_json",
 | 
			
		||||
 "shared_types",
 | 
			
		||||
 "static_assertions",
 | 
			
		||||
 "task_executor",
 | 
			
		||||
 "tiny-keccak",
 | 
			
		||||
 "tokio",
 | 
			
		||||
 "tracing",
 | 
			
		||||
 "typenum",
 | 
			
		||||
 "zgs_seal",
 | 
			
		||||
 | 
			
		||||
@ -481,7 +481,7 @@ impl LogSyncManager {
 | 
			
		||||
            } else {
 | 
			
		||||
                // check if current node need to save at least one segment
 | 
			
		||||
                let store = self.store.clone();
 | 
			
		||||
                let shard_config = store.flow().get_shard_config();
 | 
			
		||||
                let shard_config = store.get_shard_config();
 | 
			
		||||
                let start_segment_index = tx.start_entry_index as usize / PORA_CHUNK_SIZE;
 | 
			
		||||
                let end_segment_index =
 | 
			
		||||
                    (tx.start_entry_index as usize + bytes_to_chunks(tx.size as usize) - 1)
 | 
			
		||||
 | 
			
		||||
@ -423,7 +423,7 @@ impl Libp2pEventHandler {
 | 
			
		||||
        let addr = self.get_listen_addr_or_add().await?;
 | 
			
		||||
 | 
			
		||||
        let timestamp = timestamp_now();
 | 
			
		||||
        let shard_config = self.store.get_store().flow().get_shard_config();
 | 
			
		||||
        let shard_config = self.store.get_store().get_shard_config();
 | 
			
		||||
 | 
			
		||||
        let msg = AnnounceFile {
 | 
			
		||||
            tx_ids,
 | 
			
		||||
@ -699,7 +699,7 @@ impl Libp2pEventHandler {
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // notify sync layer if shard config matches
 | 
			
		||||
        let my_shard_config = self.store.get_store().flow().get_shard_config();
 | 
			
		||||
        let my_shard_config = self.store.get_store().get_shard_config();
 | 
			
		||||
        if my_shard_config.intersect(&announced_shard_config) {
 | 
			
		||||
            for tx_id in msg.tx_ids.iter() {
 | 
			
		||||
                self.send_to_sync(SyncMessage::AnnounceFileGossip {
 | 
			
		||||
@ -911,7 +911,9 @@ mod tests {
 | 
			
		||||
            let (network_send, network_recv) = mpsc::unbounded_channel();
 | 
			
		||||
            let (sync_send, sync_recv) = channel::Channel::unbounded("test");
 | 
			
		||||
            let (chunk_pool_send, _chunk_pool_recv) = mpsc::unbounded_channel();
 | 
			
		||||
            let store = LogManager::memorydb(LogConfig::default()).unwrap();
 | 
			
		||||
 | 
			
		||||
            let executor = runtime.task_executor.clone();
 | 
			
		||||
            let store = LogManager::memorydb(LogConfig::default(), executor).unwrap();
 | 
			
		||||
            Self {
 | 
			
		||||
                runtime,
 | 
			
		||||
                network_globals: Arc::new(network_globals),
 | 
			
		||||
 | 
			
		||||
@ -183,7 +183,7 @@ impl RpcServer for RpcServerImpl {
 | 
			
		||||
 | 
			
		||||
    async fn get_shard_config(&self) -> RpcResult<ShardConfig> {
 | 
			
		||||
        debug!("zgs_getShardConfig");
 | 
			
		||||
        let shard_config = self.ctx.log_store.get_store().flow().get_shard_config();
 | 
			
		||||
        let shard_config = self.ctx.log_store.get_store().get_shard_config();
 | 
			
		||||
        Ok(shard_config)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -89,9 +89,10 @@ impl ClientBuilder {
 | 
			
		||||
 | 
			
		||||
    /// Initializes in-memory storage.
 | 
			
		||||
    pub fn with_memory_store(mut self) -> Result<Self, String> {
 | 
			
		||||
        let executor = require!("sync", self, runtime_context).clone().executor;
 | 
			
		||||
        // TODO(zz): Set config.
 | 
			
		||||
        let store = Arc::new(
 | 
			
		||||
            LogManager::memorydb(LogConfig::default())
 | 
			
		||||
            LogManager::memorydb(LogConfig::default(), executor)
 | 
			
		||||
                .map_err(|e| format!("Unable to start in-memory store: {:?}", e))?,
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
@ -109,8 +110,9 @@ impl ClientBuilder {
 | 
			
		||||
 | 
			
		||||
    /// Initializes RocksDB storage.
 | 
			
		||||
    pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
 | 
			
		||||
        let executor = require!("sync", self, runtime_context).clone().executor;
 | 
			
		||||
        let store = Arc::new(
 | 
			
		||||
            LogManager::rocksdb(LogConfig::default(), &config.db_dir)
 | 
			
		||||
            LogManager::rocksdb(LogConfig::default(), &config.db_dir, executor)
 | 
			
		||||
                .map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -95,23 +95,22 @@ impl Store {
 | 
			
		||||
        &self,
 | 
			
		||||
        seal_index_max: usize,
 | 
			
		||||
    ) -> anyhow::Result<Option<Vec<SealTask>>> {
 | 
			
		||||
        self.spawn(move |store| store.flow().pull_seal_chunk(seal_index_max))
 | 
			
		||||
        self.spawn(move |store| store.pull_seal_chunk(seal_index_max))
 | 
			
		||||
            .await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> anyhow::Result<()> {
 | 
			
		||||
        self.spawn(move |store| store.flow().submit_seal_result(answers))
 | 
			
		||||
        self.spawn(move |store| store.submit_seal_result(answers))
 | 
			
		||||
            .await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>> {
 | 
			
		||||
        self.spawn(move |store| store.flow().load_sealed_data(chunk_index))
 | 
			
		||||
        self.spawn(move |store| store.load_sealed_data(chunk_index))
 | 
			
		||||
            .await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn get_num_entries(&self) -> Result<u64> {
 | 
			
		||||
        self.spawn(move |store| store.flow().get_num_entries())
 | 
			
		||||
            .await
 | 
			
		||||
        self.spawn(move |store| store.get_num_entries()).await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn remove_chunks_batch(&self, batch_list: &[u64]) -> Result<()> {
 | 
			
		||||
@ -122,7 +121,7 @@ impl Store {
 | 
			
		||||
 | 
			
		||||
    pub async fn update_shard_config(&self, shard_config: ShardConfig) {
 | 
			
		||||
        self.spawn(move |store| {
 | 
			
		||||
            store.flow().update_shard_config(shard_config);
 | 
			
		||||
            store.update_shard_config(shard_config);
 | 
			
		||||
            Ok(())
 | 
			
		||||
        })
 | 
			
		||||
        .await
 | 
			
		||||
 | 
			
		||||
@ -29,6 +29,8 @@ itertools = "0.13.0"
 | 
			
		||||
serde = { version = "1.0.197", features = ["derive"] }
 | 
			
		||||
parking_lot = "0.12.3"
 | 
			
		||||
serde_json = "1.0.127"
 | 
			
		||||
tokio = { version = "1.10.0", features = ["sync"] }
 | 
			
		||||
task_executor = { path = "../../common/task_executor" }
 | 
			
		||||
 | 
			
		||||
[dev-dependencies]
 | 
			
		||||
rand = "0.8.5"
 | 
			
		||||
 | 
			
		||||
@ -14,14 +14,18 @@ use storage::{
 | 
			
		||||
    },
 | 
			
		||||
    LogManager,
 | 
			
		||||
};
 | 
			
		||||
use task_executor::test_utils::TestRuntime;
 | 
			
		||||
 | 
			
		||||
fn write_performance(c: &mut Criterion) {
 | 
			
		||||
    if Path::new("db_write").exists() {
 | 
			
		||||
        fs::remove_dir_all("db_write").unwrap();
 | 
			
		||||
    }
 | 
			
		||||
    let runtime = TestRuntime::default();
 | 
			
		||||
 | 
			
		||||
    let executor = runtime.task_executor.clone();
 | 
			
		||||
 | 
			
		||||
    let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
 | 
			
		||||
        LogManager::rocksdb(LogConfig::default(), "db_write")
 | 
			
		||||
        LogManager::rocksdb(LogConfig::default(), "db_write", executor)
 | 
			
		||||
            .map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
 | 
			
		||||
            .unwrap(),
 | 
			
		||||
    ));
 | 
			
		||||
@ -105,8 +109,12 @@ fn read_performance(c: &mut Criterion) {
 | 
			
		||||
        fs::remove_dir_all("db_read").unwrap();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    let runtime = TestRuntime::default();
 | 
			
		||||
 | 
			
		||||
    let executor = runtime.task_executor.clone();
 | 
			
		||||
 | 
			
		||||
    let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
 | 
			
		||||
        LogManager::rocksdb(LogConfig::default(), "db_read")
 | 
			
		||||
        LogManager::rocksdb(LogConfig::default(), "db_read", executor)
 | 
			
		||||
            .map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
 | 
			
		||||
            .unwrap(),
 | 
			
		||||
    ));
 | 
			
		||||
 | 
			
		||||
@ -3,7 +3,8 @@ use super::{MineLoadChunk, SealAnswer, SealTask};
 | 
			
		||||
use crate::config::ShardConfig;
 | 
			
		||||
use crate::error::Error;
 | 
			
		||||
use crate::log_store::log_manager::{
 | 
			
		||||
    bytes_to_entries, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT, COL_FLOW_MPT_NODES, PORA_CHUNK_SIZE,
 | 
			
		||||
    bytes_to_entries, data_to_merkle_leaves, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT,
 | 
			
		||||
    COL_FLOW_MPT_NODES, ENTRY_SIZE, PORA_CHUNK_SIZE,
 | 
			
		||||
};
 | 
			
		||||
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
 | 
			
		||||
use crate::{try_option, ZgsKeyValueDB};
 | 
			
		||||
@ -11,7 +12,7 @@ use anyhow::{anyhow, bail, Result};
 | 
			
		||||
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead};
 | 
			
		||||
use itertools::Itertools;
 | 
			
		||||
use parking_lot::RwLock;
 | 
			
		||||
use shared_types::{ChunkArray, DataRoot, FlowProof};
 | 
			
		||||
use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle};
 | 
			
		||||
use ssz::{Decode, Encode};
 | 
			
		||||
use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
 | 
			
		||||
use std::cmp::Ordering;
 | 
			
		||||
@ -441,6 +442,10 @@ impl FlowDBStore {
 | 
			
		||||
        // and they will be updated in the merkle tree with `fill_leaf` by the caller.
 | 
			
		||||
        let mut leaf_list = Vec::new();
 | 
			
		||||
        let mut expected_index = 0;
 | 
			
		||||
 | 
			
		||||
        let empty_data = vec![0; PORA_CHUNK_SIZE * ENTRY_SIZE];
 | 
			
		||||
        let empty_root = *Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root();
 | 
			
		||||
 | 
			
		||||
        for r in self.kvdb.iter(COL_ENTRY_BATCH_ROOT) {
 | 
			
		||||
            let (index_bytes, root_bytes) = r?;
 | 
			
		||||
            let (batch_index, subtree_depth) = decode_batch_root_key(index_bytes.as_ref())?;
 | 
			
		||||
@ -475,25 +480,26 @@ impl FlowDBStore {
 | 
			
		||||
                            expected_index += 1;
 | 
			
		||||
                        }
 | 
			
		||||
                        Ordering::Greater => {
 | 
			
		||||
                            bail!(
 | 
			
		||||
                                "unexpected chunk leaf in range, expected={}, get={}, range={:?}",
 | 
			
		||||
                                expected_index,
 | 
			
		||||
                                batch_index,
 | 
			
		||||
                                range_root,
 | 
			
		||||
                            );
 | 
			
		||||
                            while batch_index > expected_index {
 | 
			
		||||
                                // Fill the gap with empty leaves.
 | 
			
		||||
                                root_list.push((1, empty_root));
 | 
			
		||||
                                expected_index += 1;
 | 
			
		||||
                            }
 | 
			
		||||
                            range_root = None;
 | 
			
		||||
                            root_list.push((1, root));
 | 
			
		||||
                            expected_index += 1;
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            } else if expected_index == batch_index {
 | 
			
		||||
            } else {
 | 
			
		||||
                while batch_index > expected_index {
 | 
			
		||||
                    // Fill the gap with empty leaves.
 | 
			
		||||
                    root_list.push((1, empty_root));
 | 
			
		||||
                    expected_index += 1;
 | 
			
		||||
                }
 | 
			
		||||
                range_root = Some(BatchRoot::Multiple((subtree_depth, root)));
 | 
			
		||||
                root_list.push((subtree_depth, root));
 | 
			
		||||
                expected_index += 1 << (subtree_depth - 1);
 | 
			
		||||
            } else {
 | 
			
		||||
                bail!(
 | 
			
		||||
                    "unexpected range root: expected={} get={}",
 | 
			
		||||
                    expected_index,
 | 
			
		||||
                    batch_index
 | 
			
		||||
                );
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        let extra_node_list = self.get_mpt_node_list()?;
 | 
			
		||||
 | 
			
		||||
@ -1,3 +1,4 @@
 | 
			
		||||
use crate::config::ShardConfig;
 | 
			
		||||
use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowStore};
 | 
			
		||||
use crate::log_store::tx_store::TransactionStore;
 | 
			
		||||
use crate::log_store::{
 | 
			
		||||
@ -20,11 +21,12 @@ use shared_types::{
 | 
			
		||||
use std::cmp::Ordering;
 | 
			
		||||
use std::collections::BTreeMap;
 | 
			
		||||
use std::path::Path;
 | 
			
		||||
use std::sync::mpsc;
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use tracing::{debug, error, info, instrument, trace, warn};
 | 
			
		||||
 | 
			
		||||
use super::tx_store::BlockHashAndSubmissionIndex;
 | 
			
		||||
use super::LogStoreInner;
 | 
			
		||||
use super::{FlowSeal, MineLoadChunk, SealAnswer, SealTask};
 | 
			
		||||
 | 
			
		||||
/// 256 Bytes
 | 
			
		||||
pub const ENTRY_SIZE: usize = 256;
 | 
			
		||||
@ -45,11 +47,18 @@ pub const COL_NUM: u32 = 9;
 | 
			
		||||
// Process at most 1M entries (256MB) pad data at a time.
 | 
			
		||||
const PAD_MAX_SIZE: usize = 1 << 20;
 | 
			
		||||
 | 
			
		||||
pub struct UpdateFlowMessage {
 | 
			
		||||
    pub root_map: BTreeMap<usize, (H256, usize)>,
 | 
			
		||||
    pub pad_data: usize,
 | 
			
		||||
    pub tx_start_flow_index: u64,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub struct LogManager {
 | 
			
		||||
    pub(crate) db: Arc<dyn ZgsKeyValueDB>,
 | 
			
		||||
    tx_store: TransactionStore,
 | 
			
		||||
    flow_store: FlowStore,
 | 
			
		||||
    flow_store: Arc<FlowStore>,
 | 
			
		||||
    merkle: RwLock<MerkleManager>,
 | 
			
		||||
    sender: mpsc::Sender<UpdateFlowMessage>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
struct MerkleManager {
 | 
			
		||||
@ -139,16 +148,6 @@ pub struct LogConfig {
 | 
			
		||||
    pub flow: FlowConfig,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl LogStoreInner for LogManager {
 | 
			
		||||
    fn flow(&self) -> &dyn super::Flow {
 | 
			
		||||
        &self.flow_store
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn flow_mut(&mut self) -> &mut dyn super::Flow {
 | 
			
		||||
        &mut self.flow_store
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl LogStoreChunkWrite for LogManager {
 | 
			
		||||
    fn put_chunks(&self, tx_seq: u64, chunks: ChunkArray) -> Result<()> {
 | 
			
		||||
        let mut merkle = self.merkle.write();
 | 
			
		||||
@ -388,6 +387,14 @@ impl LogStoreWrite for LogManager {
 | 
			
		||||
    fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()> {
 | 
			
		||||
        self.tx_store.delete_block_hash_by_number(block_number)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn update_shard_config(&self, shard_config: ShardConfig) {
 | 
			
		||||
        self.flow_store.update_shard_config(shard_config)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()> {
 | 
			
		||||
        self.flow_store.submit_seal_result(answers)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl LogStoreChunkRead for LogManager {
 | 
			
		||||
@ -578,24 +585,48 @@ impl LogStoreRead for LogManager {
 | 
			
		||||
    fn check_tx_pruned(&self, tx_seq: u64) -> crate::error::Result<bool> {
 | 
			
		||||
        self.tx_store.check_tx_pruned(tx_seq)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn pull_seal_chunk(&self, seal_index_max: usize) -> Result<Option<Vec<SealTask>>> {
 | 
			
		||||
        self.flow_store.pull_seal_chunk(seal_index_max)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn get_num_entries(&self) -> Result<u64> {
 | 
			
		||||
        self.flow_store.get_num_entries()
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>> {
 | 
			
		||||
        self.flow_store.load_sealed_data(chunk_index)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn get_shard_config(&self) -> ShardConfig {
 | 
			
		||||
        self.flow_store.get_shard_config()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl LogManager {
 | 
			
		||||
    pub fn rocksdb(config: LogConfig, path: impl AsRef<Path>) -> Result<Self> {
 | 
			
		||||
    pub fn rocksdb(
 | 
			
		||||
        config: LogConfig,
 | 
			
		||||
        path: impl AsRef<Path>,
 | 
			
		||||
        executor: task_executor::TaskExecutor,
 | 
			
		||||
    ) -> Result<Self> {
 | 
			
		||||
        let mut db_config = DatabaseConfig::with_columns(COL_NUM);
 | 
			
		||||
        db_config.enable_statistics = true;
 | 
			
		||||
        let db = Arc::new(Database::open(&db_config, path)?);
 | 
			
		||||
        Self::new(db, config)
 | 
			
		||||
        Self::new(db, config, executor)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn memorydb(config: LogConfig) -> Result<Self> {
 | 
			
		||||
    pub fn memorydb(config: LogConfig, executor: task_executor::TaskExecutor) -> Result<Self> {
 | 
			
		||||
        let db = Arc::new(kvdb_memorydb::create(COL_NUM));
 | 
			
		||||
        Self::new(db, config)
 | 
			
		||||
        Self::new(db, config, executor)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn new(db: Arc<dyn ZgsKeyValueDB>, config: LogConfig) -> Result<Self> {
 | 
			
		||||
    fn new(
 | 
			
		||||
        db: Arc<dyn ZgsKeyValueDB>,
 | 
			
		||||
        config: LogConfig,
 | 
			
		||||
        executor: task_executor::TaskExecutor,
 | 
			
		||||
    ) -> Result<Self> {
 | 
			
		||||
        let tx_store = TransactionStore::new(db.clone())?;
 | 
			
		||||
        let flow_store = FlowStore::new(db.clone(), config.flow);
 | 
			
		||||
        let flow_store = Arc::new(FlowStore::new(db.clone(), config.flow));
 | 
			
		||||
        let mut initial_data = flow_store.get_chunk_root_list()?;
 | 
			
		||||
        // If the last tx `put_tx` does not complete, we will revert it in `initial_data.subtree_list`
 | 
			
		||||
        // first and call `put_tx` later. The known leaves in its data will be saved in `extra_leaves`
 | 
			
		||||
@ -699,13 +730,19 @@ impl LogManager {
 | 
			
		||||
            pora_chunks_merkle,
 | 
			
		||||
            last_chunk_merkle,
 | 
			
		||||
        });
 | 
			
		||||
        let log_manager = Self {
 | 
			
		||||
 | 
			
		||||
        let (sender, receiver) = mpsc::channel();
 | 
			
		||||
 | 
			
		||||
        let mut log_manager = Self {
 | 
			
		||||
            db,
 | 
			
		||||
            tx_store,
 | 
			
		||||
            flow_store,
 | 
			
		||||
            merkle,
 | 
			
		||||
            sender,
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        log_manager.start_receiver(receiver, executor);
 | 
			
		||||
 | 
			
		||||
        if let Some(tx) = last_tx_to_insert {
 | 
			
		||||
            log_manager.revert_to(tx.seq - 1)?;
 | 
			
		||||
            log_manager.put_tx(tx)?;
 | 
			
		||||
@ -727,6 +764,41 @@ impl LogManager {
 | 
			
		||||
        Ok(log_manager)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn start_receiver(
 | 
			
		||||
        &mut self,
 | 
			
		||||
        rx: mpsc::Receiver<UpdateFlowMessage>,
 | 
			
		||||
        executor: task_executor::TaskExecutor,
 | 
			
		||||
    ) {
 | 
			
		||||
        let flow_store = self.flow_store.clone();
 | 
			
		||||
        executor.spawn(
 | 
			
		||||
            async move {
 | 
			
		||||
                loop {
 | 
			
		||||
                    match rx.recv() {
 | 
			
		||||
                        std::result::Result::Ok(data) => {
 | 
			
		||||
                            // Update the root index.
 | 
			
		||||
                            flow_store.put_batch_root_list(data.root_map).unwrap();
 | 
			
		||||
                            // Update the flow database.
 | 
			
		||||
                            // This should be called before `complete_last_chunk_merkle` so that we do not save
 | 
			
		||||
                            // subtrees with data known.
 | 
			
		||||
                            flow_store
 | 
			
		||||
                                .append_entries(ChunkArray {
 | 
			
		||||
                                    data: vec![0; data.pad_data],
 | 
			
		||||
                                    start_index: data.tx_start_flow_index,
 | 
			
		||||
                                })
 | 
			
		||||
                                .unwrap();
 | 
			
		||||
                        }
 | 
			
		||||
                        std::result::Result::Err(_) => {
 | 
			
		||||
                            error!("Receiver error");
 | 
			
		||||
                        }
 | 
			
		||||
                    };
 | 
			
		||||
                }
 | 
			
		||||
            },
 | 
			
		||||
            "pad_tx",
 | 
			
		||||
        );
 | 
			
		||||
        // Wait for the spawned thread to finish
 | 
			
		||||
        // let _ = handle.join().expect("Thread panicked");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn gen_proof(&self, flow_index: u64, maybe_root: Option<DataRoot>) -> Result<FlowProof> {
 | 
			
		||||
        match maybe_root {
 | 
			
		||||
            None => self.gen_proof_at_version(flow_index, None),
 | 
			
		||||
@ -863,6 +935,7 @@ impl LogManager {
 | 
			
		||||
        );
 | 
			
		||||
        if extra != 0 {
 | 
			
		||||
            for pad_data in Self::padding((first_subtree_size - extra) as usize) {
 | 
			
		||||
                let mut is_full_empty = true;
 | 
			
		||||
                let mut root_map = BTreeMap::new();
 | 
			
		||||
 | 
			
		||||
                // Update the in-memory merkle tree.
 | 
			
		||||
@ -874,6 +947,7 @@ impl LogManager {
 | 
			
		||||
 | 
			
		||||
                let mut completed_chunk_index = None;
 | 
			
		||||
                if pad_data.len() < last_chunk_pad {
 | 
			
		||||
                    is_full_empty = false;
 | 
			
		||||
                    merkle
 | 
			
		||||
                        .last_chunk_merkle
 | 
			
		||||
                        .append_list(data_to_merkle_leaves(&pad_data)?);
 | 
			
		||||
@ -882,6 +956,7 @@ impl LogManager {
 | 
			
		||||
                        .update_last(*merkle.last_chunk_merkle.root());
 | 
			
		||||
                } else {
 | 
			
		||||
                    if last_chunk_pad != 0 {
 | 
			
		||||
                        is_full_empty = false;
 | 
			
		||||
                        // Pad the last chunk.
 | 
			
		||||
                        merkle
 | 
			
		||||
                            .last_chunk_merkle
 | 
			
		||||
@ -910,16 +985,26 @@ impl LogManager {
 | 
			
		||||
                    assert_eq!(pad_data.len(), start_index * ENTRY_SIZE);
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                // Update the root index.
 | 
			
		||||
                self.flow_store.put_batch_root_list(root_map)?;
 | 
			
		||||
                // Update the flow database.
 | 
			
		||||
                // This should be called before `complete_last_chunk_merkle` so that we do not save
 | 
			
		||||
                // subtrees with data known.
 | 
			
		||||
                let data_size = pad_data.len() / ENTRY_SIZE;
 | 
			
		||||
                self.flow_store.append_entries(ChunkArray {
 | 
			
		||||
                    data: pad_data,
 | 
			
		||||
                    start_index: tx_start_flow_index,
 | 
			
		||||
                })?;
 | 
			
		||||
                if is_full_empty {
 | 
			
		||||
                    self.sender.send(UpdateFlowMessage {
 | 
			
		||||
                        root_map,
 | 
			
		||||
                        pad_data: pad_data.len(),
 | 
			
		||||
                        tx_start_flow_index,
 | 
			
		||||
                    })?;
 | 
			
		||||
                } else {
 | 
			
		||||
                    self.flow_store.put_batch_root_list(root_map).unwrap();
 | 
			
		||||
                    // Update the flow database.
 | 
			
		||||
                    // This should be called before `complete_last_chunk_merkle` so that we do not save
 | 
			
		||||
                    // subtrees with data known.
 | 
			
		||||
                    self.flow_store
 | 
			
		||||
                        .append_entries(ChunkArray {
 | 
			
		||||
                            data: pad_data.to_vec(),
 | 
			
		||||
                            start_index: tx_start_flow_index,
 | 
			
		||||
                        })
 | 
			
		||||
                        .unwrap();
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                tx_start_flow_index += data_size as u64;
 | 
			
		||||
                if let Some(index) = completed_chunk_index {
 | 
			
		||||
                    self.complete_last_chunk_merkle(index, &mut *merkle)?;
 | 
			
		||||
 | 
			
		||||
@ -76,6 +76,14 @@ pub trait LogStoreRead: LogStoreChunkRead {
 | 
			
		||||
 | 
			
		||||
    /// Return flow root and length.
 | 
			
		||||
    fn get_context(&self) -> Result<(DataRoot, u64)>;
 | 
			
		||||
 | 
			
		||||
    fn pull_seal_chunk(&self, seal_index_max: usize) -> Result<Option<Vec<SealTask>>>;
 | 
			
		||||
 | 
			
		||||
    fn get_num_entries(&self) -> Result<u64>;
 | 
			
		||||
 | 
			
		||||
    fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>>;
 | 
			
		||||
 | 
			
		||||
    fn get_shard_config(&self) -> ShardConfig;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub trait LogStoreChunkRead {
 | 
			
		||||
@ -145,6 +153,10 @@ pub trait LogStoreWrite: LogStoreChunkWrite {
 | 
			
		||||
    ) -> Result<bool>;
 | 
			
		||||
 | 
			
		||||
    fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()>;
 | 
			
		||||
 | 
			
		||||
    fn update_shard_config(&self, shard_config: ShardConfig);
 | 
			
		||||
 | 
			
		||||
    fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()>;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub trait LogStoreChunkWrite {
 | 
			
		||||
@ -168,19 +180,10 @@ pub trait LogChunkStore: LogStoreChunkRead + LogStoreChunkWrite + Send + Sync +
 | 
			
		||||
impl<T: LogStoreChunkRead + LogStoreChunkWrite + Send + Sync + 'static> LogChunkStore for T {}
 | 
			
		||||
 | 
			
		||||
pub trait Store:
 | 
			
		||||
    LogStoreRead + LogStoreWrite + LogStoreInner + config::Configurable + Send + Sync + 'static
 | 
			
		||||
    LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static
 | 
			
		||||
{
 | 
			
		||||
}
 | 
			
		||||
impl<
 | 
			
		||||
        T: LogStoreRead + LogStoreWrite + LogStoreInner + config::Configurable + Send + Sync + 'static,
 | 
			
		||||
    > Store for T
 | 
			
		||||
{
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub trait LogStoreInner {
 | 
			
		||||
    fn flow(&self) -> &dyn Flow;
 | 
			
		||||
    fn flow_mut(&mut self) -> &mut dyn Flow;
 | 
			
		||||
}
 | 
			
		||||
impl<T: LogStoreRead + LogStoreWrite + config::Configurable + Send + Sync + 'static> Store for T {}
 | 
			
		||||
 | 
			
		||||
pub struct MineLoadChunk {
 | 
			
		||||
    // Use `Vec` instead of array to avoid thread stack overflow.
 | 
			
		||||
 | 
			
		||||
@ -8,11 +8,15 @@ use ethereum_types::H256;
 | 
			
		||||
use rand::random;
 | 
			
		||||
use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE};
 | 
			
		||||
use std::cmp;
 | 
			
		||||
use task_executor::test_utils::TestRuntime;
 | 
			
		||||
 | 
			
		||||
#[test]
 | 
			
		||||
fn test_put_get() {
 | 
			
		||||
    let config = LogConfig::default();
 | 
			
		||||
    let store = LogManager::memorydb(config.clone()).unwrap();
 | 
			
		||||
    let runtime = TestRuntime::default();
 | 
			
		||||
 | 
			
		||||
    let executor = runtime.task_executor.clone();
 | 
			
		||||
    let store = LogManager::memorydb(config.clone(), executor).unwrap();
 | 
			
		||||
    let chunk_count = config.flow.batch_size + config.flow.batch_size / 2 - 1;
 | 
			
		||||
    // Aligned with size.
 | 
			
		||||
    let start_offset = 1024;
 | 
			
		||||
@ -169,8 +173,10 @@ fn test_put_tx() {
 | 
			
		||||
 | 
			
		||||
fn create_store() -> LogManager {
 | 
			
		||||
    let config = LogConfig::default();
 | 
			
		||||
    let runtime = TestRuntime::default();
 | 
			
		||||
    let executor = runtime.task_executor.clone();
 | 
			
		||||
 | 
			
		||||
    LogManager::memorydb(config).unwrap()
 | 
			
		||||
    LogManager::memorydb(config, executor).unwrap()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn put_tx(store: &mut LogManager, chunk_count: usize, seq: u64) {
 | 
			
		||||
 | 
			
		||||
@ -492,7 +492,7 @@ impl SerialSyncController {
 | 
			
		||||
 | 
			
		||||
        metrics::SERIAL_SYNC_SEGMENT_LATENCY.update_since(since.0);
 | 
			
		||||
 | 
			
		||||
        let shard_config = self.store.get_store().flow().get_shard_config();
 | 
			
		||||
        let shard_config = self.store.get_store().get_shard_config();
 | 
			
		||||
        let next_chunk = segment_to_sector(shard_config.next_segment_index(
 | 
			
		||||
            sector_to_segment(from_chunk),
 | 
			
		||||
            sector_to_segment(self.tx_start_chunk_in_flow),
 | 
			
		||||
@ -1622,7 +1622,7 @@ mod tests {
 | 
			
		||||
        let num_chunks = 123;
 | 
			
		||||
 | 
			
		||||
        let config = LogConfig::default();
 | 
			
		||||
        let store = Arc::new(LogManager::memorydb(config).unwrap());
 | 
			
		||||
        let store = Arc::new(LogManager::memorydb(config, task_executor.clone()).unwrap());
 | 
			
		||||
 | 
			
		||||
        create_controller(task_executor, peer_id, store, tx_id, num_chunks)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -807,7 +807,7 @@ impl SyncService {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn tx_sync_start_index(store: &Store, tx: &Transaction) -> Result<Option<u64>> {
 | 
			
		||||
        let shard_config = store.get_store().flow().get_shard_config();
 | 
			
		||||
        let shard_config = store.get_store().get_shard_config();
 | 
			
		||||
        let start_segment = sector_to_segment(tx.start_entry_index());
 | 
			
		||||
        let end =
 | 
			
		||||
            bytes_to_chunks(usize::try_from(tx.size).map_err(|e| anyhow!("tx size e={}", e))?);
 | 
			
		||||
@ -1294,7 +1294,9 @@ mod tests {
 | 
			
		||||
 | 
			
		||||
        let config = LogConfig::default();
 | 
			
		||||
 | 
			
		||||
        let store = Arc::new(LogManager::memorydb(config.clone()).unwrap());
 | 
			
		||||
        let executor = runtime.task_executor.clone();
 | 
			
		||||
 | 
			
		||||
        let store = Arc::new(LogManager::memorydb(config.clone(), executor).unwrap());
 | 
			
		||||
 | 
			
		||||
        let init_peer_id = identity::Keypair::generate_ed25519().public().to_peer_id();
 | 
			
		||||
        let file_location_cache: Arc<FileLocationCache> =
 | 
			
		||||
 | 
			
		||||
@ -9,6 +9,8 @@ use storage::{
 | 
			
		||||
    LogManager,
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
use task_executor::test_utils::TestRuntime;
 | 
			
		||||
 | 
			
		||||
/// Creates stores for local node and peers with initialized transaction of specified chunk count.
 | 
			
		||||
/// The first store is for local node, and data not stored. The second store is for peers, and all
 | 
			
		||||
/// transactions are finalized for file sync.
 | 
			
		||||
@ -22,8 +24,11 @@ pub fn create_2_store(
 | 
			
		||||
    Vec<Vec<u8>>,
 | 
			
		||||
) {
 | 
			
		||||
    let config = LogConfig::default();
 | 
			
		||||
    let mut store = LogManager::memorydb(config.clone()).unwrap();
 | 
			
		||||
    let mut peer_store = LogManager::memorydb(config).unwrap();
 | 
			
		||||
    let runtime = TestRuntime::default();
 | 
			
		||||
 | 
			
		||||
    let executor = runtime.task_executor.clone();
 | 
			
		||||
    let mut store = LogManager::memorydb(config.clone(), executor.clone()).unwrap();
 | 
			
		||||
    let mut peer_store = LogManager::memorydb(config, executor).unwrap();
 | 
			
		||||
 | 
			
		||||
    let mut offset = 1;
 | 
			
		||||
    let mut txs = vec![];
 | 
			
		||||
@ -115,7 +120,10 @@ pub mod tests {
 | 
			
		||||
 | 
			
		||||
    impl TestStoreRuntime {
 | 
			
		||||
        pub fn new_store() -> impl LogStore {
 | 
			
		||||
            LogManager::memorydb(LogConfig::default()).unwrap()
 | 
			
		||||
            let runtime = TestRuntime::default();
 | 
			
		||||
 | 
			
		||||
            let executor = runtime.task_executor.clone();
 | 
			
		||||
            LogManager::memorydb(LogConfig::default(), executor).unwrap()
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        pub fn new(store: Arc<dyn LogStore>) -> TestStoreRuntime {
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user