Use inner lock in storage and use async lock. (#92)

* Use inner lock in storage.

* Remove mut.

* Remove async lock for storage.

* Fix tests and warnings.

* Use spawn_blocking for storage task.

* Fix clippy.

* Finalize the new tx at last.

* Revert "Finalize the new tx at last."

This reverts commit b56ad5582d.

* Wait for old same-root txs to finalize.

* Use async storage in miner.

* Update rust version to 1.79.0.

* Use Vec to avoid stack overflow.

* Fix unused warning.

* Fix clippy.

* Fix test warning.

* Fix test.

* fmt.

* Use async storage in pruner.

* nit.
This commit is contained in:
peilun-conflux 2024-06-29 17:08:02 +08:00 committed by GitHub
parent cc44c9db66
commit 4eb2a50b0e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
44 changed files with 934 additions and 1008 deletions

View File

@ -67,4 +67,6 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: clippy
# blocks_in_conditions is triggered for tracing::instrument.
# This can be removed after the fix is released.
args: -- -D warnings

853
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -724,10 +724,10 @@ mod tests {
}
}
fn verify(data: &Vec<H256>, merkle: &mut AppendMerkleTree<H256, Sha3Algorithm>) {
for i in 0..data.len() {
fn verify(data: &[H256], merkle: &mut AppendMerkleTree<H256, Sha3Algorithm>) {
for (i, item) in data.iter().enumerate() {
let proof = merkle.gen_proof(i + 1).unwrap();
let r = merkle.validate(&proof, &data[i], i + 1);
let r = merkle.validate(&proof, item, i + 1);
assert!(matches!(r, Ok(true)), "{:?}", r);
}
for i in (0..data.len()).step_by(6) {

View File

@ -148,7 +148,6 @@
unused_qualifications,
missing_debug_implementations,
missing_copy_implementations,
trivial_casts,
trivial_numeric_casts,
unsafe_code,
unstable_features,

View File

@ -1,5 +1,3 @@
#![cfg(test)]
use crate::hash::{Algorithm, Hashable};
use crate::merkle::MerkleTree;
use crate::test_item::Item;
@ -9,15 +7,15 @@ use std::iter::FromIterator;
/// Custom merkle hash util test
#[derive(Debug, Clone, Default)]
struct CMH(DefaultHasher);
struct Cmh(DefaultHasher);
impl CMH {
pub fn new() -> CMH {
CMH(DefaultHasher::new())
impl Cmh {
pub fn new() -> Cmh {
Cmh(DefaultHasher::new())
}
}
impl Hasher for CMH {
impl Hasher for Cmh {
#[inline]
fn write(&mut self, msg: &[u8]) {
self.0.write(msg)
@ -29,7 +27,7 @@ impl Hasher for CMH {
}
}
impl Algorithm<Item> for CMH {
impl Algorithm<Item> for Cmh {
#[inline]
fn hash(&mut self) -> Item {
Item(self.finish())
@ -37,7 +35,7 @@ impl Algorithm<Item> for CMH {
#[inline]
fn reset(&mut self) {
*self = CMH::default()
*self = Cmh::default()
}
#[inline]
@ -57,8 +55,8 @@ impl Algorithm<Item> for CMH {
#[test]
fn test_custom_merkle_hasher() {
let mut a = CMH::new();
let mt: MerkleTree<Item, CMH> = MerkleTree::from_iter([1, 2, 3, 4, 5].iter().map(|x| {
let mut a = Cmh::new();
let mt: MerkleTree<Item, Cmh> = MerkleTree::from_iter([1, 2, 3, 4, 5].iter().map(|x| {
a.reset();
x.hash(&mut a);
a.hash()

View File

@ -1,8 +1,6 @@
#![cfg(test)]
#![allow(unsafe_code)]
use crate::hash::{Algorithm, Hashable};
use std::mem;
use std::slice;
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Default, Debug)]
@ -10,7 +8,7 @@ pub struct Item(pub u64);
impl AsRef<[u8]> for Item {
fn as_ref(&self) -> &[u8] {
unsafe { slice::from_raw_parts(mem::transmute(&self.0), 8) }
unsafe { slice::from_raw_parts(&self.0 as *const u64 as *const u8, 8) }
}
}

View File

@ -12,7 +12,7 @@ use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
pub struct ChunkPoolHandler {
receiver: UnboundedReceiver<ChunkPoolMessage>,
mem_pool: Arc<MemoryChunkPool>,
log_store: Store,
log_store: Arc<Store>,
sender: UnboundedSender<NetworkMessage>,
}
@ -20,7 +20,7 @@ impl ChunkPoolHandler {
pub(crate) fn new(
receiver: UnboundedReceiver<ChunkPoolMessage>,
mem_pool: Arc<MemoryChunkPool>,
log_store: Store,
log_store: Arc<Store>,
sender: UnboundedSender<NetworkMessage>,
) -> Self {
ChunkPoolHandler {

View File

@ -28,7 +28,7 @@ impl Config {
pub fn unbounded(
config: Config,
log_store: storage_async::Store,
log_store: Arc<storage_async::Store>,
network_send: tokio::sync::mpsc::UnboundedSender<network::NetworkMessage>,
) -> (Arc<MemoryChunkPool>, ChunkPoolHandler) {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();

View File

@ -82,14 +82,14 @@ impl From<SegmentInfo> for (ChunkArray, FileProof) {
/// and data root verified on blockchain.
pub struct MemoryChunkPool {
inner: Mutex<Inner>,
log_store: Store,
log_store: Arc<Store>,
sender: UnboundedSender<ChunkPoolMessage>,
}
impl MemoryChunkPool {
pub(crate) fn new(
config: Config,
log_store: Store,
log_store: Arc<Store>,
sender: UnboundedSender<ChunkPoolMessage>,
) -> Self {
MemoryChunkPool {
@ -99,7 +99,7 @@ impl MemoryChunkPool {
}
}
pub fn validate_segment_size(&self, segment: &Vec<u8>) -> Result<()> {
pub fn validate_segment_size(&self, segment: &[u8]) -> Result<()> {
if segment.is_empty() {
bail!("data is empty");
}

View File

@ -1,10 +1,11 @@
extern crate core;
pub(crate) mod rpc_proxy;
mod sync_manager;
pub use rpc_proxy::ContractAddress;
use ethers::prelude::H160;
pub use sync_manager::{
config::{CacheConfig, LogSyncConfig},
LogSyncEvent, LogSyncManager,
};
pub type ContractAddress = H160;

View File

@ -1,30 +0,0 @@
use crate::rpc_proxy::{ContractAddress, EvmRpcProxy, SubEvent, SubFilter};
use async_trait::async_trait;
use ethers::prelude::{Bytes, Middleware, Provider};
use ethers::providers::Ws;
use ethers::types::TransactionRequest;
use jsonrpsee::core::client::Subscription;
pub struct EthClient {
client: Provider<Ws>,
}
impl EthClient {
#[allow(unused)]
pub async fn new(url: &str) -> anyhow::Result<EthClient> {
let client = Provider::new(Ws::connect(url).await?);
Ok(Self { client })
}
}
#[async_trait]
impl EvmRpcProxy for EthClient {
async fn call(&self, to: ContractAddress, data: Bytes) -> anyhow::Result<Bytes> {
let request = TransactionRequest::new().to(to).data(data);
Ok(self.client.call(&request.into(), None).await?)
}
async fn sub_events(&self, _filter: SubFilter) -> Subscription<SubEvent> {
todo!()
}
}

View File

@ -1,38 +0,0 @@
use anyhow::Result;
use async_trait::async_trait;
use ethereum_types::{H160, H256};
use ethers::prelude::Bytes;
use jsonrpsee::core::client::Subscription;
// TODO: Define accounts/filter/events as associated types?
// TODO: Define an abstraction suitable for other chains.
#[async_trait]
pub trait EvmRpcProxy {
async fn call(&self, to: ContractAddress, data: Bytes) -> Result<Bytes>;
async fn sub_events(&self, filter: SubFilter) -> Subscription<SubEvent>;
}
pub type ContractAddress = H160;
pub type Topic = H256;
#[allow(unused)]
pub struct SubFilter {
to: Option<ContractAddress>,
topics: Vec<Topic>,
}
#[allow(unused)]
pub struct SubEvent {
/// Address
pub address: ContractAddress,
/// Topics
pub topics: Vec<Topic>,
/// Data
pub data: Bytes,
}
pub(crate) mod eth;

View File

@ -1,4 +1,4 @@
use crate::rpc_proxy::ContractAddress;
use crate::ContractAddress;
pub struct LogSyncConfig {
pub rpc_endpoint_url: String,

View File

@ -1,6 +1,6 @@
use crate::rpc_proxy::ContractAddress;
use crate::sync_manager::log_query::LogQuery;
use crate::sync_manager::RETRY_WAIT_MS;
use crate::ContractAddress;
use anyhow::{anyhow, bail, Result};
use append_merkle::{Algorithm, Sha3Algorithm};
use contract_interface::{SubmissionNode, SubmitFilter, ZgsFlow};
@ -122,7 +122,7 @@ impl LogEntryFetcher {
pub fn start_remove_finalized_block_task(
&self,
executor: &TaskExecutor,
store: Arc<RwLock<dyn Store>>,
store: Arc<dyn Store>,
block_hash_cache: Arc<RwLock<BTreeMap<u64, Option<BlockHashAndSubmissionIndex>>>>,
default_finalized_block_count: u64,
remove_finalized_block_interval_minutes: u64,
@ -133,7 +133,7 @@ impl LogEntryFetcher {
loop {
debug!("processing finalized block");
let processed_block_number = match store.read().await.get_sync_progress() {
let processed_block_number = match store.get_sync_progress() {
Ok(Some((processed_block_number, _))) => Some(processed_block_number),
Ok(None) => None,
Err(e) => {
@ -176,9 +176,7 @@ impl LogEntryFetcher {
}
for key in pending_keys.into_iter() {
if let Err(e) =
store.write().await.delete_block_hash_by_number(key)
{
if let Err(e) = store.delete_block_hash_by_number(key) {
error!(
"remove block tx for number {} error: e={:?}",
key, e
@ -599,7 +597,7 @@ fn submission_event_to_transaction(e: SubmitFilter) -> LogFetchProgress {
})
}
fn nodes_to_root(node_list: &Vec<SubmissionNode>) -> DataRoot {
fn nodes_to_root(node_list: &[SubmissionNode]) -> DataRoot {
let mut root: DataRoot = node_list.last().expect("not empty").root.into();
for next_node in node_list[..node_list.len() - 1].iter().rev() {
root = Sha3Algorithm::parent(&next_node.root.into(), &root);

View File

@ -33,7 +33,7 @@ pub enum LogSyncEvent {
pub struct LogSyncManager {
config: LogSyncConfig,
log_fetcher: LogEntryFetcher,
store: Arc<RwLock<dyn Store>>,
store: Arc<dyn Store>,
data_cache: DataCache,
next_tx_seq: u64,
@ -48,9 +48,9 @@ impl LogSyncManager {
pub async fn spawn(
config: LogSyncConfig,
executor: TaskExecutor,
store: Arc<RwLock<dyn Store>>,
store: Arc<dyn Store>,
) -> Result<broadcast::Sender<LogSyncEvent>> {
let next_tx_seq = store.read().await.next_tx_seq();
let next_tx_seq = store.next_tx_seq();
let executor_clone = executor.clone();
let mut shutdown_sender = executor.shutdown_sender();
@ -81,8 +81,6 @@ impl LogSyncManager {
let block_hash_cache = Arc::new(RwLock::new(
store
.read()
.await
.get_block_hashes()?
.into_iter()
.map(|(x, y)| (x, Some(y)))
@ -124,7 +122,7 @@ impl LogSyncManager {
// TODO(zz): Handle reorg instead of return.
let mut need_handle_reorg = false;
let (mut start_block_number, mut start_block_hash) =
match log_sync_manager.store.read().await.get_sync_progress()? {
match log_sync_manager.store.get_sync_progress()? {
// No previous progress, so just use config.
None => {
let block_number = log_sync_manager.config.start_block_number;
@ -176,7 +174,7 @@ impl LogSyncManager {
log_sync_manager.handle_data(reorg_rx).await?;
if let Some((block_number, block_hash)) =
log_sync_manager.store.read().await.get_sync_progress()?
log_sync_manager.store.get_sync_progress()?
{
start_block_number = block_number;
start_block_hash = block_hash;
@ -301,7 +299,7 @@ impl LogSyncManager {
async fn process_reverted(&mut self, tx_seq: u64) {
warn!("revert for chain reorg: seq={}", tx_seq);
{
let store = self.store.read().await;
let store = self.store.clone();
for seq in tx_seq..self.next_tx_seq {
if matches!(store.check_tx_completed(seq), Ok(true)) {
if let Ok(Some(tx)) = store.get_tx_by_seq_number(seq) {
@ -325,7 +323,7 @@ impl LogSyncManager {
let _ = self.event_send.send(LogSyncEvent::ReorgDetected { tx_seq });
// TODO(zz): `wrapping_sub` here is a hack to handle the case of tx_seq=0.
if let Err(e) = self.store.write().await.revert_to(tx_seq.wrapping_sub(1)) {
if let Err(e) = self.store.revert_to(tx_seq.wrapping_sub(1)) {
error!("revert_to fails: e={:?}", e);
return;
}
@ -353,7 +351,7 @@ impl LogSyncManager {
);
}
self.store.write().await.put_sync_progress((
self.store.put_sync_progress((
block_number,
block_hash,
first_submission_index,
@ -396,12 +394,12 @@ impl LogSyncManager {
}
async fn put_tx_inner(&mut self, tx: Transaction) -> bool {
if let Err(e) = self.store.write().await.put_tx(tx.clone()) {
if let Err(e) = self.store.put_tx(tx.clone()) {
error!("put_tx error: e={:?}", e);
false
} else {
if let Some(data) = self.data_cache.pop_data(&tx.data_merkle_root) {
let mut store = self.store.write().await;
let store = self.store.clone();
// We are holding a mutable reference of LogSyncManager, so no chain reorg is
// possible after put_tx.
if let Err(e) = store

View File

@ -21,3 +21,4 @@ lazy_static = "1.4"
async-trait = "0.1.56"
shared_types = { path = "../shared_types" }
hex = "0.4"
storage-async = { path = "../storage-async" }

View File

@ -1,7 +1,6 @@
use async_trait::async_trait;
use std::sync::Arc;
use storage::log_store::{MineLoadChunk, Store};
use tokio::sync::RwLock;
use storage::log_store::MineLoadChunk;
use storage_async::Store;
#[async_trait]
pub trait PoraLoader: Send + Sync {
@ -9,10 +8,9 @@ pub trait PoraLoader: Send + Sync {
}
#[async_trait]
impl PoraLoader for Arc<RwLock<dyn Store>> {
impl PoraLoader for Store {
async fn load_sealed_data(&self, chunk_index: u64) -> Option<MineLoadChunk> {
let store = &*self.read().await;
match store.flow().load_sealed_data(chunk_index) {
match self.load_sealed_data(chunk_index).await {
Ok(Some(chunk)) => Some(chunk),
_ => None,
}

View File

@ -5,26 +5,26 @@ use ethereum_types::Address;
use ethers::contract::ContractCall;
use ethers::contract::EthEvent;
use std::sync::Arc;
use storage::log_store::{config::ConfigurableExt, Store};
use storage::H256;
use tokio::sync::RwLock;
use storage_async::Store;
const MINER_ID: &str = "mine.miner_id";
pub fn load_miner_id(store: &dyn Store) -> storage::error::Result<Option<H256>> {
store.get_config_decoded(&MINER_ID)
pub async fn load_miner_id(store: &Store) -> storage::error::Result<Option<H256>> {
store.get_config_decoded(&MINER_ID).await
}
fn set_miner_id(store: &dyn Store, miner_id: &H256) -> storage::error::Result<()> {
store.set_config_encoded(&MINER_ID, miner_id)
async fn set_miner_id(store: &Store, miner_id: &H256) -> storage::error::Result<()> {
store.set_config_encoded(&MINER_ID, miner_id).await
}
pub(crate) async fn check_and_request_miner_id(
config: &MinerConfig,
store: &RwLock<dyn Store>,
store: &Store,
provider: &Arc<MineServiceMiddleware>,
) -> Result<H256, String> {
let db_miner_id = load_miner_id(&*store.read().await)
let db_miner_id = load_miner_id(store)
.await
.map_err(|e| format!("miner_id on db corrupt: {:?}", e))?;
let mine_contract = PoraMine::new(config.mine_address, provider.clone());
@ -42,7 +42,8 @@ pub(crate) async fn check_and_request_miner_id(
}
(None, Some(c_id)) => {
check_miner_id(&mine_contract, c_id).await?;
set_miner_id(&*store.write().await, &c_id)
set_miner_id(store, &c_id)
.await
.map_err(|e| format!("set miner id on db corrupt: {:?}", e))?;
Ok(c_id)
}
@ -53,7 +54,8 @@ pub(crate) async fn check_and_request_miner_id(
(None, None) => {
let beneficiary = provider.address();
let id = request_miner_id(&mine_contract, beneficiary).await?;
set_miner_id(&*store.write().await, &id)
set_miner_id(store, &id)
.await
.map_err(|e| format!("set miner id on db corrupt: {:?}", e))?;
Ok(id)
}

View File

@ -1,16 +1,14 @@
use std::{collections::BTreeMap, sync::Arc};
use ethereum_types::H256;
use tokio::{
sync::RwLock,
time::{sleep, Duration, Instant},
};
use tokio::time::{sleep, Duration, Instant};
use contract_interface::{EpochRangeWithContextDigest, ZgsFlow};
use storage::{
error::Result,
log_store::{SealAnswer, SealTask, Store},
log_store::{SealAnswer, SealTask},
};
use storage_async::Store;
use task_executor::TaskExecutor;
use zgs_spec::SECTORS_PER_SEAL;
@ -22,7 +20,7 @@ const CHAIN_STATUS_QUERY_PERIOD: u64 = 5;
pub struct Sealer {
flow_contract: ZgsFlow<MineServiceMiddleware>,
store: Arc<RwLock<dyn Store>>,
store: Arc<Store>,
context_cache: BTreeMap<u128, EpochRangeWithContextDigest>,
last_context_flow_length: u64,
miner_id: H256,
@ -32,7 +30,7 @@ impl Sealer {
pub fn spawn(
executor: TaskExecutor,
provider: Arc<MineServiceMiddleware>,
store: Arc<RwLock<dyn Store>>,
store: Arc<Store>,
config: &MinerConfig,
miner_id: H256,
) {
@ -152,19 +150,11 @@ impl Sealer {
async fn fetch_task(&self) -> Result<Option<Vec<SealTask>>> {
let seal_index_max = self.last_context_flow_length as usize / SECTORS_PER_SEAL;
self.store
.read()
.await
.flow()
.pull_seal_chunk(seal_index_max)
self.store.pull_seal_chunk(seal_index_max).await
}
async fn submit_answer(&self, answers: Vec<SealAnswer>) -> Result<()> {
self.store
.write()
.await
.flow_mut()
.submit_seal_result(answers)
self.store.submit_seal_result(answers).await
}
async fn seal_iteration(&mut self) -> Result<bool> {

View File

@ -5,9 +5,9 @@ use crate::{config::MinerConfig, mine::PoraService, watcher::MineContextWatcher}
use network::NetworkMessage;
use std::sync::Arc;
use storage::config::ShardConfig;
use storage::log_store::Store;
use storage_async::Store;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::{broadcast, RwLock};
#[derive(Clone, Debug)]
pub enum MinerMessage {
@ -29,13 +29,13 @@ impl MineService {
executor: task_executor::TaskExecutor,
_network_send: mpsc::UnboundedSender<NetworkMessage>,
config: MinerConfig,
store: Arc<RwLock<dyn Store>>,
store: Arc<Store>,
) -> Result<broadcast::Sender<MinerMessage>, String> {
let provider = Arc::new(config.make_provider().await?);
let (msg_send, msg_recv) = broadcast::channel(1024);
let miner_id = check_and_request_miner_id(&config, &store, &provider).await?;
let miner_id = check_and_request_miner_id(&config, store.as_ref(), &provider).await?;
debug!("miner id setting complete.");
let mine_context_receiver = MineContextWatcher::spawn(
@ -49,7 +49,7 @@ impl MineService {
executor.clone(),
msg_recv.resubscribe(),
mine_context_receiver,
Arc::new(store.clone()),
store.clone(),
&config,
miner_id,
);

View File

@ -6,14 +6,14 @@ use ethers::providers::PendingTransaction;
use hex::ToHex;
use shared_types::FlowRangeProof;
use std::sync::Arc;
use storage::log_store::Store;
use storage_async::Store;
use task_executor::TaskExecutor;
use tokio::sync::{mpsc, RwLock};
use tokio::sync::mpsc;
use crate::config::{MineServiceMiddleware, MinerConfig};
use crate::pora::AnswerWithoutProof;
use zgs_spec::SECTORS_PER_SEAL;
use zgs_spec::{BYTES_PER_SEAL, SECTORS_PER_SEAL};
const SUBMISSION_RETIES: usize = 3;
@ -22,7 +22,7 @@ pub struct Submitter {
mine_contract: PoraMine<MineServiceMiddleware>,
flow_contract: ZgsFlow<MineServiceMiddleware>,
default_gas_limit: Option<U256>,
store: Arc<RwLock<dyn Store>>,
store: Arc<Store>,
}
impl Submitter {
@ -30,7 +30,7 @@ impl Submitter {
executor: TaskExecutor,
mine_answer_receiver: mpsc::UnboundedReceiver<AnswerWithoutProof>,
provider: Arc<MineServiceMiddleware>,
store: Arc<RwLock<dyn Store>>,
store: Arc<Store>,
config: &MinerConfig,
) {
let mine_contract = PoraMine::new(config.mine_address, provider.clone());
@ -80,13 +80,12 @@ impl Submitter {
let flow_proof = self
.store
.read()
.await
.get_proof_at_root(
&mine_answer.context_flow_root,
mine_answer.context_flow_root,
mine_answer.recall_position,
SECTORS_PER_SEAL as u64,
)
.await
.map_err(|e| e.to_string())?;
let answer = PoraAnswer {
@ -97,7 +96,11 @@ impl Submitter {
recall_position: mine_answer.recall_position.into(),
seal_offset: mine_answer.seal_offset.into(),
sealed_context_digest: sealed_context_digest.digest,
sealed_data: unsafe { std::mem::transmute(mine_answer.sealed_data) },
sealed_data: unsafe {
std::mem::transmute::<[u8; BYTES_PER_SEAL], [[u8; 32]; BYTES_PER_SEAL / 32]>(
mine_answer.sealed_data,
)
},
merkle_proof: flow_proof_to_pora_merkle_proof(flow_proof),
};
trace!("submit_answer: answer={:?}", answer);

View File

@ -24,6 +24,7 @@ type ReqId = usize;
use tempfile::Builder as TempBuilder;
use tokio::sync::mpsc::unbounded_channel;
#[allow(unused)]
pub struct Libp2pInstance(LibP2PService<ReqId>, exit_future::Signal);
impl std::ops::Deref for Libp2pInstance {

View File

@ -5,6 +5,7 @@ edition = "2021"
[dependencies]
storage = { path = "../storage" }
storage-async = { path = "../storage-async" }
miner = { path = "../miner" }
anyhow = "1.0.86"
tokio = "1.37.0"

View File

@ -5,11 +5,10 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use storage::config::{ShardConfig, SHARD_CONFIG_KEY};
use storage::log_store::config::ConfigurableExt;
use storage::log_store::Store;
use storage_async::Store;
use task_executor::TaskExecutor;
use tokio::sync::{broadcast, mpsc, RwLock};
use tracing::debug;
use tokio::sync::{broadcast, mpsc};
use tracing::{debug, info};
// Start pruning when the db directory size exceeds 0.9 * limit.
const PRUNE_THRESHOLD: f32 = 0.9;
@ -32,7 +31,7 @@ impl PrunerConfig {
pub struct Pruner {
config: PrunerConfig,
store: Arc<RwLock<dyn Store>>,
store: Arc<Store>,
sender: mpsc::UnboundedSender<PrunerMessage>,
miner_sender: Option<broadcast::Sender<MinerMessage>>,
@ -42,10 +41,10 @@ impl Pruner {
pub async fn spawn(
executor: TaskExecutor,
mut config: PrunerConfig,
store: Arc<RwLock<dyn Store>>,
store: Arc<Store>,
miner_sender: Option<broadcast::Sender<MinerMessage>>,
) -> Result<mpsc::UnboundedReceiver<PrunerMessage>> {
if let Some(shard_config) = get_shard_config(&store).await? {
if let Some(shard_config) = get_shard_config(store.as_ref()).await? {
config.shard_config = shard_config;
}
let (tx, rx) = mpsc::unbounded_channel();
@ -68,7 +67,7 @@ impl Pruner {
pub async fn start(mut self) -> Result<()> {
loop {
if let Some(delete_list) = self.maybe_update().await? {
debug!(new_config = ?self.config.shard_config, "new shard config");
info!(new_config = ?self.config.shard_config, "new shard config");
self.put_shard_config().await?;
let mut batch = Vec::with_capacity(self.config.batch_size);
let mut iter = delete_list.peekable();
@ -76,7 +75,7 @@ impl Pruner {
batch.push(index);
if batch.len() == self.config.batch_size || iter.peek().is_none() {
debug!(start = batch.first(), end = batch.last(), "prune batch");
self.store.write().await.remove_chunks_batch(&batch)?;
self.store.remove_chunks_batch(&batch).await?;
batch = Vec::with_capacity(self.config.batch_size);
tokio::time::sleep(self.config.batch_wait_time).await;
}
@ -87,7 +86,7 @@ impl Pruner {
}
async fn maybe_update(&mut self) -> Result<Option<Box<dyn Send + Iterator<Item = u64>>>> {
let current_size = self.store.read().await.flow().get_num_entries()?;
let current_size = self.store.get_num_entries().await?;
debug!(
current_size = current_size,
config = ?self.config.shard_config,
@ -109,7 +108,7 @@ impl Pruner {
config.num_shard *= 2;
// Generate delete list
let flow_len = self.store.read().await.get_context()?.1;
let flow_len = self.store.get_context().await?.1;
let start_index = old_shard_id + (!rand_bit) as usize * old_num_shard;
return Ok(Some(Box::new(
(start_index as u64..flow_len).step_by(config.num_shard),
@ -124,16 +123,17 @@ impl Pruner {
}
self.sender
.send(PrunerMessage::ChangeShardConfig(self.config.shard_config))?;
let mut store = self.store.write().await;
store
.flow_mut()
.update_shard_config(self.config.shard_config);
store.set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config)
self.store
.update_shard_config(self.config.shard_config)
.await;
self.store
.set_config_encoded(&SHARD_CONFIG_KEY, &self.config.shard_config)
.await
}
}
async fn get_shard_config(store: &RwLock<dyn Store>) -> Result<Option<ShardConfig>> {
store.read().await.get_config_decoded(&SHARD_CONFIG_KEY)
async fn get_shard_config(store: &Store) -> Result<Option<ShardConfig>> {
store.get_config_decoded(&SHARD_CONFIG_KEY).await
}
#[derive(Debug)]

View File

@ -269,13 +269,7 @@ impl Libp2pEventHandler {
};
let timestamp = timestamp_now();
let shard_config = self
.store
.get_store()
.read()
.await
.flow()
.get_shard_config();
let shard_config = self.store.get_store().flow().get_shard_config();
let msg = AnnounceFile {
tx_id,
@ -597,8 +591,8 @@ mod tests {
sync_send: SyncSender,
sync_recv: SyncReceiver,
chunk_pool_send: mpsc::UnboundedSender<ChunkPoolMessage>,
chunk_pool_recv: mpsc::UnboundedReceiver<ChunkPoolMessage>,
store: Arc<RwLock<dyn Store>>,
// chunk_pool_recv: mpsc::UnboundedReceiver<ChunkPoolMessage>,
store: Arc<dyn Store>,
file_location_cache: Arc<FileLocationCache>,
peers: Arc<RwLock<PeerManager>>,
}
@ -609,7 +603,7 @@ mod tests {
let (network_globals, keypair) = Context::new_network_globals();
let (network_send, network_recv) = mpsc::unbounded_channel();
let (sync_send, sync_recv) = channel::Channel::unbounded();
let (chunk_pool_send, chunk_pool_recv) = mpsc::unbounded_channel();
let (chunk_pool_send, _chunk_pool_recv) = mpsc::unbounded_channel();
let store = LogManager::memorydb(LogConfig::default()).unwrap();
Self {
runtime,
@ -620,8 +614,8 @@ mod tests {
sync_send,
sync_recv,
chunk_pool_send,
chunk_pool_recv,
store: Arc::new(RwLock::new(store)),
// chunk_pool_recv,
store: Arc::new(store),
file_location_cache: Arc::new(FileLocationCache::default()),
peers: Arc::new(RwLock::new(PeerManager::new(Config::default()))),
}
@ -1003,7 +997,11 @@ mod tests {
assert_eq!(peer_id, *ctx.network_globals.peer_id.read());
assert_eq!(
addr,
*ctx.network_globals.listen_multiaddrs.read().get(0).unwrap()
*ctx.network_globals
.listen_multiaddrs
.read()
.first()
.unwrap()
);
}
Ok(_) => panic!("Unexpected sync message type received"),

View File

@ -58,7 +58,7 @@ impl RouterService {
_miner_send: Option<broadcast::Sender<MinerMessage>>,
chunk_pool_send: UnboundedSender<ChunkPoolMessage>,
pruner_recv: Option<mpsc::UnboundedReceiver<PrunerMessage>>,
store: Arc<RwLock<dyn LogStore>>,
store: Arc<dyn LogStore>,
file_location_cache: Arc<FileLocationCache>,
local_keypair: Keypair,
config: Config,

View File

@ -44,7 +44,7 @@ pub struct Context {
pub network_send: UnboundedSender<NetworkMessage>,
pub sync_send: SyncSender,
pub chunk_pool: Arc<MemoryChunkPool>,
pub log_store: Store,
pub log_store: Arc<Store>,
pub shutdown_sender: Sender<ShutdownReason>,
pub mine_service_sender: Option<broadcast::Sender<MinerMessage>>,
}

View File

@ -23,8 +23,6 @@ impl RpcServer for RpcServerImpl {
.ctx
.log_store
.get_store()
.read()
.await
.get_sync_progress()?
.unwrap_or_default();
@ -156,14 +154,7 @@ impl RpcServer for RpcServerImpl {
async fn get_shard_config(&self) -> RpcResult<ShardConfig> {
debug!("zgs_getShardConfig");
let shard_config = self
.ctx
.log_store
.get_store()
.read()
.await
.flow()
.get_shard_config();
let shard_config = self.ctx.log_store.get_store().flow().get_shard_config();
Ok(shard_config)
}
}

View File

@ -15,7 +15,7 @@ use storage::log_store::log_manager::LogConfig;
use storage::log_store::Store;
use storage::{LogManager, StorageConfig};
use sync::{SyncSender, SyncService};
use tokio::sync::{broadcast, mpsc, RwLock};
use tokio::sync::{broadcast, mpsc};
macro_rules! require {
($component:expr, $self:ident, $e:ident) => {
@ -68,8 +68,8 @@ struct ChunkPoolComponents {
#[derive(Default)]
pub struct ClientBuilder {
runtime_context: Option<RuntimeContext>,
store: Option<Arc<RwLock<dyn Store>>>,
async_store: Option<storage_async::Store>,
store: Option<Arc<dyn Store>>,
async_store: Option<Arc<storage_async::Store>>,
file_location_cache: Option<Arc<FileLocationCache>>,
network: Option<NetworkComponents>,
sync: Option<SyncComponents>,
@ -89,15 +89,18 @@ impl ClientBuilder {
/// Initializes in-memory storage.
pub fn with_memory_store(mut self) -> Result<Self, String> {
// TODO(zz): Set config.
let store = Arc::new(RwLock::new(
let store = Arc::new(
LogManager::memorydb(LogConfig::default())
.map_err(|e| format!("Unable to start in-memory store: {:?}", e))?,
));
);
self.store = Some(store.clone());
if let Some(ctx) = self.runtime_context.as_ref() {
self.async_store = Some(storage_async::Store::new(store, ctx.executor.clone()));
self.async_store = Some(Arc::new(storage_async::Store::new(
store,
ctx.executor.clone(),
)));
}
Ok(self)
@ -105,15 +108,18 @@ impl ClientBuilder {
/// Initializes RocksDB storage.
pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
let store = Arc::new(RwLock::new(
let store = Arc::new(
LogManager::rocksdb(LogConfig::default(), &config.db_dir)
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
));
);
self.store = Some(store.clone());
if let Some(ctx) = self.runtime_context.as_ref() {
self.async_store = Some(storage_async::Store::new(store, ctx.executor.clone()));
self.async_store = Some(Arc::new(storage_async::Store::new(
store,
ctx.executor.clone(),
)));
}
Ok(self)
@ -177,7 +183,7 @@ impl ClientBuilder {
if let Some(config) = config {
let executor = require!("miner", self, runtime_context).clone().executor;
let network_send = require!("miner", self, network).send.clone();
let store = self.store.as_ref().unwrap().clone();
let store = self.async_store.as_ref().unwrap().clone();
let send = MineService::spawn(executor, network_send, config, store).await?;
self.miner = Some(MinerComponents { send });
@ -189,7 +195,7 @@ impl ClientBuilder {
pub async fn with_pruner(mut self, config: Option<PrunerConfig>) -> Result<Self, String> {
if let Some(config) = config {
let miner_send = self.miner.as_ref().map(|miner| miner.send.clone());
let store = require!("pruner", self, store).clone();
let store = require!("pruner", self, async_store).clone();
let executor = require!("pruner", self, runtime_context).clone().executor;
let recv = Pruner::spawn(executor, config, store, miner_send)
.await

View File

@ -119,8 +119,7 @@ impl Environment {
let inner_shutdown =
async move { rx.next().await.ok_or("Internal shutdown channel exhausted") };
futures::pin_mut!(inner_shutdown);
match self.runtime().block_on(async {
let shutdown = async {
let mut handles = vec![];
// setup for handling SIGTERM
@ -151,7 +150,9 @@ impl Environment {
}
future::select(inner_shutdown, future::select_all(handles.into_iter())).await
}) {
};
match self.runtime().block_on(shutdown) {
future::Either::Left((Ok(reason), _)) => {
info!(reason = reason.message(), "Internal shutdown received");
Ok(reason)

View File

@ -10,3 +10,4 @@ storage = { path = "../storage" }
task_executor = { path = "../../common/task_executor" }
tokio = { version = "1.19.2", features = ["sync"] }
tracing = "0.1.35"
eth2_ssz = "0.4.0"

View File

@ -2,13 +2,18 @@
extern crate tracing;
use anyhow::bail;
use shared_types::{Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, Transaction};
use shared_types::{
Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, FlowRangeProof, Transaction,
};
use ssz::{Decode, Encode};
use std::sync::Arc;
use storage::{error, error::Result, log_store::Store as LogStore, H256};
use task_executor::TaskExecutor;
use tokio::sync::{oneshot, RwLock};
use tokio::sync::oneshot;
pub use storage::config::ShardConfig;
use storage::log_store::config::ConfigurableExt;
use storage::log_store::{MineLoadChunk, SealAnswer, SealTask};
/// The name of the worker tokio tasks.
const WORKER_TASK_NAME: &str = "async_storage_worker";
@ -28,14 +33,14 @@ macro_rules! delegate {
#[derive(Clone)]
pub struct Store {
/// Log and transaction storage.
store: Arc<RwLock<dyn LogStore>>,
store: Arc<dyn LogStore>,
/// Tokio executor for spawning worker tasks.
executor: TaskExecutor,
}
impl Store {
pub fn new(store: Arc<RwLock<dyn LogStore>>, executor: TaskExecutor) -> Self {
pub fn new(store: Arc<dyn LogStore>, executor: TaskExecutor) -> Self {
Store { store, executor }
}
@ -49,6 +54,8 @@ impl Store {
delegate!(fn get_chunk_by_flow_index(index: u64, length: u64) -> Result<Option<ChunkArray>>);
delegate!(fn finalize_tx(tx_seq: u64) -> Result<()>);
delegate!(fn finalize_tx_with_hash(tx_seq: u64, tx_hash: H256) -> Result<bool>);
delegate!(fn get_proof_at_root(root: DataRoot, index: u64, length: u64) -> Result<FlowRangeProof>);
delegate!(fn get_context() -> Result<(DataRoot, u64)>);
pub async fn get_tx_seq_by_data_root(&self, data_root: &DataRoot) -> Result<Option<u64>> {
let root = *data_root;
@ -62,18 +69,76 @@ impl Store {
.await
}
pub async fn get_config_decoded<K: AsRef<[u8]> + Send + Sync, T: Decode + Send + 'static>(
&self,
key: &K,
) -> Result<Option<T>> {
let key = key.as_ref().to_vec();
self.spawn(move |store| store.get_config_decoded(&key))
.await
}
pub async fn set_config_encoded<K: AsRef<[u8]> + Send + Sync, T: Encode + Send + Sync>(
&self,
key: &K,
value: &T,
) -> anyhow::Result<()> {
let key = key.as_ref().to_vec();
let value = value.as_ssz_bytes();
self.spawn(move |store| store.set_config(&key, &value))
.await
}
pub async fn pull_seal_chunk(
&self,
seal_index_max: usize,
) -> anyhow::Result<Option<Vec<SealTask>>> {
self.spawn(move |store| store.flow().pull_seal_chunk(seal_index_max))
.await
}
pub async fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> anyhow::Result<()> {
self.spawn(move |store| store.flow().submit_seal_result(answers))
.await
}
pub async fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>> {
self.spawn(move |store| store.flow().load_sealed_data(chunk_index))
.await
}
pub async fn get_num_entries(&self) -> Result<u64> {
self.spawn(move |store| store.flow().get_num_entries())
.await
}
pub async fn remove_chunks_batch(&self, batch_list: &[u64]) -> Result<()> {
let batch_list = batch_list.to_vec();
self.spawn(move |store| store.remove_chunks_batch(&batch_list))
.await
}
pub async fn update_shard_config(&self, shard_config: ShardConfig) {
self.spawn(move |store| {
store.flow().update_shard_config(shard_config);
Ok(())
})
.await
.expect("always ok")
}
async fn spawn<T, F>(&self, f: F) -> Result<T>
where
F: FnOnce(&mut dyn LogStore) -> Result<T> + Send + 'static,
F: FnOnce(&dyn LogStore) -> Result<T> + Send + 'static,
T: Send + 'static,
{
let store = self.store.clone();
let (tx, rx) = oneshot::channel();
self.executor.spawn(
async move {
self.executor.spawn_blocking(
move || {
// FIXME(zz): Not all functions need `write`. Refactor store usage.
let res = f(&mut *store.write().await);
let res = f(&*store);
if tx.send(res).is_err() {
error!("Unable to complete async storage operation: the receiver dropped");
@ -87,7 +152,7 @@ impl Store {
}
// FIXME(zz): Refactor the lock and async call here.
pub fn get_store(&self) -> &RwLock<dyn LogStore> {
pub fn get_store(&self) -> &dyn LogStore {
self.store.as_ref()
}
}

View File

@ -27,6 +27,7 @@ static_assertions = "1.1"
tiny-keccak = "*"
itertools = "0.13.0"
serde = { version = "1.0.197", features = ["derive"] }
parking_lot = "0.12.3"
[dev-dependencies]
tempdir = "0.3.7"

View File

@ -37,8 +37,8 @@ fn write_performance(c: &mut Criterion) {
for _ in 0..5000 {
let mut data = vec![0; data_size];
for i in 0..data_size {
data[i] = random();
for item in data.iter_mut().take(data_size) {
*item = random();
}
let merkel_nodes = tx_subtree_root_list_padded(&data[..]);
@ -119,8 +119,8 @@ fn read_performance(c: &mut Criterion) {
for seq in 0..tx_size {
let mut data = vec![0; data_size];
for i in 0..data_size {
data[i] = random();
for item in data.iter_mut().take(data_size) {
*item = random();
}
let merkel_nodes = tx_subtree_root_list_padded(&data[..]);

View File

@ -10,6 +10,7 @@ use crate::{try_option, ZgsKeyValueDB};
use anyhow::{anyhow, bail, Result};
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead};
use itertools::Itertools;
use parking_lot::RwLock;
use shared_types::{ChunkArray, DataRoot, FlowProof};
use ssz::{Decode, Encode};
use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
@ -24,10 +25,10 @@ use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_S
pub struct FlowStore {
db: FlowDBStore,
// TODO(kevin): This is an in-memory cache for recording which chunks are ready for sealing. It should be persisted on disk.
to_seal_set: BTreeMap<usize, usize>,
to_seal_set: RwLock<BTreeMap<usize, usize>>,
// Data sealing is an asynchronized process.
// The sealing service uses the version number to distinguish if revert happens during sealing.
to_seal_version: usize,
to_seal_version: RwLock<usize>,
config: FlowConfig,
}
@ -36,7 +37,7 @@ impl FlowStore {
Self {
db: FlowDBStore::new(db),
to_seal_set: Default::default(),
to_seal_version: 0,
to_seal_version: RwLock::new(0),
config,
}
}
@ -86,7 +87,7 @@ impl FlowStore {
#[derive(Clone, Debug)]
pub struct FlowConfig {
pub batch_size: usize,
pub shard_config: ShardConfig,
pub shard_config: Arc<RwLock<ShardConfig>>,
}
impl Default for FlowConfig {
@ -207,14 +208,15 @@ impl FlowRead for FlowStore {
}
fn get_shard_config(&self) -> ShardConfig {
self.config.shard_config
*self.config.shard_config.read()
}
}
impl FlowWrite for FlowStore {
/// Return the roots of completed chunks. The order is guaranteed to be increasing
/// by chunk index.
fn append_entries(&mut self, data: ChunkArray) -> Result<Vec<(u64, DataRoot)>> {
fn append_entries(&self, data: ChunkArray) -> Result<Vec<(u64, DataRoot)>> {
let mut to_seal_set = self.to_seal_set.write();
trace!("append_entries: {} {}", data.start_index, data.data.len());
if data.data.len() % BYTES_PER_SECTOR != 0 {
bail!("append_entries: invalid data size, len={}", data.data.len());
@ -231,7 +233,7 @@ impl FlowWrite for FlowStore {
.expect("in range");
let chunk_index = chunk.start_index / self.config.batch_size as u64;
if !self.config.shard_config.in_range(chunk_index) {
if !self.config.shard_config.read().in_range(chunk_index) {
// The data are in a shard range that we are not storing.
continue;
}
@ -246,9 +248,9 @@ impl FlowWrite for FlowStore {
chunk.data,
)?;
completed_seals.into_iter().for_each(|x| {
self.to_seal_set.insert(
to_seal_set.insert(
chunk_index as usize * SEALS_PER_LOAD + x as usize,
self.to_seal_version,
*self.to_seal_version.read(),
);
});
@ -257,27 +259,29 @@ impl FlowWrite for FlowStore {
self.db.put_entry_batch_list(batch_list)
}
fn truncate(&mut self, start_index: u64) -> crate::error::Result<()> {
fn truncate(&self, start_index: u64) -> crate::error::Result<()> {
let mut to_seal_set = self.to_seal_set.write();
let mut to_seal_version = self.to_seal_version.write();
let to_reseal = self.db.truncate(start_index, self.config.batch_size)?;
self.to_seal_set
.split_off(&(start_index as usize / SECTORS_PER_SEAL));
self.to_seal_version += 1;
to_seal_set.split_off(&(start_index as usize / SECTORS_PER_SEAL));
*to_seal_version += 1;
to_reseal.into_iter().for_each(|x| {
self.to_seal_set.insert(x, self.to_seal_version);
to_seal_set.insert(x, *to_seal_version);
});
Ok(())
}
fn update_shard_config(&mut self, shard_config: ShardConfig) {
self.config.shard_config = shard_config;
fn update_shard_config(&self, shard_config: ShardConfig) {
*self.config.shard_config.write() = shard_config;
}
}
impl FlowSeal for FlowStore {
fn pull_seal_chunk(&self, seal_index_max: usize) -> Result<Option<Vec<SealTask>>> {
let mut to_seal_iter = self.to_seal_set.iter();
let to_seal_set = self.to_seal_set.read();
let mut to_seal_iter = to_seal_set.iter();
let (&first_index, &first_version) = try_option!(to_seal_iter.next());
if first_index >= seal_index_max {
return Ok(None);
@ -309,9 +313,10 @@ impl FlowSeal for FlowStore {
Ok(Some(tasks))
}
fn submit_seal_result(&mut self, answers: Vec<SealAnswer>) -> Result<()> {
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()> {
let mut to_seal_set = self.to_seal_set.write();
let is_consistent = |answer: &SealAnswer| {
self.to_seal_set
to_seal_set
.get(&(answer.seal_index as usize))
.map_or(false, |cur_ver| cur_ver == &answer.version)
};
@ -337,7 +342,7 @@ impl FlowSeal for FlowStore {
debug!("Seal chunks: indices = {:?}", removed_seal_index);
for idx in removed_seal_index.into_iter() {
self.to_seal_set.remove(&idx);
to_seal_set.remove(&idx);
}
self.db.put_entry_raw(updated_chunk)?;

View File

@ -10,6 +10,7 @@ use ethereum_types::H256;
use kvdb_rocksdb::{Database, DatabaseConfig};
use merkle_light::merkle::{log2_pow2, MerkleTree};
use merkle_tree::RawLeafSha3Algorithm;
use parking_lot::RwLock;
use rayon::iter::ParallelIterator;
use rayon::prelude::ParallelSlice;
use shared_types::{
@ -48,6 +49,10 @@ pub struct LogManager {
pub(crate) db: Arc<dyn ZgsKeyValueDB>,
tx_store: TransactionStore,
flow_store: FlowStore,
merkle: RwLock<MerkleManager>,
}
struct MerkleManager {
// TODO(zz): Refactor the in-memory merkle and in-disk storage together.
pora_chunks_merkle: Merkle,
/// The in-memory structure of the sub merkle tree of the last chunk.
@ -55,6 +60,80 @@ pub struct LogManager {
last_chunk_merkle: Merkle,
}
impl MerkleManager {
fn last_chunk_start_index(&self) -> u64 {
if self.pora_chunks_merkle.leaves() == 0 {
0
} else {
PORA_CHUNK_SIZE as u64
* if self.last_chunk_merkle.leaves() == 0 {
// The last chunk is empty and its root hash is not in `pora_chunk_merkle`,
// so all chunks in `pora_chunk_merkle` is complete.
self.pora_chunks_merkle.leaves()
} else {
// The last chunk has data, so we need to exclude it from `pora_chunks_merkle`.
self.pora_chunks_merkle.leaves() - 1
} as u64
}
}
#[instrument(skip(self))]
fn commit_merkle(&mut self, tx_seq: u64) -> Result<()> {
self.pora_chunks_merkle.commit(Some(tx_seq));
self.last_chunk_merkle.commit(Some(tx_seq));
Ok(())
}
fn revert_merkle_tree(&mut self, tx_seq: u64, tx_store: &TransactionStore) -> Result<()> {
// Special case for reverting tx_seq == 0
if tx_seq == u64::MAX {
self.pora_chunks_merkle.reset();
self.last_chunk_merkle.reset();
return Ok(());
}
let old_leaves = self.pora_chunks_merkle.leaves();
self.pora_chunks_merkle.revert_to(tx_seq)?;
if old_leaves == self.pora_chunks_merkle.leaves() {
self.last_chunk_merkle.revert_to(tx_seq)?;
} else {
// We are reverting to a position before the current last_chunk.
self.last_chunk_merkle =
tx_store.rebuild_last_chunk_merkle(self.pora_chunks_merkle.leaves() - 1, tx_seq)?;
}
Ok(())
}
fn try_initialize(&mut self, flow_store: &FlowStore) -> Result<()> {
if self.pora_chunks_merkle.leaves() == 0 && self.last_chunk_merkle.leaves() == 0 {
self.last_chunk_merkle.append(H256::zero());
self.pora_chunks_merkle
.update_last(*self.last_chunk_merkle.root());
} else if self.last_chunk_merkle.leaves() != 0 {
let last_chunk_start_index = self.last_chunk_start_index();
let last_chunk_data = flow_store.get_available_entries(
last_chunk_start_index,
last_chunk_start_index + PORA_CHUNK_SIZE as u64,
)?;
for e in last_chunk_data {
let start_index = e.start_index - last_chunk_start_index;
for i in 0..e.data.len() / ENTRY_SIZE {
let index = i + start_index as usize;
if index >= self.last_chunk_merkle.leaves() {
// We revert the merkle tree before truncate the flow store,
// so last_chunk_data may include data that should have been truncated.
break;
}
self.last_chunk_merkle.fill_leaf(
index,
Sha3Algorithm::leaf(&e.data[i * ENTRY_SIZE..(i + 1) * ENTRY_SIZE]),
);
}
}
}
Ok(())
}
}
#[derive(Clone, Default)]
pub struct LogConfig {
pub flow: FlowConfig,
@ -71,7 +150,8 @@ impl LogStoreInner for LogManager {
}
impl LogStoreChunkWrite for LogManager {
fn put_chunks(&mut self, tx_seq: u64, chunks: ChunkArray) -> Result<()> {
fn put_chunks(&self, tx_seq: u64, chunks: ChunkArray) -> Result<()> {
let mut merkle = self.merkle.write();
let tx = self
.tx_store
.get_tx_by_seq_number(tx_seq)?
@ -90,17 +170,18 @@ impl LogStoreChunkWrite for LogManager {
// TODO: Use another struct to avoid confusion.
let mut flow_entry_array = chunks;
flow_entry_array.start_index += tx.start_entry_index;
self.append_entries(flow_entry_array)?;
self.append_entries(flow_entry_array, &mut merkle)?;
Ok(())
}
fn put_chunks_with_tx_hash(
&mut self,
&self,
tx_seq: u64,
tx_hash: H256,
chunks: ChunkArray,
maybe_file_proof: Option<FlowProof>,
) -> Result<bool> {
let mut merkle = self.merkle.write();
let tx = self
.tx_store
.get_tx_by_seq_number(tx_seq)?
@ -122,10 +203,10 @@ impl LogStoreChunkWrite for LogManager {
// TODO: Use another struct to avoid confusion.
let mut flow_entry_array = chunks;
flow_entry_array.start_index += tx.start_entry_index;
self.append_entries(flow_entry_array)?;
self.append_entries(flow_entry_array, &mut merkle)?;
if let Some(file_proof) = maybe_file_proof {
let updated_node_list = self.pora_chunks_merkle.fill_with_file_proof(
let updated_node_list = merkle.pora_chunks_merkle.fill_with_file_proof(
file_proof,
tx.merkle_nodes,
tx.start_entry_index,
@ -161,7 +242,8 @@ impl LogStoreWrite for LogManager {
/// Only the last tx may have this case, so we rerun
/// `put_tx` for the last tx when we restart the node to ensure that it succeeds.
///
fn put_tx(&mut self, tx: Transaction) -> Result<()> {
fn put_tx(&self, tx: Transaction) -> Result<()> {
let mut merkle = self.merkle.write();
debug!("put_tx: tx={:?}", tx);
let expected_seq = self.next_tx_seq();
if tx.seq != expected_seq {
@ -176,12 +258,14 @@ impl LogStoreWrite for LogManager {
}
let maybe_same_data_tx_seq = self.tx_store.put_tx(tx.clone())?.first().cloned();
// TODO(zz): Should we validate received tx?
self.append_subtree_list(tx.merkle_nodes.clone())?;
self.commit_merkle(tx.seq)?;
self.append_subtree_list(tx.merkle_nodes.clone(), &mut merkle)?;
merkle.commit_merkle(tx.seq)?;
debug!(
"commit flow root: root={:?}",
self.pora_chunks_merkle.root()
merkle.pora_chunks_merkle.root()
);
// Drop the lock because `copy_tx_data` will lock again.
drop(merkle);
if let Some(old_tx_seq) = maybe_same_data_tx_seq {
if self.check_tx_completed(old_tx_seq)? {
@ -192,7 +276,7 @@ impl LogStoreWrite for LogManager {
Ok(())
}
fn finalize_tx(&mut self, tx_seq: u64) -> Result<()> {
fn finalize_tx(&self, tx_seq: u64) -> Result<()> {
let tx = self
.tx_store
.get_tx_by_seq_number(tx_seq)?
@ -218,7 +302,7 @@ impl LogStoreWrite for LogManager {
}
}
fn finalize_tx_with_hash(&mut self, tx_seq: u64, tx_hash: H256) -> crate::error::Result<bool> {
fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> crate::error::Result<bool> {
trace!(
"finalize_tx_with_hash: tx_seq={} tx_hash={:?}",
tx_seq,
@ -259,24 +343,34 @@ impl LogStoreWrite for LogManager {
/// Return the reverted Transactions in order.
/// `tx_seq == u64::MAX` is a special case for reverting all transactions.
fn revert_to(&mut self, tx_seq: u64) -> Result<Vec<Transaction>> {
fn revert_to(&self, tx_seq: u64) -> Result<Vec<Transaction>> {
// FIXME(zz): If this revert is triggered by chain reorg after restarts, this will fail.
self.revert_merkle_tree(tx_seq)?;
let start_index = self.last_chunk_start_index() * PORA_CHUNK_SIZE as u64
+ self.last_chunk_merkle.leaves() as u64;
let mut merkle = self.merkle.write();
merkle.revert_merkle_tree(tx_seq, &self.tx_store)?;
merkle.try_initialize(&self.flow_store)?;
assert_eq!(
Some(*merkle.last_chunk_merkle.root()),
merkle
.pora_chunks_merkle
.leaf_at(merkle.pora_chunks_merkle.leaves() - 1)?
);
let start_index = merkle.last_chunk_start_index() * PORA_CHUNK_SIZE as u64
+ merkle.last_chunk_merkle.leaves() as u64;
self.flow_store.truncate(start_index)?;
let start = if tx_seq != u64::MAX { tx_seq + 1 } else { 0 };
self.tx_store.remove_tx_after(start)
}
fn validate_and_insert_range_proof(
&mut self,
&self,
tx_seq: u64,
data: &ChunkArrayWithProof,
) -> Result<bool> {
let valid = self.validate_range_proof(tx_seq, data)?;
// `merkle` is used in `validate_range_proof`.
let mut merkle = self.merkle.write();
if valid {
let updated_nodes = self
let updated_nodes = merkle
.pora_chunks_merkle
.fill_with_range_proof(data.proof.clone())?;
self.flow_store.put_mpt_node_list(updated_nodes)?;
@ -421,7 +515,11 @@ impl LogStoreRead for LogManager {
&leaves,
(data.chunks.start_index + tx.start_entry_index) as usize,
)?;
Ok(self.pora_chunks_merkle.check_root(&data.proof.root()))
Ok(self
.merkle
.read_recursive()
.pora_chunks_merkle
.check_root(&data.proof.root()))
}
fn get_sync_progress(&self) -> Result<Option<(u64, H256)>> {
@ -442,12 +540,12 @@ impl LogStoreRead for LogManager {
fn get_proof_at_root(
&self,
root: &DataRoot,
root: DataRoot,
index: u64,
length: u64,
) -> crate::error::Result<FlowRangeProof> {
let left_proof = self.gen_proof(index, Some(*root))?;
let right_proof = self.gen_proof(index + length - 1, Some(*root))?;
let left_proof = self.gen_proof(index, Some(root))?;
let right_proof = self.gen_proof(index + length - 1, Some(root))?;
Ok(FlowRangeProof {
left_proof,
right_proof,
@ -455,9 +553,10 @@ impl LogStoreRead for LogManager {
}
fn get_context(&self) -> crate::error::Result<(DataRoot, u64)> {
let merkle = self.merkle.read_recursive();
Ok((
*self.pora_chunks_merkle.root(),
self.last_chunk_start_index() + self.last_chunk_merkle.leaves() as u64,
*merkle.pora_chunks_merkle.root(),
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64,
))
}
}
@ -578,19 +677,23 @@ impl LogManager {
// update the merkle root
pora_chunks_merkle.commit(start_tx_seq);
}
let mut log_manager = Self {
let merkle = RwLock::new(MerkleManager {
pora_chunks_merkle,
last_chunk_merkle,
});
let log_manager = Self {
db,
tx_store,
flow_store,
pora_chunks_merkle,
last_chunk_merkle,
merkle,
};
if let Some(tx) = last_tx_to_insert {
log_manager.put_tx(tx)?;
let mut merkle = log_manager.merkle.write();
for (index, h) in extra_leaves {
if index < log_manager.pora_chunks_merkle.leaves() {
log_manager.pora_chunks_merkle.fill_leaf(index, h);
if index < merkle.pora_chunks_merkle.leaves() {
merkle.pora_chunks_merkle.fill_leaf(index, h);
} else {
error!("out of range extra leaf: index={} hash={:?}", index, h);
}
@ -598,45 +701,19 @@ impl LogManager {
} else {
assert!(extra_leaves.is_empty());
}
log_manager.try_initialize()?;
log_manager
.merkle
.write()
.try_initialize(&log_manager.flow_store)?;
Ok(log_manager)
}
fn try_initialize(&mut self) -> Result<()> {
if self.pora_chunks_merkle.leaves() == 0 && self.last_chunk_merkle.leaves() == 0 {
self.last_chunk_merkle.append(H256::zero());
self.pora_chunks_merkle
.update_last(*self.last_chunk_merkle.root());
} else if self.last_chunk_merkle.leaves() != 0 {
let last_chunk_start_index = self.last_chunk_start_index();
let last_chunk_data = self.flow_store.get_available_entries(
last_chunk_start_index,
last_chunk_start_index + PORA_CHUNK_SIZE as u64,
)?;
for e in last_chunk_data {
let start_index = e.start_index - last_chunk_start_index;
for i in 0..e.data.len() / ENTRY_SIZE {
let index = i + start_index as usize;
if index >= self.last_chunk_merkle.leaves() {
// We revert the merkle tree before truncate the flow store,
// so last_chunk_data may include data that should have been truncated.
break;
}
self.last_chunk_merkle.fill_leaf(
index,
Sha3Algorithm::leaf(&e.data[i * ENTRY_SIZE..(i + 1) * ENTRY_SIZE]),
);
}
}
}
Ok(())
}
fn gen_proof(&self, flow_index: u64, maybe_root: Option<DataRoot>) -> Result<FlowProof> {
let merkle = self.merkle.read_recursive();
let chunk_index = flow_index / PORA_CHUNK_SIZE as u64;
let top_proof = match maybe_root {
None => self.pora_chunks_merkle.gen_proof(chunk_index as usize)?,
Some(root) => self
None => merkle.pora_chunks_merkle.gen_proof(chunk_index as usize)?,
Some(root) => merkle
.pora_chunks_merkle
.at_root_version(&root)?
.gen_proof(chunk_index as usize)?,
@ -649,17 +726,17 @@ impl LogManager {
// and `flow_index` must be within a complete PoRA chunk. For possible future usages,
// we'll need to find the flow length at the given root and load a partial chunk
// if `flow_index` is in the last chunk.
let sub_proof = if chunk_index as usize != self.pora_chunks_merkle.leaves() - 1
|| self.last_chunk_merkle.leaves() == 0
let sub_proof = if chunk_index as usize != merkle.pora_chunks_merkle.leaves() - 1
|| merkle.last_chunk_merkle.leaves() == 0
{
self.flow_store
.gen_proof_in_batch(chunk_index as usize, flow_index as usize % PORA_CHUNK_SIZE)?
} else {
match maybe_root {
None => self
None => merkle
.last_chunk_merkle
.gen_proof(flow_index as usize % PORA_CHUNK_SIZE)?,
Some(root) => self
Some(root) => merkle
.last_chunk_merkle
.at_root_version(&root)?
.gen_proof(flow_index as usize % PORA_CHUNK_SIZE)?,
@ -668,45 +745,56 @@ impl LogManager {
entry_proof(&top_proof, &sub_proof)
}
#[instrument(skip(self))]
fn append_subtree_list(&mut self, merkle_list: Vec<(usize, DataRoot)>) -> Result<()> {
#[instrument(skip(self, merkle))]
fn append_subtree_list(
&self,
merkle_list: Vec<(usize, DataRoot)>,
merkle: &mut MerkleManager,
) -> Result<()> {
if merkle_list.is_empty() {
return Ok(());
}
self.pad_tx(1 << (merkle_list[0].0 - 1))?;
self.pad_tx(1 << (merkle_list[0].0 - 1), &mut *merkle)?;
let mut batch_root_map = BTreeMap::new();
for (subtree_depth, subtree_root) in merkle_list {
let subtree_size = 1 << (subtree_depth - 1);
if self.last_chunk_merkle.leaves() + subtree_size <= PORA_CHUNK_SIZE {
self.last_chunk_merkle
if merkle.last_chunk_merkle.leaves() + subtree_size <= PORA_CHUNK_SIZE {
merkle
.last_chunk_merkle
.append_subtree(subtree_depth, subtree_root)?;
if self.last_chunk_merkle.leaves() == subtree_size {
if merkle.last_chunk_merkle.leaves() == subtree_size {
// `last_chunk_merkle` was empty, so this is a new leaf in the top_tree.
self.pora_chunks_merkle
.append_subtree(1, *self.last_chunk_merkle.root())?;
merkle
.pora_chunks_merkle
.append_subtree(1, *merkle.last_chunk_merkle.root())?;
} else {
self.pora_chunks_merkle
.update_last(*self.last_chunk_merkle.root());
merkle
.pora_chunks_merkle
.update_last(*merkle.last_chunk_merkle.root());
}
if self.last_chunk_merkle.leaves() == PORA_CHUNK_SIZE {
if merkle.last_chunk_merkle.leaves() == PORA_CHUNK_SIZE {
batch_root_map.insert(
self.pora_chunks_merkle.leaves() - 1,
(*self.last_chunk_merkle.root(), 1),
merkle.pora_chunks_merkle.leaves() - 1,
(*merkle.last_chunk_merkle.root(), 1),
);
self.complete_last_chunk_merkle(self.pora_chunks_merkle.leaves() - 1)?;
self.complete_last_chunk_merkle(
merkle.pora_chunks_merkle.leaves() - 1,
&mut *merkle,
)?;
}
} else {
// `last_chunk_merkle` has been padded here, so a subtree should not be across
// the chunks boundary.
assert_eq!(self.last_chunk_merkle.leaves(), 0);
assert_eq!(merkle.last_chunk_merkle.leaves(), 0);
assert!(subtree_size >= PORA_CHUNK_SIZE);
batch_root_map.insert(
self.pora_chunks_merkle.leaves(),
merkle.pora_chunks_merkle.leaves(),
(subtree_root, subtree_depth - log2_pow2(PORA_CHUNK_SIZE)),
);
self.pora_chunks_merkle
merkle
.pora_chunks_merkle
.append_subtree(subtree_depth - log2_pow2(PORA_CHUNK_SIZE), subtree_root)?;
}
}
@ -714,46 +802,50 @@ impl LogManager {
Ok(())
}
#[instrument(skip(self))]
fn pad_tx(&mut self, first_subtree_size: u64) -> Result<()> {
#[instrument(skip(self, merkle))]
fn pad_tx(&self, first_subtree_size: u64, merkle: &mut MerkleManager) -> Result<()> {
// Check if we need to pad the flow.
let mut tx_start_flow_index =
self.last_chunk_start_index() + self.last_chunk_merkle.leaves() as u64;
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64;
let extra = tx_start_flow_index % first_subtree_size;
trace!(
"before pad_tx {} {}",
self.pora_chunks_merkle.leaves(),
self.last_chunk_merkle.leaves()
merkle.pora_chunks_merkle.leaves(),
merkle.last_chunk_merkle.leaves()
);
if extra != 0 {
for pad_data in Self::padding((first_subtree_size - extra) as usize) {
let mut root_map = BTreeMap::new();
// Update the in-memory merkle tree.
let last_chunk_pad = if self.last_chunk_merkle.leaves() == 0 {
let last_chunk_pad = if merkle.last_chunk_merkle.leaves() == 0 {
0
} else {
(PORA_CHUNK_SIZE - self.last_chunk_merkle.leaves()) * ENTRY_SIZE
(PORA_CHUNK_SIZE - merkle.last_chunk_merkle.leaves()) * ENTRY_SIZE
};
let mut completed_chunk_index = None;
if pad_data.len() < last_chunk_pad {
self.last_chunk_merkle
merkle
.last_chunk_merkle
.append_list(data_to_merkle_leaves(&pad_data)?);
self.pora_chunks_merkle
.update_last(*self.last_chunk_merkle.root());
merkle
.pora_chunks_merkle
.update_last(*merkle.last_chunk_merkle.root());
} else {
if last_chunk_pad != 0 {
// Pad the last chunk.
self.last_chunk_merkle
merkle
.last_chunk_merkle
.append_list(data_to_merkle_leaves(&pad_data[..last_chunk_pad])?);
self.pora_chunks_merkle
.update_last(*self.last_chunk_merkle.root());
merkle
.pora_chunks_merkle
.update_last(*merkle.last_chunk_merkle.root());
root_map.insert(
self.pora_chunks_merkle.leaves() - 1,
(*self.last_chunk_merkle.root(), 1),
merkle.pora_chunks_merkle.leaves() - 1,
(*merkle.last_chunk_merkle.root(), 1),
);
completed_chunk_index = Some(self.pora_chunks_merkle.leaves() - 1);
completed_chunk_index = Some(merkle.pora_chunks_merkle.leaves() - 1);
}
// Pad with more complete chunks.
@ -763,8 +855,8 @@ impl LogManager {
..(start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE]
.to_vec();
let root = *Merkle::new(data_to_merkle_leaves(&data)?, 0, None).root();
self.pora_chunks_merkle.append(root);
root_map.insert(self.pora_chunks_merkle.leaves() - 1, (root, 1));
merkle.pora_chunks_merkle.append(root);
root_map.insert(merkle.pora_chunks_merkle.leaves() - 1, (root, 1));
start_index += PORA_CHUNK_SIZE;
}
assert_eq!(pad_data.len(), start_index * ENTRY_SIZE);
@ -782,20 +874,24 @@ impl LogManager {
})?;
tx_start_flow_index += data_size as u64;
if let Some(index) = completed_chunk_index {
self.complete_last_chunk_merkle(index)?;
self.complete_last_chunk_merkle(index, &mut *merkle)?;
}
}
}
trace!(
"after pad_tx {} {}",
self.pora_chunks_merkle.leaves(),
self.last_chunk_merkle.leaves()
merkle.pora_chunks_merkle.leaves(),
merkle.last_chunk_merkle.leaves()
);
Ok(())
}
fn append_entries(&mut self, flow_entry_array: ChunkArray) -> Result<()> {
let last_chunk_start_index = self.last_chunk_start_index();
fn append_entries(
&self,
flow_entry_array: ChunkArray,
merkle: &mut MerkleManager,
) -> Result<()> {
let last_chunk_start_index = merkle.last_chunk_start_index();
if flow_entry_array.start_index + bytes_to_chunks(flow_entry_array.data.len()) as u64
> last_chunk_start_index
{
@ -822,14 +918,16 @@ impl LogManager {
.chunks_exact(ENTRY_SIZE)
.enumerate()
{
self.last_chunk_merkle
merkle
.last_chunk_merkle
.fill_leaf(chunk_start_index + local_index, Sha3Algorithm::leaf(entry));
}
}
let chunk_roots = self.flow_store.append_entries(flow_entry_array)?;
for (chunk_index, chunk_root) in chunk_roots {
if chunk_index < self.pora_chunks_merkle.leaves() as u64 {
self.pora_chunks_merkle
if chunk_index < merkle.pora_chunks_merkle.leaves() as u64 {
merkle
.pora_chunks_merkle
.fill_leaf(chunk_index as usize, chunk_root);
} else {
// TODO(zz): This assumption may be false in the future.
@ -857,62 +955,12 @@ impl LogManager {
vec![0; len * ENTRY_SIZE]
}
fn last_chunk_start_index(&self) -> u64 {
if self.pora_chunks_merkle.leaves() == 0 {
0
} else {
PORA_CHUNK_SIZE as u64
* if self.last_chunk_merkle.leaves() == 0 {
// The last chunk is empty and its root hash is not in `pora_chunk_merkle`,
// so all chunks in `pora_chunk_merkle` is complete.
self.pora_chunks_merkle.leaves()
} else {
// The last chunk has data, so we need to exclude it from `pora_chunks_merkle`.
self.pora_chunks_merkle.leaves() - 1
} as u64
}
}
#[instrument(skip(self))]
fn commit_merkle(&mut self, tx_seq: u64) -> Result<()> {
self.pora_chunks_merkle.commit(Some(tx_seq));
self.last_chunk_merkle.commit(Some(tx_seq));
Ok(())
}
fn revert_merkle_tree(&mut self, tx_seq: u64) -> Result<()> {
// Special case for reverting tx_seq == 0
if tx_seq == u64::MAX {
self.pora_chunks_merkle.reset();
self.last_chunk_merkle.reset();
self.try_initialize()?;
return Ok(());
}
let old_leaves = self.pora_chunks_merkle.leaves();
self.pora_chunks_merkle.revert_to(tx_seq)?;
if old_leaves == self.pora_chunks_merkle.leaves() {
self.last_chunk_merkle.revert_to(tx_seq)?;
} else {
// We are reverting to a position before the current last_chunk.
self.last_chunk_merkle = self
.tx_store
.rebuild_last_chunk_merkle(self.pora_chunks_merkle.leaves() - 1, tx_seq)?;
self.try_initialize()?;
assert_eq!(
Some(*self.last_chunk_merkle.root()),
self.pora_chunks_merkle
.leaf_at(self.pora_chunks_merkle.leaves() - 1)?
);
}
Ok(())
}
#[cfg(test)]
pub fn flow_store(&self) -> &FlowStore {
&self.flow_store
}
fn padding_rear_data(&mut self, tx: &Transaction) -> Result<()> {
fn padding_rear_data(&self, tx: &Transaction) -> Result<()> {
let (chunks, _) = compute_padded_chunk_size(tx.size as usize);
let (segments_for_proof, last_segment_size_for_proof) =
compute_segment_size(chunks, PORA_CHUNK_SIZE);
@ -959,7 +1007,8 @@ impl LogManager {
Ok(())
}
fn copy_tx_data(&mut self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> {
fn copy_tx_data(&self, from_tx_seq: u64, to_tx_seq_list: Vec<u64>) -> Result<()> {
let mut merkle = self.merkle.write();
// We have all the data need for this tx, so just copy them.
let old_tx = self
.get_tx_by_seq_number(from_tx_seq)?
@ -992,7 +1041,7 @@ impl LogManager {
for (_, offset) in &to_tx_offset_list {
let mut data = batch_data.clone();
data.start_index += offset;
self.append_entries(data)?;
self.append_entries(data, &mut merkle)?;
}
}
// num_entries() includes the rear padding data, so no need for more padding.
@ -1007,9 +1056,9 @@ impl LogManager {
/// we can still provide proof for known data in it.
/// Another choice is to insert these subtrees earlier in `put_tx`. To insert them here can
/// batch them and avoid inserting for the subtrees with all data known.
fn complete_last_chunk_merkle(&mut self, index: usize) -> Result<()> {
let subtree_list = self.last_chunk_merkle.get_subtrees();
self.last_chunk_merkle =
fn complete_last_chunk_merkle(&self, index: usize, merkle: &mut MerkleManager) -> Result<()> {
let subtree_list = merkle.last_chunk_merkle.get_subtrees();
merkle.last_chunk_merkle =
Merkle::new_with_depth(vec![], log2_pow2(PORA_CHUNK_SIZE) + 1, None);
// Only insert non-leave subtrees. The leave data should have been available.

View File

@ -62,8 +62,7 @@ pub trait LogStoreRead: LogStoreChunkRead {
fn validate_range_proof(&self, tx_seq: u64, data: &ChunkArrayWithProof) -> Result<bool>;
fn get_proof_at_root(&self, root: &DataRoot, index: u64, length: u64)
-> Result<FlowRangeProof>;
fn get_proof_at_root(&self, root: DataRoot, index: u64, length: u64) -> Result<FlowRangeProof>;
/// Return flow root and length.
fn get_context(&self) -> Result<(DataRoot, u64)>;
@ -103,7 +102,7 @@ pub trait LogStoreChunkRead {
pub trait LogStoreWrite: LogStoreChunkWrite {
/// Store a data entry metadata.
fn put_tx(&mut self, tx: Transaction) -> Result<()>;
fn put_tx(&self, tx: Transaction) -> Result<()>;
/// Finalize a transaction storage.
/// This will compute and the merkle tree, check the data root, and persist a part of the merkle
@ -111,8 +110,8 @@ pub trait LogStoreWrite: LogStoreChunkWrite {
///
/// This will return error if not all chunks are stored. But since this check can be expensive,
/// the caller is supposed to track chunk statuses and call this after storing all the chunks.
fn finalize_tx(&mut self, tx_seq: u64) -> Result<()>;
fn finalize_tx_with_hash(&mut self, tx_seq: u64, tx_hash: H256) -> Result<bool>;
fn finalize_tx(&self, tx_seq: u64) -> Result<()>;
fn finalize_tx_with_hash(&self, tx_seq: u64, tx_hash: H256) -> Result<bool>;
/// Store the progress of synced block number and its hash.
fn put_sync_progress(&self, progress: (u64, H256, Option<Option<u64>>)) -> Result<()>;
@ -121,11 +120,11 @@ pub trait LogStoreWrite: LogStoreChunkWrite {
/// This is needed when transactions are reverted because of chain reorg.
///
/// Reverted transactions are returned in order.
fn revert_to(&mut self, tx_seq: u64) -> Result<Vec<Transaction>>;
fn revert_to(&self, tx_seq: u64) -> Result<Vec<Transaction>>;
/// If the proof is valid, fill the tree nodes with the new data.
fn validate_and_insert_range_proof(
&mut self,
&self,
tx_seq: u64,
data: &ChunkArrayWithProof,
) -> Result<bool>;
@ -135,10 +134,10 @@ pub trait LogStoreWrite: LogStoreChunkWrite {
pub trait LogStoreChunkWrite {
/// Store data chunks of a data entry.
fn put_chunks(&mut self, tx_seq: u64, chunks: ChunkArray) -> Result<()>;
fn put_chunks(&self, tx_seq: u64, chunks: ChunkArray) -> Result<()>;
fn put_chunks_with_tx_hash(
&mut self,
&self,
tx_seq: u64,
tx_hash: H256,
chunks: ChunkArray,
@ -169,14 +168,15 @@ pub trait LogStoreInner {
}
pub struct MineLoadChunk {
pub loaded_chunk: [[u8; BYTES_PER_SEAL]; SEALS_PER_LOAD],
// Use `Vec` instead of array to avoid thread stack overflow.
pub loaded_chunk: Vec<[u8; BYTES_PER_SEAL]>,
pub avalibilities: [bool; SEALS_PER_LOAD],
}
impl Default for MineLoadChunk {
fn default() -> Self {
Self {
loaded_chunk: [[0u8; BYTES_PER_SEAL]; SEALS_PER_LOAD],
loaded_chunk: vec![[0u8; BYTES_PER_SEAL]; SEALS_PER_LOAD],
avalibilities: [false; SEALS_PER_LOAD],
}
}
@ -206,14 +206,14 @@ pub trait FlowWrite {
/// Append data to the flow. `start_index` is included in `ChunkArray`, so
/// it's possible to append arrays in any place.
/// Return the list of completed chunks.
fn append_entries(&mut self, data: ChunkArray) -> Result<Vec<(u64, DataRoot)>>;
fn append_entries(&self, data: ChunkArray) -> Result<Vec<(u64, DataRoot)>>;
/// Remove all the entries after `start_index`.
/// This is used to remove deprecated data in case of chain reorg.
fn truncate(&mut self, start_index: u64) -> Result<()>;
fn truncate(&self, start_index: u64) -> Result<()>;
/// Update the shard config.
fn update_shard_config(&mut self, shard_config: ShardConfig);
fn update_shard_config(&self, shard_config: ShardConfig);
}
pub struct SealTask {
@ -247,7 +247,7 @@ pub trait FlowSeal {
/// Submit sealing result
fn submit_seal_result(&mut self, answers: Vec<SealAnswer>) -> Result<()>;
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()>;
}
pub trait Flow: FlowRead + FlowWrite + FlowSeal {}

View File

@ -12,7 +12,7 @@ use std::cmp;
#[test]
fn test_put_get() {
let config = LogConfig::default();
let mut store = LogManager::memorydb(config.clone()).unwrap();
let store = LogManager::memorydb(config.clone()).unwrap();
let chunk_count = config.flow.batch_size + config.flow.batch_size / 2 - 1;
// Aligned with size.
let start_offset = 1024;

View File

@ -1,6 +1,6 @@
use super::tx_store::TxStore;
use anyhow::Result;
use std::ops::Deref;
use storage::log_store::config::{ConfigTx, ConfigurableExt};
use storage_async::Store;
@ -29,7 +29,7 @@ impl SyncStore {
}
pub async fn get_tx_seq_range(&self) -> Result<(Option<u64>, Option<u64>)> {
let store = self.store.get_store().read().await;
let store = self.store.get_store();
// load next_tx_seq
let next_tx_seq = store.get_config_decoded(&KEY_NEXT_TX_SEQ)?;
@ -43,8 +43,6 @@ impl SyncStore {
pub async fn set_next_tx_seq(&self, tx_seq: u64) -> Result<()> {
self.store
.get_store()
.write()
.await
.set_config_encoded(&KEY_NEXT_TX_SEQ, &tx_seq)
}
@ -52,38 +50,33 @@ impl SyncStore {
debug!(%tx_seq, "set_max_tx_seq");
self.store
.get_store()
.write()
.await
.set_config_encoded(&KEY_MAX_TX_SEQ, &tx_seq)
}
pub async fn add_pending_tx(&self, tx_seq: u64) -> Result<bool> {
let store = self.store.get_store().write().await;
let store = self.store.get_store();
// already in ready queue
if self.ready_txs.has(store.deref(), tx_seq)? {
if self.ready_txs.has(store, tx_seq)? {
return Ok(false);
}
// always add in pending queue
self.pending_txs.add(store.deref(), None, tx_seq)
self.pending_txs.add(store, None, tx_seq)
}
pub async fn upgrade_tx_to_ready(&self, tx_seq: u64) -> Result<bool> {
let store = self.store.get_store().write().await;
let store = self.store.get_store();
let mut tx = ConfigTx::default();
// not in pending queue
if !self
.pending_txs
.remove(store.deref(), Some(&mut tx), tx_seq)?
{
if !self.pending_txs.remove(store, Some(&mut tx), tx_seq)? {
return Ok(false);
}
// move from pending to ready queue
let added = self.ready_txs.add(store.deref(), Some(&mut tx), tx_seq)?;
let added = self.ready_txs.add(store, Some(&mut tx), tx_seq)?;
store.exec_configs(tx)?;
@ -91,20 +84,17 @@ impl SyncStore {
}
pub async fn downgrade_tx_to_pending(&self, tx_seq: u64) -> Result<bool> {
let store = self.store.get_store().write().await;
let store = self.store.get_store();
let mut tx = ConfigTx::default();
// not in ready queue
if !self
.ready_txs
.remove(store.deref(), Some(&mut tx), tx_seq)?
{
if !self.ready_txs.remove(store, Some(&mut tx), tx_seq)? {
return Ok(false);
}
// move from ready to pending queue
let added = self.pending_txs.add(store.deref(), Some(&mut tx), tx_seq)?;
let added = self.pending_txs.add(store, Some(&mut tx), tx_seq)?;
store.exec_configs(tx)?;
@ -112,27 +102,27 @@ impl SyncStore {
}
pub async fn random_tx(&self) -> Result<Option<u64>> {
let store = self.store.get_store().read().await;
let store = self.store.get_store();
// try to find a tx in ready queue with high priority
if let Some(val) = self.ready_txs.random(store.deref())? {
if let Some(val) = self.ready_txs.random(store)? {
return Ok(Some(val));
}
// otherwise, find tx in pending queue
self.pending_txs.random(store.deref())
self.pending_txs.random(store)
}
pub async fn remove_tx(&self, tx_seq: u64) -> Result<bool> {
let store = self.store.get_store().write().await;
let store = self.store.get_store();
// removed in ready queue
if self.ready_txs.remove(store.deref(), None, tx_seq)? {
if self.ready_txs.remove(store, None, tx_seq)? {
return Ok(true);
}
// otherwise, try to remove in pending queue
self.pending_txs.remove(store.deref(), None, tx_seq)
self.pending_txs.remove(store, None, tx_seq)
}
}

View File

@ -420,8 +420,6 @@ impl SerialSyncController {
let validation_result = self
.store
.get_store()
.write()
.await
.validate_and_insert_range_proof(self.tx_seq, &response);
match validation_result {
@ -443,13 +441,7 @@ impl SerialSyncController {
self.failures = 0;
let shard_config = self
.store
.get_store()
.read()
.await
.flow()
.get_shard_config();
let shard_config = self.store.get_store().flow().get_shard_config();
let next_chunk = shard_config.next_segment_index(
(from_chunk / PORA_CHUNK_SIZE as u64) as usize,
(self.tx_start_chunk_in_flow / PORA_CHUNK_SIZE as u64) as usize,
@ -664,7 +656,6 @@ mod tests {
use storage::H256;
use task_executor::{test_utils::TestRuntime, TaskExecutor};
use tokio::sync::mpsc::{self, UnboundedReceiver};
use tokio::sync::RwLock;
#[test]
fn test_status() {
@ -1112,8 +1103,6 @@ mod tests {
);
let chunks = peer_store
.read()
.await
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count)
.unwrap()
.unwrap();
@ -1146,8 +1135,6 @@ mod tests {
);
let mut chunks = peer_store
.read()
.await
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count)
.unwrap()
.unwrap();
@ -1215,8 +1202,6 @@ mod tests {
);
let chunks = peer_store
.read()
.await
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count)
.unwrap()
.unwrap();
@ -1238,7 +1223,6 @@ mod tests {
source,
msg,
} => {
assert_eq!(peer_id, peer_id);
match action {
PeerAction::Fatal => {}
_ => {
@ -1281,8 +1265,6 @@ mod tests {
);
let chunks = peer_store
.read()
.await
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count)
.unwrap()
.unwrap();
@ -1355,8 +1337,6 @@ mod tests {
);
let chunks = peer_store
.read()
.await
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count)
.unwrap()
.unwrap();
@ -1400,8 +1380,6 @@ mod tests {
);
let chunks = peer_store
.read()
.await
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, 1024)
.unwrap()
.unwrap();
@ -1448,8 +1426,6 @@ mod tests {
);
let chunks = peer_store
.read()
.await
.get_chunks_with_proof_by_tx_and_index_range(tx_seq, 0, chunk_count)
.unwrap()
.unwrap();
@ -1572,7 +1548,7 @@ mod tests {
let num_chunks = 123;
let config = LogConfig::default();
let store = Arc::new(RwLock::new(LogManager::memorydb(config).unwrap()));
let store = Arc::new(LogManager::memorydb(config).unwrap());
create_controller(task_executor, peer_id, store, tx_id, num_chunks)
}
@ -1580,7 +1556,7 @@ mod tests {
fn create_controller(
task_executor: TaskExecutor,
peer_id: Option<PeerId>,
store: Arc<RwLock<LogManager>>,
store: Arc<LogManager>,
tx_id: TxID,
num_chunks: usize,
) -> (SerialSyncController, UnboundedReceiver<NetworkMessage>) {

View File

@ -23,7 +23,7 @@ use storage::config::ShardConfig;
use storage::error::Result as StorageResult;
use storage::log_store::Store as LogStore;
use storage_async::Store;
use tokio::sync::{broadcast, mpsc, RwLock};
use tokio::sync::{broadcast, mpsc};
const HEARTBEAT_INTERVAL_SEC: u64 = 5;
@ -129,7 +129,7 @@ impl SyncService {
pub async fn spawn(
executor: task_executor::TaskExecutor,
network_send: mpsc::UnboundedSender<NetworkMessage>,
store: Arc<RwLock<dyn LogStore>>,
store: Arc<dyn LogStore>,
file_location_cache: Arc<FileLocationCache>,
event_recv: broadcast::Receiver<LogSyncEvent>,
) -> Result<SyncSender> {
@ -148,7 +148,7 @@ impl SyncService {
config: Config,
executor: task_executor::TaskExecutor,
network_send: mpsc::UnboundedSender<NetworkMessage>,
store: Arc<RwLock<dyn LogStore>>,
store: Arc<dyn LogStore>,
file_location_cache: Arc<FileLocationCache>,
event_recv: broadcast::Receiver<LogSyncEvent>,
) -> Result<SyncSender> {
@ -740,8 +740,8 @@ mod tests {
runtime: TestRuntime,
chunk_count: usize,
store: Arc<RwLock<LogManager>>,
peer_store: Arc<RwLock<LogManager>>,
store: Arc<LogManager>,
peer_store: Arc<LogManager>,
txs: Vec<Transaction>,
init_data: Vec<u8>,
@ -930,8 +930,6 @@ mod tests {
runtime
.peer_store
.read()
.await
.validate_range_proof(0, &response)
.expect("validate proof");
}
@ -1177,7 +1175,7 @@ mod tests {
let config = LogConfig::default();
let store = Arc::new(RwLock::new(LogManager::memorydb(config.clone()).unwrap()));
let store = Arc::new(LogManager::memorydb(config.clone()).unwrap());
let init_peer_id = identity::Keypair::generate_ed25519().public().to_peer_id();
let file_location_cache: Arc<FileLocationCache> =
@ -1203,10 +1201,7 @@ mod tests {
.unwrap();
thread::sleep(Duration::from_millis(1000));
assert_eq!(
store.read().await.get_tx_by_seq_number(tx_seq).unwrap(),
None
);
assert_eq!(store.get_tx_by_seq_number(tx_seq).unwrap(), None);
assert!(network_recv.try_recv().is_err());
}
@ -1222,18 +1217,13 @@ mod tests {
.unwrap();
thread::sleep(Duration::from_millis(1000));
assert!(runtime
.peer_store
.read()
.await
.check_tx_completed(tx_seq)
.unwrap());
assert!(runtime.peer_store.check_tx_completed(tx_seq).unwrap());
assert!(runtime.network_recv.try_recv().is_err());
}
async fn wait_for_tx_finalized(store: Arc<RwLock<LogManager>>, tx_seq: u64) {
async fn wait_for_tx_finalized(store: Arc<LogManager>, tx_seq: u64) {
let deadline = Instant::now() + Duration::from_millis(5000);
while !store.read().await.check_tx_completed(tx_seq).unwrap() {
while !store.check_tx_completed(tx_seq).unwrap() {
if Instant::now() >= deadline {
panic!("Failed to wait tx completed");
}
@ -1255,12 +1245,7 @@ mod tests {
receive_dial(&mut runtime, &sync_send).await;
assert!(!runtime
.store
.read()
.await
.check_tx_completed(tx_seq)
.unwrap());
assert!(!runtime.store.check_tx_completed(tx_seq).unwrap());
assert!(!matches!(
sync_send
@ -1330,12 +1315,7 @@ mod tests {
receive_dial(&mut runtime, &sync_send).await;
assert!(!runtime
.store
.read()
.await
.check_tx_completed(tx_seq)
.unwrap());
assert!(!runtime.store.check_tx_completed(tx_seq).unwrap());
receive_chunk_request(
&mut runtime.network_recv,
@ -1385,13 +1365,8 @@ mod tests {
receive_dial(&mut runtime, &sync_send).await;
assert!(!runtime
.store
.read()
.await
.check_tx_completed(tx_seq)
.unwrap());
assert!(!runtime.store.read().await.check_tx_completed(0).unwrap());
assert!(!runtime.store.check_tx_completed(tx_seq).unwrap());
assert!(!runtime.store.check_tx_completed(0).unwrap());
receive_chunk_request(
&mut runtime.network_recv,
@ -1406,7 +1381,7 @@ mod tests {
wait_for_tx_finalized(runtime.store.clone(), tx_seq).await;
assert!(!runtime.store.read().await.check_tx_completed(0).unwrap());
assert!(!runtime.store.check_tx_completed(0).unwrap());
// first file
let tx_seq = 0u64;
@ -1461,8 +1436,10 @@ mod tests {
#[tokio::test]
async fn test_announce_file() {
let mut runtime = TestSyncRuntime::new(vec![1023], 0);
let mut config = Config::default();
config.sync_file_on_announcement_enabled = true;
let config = Config {
sync_file_on_announcement_enabled: true,
..Default::default()
};
let sync_send = runtime.spawn_sync_service_with_config(false, config).await;
let tx_seq = 0u64;
@ -1477,12 +1454,7 @@ mod tests {
receive_dial(&mut runtime, &sync_send).await;
assert!(!runtime
.store
.read()
.await
.check_tx_completed(tx_seq)
.unwrap());
assert!(!runtime.store.check_tx_completed(tx_seq).unwrap());
receive_chunk_request(
&mut runtime.network_recv,
@ -1520,12 +1492,7 @@ mod tests {
receive_dial(&mut runtime, &sync_send).await;
assert!(!runtime
.store
.read()
.await
.check_tx_completed(tx_seq)
.unwrap());
assert!(!runtime.store.check_tx_completed(tx_seq).unwrap());
receive_chunk_request(
&mut runtime.network_recv,
@ -1606,12 +1573,7 @@ mod tests {
receive_dial(&mut runtime, &sync_send).await;
assert!(!runtime
.store
.read()
.await
.check_tx_completed(tx_seq)
.unwrap());
assert!(!runtime.store.check_tx_completed(tx_seq).unwrap());
assert!(!matches!(
sync_send
@ -1637,7 +1599,7 @@ mod tests {
async fn receive_chunk_request(
network_recv: &mut UnboundedReceiver<NetworkMessage>,
sync_send: &SyncSender,
peer_store: Arc<RwLock<LogManager>>,
peer_store: Arc<LogManager>,
init_peer_id: PeerId,
tx_seq: u64,
index_start: u64,
@ -1671,8 +1633,6 @@ mod tests {
};
let chunks = peer_store
.read()
.await
.get_chunks_with_proof_by_tx_and_index_range(
tx_seq,
req.index_start as usize,

View File

@ -8,7 +8,6 @@ use storage::{
},
LogManager,
};
use tokio::sync::RwLock;
/// Creates stores for local node and peers with initialized transaction of specified chunk count.
/// The first store is for local node, and data not stored. The second store is for peers, and all
@ -17,8 +16,8 @@ use tokio::sync::RwLock;
pub fn create_2_store(
chunk_count: Vec<usize>,
) -> (
Arc<RwLock<LogManager>>,
Arc<RwLock<LogManager>>,
Arc<LogManager>,
Arc<LogManager>,
Vec<Transaction>,
Vec<Vec<u8>>,
) {
@ -37,12 +36,7 @@ pub fn create_2_store(
offset = ret.2;
}
(
Arc::new(RwLock::new(store)),
Arc::new(RwLock::new(peer_store)),
txs,
data,
)
(Arc::new(store), Arc::new(peer_store), txs, data)
}
fn generate_data(
@ -106,7 +100,6 @@ pub mod tests {
};
use storage_async::Store;
use task_executor::test_utils::TestRuntime;
use tokio::sync::RwLock;
pub struct TestStoreRuntime {
pub runtime: TestRuntime,
@ -115,7 +108,7 @@ pub mod tests {
impl Default for TestStoreRuntime {
fn default() -> Self {
let store = Arc::new(RwLock::new(Self::new_store()));
let store = Arc::new(Self::new_store());
Self::new(store)
}
}
@ -125,7 +118,7 @@ pub mod tests {
LogManager::memorydb(LogConfig::default()).unwrap()
}
pub fn new(store: Arc<RwLock<dyn LogStore>>) -> TestStoreRuntime {
pub fn new(store: Arc<dyn LogStore>) -> TestStoreRuntime {
let runtime = TestRuntime::default();
let executor = runtime.task_executor.clone();
Self {

View File

@ -40,8 +40,8 @@ class SubmissionTest(TestFramework):
# Check if all transactions are finalized
for tx_offset in range(same_root_tx_count + 1):
tx_seq = next_tx_seq - 1 - tx_offset
status = self.nodes[0].zgs_get_file_info_by_tx_seq(tx_seq)
assert status["finalized"]
# old txs are finalized after finalizing the new tx, so we may need to wait here.
wait_until(lambda: self.nodes[0].zgs_get_file_info_by_tx_seq(tx_seq)["finalized"])
# Send tx after uploading data
for _ in range(same_root_tx_count):
@ -57,7 +57,7 @@ class SubmissionTest(TestFramework):
client = self.nodes[node_idx]
wait_until(lambda: client.zgs_get_file_info_by_tx_seq(tx_seq) is not None)
assert_equal(client.zgs_get_file_info_by_tx_seq(tx_seq)["finalized"], data_finalized)
wait_until(lambda: client.zgs_get_file_info_by_tx_seq(tx_seq)["finalized"] == data_finalized)
def submit_data(self, chunk_data, node_idx=0):
_, data_root = create_submission(chunk_data)

View File

@ -23,6 +23,7 @@ use std::{
pub const ENFORCED_SIZE_TIME: u64 = 1;
pub struct ReceivedPacket<T> {
#[allow(unused)]
/// The source that sent us the packet.
pub content: T,
/// The time the packet was received.