feat: write cached segments to storage on log synced (#7)

This commit is contained in:
MiniFrenchBread 2024-01-23 18:47:14 +08:00 committed by GitHub
parent 77bf2e122f
commit 7479f56474
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 41 additions and 38 deletions

View File

@ -11,6 +11,7 @@ use std::time::{Duration, Instant};
/// retrieved from blockchain. /// retrieved from blockchain.
pub struct MemoryCachedFile { pub struct MemoryCachedFile {
pub id: FileID, pub id: FileID,
pub chunks_per_segment: usize,
/// Window to control the cache of each file /// Window to control the cache of each file
pub segments: HashMap<usize, ChunkArray>, pub segments: HashMap<usize, ChunkArray>,
/// Total number of chunks for the cache file, which is updated from log entry. /// Total number of chunks for the cache file, which is updated from log entry.
@ -22,12 +23,13 @@ pub struct MemoryCachedFile {
} }
impl MemoryCachedFile { impl MemoryCachedFile {
fn new(root: DataRoot, timeout: Duration) -> Self { fn new(root: DataRoot, timeout: Duration, chunks_per_segment: usize) -> Self {
MemoryCachedFile { MemoryCachedFile {
id: FileID { id: FileID {
root, root,
tx_id: Default::default(), tx_id: Default::default(),
}, },
chunks_per_segment,
segments: HashMap::default(), segments: HashMap::default(),
total_chunks: 0, total_chunks: 0,
expired_at: Instant::now().add(timeout), expired_at: Instant::now().add(timeout),
@ -81,6 +83,7 @@ impl ChunkPoolCache {
self.files.get(root) self.files.get(root)
} }
#[allow(unused)]
pub fn get_file_mut(&mut self, root: &DataRoot) -> Option<&mut MemoryCachedFile> { pub fn get_file_mut(&mut self, root: &DataRoot) -> Option<&mut MemoryCachedFile> {
self.files.get_mut(root) self.files.get_mut(root)
} }
@ -126,10 +129,13 @@ impl ChunkPoolCache {
// always GC at first // always GC at first
self.garbage_collect(); self.garbage_collect();
let file = self let file = self.files.entry(seg_info.root).or_insert_with(|| {
.files MemoryCachedFile::new(
.entry(seg_info.root) seg_info.root,
.or_insert_with(|| MemoryCachedFile::new(seg_info.root, self.config.expiration_time())); self.config.expiration_time(),
seg_info.chunks_per_segment,
)
});
// Segment already cached in memory. Directly return OK // Segment already cached in memory. Directly return OK
if file.segments.contains_key(&seg_info.seg_index) { if file.segments.contains_key(&seg_info.seg_index) {

View File

@ -204,33 +204,28 @@ impl MemoryChunkPool {
/// Updates the cached file info when log entry retrieved from blockchain. /// Updates the cached file info when log entry retrieved from blockchain.
pub async fn update_file_info(&self, tx: &Transaction) -> Result<bool> { pub async fn update_file_info(&self, tx: &Transaction) -> Result<bool> {
let mut inner = self.inner.lock().await; let maybe_file = self
.inner
// Do nothing if file not uploaded yet. .lock()
let file = match inner.segment_cache.get_file_mut(&tx.data_merkle_root) { .await
Some(f) => f, .segment_cache
None => return Ok(false), .remove_file(&tx.data_merkle_root);
}; if let Some(mut file) = maybe_file {
// Update the file info with transaction.
file.update_with_tx(tx); file.update_with_tx(tx);
for (seg_index, seg) in file.segments.into_iter() {
// File partially uploaded and it's up to user thread self.write_chunks(
// to write chunks into store and finalize transaction. SegmentInfo {
if file.cached_chunk_num < file.total_chunks {
return Ok(true);
}
// Otherwise, notify to write all memory cached chunks and finalize transaction.
let file_id = FileID {
root: tx.data_merkle_root, root: tx.data_merkle_root,
tx_id: tx.id(), seg_data: seg.data.clone(),
}; seg_index,
if let Err(e) = self.sender.send(file_id) { chunks_per_segment: file.chunks_per_segment,
// Channel receiver will not be dropped until program exit. },
bail!("channel send error: {}", e); file.id,
file.total_chunks * CHUNK_SIZE,
)
.await?
}
} }
Ok(true) Ok(true)
} }
@ -242,10 +237,10 @@ impl MemoryChunkPool {
Ok(LogSyncEvent::ReorgDetected { .. }) => {} Ok(LogSyncEvent::ReorgDetected { .. }) => {}
Ok(LogSyncEvent::Reverted { .. }) => {} Ok(LogSyncEvent::Reverted { .. }) => {}
Ok(LogSyncEvent::TxSynced { tx }) => { Ok(LogSyncEvent::TxSynced { tx }) => {
if let Err(_e) = chunk_pool.update_file_info(&tx).await { if let Err(e) = chunk_pool.update_file_info(&tx).await {
error!( error!(
"Failed to update file info. tx seq={}, tx_root={}", "Failed to update file info. tx seq={}, tx_root={}, error={}",
tx.seq, tx.data_merkle_root tx.seq, tx.data_merkle_root, e
); );
} }
} }

View File

@ -16,6 +16,7 @@ enum SlotStatus {
/// limit on writing threads per file. Meanwhile, the left_boundary field records /// limit on writing threads per file. Meanwhile, the left_boundary field records
/// how many segments have been uploaded. /// how many segments have been uploaded.
struct CtrlWindow { struct CtrlWindow {
#[allow(unused)]
size: usize, size: usize,
left_boundary: usize, left_boundary: usize,
slots: HashMap<usize, SlotStatus>, slots: HashMap<usize, SlotStatus>,

View File

@ -380,7 +380,7 @@ impl Libp2pEventHandler {
} }
// TODO(qhz): check if there is better way to check existence of requested chunks. // TODO(qhz): check if there is better way to check existence of requested chunks.
let _ = match self match self
.store .store
.get_chunks_by_tx_and_index_range( .get_chunks_by_tx_and_index_range(
msg.tx_id.seq, msg.tx_id.seq,

View File

@ -154,7 +154,7 @@ impl SyncService {
let store = Store::new(store, executor.clone()); let store = Store::new(store, executor.clone());
let manager = let manager =
AutoSyncManager::new(store.clone(), sync_send.clone(), config.clone()).await?; AutoSyncManager::new(store.clone(), sync_send.clone(), config).await?;
if config.auto_sync_enabled { if config.auto_sync_enabled {
manager.spwn(&executor, event_recv); manager.spwn(&executor, event_recv);
} }

View File

@ -17,7 +17,7 @@ class ExampleTest(TestFramework):
self.contract.submit(submissions) self.contract.submit(submissions)
wait_until(lambda: self.contract.num_submissions() == 1) wait_until(lambda: self.contract.num_submissions() == 1)
wait_until(lambda: client.zgs_get_file_info(data_root) is not None) wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
wait_until(lambda: client.zgs_get_file_info(data_root)["isCached"]) wait_until(lambda: not client.zgs_get_file_info(data_root)["isCached"] and client.zgs_get_file_info(data_root)["uploadedSegNum"] == 1)
client.zgs_upload_segment(segments[1]) client.zgs_upload_segment(segments[1])
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"]) wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])

View File

@ -22,6 +22,7 @@ class CrashTest(TestFramework):
self.log.info("segment: %s", segment) self.log.info("segment: %s", segment)
for i in range(self.num_nodes): for i in range(self.num_nodes):
self.nodes[i].admin_start_sync_file(0)
self.log.info("wait for node: %s", i) self.log.info("wait for node: %s", i)
wait_until( wait_until(
lambda: self.nodes[i].zgs_get_file_info(data_root) is not None lambda: self.nodes[i].zgs_get_file_info(data_root) is not None

View File

@ -17,7 +17,7 @@ class RandomTest(TestFramework):
self.num_blockchain_nodes = 1 self.num_blockchain_nodes = 1
self.num_nodes = 4 self.num_nodes = 4
for i in range(self.num_nodes): for i in range(self.num_nodes):
self.zgs_node_configs[i] = {"find_peer_timeout_secs": 1, "confirmation_block_count": 1} self.zgs_node_configs[i] = {"find_peer_timeout_secs": 1, "confirmation_block_count": 1, "sync": {"auto_sync_enabled": True}}
def run_test(self): def run_test(self):
max_size = 256 * 1024 * 64 max_size = 256 * 1024 * 64