mirror of
				https://github.com/0glabs/0g-storage-node.git
				synced 2025-11-04 00:27:39 +00:00 
			
		
		
		
	Compare commits
	
		
			2 Commits
		
	
	
		
			ae08773df5
			...
			c956b28eb7
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					c956b28eb7 | ||
| 
						 | 
					fae2d5efb6 | 
@ -15,19 +15,13 @@ abigen!(
 | 
				
			|||||||
);
 | 
					);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[cfg(feature = "dev")]
 | 
					#[cfg(feature = "dev")]
 | 
				
			||||||
abigen!(
 | 
					abigen!(ZgsFlow, "../../storage-contracts-abis/Flow.json");
 | 
				
			||||||
    ZgsFlow,
 | 
					 | 
				
			||||||
    "../../0g-storage-contracts-dev/artifacts/contracts/dataFlow/Flow.sol/Flow.json"
 | 
					 | 
				
			||||||
);
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[cfg(feature = "dev")]
 | 
					#[cfg(feature = "dev")]
 | 
				
			||||||
abigen!(
 | 
					abigen!(PoraMine, "../../storage-contracts-abis/PoraMine.json");
 | 
				
			||||||
    PoraMine,
 | 
					 | 
				
			||||||
    "../../0g-storage-contracts-dev/artifacts/contracts/miner/Mine.sol/PoraMine.json"
 | 
					 | 
				
			||||||
);
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[cfg(feature = "dev")]
 | 
					#[cfg(feature = "dev")]
 | 
				
			||||||
abigen!(
 | 
					abigen!(
 | 
				
			||||||
    ChunkLinearReward,
 | 
					    ChunkLinearReward,
 | 
				
			||||||
    "../../0g-storage-contracts-dev/artifacts/contracts/reward/ChunkLinearReward.sol/ChunkLinearReward.json"
 | 
					    "../../storage-contracts-abis/ChunkLinearReward.json"
 | 
				
			||||||
);
 | 
					);
 | 
				
			||||||
 | 
				
			|||||||
@ -277,6 +277,9 @@ impl LogSyncManager {
 | 
				
			|||||||
                                .remove_finalized_block_interval_minutes,
 | 
					                                .remove_finalized_block_interval_minutes,
 | 
				
			||||||
                        );
 | 
					                        );
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    // start the pad data store
 | 
				
			||||||
 | 
					                    log_sync_manager.store.start_padding(&executor_clone);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    let (watch_progress_tx, watch_progress_rx) =
 | 
					                    let (watch_progress_tx, watch_progress_rx) =
 | 
				
			||||||
                        tokio::sync::mpsc::unbounded_channel();
 | 
					                        tokio::sync::mpsc::unbounded_channel();
 | 
				
			||||||
                    let watch_rx = log_sync_manager.log_fetcher.start_watch(
 | 
					                    let watch_rx = log_sync_manager.log_fetcher.start_watch(
 | 
				
			||||||
@ -509,6 +512,7 @@ impl LogSyncManager {
 | 
				
			|||||||
                }
 | 
					                }
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            self.data_cache.garbage_collect(self.next_tx_seq);
 | 
					            self.data_cache.garbage_collect(self.next_tx_seq);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            self.next_tx_seq += 1;
 | 
					            self.next_tx_seq += 1;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            // Check if the computed data root matches on-chain state.
 | 
					            // Check if the computed data root matches on-chain state.
 | 
				
			||||||
 | 
				
			|||||||
@ -5,17 +5,20 @@ use ethereum_types::Address;
 | 
				
			|||||||
use ethers::contract::ContractCall;
 | 
					use ethers::contract::ContractCall;
 | 
				
			||||||
use ethers::contract::EthEvent;
 | 
					use ethers::contract::EthEvent;
 | 
				
			||||||
use std::sync::Arc;
 | 
					use std::sync::Arc;
 | 
				
			||||||
 | 
					use storage::log_store::log_manager::DATA_DB_KEY;
 | 
				
			||||||
use storage::H256;
 | 
					use storage::H256;
 | 
				
			||||||
use storage_async::Store;
 | 
					use storage_async::Store;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const MINER_ID: &str = "mine.miner_id";
 | 
					const MINER_ID: &str = "mine.miner_id";
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub async fn load_miner_id(store: &Store) -> storage::error::Result<Option<H256>> {
 | 
					pub async fn load_miner_id(store: &Store) -> storage::error::Result<Option<H256>> {
 | 
				
			||||||
    store.get_config_decoded(&MINER_ID).await
 | 
					    store.get_config_decoded(&MINER_ID, DATA_DB_KEY).await
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async fn set_miner_id(store: &Store, miner_id: &H256) -> storage::error::Result<()> {
 | 
					async fn set_miner_id(store: &Store, miner_id: &H256) -> storage::error::Result<()> {
 | 
				
			||||||
    store.set_config_encoded(&MINER_ID, miner_id).await
 | 
					    store
 | 
				
			||||||
 | 
					        .set_config_encoded(&MINER_ID, miner_id, DATA_DB_KEY)
 | 
				
			||||||
 | 
					        .await
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub(crate) async fn check_and_request_miner_id(
 | 
					pub(crate) async fn check_and_request_miner_id(
 | 
				
			||||||
 | 
				
			|||||||
@ -11,7 +11,7 @@ use std::str::FromStr;
 | 
				
			|||||||
use std::sync::Arc;
 | 
					use std::sync::Arc;
 | 
				
			||||||
use std::time::Duration;
 | 
					use std::time::Duration;
 | 
				
			||||||
use storage::config::{ShardConfig, SHARD_CONFIG_KEY};
 | 
					use storage::config::{ShardConfig, SHARD_CONFIG_KEY};
 | 
				
			||||||
use storage::log_store::log_manager::PORA_CHUNK_SIZE;
 | 
					use storage::log_store::log_manager::{DATA_DB_KEY, FLOW_DB_KEY, PORA_CHUNK_SIZE};
 | 
				
			||||||
use storage_async::Store;
 | 
					use storage_async::Store;
 | 
				
			||||||
use task_executor::TaskExecutor;
 | 
					use task_executor::TaskExecutor;
 | 
				
			||||||
use tokio::sync::{broadcast, mpsc};
 | 
					use tokio::sync::{broadcast, mpsc};
 | 
				
			||||||
@ -252,7 +252,7 @@ impl Pruner {
 | 
				
			|||||||
            .update_shard_config(self.config.shard_config)
 | 
					            .update_shard_config(self.config.shard_config)
 | 
				
			||||||
            .await;
 | 
					            .await;
 | 
				
			||||||
        self.store
 | 
					        self.store
 | 
				
			||||||
            .set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config)
 | 
					            .set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config, DATA_DB_KEY)
 | 
				
			||||||
            .await
 | 
					            .await
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -265,17 +265,22 @@ impl Pruner {
 | 
				
			|||||||
            .set_config_encoded(
 | 
					            .set_config_encoded(
 | 
				
			||||||
                &FIRST_REWARDABLE_CHUNK_KEY,
 | 
					                &FIRST_REWARDABLE_CHUNK_KEY,
 | 
				
			||||||
                &(new_first_rewardable_chunk, new_first_tx_seq),
 | 
					                &(new_first_rewardable_chunk, new_first_tx_seq),
 | 
				
			||||||
 | 
					                FLOW_DB_KEY,
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
            .await
 | 
					            .await
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async fn get_shard_config(store: &Store) -> Result<Option<ShardConfig>> {
 | 
					async fn get_shard_config(store: &Store) -> Result<Option<ShardConfig>> {
 | 
				
			||||||
    store.get_config_decoded(&SHARD_CONFIG_KEY).await
 | 
					    store
 | 
				
			||||||
 | 
					        .get_config_decoded(&SHARD_CONFIG_KEY, DATA_DB_KEY)
 | 
				
			||||||
 | 
					        .await
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async fn get_first_rewardable_chunk(store: &Store) -> Result<Option<(u64, u64)>> {
 | 
					async fn get_first_rewardable_chunk(store: &Store) -> Result<Option<(u64, u64)>> {
 | 
				
			||||||
    store.get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY).await
 | 
					    store
 | 
				
			||||||
 | 
					        .get_config_decoded(&FIRST_REWARDABLE_CHUNK_KEY, FLOW_DB_KEY)
 | 
				
			||||||
 | 
					        .await
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[derive(Debug)]
 | 
					#[derive(Debug)]
 | 
				
			||||||
 | 
				
			|||||||
@ -1054,8 +1054,7 @@ mod tests {
 | 
				
			|||||||
            let (sync_send, sync_recv) = channel::Channel::unbounded("test");
 | 
					            let (sync_send, sync_recv) = channel::Channel::unbounded("test");
 | 
				
			||||||
            let (chunk_pool_send, _chunk_pool_recv) = mpsc::unbounded_channel();
 | 
					            let (chunk_pool_send, _chunk_pool_recv) = mpsc::unbounded_channel();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            let executor = runtime.task_executor.clone();
 | 
					            let store = LogManager::memorydb(LogConfig::default()).unwrap();
 | 
				
			||||||
            let store = LogManager::memorydb(LogConfig::default(), executor).unwrap();
 | 
					 | 
				
			||||||
            Self {
 | 
					            Self {
 | 
				
			||||||
                runtime,
 | 
					                runtime,
 | 
				
			||||||
                network_globals: Arc::new(network_globals),
 | 
					                network_globals: Arc::new(network_globals),
 | 
				
			||||||
 | 
				
			|||||||
@ -89,10 +89,9 @@ impl ClientBuilder {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    /// Initializes in-memory storage.
 | 
					    /// Initializes in-memory storage.
 | 
				
			||||||
    pub fn with_memory_store(mut self) -> Result<Self, String> {
 | 
					    pub fn with_memory_store(mut self) -> Result<Self, String> {
 | 
				
			||||||
        let executor = require!("sync", self, runtime_context).clone().executor;
 | 
					 | 
				
			||||||
        // TODO(zz): Set config.
 | 
					        // TODO(zz): Set config.
 | 
				
			||||||
        let store = Arc::new(
 | 
					        let store = Arc::new(
 | 
				
			||||||
            LogManager::memorydb(LogConfig::default(), executor)
 | 
					            LogManager::memorydb(LogConfig::default())
 | 
				
			||||||
                .map_err(|e| format!("Unable to start in-memory store: {:?}", e))?,
 | 
					                .map_err(|e| format!("Unable to start in-memory store: {:?}", e))?,
 | 
				
			||||||
        );
 | 
					        );
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -110,13 +109,11 @@ impl ClientBuilder {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    /// Initializes RocksDB storage.
 | 
					    /// Initializes RocksDB storage.
 | 
				
			||||||
    pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
 | 
					    pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
 | 
				
			||||||
        let executor = require!("sync", self, runtime_context).clone().executor;
 | 
					 | 
				
			||||||
        let store = Arc::new(
 | 
					        let store = Arc::new(
 | 
				
			||||||
            LogManager::rocksdb(
 | 
					            LogManager::rocksdb(
 | 
				
			||||||
                config.log_config.clone(),
 | 
					                config.log_config.clone(),
 | 
				
			||||||
                config.db_dir.join("flow_db"),
 | 
					                config.db_dir.join("flow_db"),
 | 
				
			||||||
                config.db_dir.join("data_db"),
 | 
					                config.db_dir.join("data_db"),
 | 
				
			||||||
                executor,
 | 
					 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
            .map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
 | 
					            .map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
 | 
				
			||||||
        );
 | 
					        );
 | 
				
			||||||
 | 
				
			|||||||
@ -74,9 +74,11 @@ impl Store {
 | 
				
			|||||||
    pub async fn get_config_decoded<K: AsRef<[u8]> + Send + Sync, T: Decode + Send + 'static>(
 | 
					    pub async fn get_config_decoded<K: AsRef<[u8]> + Send + Sync, T: Decode + Send + 'static>(
 | 
				
			||||||
        &self,
 | 
					        &self,
 | 
				
			||||||
        key: &K,
 | 
					        key: &K,
 | 
				
			||||||
 | 
					        dest: &str,
 | 
				
			||||||
    ) -> Result<Option<T>> {
 | 
					    ) -> Result<Option<T>> {
 | 
				
			||||||
        let key = key.as_ref().to_vec();
 | 
					        let key = key.as_ref().to_vec();
 | 
				
			||||||
        self.spawn(move |store| store.get_config_decoded(&key))
 | 
					        let dest = dest.to_string();
 | 
				
			||||||
 | 
					        self.spawn(move |store| store.get_config_decoded(&key, &dest))
 | 
				
			||||||
            .await
 | 
					            .await
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -84,10 +86,12 @@ impl Store {
 | 
				
			|||||||
        &self,
 | 
					        &self,
 | 
				
			||||||
        key: &K,
 | 
					        key: &K,
 | 
				
			||||||
        value: &T,
 | 
					        value: &T,
 | 
				
			||||||
 | 
					        dest: &str,
 | 
				
			||||||
    ) -> anyhow::Result<()> {
 | 
					    ) -> anyhow::Result<()> {
 | 
				
			||||||
        let key = key.as_ref().to_vec();
 | 
					        let key = key.as_ref().to_vec();
 | 
				
			||||||
        let value = value.as_ssz_bytes();
 | 
					        let value = value.as_ssz_bytes();
 | 
				
			||||||
        self.spawn(move |store| store.set_config(&key, &value))
 | 
					        let dest = dest.to_string();
 | 
				
			||||||
 | 
					        self.spawn(move |store| store.set_config(&key, &value, &dest))
 | 
				
			||||||
            .await
 | 
					            .await
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -14,25 +14,16 @@ use storage::{
 | 
				
			|||||||
    },
 | 
					    },
 | 
				
			||||||
    LogManager,
 | 
					    LogManager,
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
use task_executor::test_utils::TestRuntime;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
fn write_performance(c: &mut Criterion) {
 | 
					fn write_performance(c: &mut Criterion) {
 | 
				
			||||||
    if Path::new("db_write").exists() {
 | 
					    if Path::new("db_write").exists() {
 | 
				
			||||||
        fs::remove_dir_all("db_write").unwrap();
 | 
					        fs::remove_dir_all("db_write").unwrap();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    let runtime = TestRuntime::default();
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    let executor = runtime.task_executor.clone();
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
 | 
					    let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
 | 
				
			||||||
        LogManager::rocksdb(
 | 
					        LogManager::rocksdb(LogConfig::default(), "db_flow_write", "db_data_write")
 | 
				
			||||||
            LogConfig::default(),
 | 
					            .map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
 | 
				
			||||||
            "db_flow_write",
 | 
					            .unwrap(),
 | 
				
			||||||
            "db_data_write",
 | 
					 | 
				
			||||||
            executor,
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
        .map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
 | 
					 | 
				
			||||||
        .unwrap(),
 | 
					 | 
				
			||||||
    ));
 | 
					    ));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    let chunk_count = 2048;
 | 
					    let chunk_count = 2048;
 | 
				
			||||||
@ -114,19 +105,10 @@ fn read_performance(c: &mut Criterion) {
 | 
				
			|||||||
        fs::remove_dir_all("db_read").unwrap();
 | 
					        fs::remove_dir_all("db_read").unwrap();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    let runtime = TestRuntime::default();
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    let executor = runtime.task_executor.clone();
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
 | 
					    let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
 | 
				
			||||||
        LogManager::rocksdb(
 | 
					        LogManager::rocksdb(LogConfig::default(), "db_flow_read", "db_data_read")
 | 
				
			||||||
            LogConfig::default(),
 | 
					            .map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
 | 
				
			||||||
            "db_flow_read",
 | 
					            .unwrap(),
 | 
				
			||||||
            "db_data_read",
 | 
					 | 
				
			||||||
            executor,
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
        .map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
 | 
					 | 
				
			||||||
        .unwrap(),
 | 
					 | 
				
			||||||
    ));
 | 
					    ));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    let tx_size = 1000;
 | 
					    let tx_size = 1000;
 | 
				
			||||||
 | 
				
			|||||||
@ -2,16 +2,55 @@ use anyhow::{anyhow, Result};
 | 
				
			|||||||
use kvdb::{DBKey, DBOp};
 | 
					use kvdb::{DBKey, DBOp};
 | 
				
			||||||
use ssz::{Decode, Encode};
 | 
					use ssz::{Decode, Encode};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					use crate::log_store::log_manager::{COL_MISC, DATA_DB_KEY, FLOW_DB_KEY};
 | 
				
			||||||
use crate::LogManager;
 | 
					use crate::LogManager;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use super::log_manager::COL_MISC;
 | 
					macro_rules! db_operation {
 | 
				
			||||||
 | 
					    ($self:expr, $dest:expr, get, $key:expr) => {{
 | 
				
			||||||
 | 
					        let db = match $dest {
 | 
				
			||||||
 | 
					            DATA_DB_KEY => &$self.data_db,
 | 
				
			||||||
 | 
					            FLOW_DB_KEY => &$self.flow_db,
 | 
				
			||||||
 | 
					            _ => return Err(anyhow!("Invalid destination")),
 | 
				
			||||||
 | 
					        };
 | 
				
			||||||
 | 
					        Ok(db.get(COL_MISC, $key)?)
 | 
				
			||||||
 | 
					    }};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    ($self:expr, $dest:expr, put, $key:expr, $value:expr) => {{
 | 
				
			||||||
 | 
					        let db = match $dest {
 | 
				
			||||||
 | 
					            DATA_DB_KEY => &$self.data_db,
 | 
				
			||||||
 | 
					            FLOW_DB_KEY => &$self.flow_db,
 | 
				
			||||||
 | 
					            _ => return Err(anyhow!("Invalid destination")),
 | 
				
			||||||
 | 
					        };
 | 
				
			||||||
 | 
					        Ok(db.put(COL_MISC, $key, $value)?)
 | 
				
			||||||
 | 
					    }};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    ($self:expr, $dest:expr, delete, $key:expr) => {{
 | 
				
			||||||
 | 
					        let db = match $dest {
 | 
				
			||||||
 | 
					            DATA_DB_KEY => &$self.data_db,
 | 
				
			||||||
 | 
					            FLOW_DB_KEY => &$self.flow_db,
 | 
				
			||||||
 | 
					            _ => return Err(anyhow!("Invalid destination")),
 | 
				
			||||||
 | 
					        };
 | 
				
			||||||
 | 
					        Ok(db.delete(COL_MISC, $key)?)
 | 
				
			||||||
 | 
					    }};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    ($self:expr, $dest:expr, transaction, $tx:expr) => {{
 | 
				
			||||||
 | 
					        let db = match $dest {
 | 
				
			||||||
 | 
					            DATA_DB_KEY => &$self.data_db,
 | 
				
			||||||
 | 
					            FLOW_DB_KEY => &$self.flow_db,
 | 
				
			||||||
 | 
					            _ => return Err(anyhow!("Invalid destination")),
 | 
				
			||||||
 | 
					        };
 | 
				
			||||||
 | 
					        let mut db_tx = db.transaction();
 | 
				
			||||||
 | 
					        db_tx.ops = $tx.ops;
 | 
				
			||||||
 | 
					        Ok(db.write(db_tx)?)
 | 
				
			||||||
 | 
					    }};
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub trait Configurable {
 | 
					pub trait Configurable {
 | 
				
			||||||
    fn get_config(&self, key: &[u8]) -> Result<Option<Vec<u8>>>;
 | 
					    fn get_config(&self, key: &[u8], dest: &str) -> Result<Option<Vec<u8>>>;
 | 
				
			||||||
    fn set_config(&self, key: &[u8], value: &[u8]) -> Result<()>;
 | 
					    fn set_config(&self, key: &[u8], value: &[u8], dest: &str) -> Result<()>;
 | 
				
			||||||
    fn remove_config(&self, key: &[u8]) -> Result<()>;
 | 
					    fn remove_config(&self, key: &[u8], dest: &str) -> Result<()>;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn exec_configs(&self, tx: ConfigTx) -> Result<()>;
 | 
					    fn exec_configs(&self, tx: ConfigTx, dest: &str) -> Result<()>;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[derive(Default)]
 | 
					#[derive(Default)]
 | 
				
			||||||
@ -41,8 +80,12 @@ impl ConfigTx {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub trait ConfigurableExt: Configurable {
 | 
					pub trait ConfigurableExt: Configurable {
 | 
				
			||||||
    fn get_config_decoded<K: AsRef<[u8]>, T: Decode>(&self, key: &K) -> Result<Option<T>> {
 | 
					    fn get_config_decoded<K: AsRef<[u8]>, T: Decode>(
 | 
				
			||||||
        match self.get_config(key.as_ref())? {
 | 
					        &self,
 | 
				
			||||||
 | 
					        key: &K,
 | 
				
			||||||
 | 
					        dest: &str,
 | 
				
			||||||
 | 
					    ) -> Result<Option<T>> {
 | 
				
			||||||
 | 
					        match self.get_config(key.as_ref(), dest)? {
 | 
				
			||||||
            Some(val) => Ok(Some(
 | 
					            Some(val) => Ok(Some(
 | 
				
			||||||
                T::from_ssz_bytes(&val).map_err(|e| anyhow!("SSZ decode error: {:?}", e))?,
 | 
					                T::from_ssz_bytes(&val).map_err(|e| anyhow!("SSZ decode error: {:?}", e))?,
 | 
				
			||||||
            )),
 | 
					            )),
 | 
				
			||||||
@ -50,36 +93,36 @@ pub trait ConfigurableExt: Configurable {
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn set_config_encoded<K: AsRef<[u8]>, T: Encode>(&self, key: &K, value: &T) -> Result<()> {
 | 
					    fn set_config_encoded<K: AsRef<[u8]>, T: Encode>(
 | 
				
			||||||
        self.set_config(key.as_ref(), &value.as_ssz_bytes())
 | 
					        &self,
 | 
				
			||||||
 | 
					        key: &K,
 | 
				
			||||||
 | 
					        value: &T,
 | 
				
			||||||
 | 
					        dest: &str,
 | 
				
			||||||
 | 
					    ) -> Result<()> {
 | 
				
			||||||
 | 
					        self.set_config(key.as_ref(), &value.as_ssz_bytes(), dest)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn remove_config_by_key<K: AsRef<[u8]>>(&self, key: &K) -> Result<()> {
 | 
					    fn remove_config_by_key<K: AsRef<[u8]>>(&self, key: &K, dest: &str) -> Result<()> {
 | 
				
			||||||
        self.remove_config(key.as_ref())
 | 
					        self.remove_config(key.as_ref(), dest)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl<T: ?Sized + Configurable> ConfigurableExt for T {}
 | 
					impl<T: ?Sized + Configurable> ConfigurableExt for T {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl Configurable for LogManager {
 | 
					impl Configurable for LogManager {
 | 
				
			||||||
    fn get_config(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
 | 
					    fn get_config(&self, key: &[u8], dest: &str) -> Result<Option<Vec<u8>>> {
 | 
				
			||||||
        Ok(self.flow_db.get(COL_MISC, key)?)
 | 
					        db_operation!(self, dest, get, key)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn set_config(&self, key: &[u8], value: &[u8]) -> Result<()> {
 | 
					    fn set_config(&self, key: &[u8], value: &[u8], dest: &str) -> Result<()> {
 | 
				
			||||||
        self.flow_db.put(COL_MISC, key, value)?;
 | 
					        db_operation!(self, dest, put, key, value)
 | 
				
			||||||
        Ok(())
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn remove_config(&self, key: &[u8]) -> Result<()> {
 | 
					    fn remove_config(&self, key: &[u8], dest: &str) -> Result<()> {
 | 
				
			||||||
        Ok(self.flow_db.delete(COL_MISC, key)?)
 | 
					        db_operation!(self, dest, delete, key)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn exec_configs(&self, tx: ConfigTx) -> Result<()> {
 | 
					    fn exec_configs(&self, tx: ConfigTx, dest: &str) -> Result<()> {
 | 
				
			||||||
        let mut db_tx = self.flow_db.transaction();
 | 
					        db_operation!(self, dest, transaction, tx)
 | 
				
			||||||
        db_tx.ops = tx.ops;
 | 
					 | 
				
			||||||
        self.flow_db.write(db_tx)?;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        Ok(())
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -1,4 +1,5 @@
 | 
				
			|||||||
use super::load_chunk::EntryBatch;
 | 
					use super::load_chunk::EntryBatch;
 | 
				
			||||||
 | 
					use super::log_manager::{COL_PAD_DATA_LIST, COL_PAD_DATA_SYNC_HEIGH};
 | 
				
			||||||
use super::seal_task_manager::SealTaskManager;
 | 
					use super::seal_task_manager::SealTaskManager;
 | 
				
			||||||
use super::{MineLoadChunk, SealAnswer, SealTask};
 | 
					use super::{MineLoadChunk, SealAnswer, SealTask};
 | 
				
			||||||
use crate::config::ShardConfig;
 | 
					use crate::config::ShardConfig;
 | 
				
			||||||
@ -25,14 +26,16 @@ use tracing::{debug, error, trace};
 | 
				
			|||||||
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
 | 
					use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub struct FlowStore {
 | 
					pub struct FlowStore {
 | 
				
			||||||
 | 
					    flow_db: Arc<FlowDBStore>,
 | 
				
			||||||
    data_db: Arc<FlowDBStore>,
 | 
					    data_db: Arc<FlowDBStore>,
 | 
				
			||||||
    seal_manager: SealTaskManager,
 | 
					    seal_manager: SealTaskManager,
 | 
				
			||||||
    config: FlowConfig,
 | 
					    config: FlowConfig,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl FlowStore {
 | 
					impl FlowStore {
 | 
				
			||||||
    pub fn new(data_db: Arc<FlowDBStore>, config: FlowConfig) -> Self {
 | 
					    pub fn new(flow_db: Arc<FlowDBStore>, data_db: Arc<FlowDBStore>, config: FlowConfig) -> Self {
 | 
				
			||||||
        Self {
 | 
					        Self {
 | 
				
			||||||
 | 
					            flow_db,
 | 
				
			||||||
            data_db,
 | 
					            data_db,
 | 
				
			||||||
            seal_manager: Default::default(),
 | 
					            seal_manager: Default::default(),
 | 
				
			||||||
            config,
 | 
					            config,
 | 
				
			||||||
@ -199,6 +202,14 @@ impl FlowRead for FlowStore {
 | 
				
			|||||||
    fn get_shard_config(&self) -> ShardConfig {
 | 
					    fn get_shard_config(&self) -> ShardConfig {
 | 
				
			||||||
        *self.config.shard_config.read()
 | 
					        *self.config.shard_config.read()
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    fn get_pad_data(&self, start_index: u64) -> crate::error::Result<Option<Vec<PadPair>>> {
 | 
				
			||||||
 | 
					        self.flow_db.get_pad_data(start_index)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    fn get_pad_data_sync_height(&self) -> Result<Option<u64>> {
 | 
				
			||||||
 | 
					        self.data_db.get_pad_data_sync_height()
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl FlowWrite for FlowStore {
 | 
					impl FlowWrite for FlowStore {
 | 
				
			||||||
@ -266,6 +277,14 @@ impl FlowWrite for FlowStore {
 | 
				
			|||||||
    fn update_shard_config(&self, shard_config: ShardConfig) {
 | 
					    fn update_shard_config(&self, shard_config: ShardConfig) {
 | 
				
			||||||
        *self.config.shard_config.write() = shard_config;
 | 
					        *self.config.shard_config.write() = shard_config;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> crate::error::Result<()> {
 | 
				
			||||||
 | 
					        self.flow_db.put_pad_data(data_sizes, tx_seq)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    fn put_pad_data_sync_height(&self, sync_index: u64) -> crate::error::Result<()> {
 | 
				
			||||||
 | 
					        self.data_db.put_pad_data_sync_height(sync_index)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl FlowSeal for FlowStore {
 | 
					impl FlowSeal for FlowStore {
 | 
				
			||||||
@ -343,6 +362,12 @@ impl FlowSeal for FlowStore {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#[derive(Debug, PartialEq, DeriveEncode, DeriveDecode)]
 | 
				
			||||||
 | 
					pub struct PadPair {
 | 
				
			||||||
 | 
					    pub start_index: u64,
 | 
				
			||||||
 | 
					    pub data_size: u64,
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub struct FlowDBStore {
 | 
					pub struct FlowDBStore {
 | 
				
			||||||
    kvdb: Arc<dyn ZgsKeyValueDB>,
 | 
					    kvdb: Arc<dyn ZgsKeyValueDB>,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@ -443,6 +468,48 @@ impl FlowDBStore {
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
        Ok(self.kvdb.write(tx)?)
 | 
					        Ok(self.kvdb.write(tx)?)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> Result<()> {
 | 
				
			||||||
 | 
					        let mut tx = self.kvdb.transaction();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        let mut buffer = Vec::new();
 | 
				
			||||||
 | 
					        for item in data_sizes {
 | 
				
			||||||
 | 
					            buffer.extend(item.as_ssz_bytes());
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        tx.put(COL_PAD_DATA_LIST, &tx_seq.to_be_bytes(), &buffer);
 | 
				
			||||||
 | 
					        self.kvdb.write(tx)?;
 | 
				
			||||||
 | 
					        Ok(())
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    fn put_pad_data_sync_height(&self, tx_seq: u64) -> Result<()> {
 | 
				
			||||||
 | 
					        let mut tx = self.kvdb.transaction();
 | 
				
			||||||
 | 
					        tx.put(
 | 
				
			||||||
 | 
					            COL_PAD_DATA_SYNC_HEIGH,
 | 
				
			||||||
 | 
					            b"sync_height",
 | 
				
			||||||
 | 
					            &tx_seq.to_be_bytes(),
 | 
				
			||||||
 | 
					        );
 | 
				
			||||||
 | 
					        self.kvdb.write(tx)?;
 | 
				
			||||||
 | 
					        Ok(())
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    fn get_pad_data_sync_height(&self) -> Result<Option<u64>> {
 | 
				
			||||||
 | 
					        match self.kvdb.get(COL_PAD_DATA_SYNC_HEIGH, b"sync_height")? {
 | 
				
			||||||
 | 
					            Some(v) => Ok(Some(u64::from_be_bytes(
 | 
				
			||||||
 | 
					                v.try_into().map_err(|e| anyhow!("{:?}", e))?,
 | 
				
			||||||
 | 
					            ))),
 | 
				
			||||||
 | 
					            None => Ok(None),
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    fn get_pad_data(&self, tx_seq: u64) -> Result<Option<Vec<PadPair>>> {
 | 
				
			||||||
 | 
					        match self.kvdb.get(COL_PAD_DATA_LIST, &tx_seq.to_be_bytes())? {
 | 
				
			||||||
 | 
					            Some(v) => Ok(Some(
 | 
				
			||||||
 | 
					                Vec::<PadPair>::from_ssz_bytes(&v).map_err(Error::from)?,
 | 
				
			||||||
 | 
					            )),
 | 
				
			||||||
 | 
					            None => Ok(None),
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[derive(DeriveEncode, DeriveDecode, Clone, Debug)]
 | 
					#[derive(DeriveEncode, DeriveDecode, Clone, Debug)]
 | 
				
			||||||
 | 
				
			|||||||
@ -1,10 +1,11 @@
 | 
				
			|||||||
use super::tx_store::BlockHashAndSubmissionIndex;
 | 
					 | 
				
			||||||
use super::{FlowSeal, MineLoadChunk, SealAnswer, SealTask};
 | 
					 | 
				
			||||||
use crate::config::ShardConfig;
 | 
					use crate::config::ShardConfig;
 | 
				
			||||||
use crate::log_store::flow_store::{batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore};
 | 
					use crate::log_store::flow_store::{
 | 
				
			||||||
use crate::log_store::tx_store::TransactionStore;
 | 
					    batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore, PadPair,
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
 | 
					use crate::log_store::tx_store::{BlockHashAndSubmissionIndex, TransactionStore};
 | 
				
			||||||
use crate::log_store::{
 | 
					use crate::log_store::{
 | 
				
			||||||
    FlowRead, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite,
 | 
					    FlowRead, FlowSeal, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead,
 | 
				
			||||||
 | 
					    LogStoreWrite, MineLoadChunk, SealAnswer, SealTask,
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
use crate::{try_option, ZgsKeyValueDB};
 | 
					use crate::{try_option, ZgsKeyValueDB};
 | 
				
			||||||
use anyhow::{anyhow, bail, Result};
 | 
					use anyhow::{anyhow, bail, Result};
 | 
				
			||||||
@ -24,8 +25,8 @@ use shared_types::{
 | 
				
			|||||||
use std::cmp::Ordering;
 | 
					use std::cmp::Ordering;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use std::path::Path;
 | 
					use std::path::Path;
 | 
				
			||||||
use std::sync::mpsc;
 | 
					 | 
				
			||||||
use std::sync::Arc;
 | 
					use std::sync::Arc;
 | 
				
			||||||
 | 
					use std::time::Duration;
 | 
				
			||||||
use tracing::{debug, error, info, instrument, trace, warn};
 | 
					use tracing::{debug, error, info, instrument, trace, warn};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/// 256 Bytes
 | 
					/// 256 Bytes
 | 
				
			||||||
@ -33,17 +34,21 @@ pub const ENTRY_SIZE: usize = 256;
 | 
				
			|||||||
/// 1024 Entries.
 | 
					/// 1024 Entries.
 | 
				
			||||||
pub const PORA_CHUNK_SIZE: usize = 1024;
 | 
					pub const PORA_CHUNK_SIZE: usize = 1024;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub const COL_TX: u32 = 0;
 | 
					pub const COL_TX: u32 = 0; // flow db
 | 
				
			||||||
pub const COL_ENTRY_BATCH: u32 = 1;
 | 
					pub const COL_ENTRY_BATCH: u32 = 1; // data db
 | 
				
			||||||
pub const COL_TX_DATA_ROOT_INDEX: u32 = 2;
 | 
					pub const COL_TX_DATA_ROOT_INDEX: u32 = 2; // flow db
 | 
				
			||||||
pub const COL_ENTRY_BATCH_ROOT: u32 = 3;
 | 
					pub const COL_TX_COMPLETED: u32 = 3; // data db
 | 
				
			||||||
pub const COL_TX_COMPLETED: u32 = 4;
 | 
					pub const COL_MISC: u32 = 4; // flow db & data db
 | 
				
			||||||
pub const COL_MISC: u32 = 5;
 | 
					pub const COL_FLOW_MPT_NODES: u32 = 5; // flow db
 | 
				
			||||||
pub const COL_SEAL_CONTEXT: u32 = 6;
 | 
					pub const COL_BLOCK_PROGRESS: u32 = 6; // flow db
 | 
				
			||||||
pub const COL_FLOW_MPT_NODES: u32 = 7;
 | 
					pub const COL_PAD_DATA_LIST: u32 = 7; // flow db
 | 
				
			||||||
pub const COL_BLOCK_PROGRESS: u32 = 8;
 | 
					pub const COL_PAD_DATA_SYNC_HEIGH: u32 = 8; // data db
 | 
				
			||||||
pub const COL_NUM: u32 = 9;
 | 
					pub const COL_NUM: u32 = 9;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					pub const DATA_DB_KEY: &str = "data_db";
 | 
				
			||||||
 | 
					pub const FLOW_DB_KEY: &str = "flow_db";
 | 
				
			||||||
 | 
					const PAD_DELAY: Duration = Duration::from_secs(2);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Process at most 1M entries (256MB) pad data at a time.
 | 
					// Process at most 1M entries (256MB) pad data at a time.
 | 
				
			||||||
const PAD_MAX_SIZE: usize = 1 << 20;
 | 
					const PAD_MAX_SIZE: usize = 1 << 20;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -62,10 +67,10 @@ pub struct UpdateFlowMessage {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
pub struct LogManager {
 | 
					pub struct LogManager {
 | 
				
			||||||
    pub(crate) flow_db: Arc<dyn ZgsKeyValueDB>,
 | 
					    pub(crate) flow_db: Arc<dyn ZgsKeyValueDB>,
 | 
				
			||||||
 | 
					    pub(crate) data_db: Arc<dyn ZgsKeyValueDB>,
 | 
				
			||||||
    tx_store: TransactionStore,
 | 
					    tx_store: TransactionStore,
 | 
				
			||||||
    flow_store: Arc<FlowStore>,
 | 
					    flow_store: Arc<FlowStore>,
 | 
				
			||||||
    merkle: RwLock<MerkleManager>,
 | 
					    merkle: RwLock<MerkleManager>,
 | 
				
			||||||
    sender: mpsc::Sender<UpdateFlowMessage>,
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
struct MerkleManager {
 | 
					struct MerkleManager {
 | 
				
			||||||
@ -264,7 +269,12 @@ impl LogStoreWrite for LogManager {
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
        let maybe_same_data_tx_seq = self.tx_store.put_tx(tx.clone())?.first().cloned();
 | 
					        let maybe_same_data_tx_seq = self.tx_store.put_tx(tx.clone())?.first().cloned();
 | 
				
			||||||
        // TODO(zz): Should we validate received tx?
 | 
					        // TODO(zz): Should we validate received tx?
 | 
				
			||||||
        self.append_subtree_list(tx.start_entry_index, tx.merkle_nodes.clone(), &mut merkle)?;
 | 
					        self.append_subtree_list(
 | 
				
			||||||
 | 
					            tx.seq,
 | 
				
			||||||
 | 
					            tx.start_entry_index,
 | 
				
			||||||
 | 
					            tx.merkle_nodes.clone(),
 | 
				
			||||||
 | 
					            &mut merkle,
 | 
				
			||||||
 | 
					        )?;
 | 
				
			||||||
        merkle.commit_merkle(tx.seq)?;
 | 
					        merkle.commit_merkle(tx.seq)?;
 | 
				
			||||||
        debug!(
 | 
					        debug!(
 | 
				
			||||||
            "commit flow root: root={:?}",
 | 
					            "commit flow root: root={:?}",
 | 
				
			||||||
@ -401,6 +411,42 @@ impl LogStoreWrite for LogManager {
 | 
				
			|||||||
    fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()> {
 | 
					    fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()> {
 | 
				
			||||||
        self.flow_store.submit_seal_result(answers)
 | 
					        self.flow_store.submit_seal_result(answers)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    fn start_padding(&self, executor: &task_executor::TaskExecutor) {
 | 
				
			||||||
 | 
					        let store = self.flow_store.clone();
 | 
				
			||||||
 | 
					        executor.spawn(
 | 
				
			||||||
 | 
					            async move {
 | 
				
			||||||
 | 
					                let current_height = store.get_pad_data_sync_height().unwrap();
 | 
				
			||||||
 | 
					                let mut start_index = current_height.unwrap_or(0);
 | 
				
			||||||
 | 
					                loop {
 | 
				
			||||||
 | 
					                    match store.get_pad_data(start_index) {
 | 
				
			||||||
 | 
					                        std::result::Result::Ok(data) => {
 | 
				
			||||||
 | 
					                            // Update the flow database.
 | 
				
			||||||
 | 
					                            // This should be called before `complete_last_chunk_merkle` so that we do not save
 | 
				
			||||||
 | 
					                            // subtrees with data known.
 | 
				
			||||||
 | 
					                            if let Some(data) = data {
 | 
				
			||||||
 | 
					                                for pad in data {
 | 
				
			||||||
 | 
					                                    store
 | 
				
			||||||
 | 
					                                        .append_entries(ChunkArray {
 | 
				
			||||||
 | 
					                                            data: vec![0; pad.data_size as usize],
 | 
				
			||||||
 | 
					                                            start_index: pad.start_index,
 | 
				
			||||||
 | 
					                                        })
 | 
				
			||||||
 | 
					                                        .unwrap();
 | 
				
			||||||
 | 
					                                }
 | 
				
			||||||
 | 
					                            };
 | 
				
			||||||
 | 
					                            store.put_pad_data_sync_height(start_index).unwrap();
 | 
				
			||||||
 | 
					                            start_index += 1;
 | 
				
			||||||
 | 
					                        }
 | 
				
			||||||
 | 
					                        std::result::Result::Err(_) => {
 | 
				
			||||||
 | 
					                            debug!("Unable to get pad data, start_index={}", start_index);
 | 
				
			||||||
 | 
					                            tokio::time::sleep(PAD_DELAY).await;
 | 
				
			||||||
 | 
					                        }
 | 
				
			||||||
 | 
					                    };
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					            },
 | 
				
			||||||
 | 
					            "pad_tx",
 | 
				
			||||||
 | 
					        );
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl LogStoreChunkRead for LogManager {
 | 
					impl LogStoreChunkRead for LogManager {
 | 
				
			||||||
@ -614,31 +660,33 @@ impl LogManager {
 | 
				
			|||||||
        config: LogConfig,
 | 
					        config: LogConfig,
 | 
				
			||||||
        flow_path: impl AsRef<Path>,
 | 
					        flow_path: impl AsRef<Path>,
 | 
				
			||||||
        data_path: impl AsRef<Path>,
 | 
					        data_path: impl AsRef<Path>,
 | 
				
			||||||
        executor: task_executor::TaskExecutor,
 | 
					 | 
				
			||||||
    ) -> Result<Self> {
 | 
					    ) -> Result<Self> {
 | 
				
			||||||
        let mut db_config = DatabaseConfig::with_columns(COL_NUM);
 | 
					        let mut db_config = DatabaseConfig::with_columns(COL_NUM);
 | 
				
			||||||
        db_config.enable_statistics = true;
 | 
					        db_config.enable_statistics = true;
 | 
				
			||||||
        let flow_db_source = Arc::new(Database::open(&db_config, flow_path)?);
 | 
					        let flow_db_source = Arc::new(Database::open(&db_config, flow_path)?);
 | 
				
			||||||
        let data_db_source = Arc::new(Database::open(&db_config, data_path)?);
 | 
					        let data_db_source = Arc::new(Database::open(&db_config, data_path)?);
 | 
				
			||||||
        Self::new(flow_db_source, data_db_source, config, executor)
 | 
					        Self::new(flow_db_source, data_db_source, config)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pub fn memorydb(config: LogConfig, executor: task_executor::TaskExecutor) -> Result<Self> {
 | 
					    pub fn memorydb(config: LogConfig) -> Result<Self> {
 | 
				
			||||||
        let flow_db = Arc::new(kvdb_memorydb::create(COL_NUM));
 | 
					        let flow_db = Arc::new(kvdb_memorydb::create(COL_NUM));
 | 
				
			||||||
        let data_db = Arc::new(kvdb_memorydb::create(COL_NUM));
 | 
					        let data_db = Arc::new(kvdb_memorydb::create(COL_NUM));
 | 
				
			||||||
        Self::new(flow_db, data_db, config, executor)
 | 
					        Self::new(flow_db, data_db, config)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn new(
 | 
					    fn new(
 | 
				
			||||||
        flow_db_source: Arc<dyn ZgsKeyValueDB>,
 | 
					        flow_db_source: Arc<dyn ZgsKeyValueDB>,
 | 
				
			||||||
        data_db_source: Arc<dyn ZgsKeyValueDB>,
 | 
					        data_db_source: Arc<dyn ZgsKeyValueDB>,
 | 
				
			||||||
        config: LogConfig,
 | 
					        config: LogConfig,
 | 
				
			||||||
        executor: task_executor::TaskExecutor,
 | 
					 | 
				
			||||||
    ) -> Result<Self> {
 | 
					    ) -> Result<Self> {
 | 
				
			||||||
        let tx_store = TransactionStore::new(flow_db_source.clone())?;
 | 
					        let tx_store = TransactionStore::new(data_db_source.clone())?;
 | 
				
			||||||
        let flow_db = Arc::new(FlowDBStore::new(flow_db_source.clone()));
 | 
					        let flow_db = Arc::new(FlowDBStore::new(flow_db_source.clone()));
 | 
				
			||||||
        let data_db = Arc::new(FlowDBStore::new(data_db_source.clone()));
 | 
					        let data_db = Arc::new(FlowDBStore::new(data_db_source.clone()));
 | 
				
			||||||
        let flow_store = Arc::new(FlowStore::new(data_db.clone(), config.flow.clone()));
 | 
					        let flow_store = Arc::new(FlowStore::new(
 | 
				
			||||||
 | 
					            flow_db.clone(),
 | 
				
			||||||
 | 
					            data_db.clone(),
 | 
				
			||||||
 | 
					            config.flow.clone(),
 | 
				
			||||||
 | 
					        ));
 | 
				
			||||||
        // If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle`
 | 
					        // If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle`
 | 
				
			||||||
        // first and call `put_tx` later.
 | 
					        // first and call `put_tx` later.
 | 
				
			||||||
        let next_tx_seq = tx_store.next_tx_seq();
 | 
					        let next_tx_seq = tx_store.next_tx_seq();
 | 
				
			||||||
@ -739,18 +787,14 @@ impl LogManager {
 | 
				
			|||||||
            last_chunk_merkle,
 | 
					            last_chunk_merkle,
 | 
				
			||||||
        });
 | 
					        });
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let (sender, receiver) = mpsc::channel();
 | 
					        let log_manager = Self {
 | 
				
			||||||
 | 
					 | 
				
			||||||
        let mut log_manager = Self {
 | 
					 | 
				
			||||||
            flow_db: flow_db_source,
 | 
					            flow_db: flow_db_source,
 | 
				
			||||||
 | 
					            data_db: data_db_source,
 | 
				
			||||||
            tx_store,
 | 
					            tx_store,
 | 
				
			||||||
            flow_store,
 | 
					            flow_store,
 | 
				
			||||||
            merkle,
 | 
					            merkle,
 | 
				
			||||||
            sender,
 | 
					 | 
				
			||||||
        };
 | 
					        };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        log_manager.start_receiver(receiver, executor);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        if let Some(tx) = last_tx_to_insert {
 | 
					        if let Some(tx) = last_tx_to_insert {
 | 
				
			||||||
            log_manager.put_tx(tx)?;
 | 
					            log_manager.put_tx(tx)?;
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
@ -765,40 +809,6 @@ impl LogManager {
 | 
				
			|||||||
        Ok(log_manager)
 | 
					        Ok(log_manager)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn start_receiver(
 | 
					 | 
				
			||||||
        &mut self,
 | 
					 | 
				
			||||||
        rx: mpsc::Receiver<UpdateFlowMessage>,
 | 
					 | 
				
			||||||
        executor: task_executor::TaskExecutor,
 | 
					 | 
				
			||||||
    ) {
 | 
					 | 
				
			||||||
        let flow_store = self.flow_store.clone();
 | 
					 | 
				
			||||||
        executor.spawn(
 | 
					 | 
				
			||||||
            async move {
 | 
					 | 
				
			||||||
                loop {
 | 
					 | 
				
			||||||
                    match rx.recv() {
 | 
					 | 
				
			||||||
                        std::result::Result::Ok(data) => {
 | 
					 | 
				
			||||||
                            // Update the flow database.
 | 
					 | 
				
			||||||
                            // This should be called before `complete_last_chunk_merkle` so that we do not save
 | 
					 | 
				
			||||||
                            // subtrees with data known.
 | 
					 | 
				
			||||||
                            flow_store
 | 
					 | 
				
			||||||
                                .append_entries(ChunkArray {
 | 
					 | 
				
			||||||
                                    data: vec![0; data.pad_data],
 | 
					 | 
				
			||||||
                                    start_index: data.tx_start_flow_index,
 | 
					 | 
				
			||||||
                                })
 | 
					 | 
				
			||||||
                                .unwrap();
 | 
					 | 
				
			||||||
                        }
 | 
					 | 
				
			||||||
                        std::result::Result::Err(_) => {
 | 
					 | 
				
			||||||
                            debug!("Log manager inner channel closed");
 | 
					 | 
				
			||||||
                            break;
 | 
					 | 
				
			||||||
                        }
 | 
					 | 
				
			||||||
                    };
 | 
					 | 
				
			||||||
                }
 | 
					 | 
				
			||||||
            },
 | 
					 | 
				
			||||||
            "pad_tx",
 | 
					 | 
				
			||||||
        );
 | 
					 | 
				
			||||||
        // Wait for the spawned thread to finish
 | 
					 | 
				
			||||||
        // let _ = handle.join().expect("Thread panicked");
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    fn gen_proof(&self, flow_index: u64, maybe_root: Option<DataRoot>) -> Result<FlowProof> {
 | 
					    fn gen_proof(&self, flow_index: u64, maybe_root: Option<DataRoot>) -> Result<FlowProof> {
 | 
				
			||||||
        match maybe_root {
 | 
					        match maybe_root {
 | 
				
			||||||
            None => self.gen_proof_at_version(flow_index, None),
 | 
					            None => self.gen_proof_at_version(flow_index, None),
 | 
				
			||||||
@ -854,6 +864,7 @@ impl LogManager {
 | 
				
			|||||||
    #[instrument(skip(self, merkle))]
 | 
					    #[instrument(skip(self, merkle))]
 | 
				
			||||||
    fn append_subtree_list(
 | 
					    fn append_subtree_list(
 | 
				
			||||||
        &self,
 | 
					        &self,
 | 
				
			||||||
 | 
					        tx_seq: u64,
 | 
				
			||||||
        tx_start_index: u64,
 | 
					        tx_start_index: u64,
 | 
				
			||||||
        merkle_list: Vec<(usize, DataRoot)>,
 | 
					        merkle_list: Vec<(usize, DataRoot)>,
 | 
				
			||||||
        merkle: &mut MerkleManager,
 | 
					        merkle: &mut MerkleManager,
 | 
				
			||||||
@ -862,7 +873,7 @@ impl LogManager {
 | 
				
			|||||||
            return Ok(());
 | 
					            return Ok(());
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.pad_tx(tx_start_index, &mut *merkle)?;
 | 
					        self.pad_tx(tx_seq, tx_start_index, &mut *merkle)?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        for (subtree_depth, subtree_root) in merkle_list {
 | 
					        for (subtree_depth, subtree_root) in merkle_list {
 | 
				
			||||||
            let subtree_size = 1 << (subtree_depth - 1);
 | 
					            let subtree_size = 1 << (subtree_depth - 1);
 | 
				
			||||||
@ -900,7 +911,7 @@ impl LogManager {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    #[instrument(skip(self, merkle))]
 | 
					    #[instrument(skip(self, merkle))]
 | 
				
			||||||
    fn pad_tx(&self, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> {
 | 
					    fn pad_tx(&self, tx_seq: u64, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> {
 | 
				
			||||||
        // Check if we need to pad the flow.
 | 
					        // Check if we need to pad the flow.
 | 
				
			||||||
        let mut tx_start_flow_index =
 | 
					        let mut tx_start_flow_index =
 | 
				
			||||||
            merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64;
 | 
					            merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64;
 | 
				
			||||||
@ -910,6 +921,7 @@ impl LogManager {
 | 
				
			|||||||
            merkle.pora_chunks_merkle.leaves(),
 | 
					            merkle.pora_chunks_merkle.leaves(),
 | 
				
			||||||
            merkle.last_chunk_merkle.leaves()
 | 
					            merkle.last_chunk_merkle.leaves()
 | 
				
			||||||
        );
 | 
					        );
 | 
				
			||||||
 | 
					        let mut pad_list = vec![];
 | 
				
			||||||
        if pad_size != 0 {
 | 
					        if pad_size != 0 {
 | 
				
			||||||
            for pad_data in Self::padding(pad_size as usize) {
 | 
					            for pad_data in Self::padding(pad_size as usize) {
 | 
				
			||||||
                let mut is_full_empty = true;
 | 
					                let mut is_full_empty = true;
 | 
				
			||||||
@ -954,10 +966,10 @@ impl LogManager {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
                let data_size = pad_data.len() / ENTRY_SIZE;
 | 
					                let data_size = pad_data.len() / ENTRY_SIZE;
 | 
				
			||||||
                if is_full_empty {
 | 
					                if is_full_empty {
 | 
				
			||||||
                    self.sender.send(UpdateFlowMessage {
 | 
					                    pad_list.push(PadPair {
 | 
				
			||||||
                        pad_data: pad_data.len(),
 | 
					                        data_size: pad_data.len() as u64,
 | 
				
			||||||
                        tx_start_flow_index,
 | 
					                        start_index: tx_start_flow_index,
 | 
				
			||||||
                    })?;
 | 
					                    });
 | 
				
			||||||
                } else {
 | 
					                } else {
 | 
				
			||||||
                    // Update the flow database.
 | 
					                    // Update the flow database.
 | 
				
			||||||
                    // This should be called before `complete_last_chunk_merkle` so that we do not save
 | 
					                    // This should be called before `complete_last_chunk_merkle` so that we do not save
 | 
				
			||||||
@ -979,6 +991,8 @@ impl LogManager {
 | 
				
			|||||||
            merkle.pora_chunks_merkle.leaves(),
 | 
					            merkle.pora_chunks_merkle.leaves(),
 | 
				
			||||||
            merkle.last_chunk_merkle.leaves()
 | 
					            merkle.last_chunk_merkle.leaves()
 | 
				
			||||||
        );
 | 
					        );
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.flow_store.put_pad_data(&pad_list, tx_seq)?;
 | 
				
			||||||
        Ok(())
 | 
					        Ok(())
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -1,6 +1,7 @@
 | 
				
			|||||||
use crate::config::ShardConfig;
 | 
					use crate::config::ShardConfig;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use ethereum_types::H256;
 | 
					use ethereum_types::H256;
 | 
				
			||||||
 | 
					use flow_store::PadPair;
 | 
				
			||||||
use shared_types::{
 | 
					use shared_types::{
 | 
				
			||||||
    Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof,
 | 
					    Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof,
 | 
				
			||||||
    Transaction,
 | 
					    Transaction,
 | 
				
			||||||
@ -158,6 +159,8 @@ pub trait LogStoreWrite: LogStoreChunkWrite {
 | 
				
			|||||||
    fn update_shard_config(&self, shard_config: ShardConfig);
 | 
					    fn update_shard_config(&self, shard_config: ShardConfig);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()>;
 | 
					    fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()>;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    fn start_padding(&self, executor: &task_executor::TaskExecutor);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub trait LogStoreChunkWrite {
 | 
					pub trait LogStoreChunkWrite {
 | 
				
			||||||
@ -217,6 +220,10 @@ pub trait FlowRead {
 | 
				
			|||||||
    fn get_num_entries(&self) -> Result<u64>;
 | 
					    fn get_num_entries(&self) -> Result<u64>;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn get_shard_config(&self) -> ShardConfig;
 | 
					    fn get_shard_config(&self) -> ShardConfig;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    fn get_pad_data(&self, start_index: u64) -> Result<Option<Vec<PadPair>>>;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    fn get_pad_data_sync_height(&self) -> Result<Option<u64>>;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub trait FlowWrite {
 | 
					pub trait FlowWrite {
 | 
				
			||||||
@ -231,6 +238,10 @@ pub trait FlowWrite {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    /// Update the shard config.
 | 
					    /// Update the shard config.
 | 
				
			||||||
    fn update_shard_config(&self, shard_config: ShardConfig);
 | 
					    fn update_shard_config(&self, shard_config: ShardConfig);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> Result<()>;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    fn put_pad_data_sync_height(&self, tx_seq: u64) -> Result<()>;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub struct SealTask {
 | 
					pub struct SealTask {
 | 
				
			||||||
@ -269,3 +280,23 @@ pub trait FlowSeal {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
pub trait Flow: FlowRead + FlowWrite + FlowSeal {}
 | 
					pub trait Flow: FlowRead + FlowWrite + FlowSeal {}
 | 
				
			||||||
impl<T: FlowRead + FlowWrite + FlowSeal> Flow for T {}
 | 
					impl<T: FlowRead + FlowWrite + FlowSeal> Flow for T {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					pub trait PadDataStoreRead {
 | 
				
			||||||
 | 
					    fn get_pad_data(&self, start_index: u64) -> Result<Option<Vec<PadPair>>>;
 | 
				
			||||||
 | 
					    fn get_pad_data_sync_height(&self) -> Result<Option<u64>>;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					pub trait PadDataStoreWrite {
 | 
				
			||||||
 | 
					    fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> Result<()>;
 | 
				
			||||||
 | 
					    fn put_pad_data_sync_height(&self, tx_seq: u64) -> Result<()>;
 | 
				
			||||||
 | 
					    fn start_padding(&mut self, executor: &task_executor::TaskExecutor);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					pub trait PadDataStore:
 | 
				
			||||||
 | 
					    PadDataStoreRead + PadDataStoreWrite + config::Configurable + Send + Sync + 'static
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					impl<T: PadDataStoreRead + PadDataStoreWrite + config::Configurable + Send + Sync + 'static>
 | 
				
			||||||
 | 
					    PadDataStore for T
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -9,15 +9,10 @@ use rand::random;
 | 
				
			|||||||
use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE};
 | 
					use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE};
 | 
				
			||||||
use std::cmp;
 | 
					use std::cmp;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use task_executor::test_utils::TestRuntime;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
#[test]
 | 
					#[test]
 | 
				
			||||||
fn test_put_get() {
 | 
					fn test_put_get() {
 | 
				
			||||||
    let config = LogConfig::default();
 | 
					    let config = LogConfig::default();
 | 
				
			||||||
    let runtime = TestRuntime::default();
 | 
					    let store = LogManager::memorydb(config.clone()).unwrap();
 | 
				
			||||||
 | 
					 | 
				
			||||||
    let executor = runtime.task_executor.clone();
 | 
					 | 
				
			||||||
    let store = LogManager::memorydb(config.clone(), executor).unwrap();
 | 
					 | 
				
			||||||
    let chunk_count = config.flow.batch_size + config.flow.batch_size / 2 - 1;
 | 
					    let chunk_count = config.flow.batch_size + config.flow.batch_size / 2 - 1;
 | 
				
			||||||
    // Aligned with size.
 | 
					    // Aligned with size.
 | 
				
			||||||
    let start_offset = 1024;
 | 
					    let start_offset = 1024;
 | 
				
			||||||
@ -174,10 +169,7 @@ fn test_put_tx() {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
fn create_store() -> LogManager {
 | 
					fn create_store() -> LogManager {
 | 
				
			||||||
    let config = LogConfig::default();
 | 
					    let config = LogConfig::default();
 | 
				
			||||||
    let runtime = TestRuntime::default();
 | 
					    LogManager::memorydb(config).unwrap()
 | 
				
			||||||
    let executor = runtime.task_executor.clone();
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    LogManager::memorydb(config, executor).unwrap()
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
fn put_tx(store: &mut LogManager, chunk_count: usize, seq: u64) {
 | 
					fn put_tx(store: &mut LogManager, chunk_count: usize, seq: u64) {
 | 
				
			||||||
 | 
				
			|||||||
@ -1,7 +1,10 @@
 | 
				
			|||||||
use super::tx_store::TxStore;
 | 
					use super::tx_store::TxStore;
 | 
				
			||||||
use anyhow::Result;
 | 
					use anyhow::Result;
 | 
				
			||||||
use std::sync::Arc;
 | 
					use std::sync::Arc;
 | 
				
			||||||
use storage::log_store::config::{ConfigTx, ConfigurableExt};
 | 
					use storage::log_store::{
 | 
				
			||||||
 | 
					    config::{ConfigTx, ConfigurableExt},
 | 
				
			||||||
 | 
					    log_manager::DATA_DB_KEY,
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
use storage_async::Store;
 | 
					use storage_async::Store;
 | 
				
			||||||
use tokio::sync::RwLock;
 | 
					use tokio::sync::RwLock;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -66,10 +69,10 @@ impl SyncStore {
 | 
				
			|||||||
        let store = async_store.get_store();
 | 
					        let store = async_store.get_store();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // load next_tx_seq
 | 
					        // load next_tx_seq
 | 
				
			||||||
        let next_tx_seq = store.get_config_decoded(&KEY_NEXT_TX_SEQ)?;
 | 
					        let next_tx_seq = store.get_config_decoded(&KEY_NEXT_TX_SEQ, DATA_DB_KEY)?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        // load max_tx_seq
 | 
					        // load max_tx_seq
 | 
				
			||||||
        let max_tx_seq = store.get_config_decoded(&KEY_MAX_TX_SEQ)?;
 | 
					        let max_tx_seq = store.get_config_decoded(&KEY_MAX_TX_SEQ, DATA_DB_KEY)?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        Ok((next_tx_seq, max_tx_seq))
 | 
					        Ok((next_tx_seq, max_tx_seq))
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@ -77,13 +80,13 @@ impl SyncStore {
 | 
				
			|||||||
    pub async fn set_next_tx_seq(&self, tx_seq: u64) -> Result<()> {
 | 
					    pub async fn set_next_tx_seq(&self, tx_seq: u64) -> Result<()> {
 | 
				
			||||||
        let async_store = self.store.write().await;
 | 
					        let async_store = self.store.write().await;
 | 
				
			||||||
        let store = async_store.get_store();
 | 
					        let store = async_store.get_store();
 | 
				
			||||||
        store.set_config_encoded(&KEY_NEXT_TX_SEQ, &tx_seq)
 | 
					        store.set_config_encoded(&KEY_NEXT_TX_SEQ, &tx_seq, DATA_DB_KEY)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pub async fn set_max_tx_seq(&self, tx_seq: u64) -> Result<()> {
 | 
					    pub async fn set_max_tx_seq(&self, tx_seq: u64) -> Result<()> {
 | 
				
			||||||
        let async_store = self.store.write().await;
 | 
					        let async_store = self.store.write().await;
 | 
				
			||||||
        let store = async_store.get_store();
 | 
					        let store = async_store.get_store();
 | 
				
			||||||
        store.set_config_encoded(&KEY_MAX_TX_SEQ, &tx_seq)
 | 
					        store.set_config_encoded(&KEY_MAX_TX_SEQ, &tx_seq, DATA_DB_KEY)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pub async fn contains(&self, tx_seq: u64) -> Result<Option<Queue>> {
 | 
					    pub async fn contains(&self, tx_seq: u64) -> Result<Option<Queue>> {
 | 
				
			||||||
@ -114,7 +117,7 @@ impl SyncStore {
 | 
				
			|||||||
                }
 | 
					                }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                let removed = self.pending_txs.remove(store, Some(&mut tx), tx_seq)?;
 | 
					                let removed = self.pending_txs.remove(store, Some(&mut tx), tx_seq)?;
 | 
				
			||||||
                store.exec_configs(tx)?;
 | 
					                store.exec_configs(tx, DATA_DB_KEY)?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                if removed {
 | 
					                if removed {
 | 
				
			||||||
                    Ok(InsertResult::Upgraded)
 | 
					                    Ok(InsertResult::Upgraded)
 | 
				
			||||||
@ -128,7 +131,7 @@ impl SyncStore {
 | 
				
			|||||||
                }
 | 
					                }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                let removed = self.ready_txs.remove(store, Some(&mut tx), tx_seq)?;
 | 
					                let removed = self.ready_txs.remove(store, Some(&mut tx), tx_seq)?;
 | 
				
			||||||
                store.exec_configs(tx)?;
 | 
					                store.exec_configs(tx, DATA_DB_KEY)?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                if removed {
 | 
					                if removed {
 | 
				
			||||||
                    Ok(InsertResult::Downgraded)
 | 
					                    Ok(InsertResult::Downgraded)
 | 
				
			||||||
@ -151,7 +154,7 @@ impl SyncStore {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        let added = self.ready_txs.add(store, Some(&mut tx), tx_seq)?;
 | 
					        let added = self.ready_txs.add(store, Some(&mut tx), tx_seq)?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        store.exec_configs(tx)?;
 | 
					        store.exec_configs(tx, DATA_DB_KEY)?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        Ok(added)
 | 
					        Ok(added)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
				
			|||||||
@ -1,6 +1,7 @@
 | 
				
			|||||||
use anyhow::Result;
 | 
					use anyhow::Result;
 | 
				
			||||||
use rand::Rng;
 | 
					use rand::Rng;
 | 
				
			||||||
use storage::log_store::config::{ConfigTx, ConfigurableExt};
 | 
					use storage::log_store::config::{ConfigTx, ConfigurableExt};
 | 
				
			||||||
 | 
					use storage::log_store::log_manager::DATA_DB_KEY;
 | 
				
			||||||
use storage::log_store::Store;
 | 
					use storage::log_store::Store;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/// TxStore is used to store pending transactions that to be synchronized in advance.
 | 
					/// TxStore is used to store pending transactions that to be synchronized in advance.
 | 
				
			||||||
@ -32,11 +33,11 @@ impl TxStore {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn index_of(&self, store: &dyn Store, tx_seq: u64) -> Result<Option<usize>> {
 | 
					    fn index_of(&self, store: &dyn Store, tx_seq: u64) -> Result<Option<usize>> {
 | 
				
			||||||
        store.get_config_decoded(&self.key_seq_to_index(tx_seq))
 | 
					        store.get_config_decoded(&self.key_seq_to_index(tx_seq), DATA_DB_KEY)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fn at(&self, store: &dyn Store, index: usize) -> Result<Option<u64>> {
 | 
					    fn at(&self, store: &dyn Store, index: usize) -> Result<Option<u64>> {
 | 
				
			||||||
        store.get_config_decoded(&self.key_index_to_seq(index))
 | 
					        store.get_config_decoded(&self.key_index_to_seq(index), DATA_DB_KEY)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pub fn has(&self, store: &dyn Store, tx_seq: u64) -> Result<bool> {
 | 
					    pub fn has(&self, store: &dyn Store, tx_seq: u64) -> Result<bool> {
 | 
				
			||||||
@ -45,7 +46,7 @@ impl TxStore {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    pub fn count(&self, store: &dyn Store) -> Result<usize> {
 | 
					    pub fn count(&self, store: &dyn Store) -> Result<usize> {
 | 
				
			||||||
        store
 | 
					        store
 | 
				
			||||||
            .get_config_decoded(&self.key_count)
 | 
					            .get_config_decoded(&self.key_count, DATA_DB_KEY)
 | 
				
			||||||
            .map(|x| x.unwrap_or(0))
 | 
					            .map(|x| x.unwrap_or(0))
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -70,7 +71,7 @@ impl TxStore {
 | 
				
			|||||||
        if let Some(db_tx) = db_tx {
 | 
					        if let Some(db_tx) = db_tx {
 | 
				
			||||||
            db_tx.append(&mut tx);
 | 
					            db_tx.append(&mut tx);
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
            store.exec_configs(tx)?;
 | 
					            store.exec_configs(tx, DATA_DB_KEY)?;
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        Ok(true)
 | 
					        Ok(true)
 | 
				
			||||||
@ -130,7 +131,7 @@ impl TxStore {
 | 
				
			|||||||
        if let Some(db_tx) = db_tx {
 | 
					        if let Some(db_tx) = db_tx {
 | 
				
			||||||
            db_tx.append(&mut tx);
 | 
					            db_tx.append(&mut tx);
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
            store.exec_configs(tx)?;
 | 
					            store.exec_configs(tx, DATA_DB_KEY)?;
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        Ok(true)
 | 
					        Ok(true)
 | 
				
			||||||
 | 
				
			|||||||
@ -1657,7 +1657,7 @@ mod tests {
 | 
				
			|||||||
        let num_chunks = 123;
 | 
					        let num_chunks = 123;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let config = LogConfig::default();
 | 
					        let config = LogConfig::default();
 | 
				
			||||||
        let store = Arc::new(LogManager::memorydb(config, task_executor.clone()).unwrap());
 | 
					        let store = Arc::new(LogManager::memorydb(config).unwrap());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        create_controller(task_executor, peer_id, store, tx_id, num_chunks)
 | 
					        create_controller(task_executor, peer_id, store, tx_id, num_chunks)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
				
			|||||||
@ -1348,9 +1348,7 @@ mod tests {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        let config = LogConfig::default();
 | 
					        let config = LogConfig::default();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let executor = runtime.task_executor.clone();
 | 
					        let store = Arc::new(LogManager::memorydb(config.clone()).unwrap());
 | 
				
			||||||
 | 
					 | 
				
			||||||
        let store = Arc::new(LogManager::memorydb(config.clone(), executor).unwrap());
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let init_peer_id = identity::Keypair::generate_ed25519().public().to_peer_id();
 | 
					        let init_peer_id = identity::Keypair::generate_ed25519().public().to_peer_id();
 | 
				
			||||||
        let file_location_cache: Arc<FileLocationCache> =
 | 
					        let file_location_cache: Arc<FileLocationCache> =
 | 
				
			||||||
 | 
				
			|||||||
@ -9,8 +9,6 @@ use storage::{
 | 
				
			|||||||
    LogManager,
 | 
					    LogManager,
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use task_executor::test_utils::TestRuntime;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
/// Creates stores for local node and peers with initialized transaction of specified chunk count.
 | 
					/// Creates stores for local node and peers with initialized transaction of specified chunk count.
 | 
				
			||||||
/// The first store is for local node, and data not stored. The second store is for peers, and all
 | 
					/// The first store is for local node, and data not stored. The second store is for peers, and all
 | 
				
			||||||
/// transactions are finalized for file sync.
 | 
					/// transactions are finalized for file sync.
 | 
				
			||||||
@ -24,11 +22,8 @@ pub fn create_2_store(
 | 
				
			|||||||
    Vec<Vec<u8>>,
 | 
					    Vec<Vec<u8>>,
 | 
				
			||||||
) {
 | 
					) {
 | 
				
			||||||
    let config = LogConfig::default();
 | 
					    let config = LogConfig::default();
 | 
				
			||||||
    let runtime = TestRuntime::default();
 | 
					    let mut store = LogManager::memorydb(config.clone()).unwrap();
 | 
				
			||||||
 | 
					    let mut peer_store = LogManager::memorydb(config).unwrap();
 | 
				
			||||||
    let executor = runtime.task_executor.clone();
 | 
					 | 
				
			||||||
    let mut store = LogManager::memorydb(config.clone(), executor.clone()).unwrap();
 | 
					 | 
				
			||||||
    let mut peer_store = LogManager::memorydb(config, executor).unwrap();
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    let mut offset = 1;
 | 
					    let mut offset = 1;
 | 
				
			||||||
    let mut txs = vec![];
 | 
					    let mut txs = vec![];
 | 
				
			||||||
@ -120,10 +115,7 @@ pub mod tests {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    impl TestStoreRuntime {
 | 
					    impl TestStoreRuntime {
 | 
				
			||||||
        pub fn new_store() -> impl LogStore {
 | 
					        pub fn new_store() -> impl LogStore {
 | 
				
			||||||
            let runtime = TestRuntime::default();
 | 
					            LogManager::memorydb(LogConfig::default()).unwrap()
 | 
				
			||||||
 | 
					 | 
				
			||||||
            let executor = runtime.task_executor.clone();
 | 
					 | 
				
			||||||
            LogManager::memorydb(LogConfig::default(), executor).unwrap()
 | 
					 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        pub fn new(store: Arc<dyn LogStore>) -> TestStoreRuntime {
 | 
					        pub fn new(store: Arc<dyn LogStore>) -> TestStoreRuntime {
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
		Reference in New Issue
	
	Block a user