mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2024-12-27 00:35:18 +00:00
support async padding after sync phase
This commit is contained in:
parent
e946400b82
commit
3b4ed1502b
@ -15,19 +15,13 @@ abigen!(
|
|||||||
);
|
);
|
||||||
|
|
||||||
#[cfg(feature = "dev")]
|
#[cfg(feature = "dev")]
|
||||||
abigen!(
|
abigen!(ZgsFlow, "../../storage-contracts-abis/Flow.json");
|
||||||
ZgsFlow,
|
|
||||||
"../../0g-storage-contracts-dev/artifacts/contracts/dataFlow/Flow.sol/Flow.json"
|
|
||||||
);
|
|
||||||
|
|
||||||
#[cfg(feature = "dev")]
|
#[cfg(feature = "dev")]
|
||||||
abigen!(
|
abigen!(PoraMine, "../../storage-contracts-abis/PoraMine.json");
|
||||||
PoraMine,
|
|
||||||
"../../0g-storage-contracts-dev/artifacts/contracts/miner/Mine.sol/PoraMine.json"
|
|
||||||
);
|
|
||||||
|
|
||||||
#[cfg(feature = "dev")]
|
#[cfg(feature = "dev")]
|
||||||
abigen!(
|
abigen!(
|
||||||
ChunkLinearReward,
|
ChunkLinearReward,
|
||||||
"../../0g-storage-contracts-dev/artifacts/contracts/reward/ChunkLinearReward.sol/ChunkLinearReward.json"
|
"../../storage-contracts-abis/ChunkLinearReward.json"
|
||||||
);
|
);
|
||||||
|
@ -278,6 +278,7 @@ impl LogSyncManager {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// start the pad data store
|
// start the pad data store
|
||||||
|
log_sync_manager.store.start_padding(&executor_clone);
|
||||||
|
|
||||||
let (watch_progress_tx, watch_progress_rx) =
|
let (watch_progress_tx, watch_progress_rx) =
|
||||||
tokio::sync::mpsc::unbounded_channel();
|
tokio::sync::mpsc::unbounded_channel();
|
||||||
@ -511,6 +512,7 @@ impl LogSyncManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.data_cache.garbage_collect(self.next_tx_seq);
|
self.data_cache.garbage_collect(self.next_tx_seq);
|
||||||
|
|
||||||
self.next_tx_seq += 1;
|
self.next_tx_seq += 1;
|
||||||
|
|
||||||
// Check if the computed data root matches on-chain state.
|
// Check if the computed data root matches on-chain state.
|
||||||
|
@ -1054,8 +1054,7 @@ mod tests {
|
|||||||
let (sync_send, sync_recv) = channel::Channel::unbounded("test");
|
let (sync_send, sync_recv) = channel::Channel::unbounded("test");
|
||||||
let (chunk_pool_send, _chunk_pool_recv) = mpsc::unbounded_channel();
|
let (chunk_pool_send, _chunk_pool_recv) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
let executor = runtime.task_executor.clone();
|
let store = LogManager::memorydb(LogConfig::default()).unwrap();
|
||||||
let store = LogManager::memorydb(LogConfig::default(), executor).unwrap();
|
|
||||||
Self {
|
Self {
|
||||||
runtime,
|
runtime,
|
||||||
network_globals: Arc::new(network_globals),
|
network_globals: Arc::new(network_globals),
|
||||||
|
@ -89,10 +89,9 @@ impl ClientBuilder {
|
|||||||
|
|
||||||
/// Initializes in-memory storage.
|
/// Initializes in-memory storage.
|
||||||
pub fn with_memory_store(mut self) -> Result<Self, String> {
|
pub fn with_memory_store(mut self) -> Result<Self, String> {
|
||||||
let executor = require!("sync", self, runtime_context).clone().executor;
|
|
||||||
// TODO(zz): Set config.
|
// TODO(zz): Set config.
|
||||||
let store = Arc::new(
|
let store = Arc::new(
|
||||||
LogManager::memorydb(LogConfig::default(), executor)
|
LogManager::memorydb(LogConfig::default())
|
||||||
.map_err(|e| format!("Unable to start in-memory store: {:?}", e))?,
|
.map_err(|e| format!("Unable to start in-memory store: {:?}", e))?,
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -110,13 +109,11 @@ impl ClientBuilder {
|
|||||||
|
|
||||||
/// Initializes RocksDB storage.
|
/// Initializes RocksDB storage.
|
||||||
pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
|
pub fn with_rocksdb_store(mut self, config: &StorageConfig) -> Result<Self, String> {
|
||||||
let executor = require!("sync", self, runtime_context).clone().executor;
|
|
||||||
let store = Arc::new(
|
let store = Arc::new(
|
||||||
LogManager::rocksdb(
|
LogManager::rocksdb(
|
||||||
config.log_config.clone(),
|
config.log_config.clone(),
|
||||||
config.db_dir.join("flow_db"),
|
config.db_dir.join("flow_db"),
|
||||||
config.db_dir.join("data_db"),
|
config.db_dir.join("data_db"),
|
||||||
executor,
|
|
||||||
)
|
)
|
||||||
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
|
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))?,
|
||||||
);
|
);
|
||||||
|
@ -14,23 +14,14 @@ use storage::{
|
|||||||
},
|
},
|
||||||
LogManager,
|
LogManager,
|
||||||
};
|
};
|
||||||
use task_executor::test_utils::TestRuntime;
|
|
||||||
|
|
||||||
fn write_performance(c: &mut Criterion) {
|
fn write_performance(c: &mut Criterion) {
|
||||||
if Path::new("db_write").exists() {
|
if Path::new("db_write").exists() {
|
||||||
fs::remove_dir_all("db_write").unwrap();
|
fs::remove_dir_all("db_write").unwrap();
|
||||||
}
|
}
|
||||||
let runtime = TestRuntime::default();
|
|
||||||
|
|
||||||
let executor = runtime.task_executor.clone();
|
|
||||||
|
|
||||||
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
|
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
|
||||||
LogManager::rocksdb(
|
LogManager::rocksdb(LogConfig::default(), "db_flow_write", "db_data_write")
|
||||||
LogConfig::default(),
|
|
||||||
"db_flow_write",
|
|
||||||
"db_data_write",
|
|
||||||
executor,
|
|
||||||
)
|
|
||||||
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
|
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
));
|
));
|
||||||
@ -114,17 +105,8 @@ fn read_performance(c: &mut Criterion) {
|
|||||||
fs::remove_dir_all("db_read").unwrap();
|
fs::remove_dir_all("db_read").unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
let runtime = TestRuntime::default();
|
|
||||||
|
|
||||||
let executor = runtime.task_executor.clone();
|
|
||||||
|
|
||||||
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
|
let store: Arc<RwLock<dyn Store>> = Arc::new(RwLock::new(
|
||||||
LogManager::rocksdb(
|
LogManager::rocksdb(LogConfig::default(), "db_flow_read", "db_data_read")
|
||||||
LogConfig::default(),
|
|
||||||
"db_flow_read",
|
|
||||||
"db_data_read",
|
|
||||||
executor,
|
|
||||||
)
|
|
||||||
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
|
.map_err(|e| format!("Unable to start RocksDB store: {:?}", e))
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
));
|
));
|
||||||
|
@ -26,14 +26,16 @@ use tracing::{debug, error, trace};
|
|||||||
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
|
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
|
||||||
|
|
||||||
pub struct FlowStore {
|
pub struct FlowStore {
|
||||||
|
flow_db: Arc<FlowDBStore>,
|
||||||
data_db: Arc<FlowDBStore>,
|
data_db: Arc<FlowDBStore>,
|
||||||
seal_manager: SealTaskManager,
|
seal_manager: SealTaskManager,
|
||||||
config: FlowConfig,
|
config: FlowConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FlowStore {
|
impl FlowStore {
|
||||||
pub fn new(data_db: Arc<FlowDBStore>, config: FlowConfig) -> Self {
|
pub fn new(flow_db: Arc<FlowDBStore>, data_db: Arc<FlowDBStore>, config: FlowConfig) -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
flow_db,
|
||||||
data_db,
|
data_db,
|
||||||
seal_manager: Default::default(),
|
seal_manager: Default::default(),
|
||||||
config,
|
config,
|
||||||
@ -200,6 +202,14 @@ impl FlowRead for FlowStore {
|
|||||||
fn get_shard_config(&self) -> ShardConfig {
|
fn get_shard_config(&self) -> ShardConfig {
|
||||||
*self.config.shard_config.read()
|
*self.config.shard_config.read()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_pad_data(&self, start_index: u64) -> crate::error::Result<Option<Vec<PadPair>>> {
|
||||||
|
self.flow_db.get_pad_data(start_index)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_pad_data_sync_height(&self) -> Result<Option<u64>> {
|
||||||
|
self.data_db.get_pad_data_sync_height()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FlowWrite for FlowStore {
|
impl FlowWrite for FlowStore {
|
||||||
@ -267,6 +277,14 @@ impl FlowWrite for FlowStore {
|
|||||||
fn update_shard_config(&self, shard_config: ShardConfig) {
|
fn update_shard_config(&self, shard_config: ShardConfig) {
|
||||||
*self.config.shard_config.write() = shard_config;
|
*self.config.shard_config.write() = shard_config;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> crate::error::Result<()> {
|
||||||
|
self.flow_db.put_pad_data(data_sizes, tx_seq)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn put_pad_data_sync_height(&self, sync_index: u64) -> crate::error::Result<()> {
|
||||||
|
self.data_db.put_pad_data_sync_height(sync_index)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FlowSeal for FlowStore {
|
impl FlowSeal for FlowStore {
|
||||||
@ -344,33 +362,10 @@ impl FlowSeal for FlowStore {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct PadDataStore {
|
#[derive(Debug, PartialEq, DeriveEncode, DeriveDecode)]
|
||||||
flow_db: Arc<FlowDBStore>,
|
pub struct PadPair {
|
||||||
data_db: Arc<FlowDBStore>,
|
pub start_index: u64,
|
||||||
}
|
pub data_size: u64,
|
||||||
|
|
||||||
impl PadDataStore {
|
|
||||||
pub fn new(flow_db: Arc<FlowDBStore>, data_db: Arc<FlowDBStore>) -> Self {
|
|
||||||
Self { flow_db, data_db }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn put_pad_data(&self, data_size: usize, start_index: u64) -> Result<()> {
|
|
||||||
self.flow_db.put_pad_data(data_size, start_index)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn put_pad_data_sync_height(&self, sync_index: u64) -> Result<()> {
|
|
||||||
self.data_db.put_pad_data_sync_height(sync_index)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_pad_data_sync_height(&self) -> Result<Option<u64>> {
|
|
||||||
self.data_db.get_pad_data_sync_heigh()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_pad_data(&self, start_index: u64) -> Result<Option<usize>> {
|
|
||||||
self.flow_db.get_pad_data(start_index)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct FlowDBStore {
|
pub struct FlowDBStore {
|
||||||
@ -474,29 +469,31 @@ impl FlowDBStore {
|
|||||||
Ok(self.kvdb.write(tx)?)
|
Ok(self.kvdb.write(tx)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn put_pad_data(&self, data_size: usize, start_index: u64) -> Result<()> {
|
fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> Result<()> {
|
||||||
let mut tx = self.kvdb.transaction();
|
let mut tx = self.kvdb.transaction();
|
||||||
tx.put(
|
|
||||||
COL_PAD_DATA_LIST,
|
let mut buffer = Vec::new();
|
||||||
&start_index.to_be_bytes(),
|
for item in data_sizes {
|
||||||
&data_size.to_be_bytes(),
|
buffer.extend(item.as_ssz_bytes());
|
||||||
);
|
}
|
||||||
|
|
||||||
|
tx.put(COL_PAD_DATA_LIST, &tx_seq.to_be_bytes(), &buffer);
|
||||||
self.kvdb.write(tx)?;
|
self.kvdb.write(tx)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn put_pad_data_sync_height(&self, sync_index: u64) -> Result<()> {
|
fn put_pad_data_sync_height(&self, tx_seq: u64) -> Result<()> {
|
||||||
let mut tx = self.kvdb.transaction();
|
let mut tx = self.kvdb.transaction();
|
||||||
tx.put(
|
tx.put(
|
||||||
COL_PAD_DATA_SYNC_HEIGH,
|
COL_PAD_DATA_SYNC_HEIGH,
|
||||||
b"sync_height",
|
b"sync_height",
|
||||||
&sync_index.to_be_bytes(),
|
&tx_seq.to_be_bytes(),
|
||||||
);
|
);
|
||||||
self.kvdb.write(tx)?;
|
self.kvdb.write(tx)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_pad_data_sync_heigh(&self) -> Result<Option<u64>> {
|
fn get_pad_data_sync_height(&self) -> Result<Option<u64>> {
|
||||||
match self.kvdb.get(COL_PAD_DATA_SYNC_HEIGH, b"sync_height")? {
|
match self.kvdb.get(COL_PAD_DATA_SYNC_HEIGH, b"sync_height")? {
|
||||||
Some(v) => Ok(Some(u64::from_be_bytes(
|
Some(v) => Ok(Some(u64::from_be_bytes(
|
||||||
v.try_into().map_err(|e| anyhow!("{:?}", e))?,
|
v.try_into().map_err(|e| anyhow!("{:?}", e))?,
|
||||||
@ -505,14 +502,11 @@ impl FlowDBStore {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_pad_data(&self, start_index: u64) -> Result<Option<usize>> {
|
fn get_pad_data(&self, tx_seq: u64) -> Result<Option<Vec<PadPair>>> {
|
||||||
match self
|
match self.kvdb.get(COL_PAD_DATA_LIST, &tx_seq.to_be_bytes())? {
|
||||||
.kvdb
|
Some(v) => Ok(Some(
|
||||||
.get(COL_PAD_DATA_LIST, &start_index.to_be_bytes())?
|
Vec::<PadPair>::from_ssz_bytes(&v).map_err(Error::from)?,
|
||||||
{
|
)),
|
||||||
Some(v) => Ok(Some(usize::from_be_bytes(
|
|
||||||
v.try_into().map_err(|e| anyhow!("{:?}", e))?,
|
|
||||||
))),
|
|
||||||
None => Ok(None),
|
None => Ok(None),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,12 +1,11 @@
|
|||||||
use super::tx_store::BlockHashAndSubmissionIndex;
|
|
||||||
use super::{FlowSeal, MineLoadChunk, SealAnswer, SealTask};
|
|
||||||
use crate::config::ShardConfig;
|
use crate::config::ShardConfig;
|
||||||
use crate::log_store::flow_store::{
|
use crate::log_store::flow_store::{
|
||||||
batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore, PadDataStore,
|
batch_iter_sharded, FlowConfig, FlowDBStore, FlowStore, PadPair,
|
||||||
};
|
};
|
||||||
use crate::log_store::tx_store::TransactionStore;
|
use crate::log_store::tx_store::{BlockHashAndSubmissionIndex, TransactionStore};
|
||||||
use crate::log_store::{
|
use crate::log_store::{
|
||||||
FlowRead, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead, LogStoreWrite,
|
FlowRead, FlowSeal, FlowWrite, LogStoreChunkRead, LogStoreChunkWrite, LogStoreRead,
|
||||||
|
LogStoreWrite, MineLoadChunk, SealAnswer, SealTask,
|
||||||
};
|
};
|
||||||
use crate::{try_option, ZgsKeyValueDB};
|
use crate::{try_option, ZgsKeyValueDB};
|
||||||
use anyhow::{anyhow, bail, Result};
|
use anyhow::{anyhow, bail, Result};
|
||||||
@ -26,7 +25,6 @@ use shared_types::{
|
|||||||
use std::cmp::Ordering;
|
use std::cmp::Ordering;
|
||||||
|
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::sync::mpsc;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tracing::{debug, error, info, instrument, trace, warn};
|
use tracing::{debug, error, info, instrument, trace, warn};
|
||||||
|
|
||||||
@ -35,18 +33,16 @@ pub const ENTRY_SIZE: usize = 256;
|
|||||||
/// 1024 Entries.
|
/// 1024 Entries.
|
||||||
pub const PORA_CHUNK_SIZE: usize = 1024;
|
pub const PORA_CHUNK_SIZE: usize = 1024;
|
||||||
|
|
||||||
pub const COL_TX: u32 = 0;
|
pub const COL_TX: u32 = 0; // flow db
|
||||||
pub const COL_ENTRY_BATCH: u32 = 1;
|
pub const COL_ENTRY_BATCH: u32 = 1; // data db
|
||||||
pub const COL_TX_DATA_ROOT_INDEX: u32 = 2;
|
pub const COL_TX_DATA_ROOT_INDEX: u32 = 2; // flow db
|
||||||
pub const COL_ENTRY_BATCH_ROOT: u32 = 3;
|
pub const COL_TX_COMPLETED: u32 = 3; // data db
|
||||||
pub const COL_TX_COMPLETED: u32 = 4;
|
pub const COL_MISC: u32 = 4; // flow db & data db
|
||||||
pub const COL_MISC: u32 = 5;
|
pub const COL_FLOW_MPT_NODES: u32 = 5; // flow db
|
||||||
pub const COL_SEAL_CONTEXT: u32 = 6;
|
pub const COL_BLOCK_PROGRESS: u32 = 6; // flow db
|
||||||
pub const COL_FLOW_MPT_NODES: u32 = 7;
|
pub const COL_PAD_DATA_LIST: u32 = 7; // flow db
|
||||||
pub const COL_BLOCK_PROGRESS: u32 = 8;
|
pub const COL_PAD_DATA_SYNC_HEIGH: u32 = 8; // data db
|
||||||
pub const COL_NUM: u32 = 9;
|
pub const COL_NUM: u32 = 9;
|
||||||
pub const COL_PAD_DATA_LIST: u32 = 10;
|
|
||||||
pub const COL_PAD_DATA_SYNC_HEIGH: u32 = 11;
|
|
||||||
|
|
||||||
// Process at most 1M entries (256MB) pad data at a time.
|
// Process at most 1M entries (256MB) pad data at a time.
|
||||||
const PAD_MAX_SIZE: usize = 1 << 20;
|
const PAD_MAX_SIZE: usize = 1 << 20;
|
||||||
@ -65,12 +61,10 @@ pub struct UpdateFlowMessage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub struct LogManager {
|
pub struct LogManager {
|
||||||
pub(crate) pad_db: Arc<PadDataStore>,
|
|
||||||
pub(crate) data_db: Arc<dyn ZgsKeyValueDB>,
|
pub(crate) data_db: Arc<dyn ZgsKeyValueDB>,
|
||||||
tx_store: TransactionStore,
|
tx_store: TransactionStore,
|
||||||
flow_store: Arc<FlowStore>,
|
flow_store: Arc<FlowStore>,
|
||||||
merkle: RwLock<MerkleManager>,
|
merkle: RwLock<MerkleManager>,
|
||||||
sender: mpsc::Sender<UpdateFlowMessage>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
struct MerkleManager {
|
struct MerkleManager {
|
||||||
@ -269,7 +263,12 @@ impl LogStoreWrite for LogManager {
|
|||||||
}
|
}
|
||||||
let maybe_same_data_tx_seq = self.tx_store.put_tx(tx.clone())?.first().cloned();
|
let maybe_same_data_tx_seq = self.tx_store.put_tx(tx.clone())?.first().cloned();
|
||||||
// TODO(zz): Should we validate received tx?
|
// TODO(zz): Should we validate received tx?
|
||||||
self.append_subtree_list(tx.start_entry_index, tx.merkle_nodes.clone(), &mut merkle)?;
|
self.append_subtree_list(
|
||||||
|
tx.seq,
|
||||||
|
tx.start_entry_index,
|
||||||
|
tx.merkle_nodes.clone(),
|
||||||
|
&mut merkle,
|
||||||
|
)?;
|
||||||
merkle.commit_merkle(tx.seq)?;
|
merkle.commit_merkle(tx.seq)?;
|
||||||
debug!(
|
debug!(
|
||||||
"commit flow root: root={:?}",
|
"commit flow root: root={:?}",
|
||||||
@ -406,6 +405,41 @@ impl LogStoreWrite for LogManager {
|
|||||||
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()> {
|
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()> {
|
||||||
self.flow_store.submit_seal_result(answers)
|
self.flow_store.submit_seal_result(answers)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn start_padding(&self, executor: &task_executor::TaskExecutor) {
|
||||||
|
let store = self.flow_store.clone();
|
||||||
|
executor.spawn(
|
||||||
|
async move {
|
||||||
|
let current_height = store.get_pad_data_sync_height().unwrap();
|
||||||
|
let mut start_index = current_height.unwrap_or(0);
|
||||||
|
loop {
|
||||||
|
match store.get_pad_data(start_index) {
|
||||||
|
std::result::Result::Ok(data) => {
|
||||||
|
// Update the flow database.
|
||||||
|
// This should be called before `complete_last_chunk_merkle` so that we do not save
|
||||||
|
// subtrees with data known.
|
||||||
|
if let Some(data) = data {
|
||||||
|
for pad in data {
|
||||||
|
store
|
||||||
|
.append_entries(ChunkArray {
|
||||||
|
data: vec![0; pad.data_size as usize],
|
||||||
|
start_index: pad.start_index,
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
store.put_pad_data_sync_height(start_index).unwrap();
|
||||||
|
start_index += 1;
|
||||||
|
}
|
||||||
|
std::result::Result::Err(_) => {
|
||||||
|
debug!("Unable to get pad data, start_index={}", start_index);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"pad_tx",
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LogStoreChunkRead for LogManager {
|
impl LogStoreChunkRead for LogManager {
|
||||||
@ -619,32 +653,33 @@ impl LogManager {
|
|||||||
config: LogConfig,
|
config: LogConfig,
|
||||||
flow_path: impl AsRef<Path>,
|
flow_path: impl AsRef<Path>,
|
||||||
data_path: impl AsRef<Path>,
|
data_path: impl AsRef<Path>,
|
||||||
executor: task_executor::TaskExecutor,
|
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let mut db_config = DatabaseConfig::with_columns(COL_NUM);
|
let mut db_config = DatabaseConfig::with_columns(COL_NUM);
|
||||||
db_config.enable_statistics = true;
|
db_config.enable_statistics = true;
|
||||||
let flow_db_source = Arc::new(Database::open(&db_config, flow_path)?);
|
let flow_db_source = Arc::new(Database::open(&db_config, flow_path)?);
|
||||||
let data_db_source = Arc::new(Database::open(&db_config, data_path)?);
|
let data_db_source = Arc::new(Database::open(&db_config, data_path)?);
|
||||||
Self::new(flow_db_source, data_db_source, config, executor)
|
Self::new(flow_db_source, data_db_source, config)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn memorydb(config: LogConfig, executor: task_executor::TaskExecutor) -> Result<Self> {
|
pub fn memorydb(config: LogConfig) -> Result<Self> {
|
||||||
let flow_db = Arc::new(kvdb_memorydb::create(COL_NUM));
|
let flow_db = Arc::new(kvdb_memorydb::create(COL_NUM));
|
||||||
let data_db = Arc::new(kvdb_memorydb::create(COL_NUM));
|
let data_db = Arc::new(kvdb_memorydb::create(COL_NUM));
|
||||||
Self::new(flow_db, data_db, config, executor)
|
Self::new(flow_db, data_db, config)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn new(
|
fn new(
|
||||||
flow_db_source: Arc<dyn ZgsKeyValueDB>,
|
flow_db_source: Arc<dyn ZgsKeyValueDB>,
|
||||||
data_db_source: Arc<dyn ZgsKeyValueDB>,
|
data_db_source: Arc<dyn ZgsKeyValueDB>,
|
||||||
config: LogConfig,
|
config: LogConfig,
|
||||||
executor: task_executor::TaskExecutor,
|
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let tx_store = TransactionStore::new(data_db_source.clone())?;
|
let tx_store = TransactionStore::new(data_db_source.clone())?;
|
||||||
let flow_db = Arc::new(FlowDBStore::new(flow_db_source.clone()));
|
let flow_db = Arc::new(FlowDBStore::new(flow_db_source.clone()));
|
||||||
let data_db = Arc::new(FlowDBStore::new(data_db_source.clone()));
|
let data_db = Arc::new(FlowDBStore::new(data_db_source.clone()));
|
||||||
let flow_store = Arc::new(FlowStore::new(data_db.clone(), config.flow.clone()));
|
let flow_store = Arc::new(FlowStore::new(
|
||||||
let pad_db = Arc::new(PadDataStore::new(flow_db.clone(), data_db.clone()));
|
flow_db.clone(),
|
||||||
|
data_db.clone(),
|
||||||
|
config.flow.clone(),
|
||||||
|
));
|
||||||
// If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle`
|
// If the last tx `put_tx` does not complete, we will revert it in `pora_chunks_merkle`
|
||||||
// first and call `put_tx` later.
|
// first and call `put_tx` later.
|
||||||
let next_tx_seq = tx_store.next_tx_seq();
|
let next_tx_seq = tx_store.next_tx_seq();
|
||||||
@ -745,19 +780,13 @@ impl LogManager {
|
|||||||
last_chunk_merkle,
|
last_chunk_merkle,
|
||||||
});
|
});
|
||||||
|
|
||||||
let (sender, receiver) = mpsc::channel();
|
let log_manager = Self {
|
||||||
|
|
||||||
let mut log_manager = Self {
|
|
||||||
pad_db,
|
|
||||||
data_db: data_db_source,
|
data_db: data_db_source,
|
||||||
tx_store,
|
tx_store,
|
||||||
flow_store,
|
flow_store,
|
||||||
merkle,
|
merkle,
|
||||||
sender,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
log_manager.start_receiver(receiver, executor);
|
|
||||||
|
|
||||||
if let Some(tx) = last_tx_to_insert {
|
if let Some(tx) = last_tx_to_insert {
|
||||||
log_manager.put_tx(tx)?;
|
log_manager.put_tx(tx)?;
|
||||||
}
|
}
|
||||||
@ -772,40 +801,6 @@ impl LogManager {
|
|||||||
Ok(log_manager)
|
Ok(log_manager)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn start_receiver(
|
|
||||||
&mut self,
|
|
||||||
rx: mpsc::Receiver<UpdateFlowMessage>,
|
|
||||||
executor: task_executor::TaskExecutor,
|
|
||||||
) {
|
|
||||||
let flow_store = self.flow_store.clone();
|
|
||||||
executor.spawn(
|
|
||||||
async move {
|
|
||||||
loop {
|
|
||||||
match rx.recv() {
|
|
||||||
std::result::Result::Ok(data) => {
|
|
||||||
// Update the flow database.
|
|
||||||
// This should be called before `complete_last_chunk_merkle` so that we do not save
|
|
||||||
// subtrees with data known.
|
|
||||||
flow_store
|
|
||||||
.append_entries(ChunkArray {
|
|
||||||
data: vec![0; data.pad_data],
|
|
||||||
start_index: data.tx_start_flow_index,
|
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
std::result::Result::Err(_) => {
|
|
||||||
debug!("Log manager inner channel closed");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"pad_tx",
|
|
||||||
);
|
|
||||||
// Wait for the spawned thread to finish
|
|
||||||
// let _ = handle.join().expect("Thread panicked");
|
|
||||||
}
|
|
||||||
|
|
||||||
fn gen_proof(&self, flow_index: u64, maybe_root: Option<DataRoot>) -> Result<FlowProof> {
|
fn gen_proof(&self, flow_index: u64, maybe_root: Option<DataRoot>) -> Result<FlowProof> {
|
||||||
match maybe_root {
|
match maybe_root {
|
||||||
None => self.gen_proof_at_version(flow_index, None),
|
None => self.gen_proof_at_version(flow_index, None),
|
||||||
@ -861,6 +856,7 @@ impl LogManager {
|
|||||||
#[instrument(skip(self, merkle))]
|
#[instrument(skip(self, merkle))]
|
||||||
fn append_subtree_list(
|
fn append_subtree_list(
|
||||||
&self,
|
&self,
|
||||||
|
tx_seq: u64,
|
||||||
tx_start_index: u64,
|
tx_start_index: u64,
|
||||||
merkle_list: Vec<(usize, DataRoot)>,
|
merkle_list: Vec<(usize, DataRoot)>,
|
||||||
merkle: &mut MerkleManager,
|
merkle: &mut MerkleManager,
|
||||||
@ -869,7 +865,7 @@ impl LogManager {
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
self.pad_tx(tx_start_index, &mut *merkle)?;
|
self.pad_tx(tx_seq, tx_start_index, &mut *merkle)?;
|
||||||
|
|
||||||
for (subtree_depth, subtree_root) in merkle_list {
|
for (subtree_depth, subtree_root) in merkle_list {
|
||||||
let subtree_size = 1 << (subtree_depth - 1);
|
let subtree_size = 1 << (subtree_depth - 1);
|
||||||
@ -907,7 +903,7 @@ impl LogManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(skip(self, merkle))]
|
#[instrument(skip(self, merkle))]
|
||||||
fn pad_tx(&self, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> {
|
fn pad_tx(&self, tx_seq: u64, tx_start_index: u64, merkle: &mut MerkleManager) -> Result<()> {
|
||||||
// Check if we need to pad the flow.
|
// Check if we need to pad the flow.
|
||||||
let mut tx_start_flow_index =
|
let mut tx_start_flow_index =
|
||||||
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64;
|
merkle.last_chunk_start_index() + merkle.last_chunk_merkle.leaves() as u64;
|
||||||
@ -917,6 +913,7 @@ impl LogManager {
|
|||||||
merkle.pora_chunks_merkle.leaves(),
|
merkle.pora_chunks_merkle.leaves(),
|
||||||
merkle.last_chunk_merkle.leaves()
|
merkle.last_chunk_merkle.leaves()
|
||||||
);
|
);
|
||||||
|
let mut pad_list = vec![];
|
||||||
if pad_size != 0 {
|
if pad_size != 0 {
|
||||||
for pad_data in Self::padding(pad_size as usize) {
|
for pad_data in Self::padding(pad_size as usize) {
|
||||||
let mut is_full_empty = true;
|
let mut is_full_empty = true;
|
||||||
@ -961,6 +958,10 @@ impl LogManager {
|
|||||||
|
|
||||||
let data_size = pad_data.len() / ENTRY_SIZE;
|
let data_size = pad_data.len() / ENTRY_SIZE;
|
||||||
if is_full_empty {
|
if is_full_empty {
|
||||||
|
pad_list.push(PadPair {
|
||||||
|
data_size: pad_data.len() as u64,
|
||||||
|
start_index: tx_start_flow_index,
|
||||||
|
});
|
||||||
} else {
|
} else {
|
||||||
// Update the flow database.
|
// Update the flow database.
|
||||||
// This should be called before `complete_last_chunk_merkle` so that we do not save
|
// This should be called before `complete_last_chunk_merkle` so that we do not save
|
||||||
@ -982,6 +983,8 @@ impl LogManager {
|
|||||||
merkle.pora_chunks_merkle.leaves(),
|
merkle.pora_chunks_merkle.leaves(),
|
||||||
merkle.last_chunk_merkle.leaves()
|
merkle.last_chunk_merkle.leaves()
|
||||||
);
|
);
|
||||||
|
|
||||||
|
self.flow_store.put_pad_data(&pad_list, tx_seq)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
use crate::config::ShardConfig;
|
use crate::config::ShardConfig;
|
||||||
|
|
||||||
use ethereum_types::H256;
|
use ethereum_types::H256;
|
||||||
|
use flow_store::PadPair;
|
||||||
use shared_types::{
|
use shared_types::{
|
||||||
Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof,
|
Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof,
|
||||||
Transaction,
|
Transaction,
|
||||||
@ -15,7 +16,6 @@ pub mod config;
|
|||||||
mod flow_store;
|
mod flow_store;
|
||||||
mod load_chunk;
|
mod load_chunk;
|
||||||
pub mod log_manager;
|
pub mod log_manager;
|
||||||
mod pad_data_store;
|
|
||||||
mod seal_task_manager;
|
mod seal_task_manager;
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests;
|
mod tests;
|
||||||
@ -159,6 +159,8 @@ pub trait LogStoreWrite: LogStoreChunkWrite {
|
|||||||
fn update_shard_config(&self, shard_config: ShardConfig);
|
fn update_shard_config(&self, shard_config: ShardConfig);
|
||||||
|
|
||||||
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()>;
|
fn submit_seal_result(&self, answers: Vec<SealAnswer>) -> Result<()>;
|
||||||
|
|
||||||
|
fn start_padding(&self, executor: &task_executor::TaskExecutor);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait LogStoreChunkWrite {
|
pub trait LogStoreChunkWrite {
|
||||||
@ -218,6 +220,10 @@ pub trait FlowRead {
|
|||||||
fn get_num_entries(&self) -> Result<u64>;
|
fn get_num_entries(&self) -> Result<u64>;
|
||||||
|
|
||||||
fn get_shard_config(&self) -> ShardConfig;
|
fn get_shard_config(&self) -> ShardConfig;
|
||||||
|
|
||||||
|
fn get_pad_data(&self, start_index: u64) -> Result<Option<Vec<PadPair>>>;
|
||||||
|
|
||||||
|
fn get_pad_data_sync_height(&self) -> Result<Option<u64>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait FlowWrite {
|
pub trait FlowWrite {
|
||||||
@ -232,6 +238,10 @@ pub trait FlowWrite {
|
|||||||
|
|
||||||
/// Update the shard config.
|
/// Update the shard config.
|
||||||
fn update_shard_config(&self, shard_config: ShardConfig);
|
fn update_shard_config(&self, shard_config: ShardConfig);
|
||||||
|
|
||||||
|
fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> Result<()>;
|
||||||
|
|
||||||
|
fn put_pad_data_sync_height(&self, tx_seq: u64) -> Result<()>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct SealTask {
|
pub struct SealTask {
|
||||||
@ -270,3 +280,23 @@ pub trait FlowSeal {
|
|||||||
|
|
||||||
pub trait Flow: FlowRead + FlowWrite + FlowSeal {}
|
pub trait Flow: FlowRead + FlowWrite + FlowSeal {}
|
||||||
impl<T: FlowRead + FlowWrite + FlowSeal> Flow for T {}
|
impl<T: FlowRead + FlowWrite + FlowSeal> Flow for T {}
|
||||||
|
|
||||||
|
pub trait PadDataStoreRead {
|
||||||
|
fn get_pad_data(&self, start_index: u64) -> Result<Option<Vec<PadPair>>>;
|
||||||
|
fn get_pad_data_sync_height(&self) -> Result<Option<u64>>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait PadDataStoreWrite {
|
||||||
|
fn put_pad_data(&self, data_sizes: &[PadPair], tx_seq: u64) -> Result<()>;
|
||||||
|
fn put_pad_data_sync_height(&self, tx_seq: u64) -> Result<()>;
|
||||||
|
fn start_padding(&mut self, executor: &task_executor::TaskExecutor);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait PadDataStore:
|
||||||
|
PadDataStoreRead + PadDataStoreWrite + config::Configurable + Send + Sync + 'static
|
||||||
|
{
|
||||||
|
}
|
||||||
|
impl<T: PadDataStoreRead + PadDataStoreWrite + config::Configurable + Send + Sync + 'static>
|
||||||
|
PadDataStore for T
|
||||||
|
{
|
||||||
|
}
|
||||||
|
@ -9,15 +9,10 @@ use rand::random;
|
|||||||
use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE};
|
use shared_types::{compute_padded_chunk_size, ChunkArray, Transaction, CHUNK_SIZE};
|
||||||
use std::cmp;
|
use std::cmp;
|
||||||
|
|
||||||
use task_executor::test_utils::TestRuntime;
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_put_get() {
|
fn test_put_get() {
|
||||||
let config = LogConfig::default();
|
let config = LogConfig::default();
|
||||||
let runtime = TestRuntime::default();
|
let store = LogManager::memorydb(config.clone()).unwrap();
|
||||||
|
|
||||||
let executor = runtime.task_executor.clone();
|
|
||||||
let store = LogManager::memorydb(config.clone(), executor).unwrap();
|
|
||||||
let chunk_count = config.flow.batch_size + config.flow.batch_size / 2 - 1;
|
let chunk_count = config.flow.batch_size + config.flow.batch_size / 2 - 1;
|
||||||
// Aligned with size.
|
// Aligned with size.
|
||||||
let start_offset = 1024;
|
let start_offset = 1024;
|
||||||
@ -174,10 +169,7 @@ fn test_put_tx() {
|
|||||||
|
|
||||||
fn create_store() -> LogManager {
|
fn create_store() -> LogManager {
|
||||||
let config = LogConfig::default();
|
let config = LogConfig::default();
|
||||||
let runtime = TestRuntime::default();
|
LogManager::memorydb(config).unwrap()
|
||||||
let executor = runtime.task_executor.clone();
|
|
||||||
|
|
||||||
LogManager::memorydb(config, executor).unwrap()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn put_tx(store: &mut LogManager, chunk_count: usize, seq: u64) {
|
fn put_tx(store: &mut LogManager, chunk_count: usize, seq: u64) {
|
||||||
|
@ -1657,7 +1657,7 @@ mod tests {
|
|||||||
let num_chunks = 123;
|
let num_chunks = 123;
|
||||||
|
|
||||||
let config = LogConfig::default();
|
let config = LogConfig::default();
|
||||||
let store = Arc::new(LogManager::memorydb(config, task_executor.clone()).unwrap());
|
let store = Arc::new(LogManager::memorydb(config).unwrap());
|
||||||
|
|
||||||
create_controller(task_executor, peer_id, store, tx_id, num_chunks)
|
create_controller(task_executor, peer_id, store, tx_id, num_chunks)
|
||||||
}
|
}
|
||||||
|
@ -1348,9 +1348,7 @@ mod tests {
|
|||||||
|
|
||||||
let config = LogConfig::default();
|
let config = LogConfig::default();
|
||||||
|
|
||||||
let executor = runtime.task_executor.clone();
|
let store = Arc::new(LogManager::memorydb(config.clone()).unwrap());
|
||||||
|
|
||||||
let store = Arc::new(LogManager::memorydb(config.clone(), executor).unwrap());
|
|
||||||
|
|
||||||
let init_peer_id = identity::Keypair::generate_ed25519().public().to_peer_id();
|
let init_peer_id = identity::Keypair::generate_ed25519().public().to_peer_id();
|
||||||
let file_location_cache: Arc<FileLocationCache> =
|
let file_location_cache: Arc<FileLocationCache> =
|
||||||
|
@ -9,8 +9,6 @@ use storage::{
|
|||||||
LogManager,
|
LogManager,
|
||||||
};
|
};
|
||||||
|
|
||||||
use task_executor::test_utils::TestRuntime;
|
|
||||||
|
|
||||||
/// Creates stores for local node and peers with initialized transaction of specified chunk count.
|
/// Creates stores for local node and peers with initialized transaction of specified chunk count.
|
||||||
/// The first store is for local node, and data not stored. The second store is for peers, and all
|
/// The first store is for local node, and data not stored. The second store is for peers, and all
|
||||||
/// transactions are finalized for file sync.
|
/// transactions are finalized for file sync.
|
||||||
@ -24,11 +22,8 @@ pub fn create_2_store(
|
|||||||
Vec<Vec<u8>>,
|
Vec<Vec<u8>>,
|
||||||
) {
|
) {
|
||||||
let config = LogConfig::default();
|
let config = LogConfig::default();
|
||||||
let runtime = TestRuntime::default();
|
let mut store = LogManager::memorydb(config.clone()).unwrap();
|
||||||
|
let mut peer_store = LogManager::memorydb(config).unwrap();
|
||||||
let executor = runtime.task_executor.clone();
|
|
||||||
let mut store = LogManager::memorydb(config.clone(), executor.clone()).unwrap();
|
|
||||||
let mut peer_store = LogManager::memorydb(config, executor).unwrap();
|
|
||||||
|
|
||||||
let mut offset = 1;
|
let mut offset = 1;
|
||||||
let mut txs = vec![];
|
let mut txs = vec![];
|
||||||
@ -120,10 +115,7 @@ pub mod tests {
|
|||||||
|
|
||||||
impl TestStoreRuntime {
|
impl TestStoreRuntime {
|
||||||
pub fn new_store() -> impl LogStore {
|
pub fn new_store() -> impl LogStore {
|
||||||
let runtime = TestRuntime::default();
|
LogManager::memorydb(LogConfig::default()).unwrap()
|
||||||
|
|
||||||
let executor = runtime.task_executor.clone();
|
|
||||||
LogManager::memorydb(LogConfig::default(), executor).unwrap()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new(store: Arc<dyn LogStore>) -> TestStoreRuntime {
|
pub fn new(store: Arc<dyn LogStore>) -> TestStoreRuntime {
|
||||||
|
Loading…
Reference in New Issue
Block a user