From 7479f5647451f04e13422f582c8c964cf864f7a0 Mon Sep 17 00:00:00 2001 From: MiniFrenchBread <103425574+MiniFrenchBread@users.noreply.github.com> Date: Tue, 23 Jan 2024 18:47:14 +0800 Subject: [PATCH] feat: write cached segments to storage on log synced (#7) --- node/chunk_pool/src/mem_pool/chunk_cache.rs | 16 ++++-- .../src/mem_pool/chunk_pool_inner.rs | 53 +++++++++---------- .../src/mem_pool/chunk_write_control.rs | 1 + node/router/src/libp2p_event_handler.rs | 2 +- node/sync/src/service.rs | 2 +- tests/cache_test.py | 2 +- tests/crash_test.py | 1 + tests/random_test.py | 2 +- 8 files changed, 41 insertions(+), 38 deletions(-) diff --git a/node/chunk_pool/src/mem_pool/chunk_cache.rs b/node/chunk_pool/src/mem_pool/chunk_cache.rs index 5e525e9..6b3d6de 100644 --- a/node/chunk_pool/src/mem_pool/chunk_cache.rs +++ b/node/chunk_pool/src/mem_pool/chunk_cache.rs @@ -11,6 +11,7 @@ use std::time::{Duration, Instant}; /// retrieved from blockchain. pub struct MemoryCachedFile { pub id: FileID, + pub chunks_per_segment: usize, /// Window to control the cache of each file pub segments: HashMap, /// Total number of chunks for the cache file, which is updated from log entry. @@ -22,12 +23,13 @@ pub struct MemoryCachedFile { } impl MemoryCachedFile { - fn new(root: DataRoot, timeout: Duration) -> Self { + fn new(root: DataRoot, timeout: Duration, chunks_per_segment: usize) -> Self { MemoryCachedFile { id: FileID { root, tx_id: Default::default(), }, + chunks_per_segment, segments: HashMap::default(), total_chunks: 0, expired_at: Instant::now().add(timeout), @@ -81,6 +83,7 @@ impl ChunkPoolCache { self.files.get(root) } + #[allow(unused)] pub fn get_file_mut(&mut self, root: &DataRoot) -> Option<&mut MemoryCachedFile> { self.files.get_mut(root) } @@ -126,10 +129,13 @@ impl ChunkPoolCache { // always GC at first self.garbage_collect(); - let file = self - .files - .entry(seg_info.root) - .or_insert_with(|| MemoryCachedFile::new(seg_info.root, self.config.expiration_time())); + let file = self.files.entry(seg_info.root).or_insert_with(|| { + MemoryCachedFile::new( + seg_info.root, + self.config.expiration_time(), + seg_info.chunks_per_segment, + ) + }); // Segment already cached in memory. Directly return OK if file.segments.contains_key(&seg_info.seg_index) { diff --git a/node/chunk_pool/src/mem_pool/chunk_pool_inner.rs b/node/chunk_pool/src/mem_pool/chunk_pool_inner.rs index de8c37b..fe6d512 100644 --- a/node/chunk_pool/src/mem_pool/chunk_pool_inner.rs +++ b/node/chunk_pool/src/mem_pool/chunk_pool_inner.rs @@ -204,33 +204,28 @@ impl MemoryChunkPool { /// Updates the cached file info when log entry retrieved from blockchain. pub async fn update_file_info(&self, tx: &Transaction) -> Result { - let mut inner = self.inner.lock().await; - - // Do nothing if file not uploaded yet. - let file = match inner.segment_cache.get_file_mut(&tx.data_merkle_root) { - Some(f) => f, - None => return Ok(false), - }; - - // Update the file info with transaction. - file.update_with_tx(tx); - - // File partially uploaded and it's up to user thread - // to write chunks into store and finalize transaction. - if file.cached_chunk_num < file.total_chunks { - return Ok(true); + let maybe_file = self + .inner + .lock() + .await + .segment_cache + .remove_file(&tx.data_merkle_root); + if let Some(mut file) = maybe_file { + file.update_with_tx(tx); + for (seg_index, seg) in file.segments.into_iter() { + self.write_chunks( + SegmentInfo { + root: tx.data_merkle_root, + seg_data: seg.data.clone(), + seg_index, + chunks_per_segment: file.chunks_per_segment, + }, + file.id, + file.total_chunks * CHUNK_SIZE, + ) + .await? + } } - - // Otherwise, notify to write all memory cached chunks and finalize transaction. - let file_id = FileID { - root: tx.data_merkle_root, - tx_id: tx.id(), - }; - if let Err(e) = self.sender.send(file_id) { - // Channel receiver will not be dropped until program exit. - bail!("channel send error: {}", e); - } - Ok(true) } @@ -242,10 +237,10 @@ impl MemoryChunkPool { Ok(LogSyncEvent::ReorgDetected { .. }) => {} Ok(LogSyncEvent::Reverted { .. }) => {} Ok(LogSyncEvent::TxSynced { tx }) => { - if let Err(_e) = chunk_pool.update_file_info(&tx).await { + if let Err(e) = chunk_pool.update_file_info(&tx).await { error!( - "Failed to update file info. tx seq={}, tx_root={}", - tx.seq, tx.data_merkle_root + "Failed to update file info. tx seq={}, tx_root={}, error={}", + tx.seq, tx.data_merkle_root, e ); } } diff --git a/node/chunk_pool/src/mem_pool/chunk_write_control.rs b/node/chunk_pool/src/mem_pool/chunk_write_control.rs index 1c05837..cbe41b2 100644 --- a/node/chunk_pool/src/mem_pool/chunk_write_control.rs +++ b/node/chunk_pool/src/mem_pool/chunk_write_control.rs @@ -16,6 +16,7 @@ enum SlotStatus { /// limit on writing threads per file. Meanwhile, the left_boundary field records /// how many segments have been uploaded. struct CtrlWindow { + #[allow(unused)] size: usize, left_boundary: usize, slots: HashMap, diff --git a/node/router/src/libp2p_event_handler.rs b/node/router/src/libp2p_event_handler.rs index 7081d94..3f71ba3 100644 --- a/node/router/src/libp2p_event_handler.rs +++ b/node/router/src/libp2p_event_handler.rs @@ -380,7 +380,7 @@ impl Libp2pEventHandler { } // TODO(qhz): check if there is better way to check existence of requested chunks. - let _ = match self + match self .store .get_chunks_by_tx_and_index_range( msg.tx_id.seq, diff --git a/node/sync/src/service.rs b/node/sync/src/service.rs index bc407c4..b31676a 100644 --- a/node/sync/src/service.rs +++ b/node/sync/src/service.rs @@ -154,7 +154,7 @@ impl SyncService { let store = Store::new(store, executor.clone()); let manager = - AutoSyncManager::new(store.clone(), sync_send.clone(), config.clone()).await?; + AutoSyncManager::new(store.clone(), sync_send.clone(), config).await?; if config.auto_sync_enabled { manager.spwn(&executor, event_recv); } diff --git a/tests/cache_test.py b/tests/cache_test.py index c56fa14..79df003 100755 --- a/tests/cache_test.py +++ b/tests/cache_test.py @@ -17,7 +17,7 @@ class ExampleTest(TestFramework): self.contract.submit(submissions) wait_until(lambda: self.contract.num_submissions() == 1) wait_until(lambda: client.zgs_get_file_info(data_root) is not None) - wait_until(lambda: client.zgs_get_file_info(data_root)["isCached"]) + wait_until(lambda: not client.zgs_get_file_info(data_root)["isCached"] and client.zgs_get_file_info(data_root)["uploadedSegNum"] == 1) client.zgs_upload_segment(segments[1]) wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"]) diff --git a/tests/crash_test.py b/tests/crash_test.py index ed0bfc1..2eaf29b 100644 --- a/tests/crash_test.py +++ b/tests/crash_test.py @@ -22,6 +22,7 @@ class CrashTest(TestFramework): self.log.info("segment: %s", segment) for i in range(self.num_nodes): + self.nodes[i].admin_start_sync_file(0) self.log.info("wait for node: %s", i) wait_until( lambda: self.nodes[i].zgs_get_file_info(data_root) is not None diff --git a/tests/random_test.py b/tests/random_test.py index c3eea3c..17dad23 100755 --- a/tests/random_test.py +++ b/tests/random_test.py @@ -17,7 +17,7 @@ class RandomTest(TestFramework): self.num_blockchain_nodes = 1 self.num_nodes = 4 for i in range(self.num_nodes): - self.zgs_node_configs[i] = {"find_peer_timeout_secs": 1, "confirmation_block_count": 1} + self.zgs_node_configs[i] = {"find_peer_timeout_secs": 1, "confirmation_block_count": 1, "sync": {"auto_sync_enabled": True}} def run_test(self): max_size = 256 * 1024 * 64