Adjust default value for testnet configs (#180)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run

* Adjust default value for testnet configs

* Supports to disable sequential auto sync

* Add py tests for auto sync

* fix py file name

* rm dummy ;py code

* change default block confirm count
This commit is contained in:
Bo QIU 2024-09-05 10:09:29 +08:00 committed by GitHub
parent e20be63026
commit b6972b97af
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 305 additions and 179 deletions

View File

@ -16,9 +16,9 @@ pub struct Config {
impl Default for Config {
fn default() -> Self {
Config {
max_entries_total: 256000,
max_entries_total: 1000000,
max_entries_per_file: 4,
entry_expiration_time_secs: 3600,
entry_expiration_time_secs: 86400,
}
}
}

View File

@ -33,7 +33,7 @@ build_config! {
(blockchain_rpc_endpoint, (String), "http://127.0.0.1:8545".to_string())
(log_contract_address, (String), "".to_string())
(log_sync_start_block_number, (u64), 0)
(confirmation_block_count, (u64), 12)
(confirmation_block_count, (u64), 3)
(log_page_size, (u64), 999)
(max_cache_data_size, (usize), 100 * 1024 * 1024) // 100 MB
(cache_tx_seq_ttl, (usize), 500)

View File

@ -1,6 +1,6 @@
use super::{batcher::Batcher, sync_store::SyncStore};
use crate::{
auto_sync::{batcher::SyncResult, metrics},
auto_sync::{batcher::SyncResult, metrics, sync_store::Queue},
Config, SyncSender,
};
use anyhow::Result;
@ -108,16 +108,17 @@ impl RandomBatcher {
SyncResult::Timeout => metrics::RANDOM_SYNC_RESULT_TIMEOUT.inc(1),
}
match sync_result {
SyncResult::Completed => self.sync_store.remove_tx(tx_seq).await?,
_ => self.sync_store.downgrade_tx_to_pending(tx_seq).await?,
};
if matches!(sync_result, SyncResult::Completed) {
self.sync_store.remove(tx_seq).await?;
} else {
self.sync_store.insert(tx_seq, Queue::Pending).await?;
}
Ok(true)
}
async fn schedule(&mut self) -> Result<bool> {
let tx_seq = match self.sync_store.random_tx().await? {
let tx_seq = match self.sync_store.random().await? {
Some(v) => v,
None => return Ok(false),
};

View File

@ -2,7 +2,10 @@ use super::{
batcher::{Batcher, SyncResult},
sync_store::SyncStore,
};
use crate::{auto_sync::metrics, Config, SyncSender};
use crate::{
auto_sync::{metrics, sync_store::Queue},
Config, SyncSender,
};
use anyhow::Result;
use log_entry_sync::LogSyncEvent;
use serde::{Deserialize, Serialize};
@ -200,7 +203,7 @@ impl SerialBatcher {
}
// otherwise, mark tx as ready for sync
if let Err(err) = self.sync_store.upgrade_tx_to_ready(announced_tx_seq).await {
if let Err(err) = self.sync_store.upgrade(announced_tx_seq).await {
error!(%err, %announced_tx_seq, "Failed to promote announced tx to ready, state = {:?}", self.get_state().await);
}
@ -280,11 +283,28 @@ impl SerialBatcher {
/// Schedule file sync in sequence.
async fn schedule_next(&mut self) -> Result<bool> {
let next_tx_seq = self.next_tx_seq.load(Ordering::Relaxed);
if next_tx_seq > self.max_tx_seq.load(Ordering::Relaxed) {
let max_tx_seq = self.max_tx_seq.load(Ordering::Relaxed);
if max_tx_seq == u64::MAX {
return Ok(false);
}
let mut next_tx_seq = self.next_tx_seq.load(Ordering::Relaxed);
if next_tx_seq > max_tx_seq {
return Ok(false);
}
// If sequential sync disabled, delegate to random sync.
if self.config.max_sequential_workers == 0 {
self.sync_store.insert(next_tx_seq, Queue::Ready).await?;
next_tx_seq += 1;
self.sync_store.set_next_tx_seq(next_tx_seq).await?;
self.next_tx_seq.store(next_tx_seq, Ordering::Relaxed);
self.next_tx_seq_in_db.store(next_tx_seq, Ordering::Relaxed);
return Ok(true);
}
if !self.batcher.add(next_tx_seq).await? {
return Ok(false);
}
@ -309,7 +329,7 @@ impl SerialBatcher {
// downgrade to random sync if file sync failed or timeout
if matches!(sync_result, SyncResult::Failed | SyncResult::Timeout) {
self.sync_store.add_pending_tx(current).await?;
self.sync_store.insert(current, Queue::Pending).await?;
}
// always move forward in db

View File

@ -8,6 +8,20 @@ use tokio::sync::RwLock;
const KEY_NEXT_TX_SEQ: &str = "sync.manager.next_tx_seq";
const KEY_MAX_TX_SEQ: &str = "sync.manager.max_tx_seq";
#[derive(Debug, PartialEq, Eq)]
pub enum Queue {
Ready,
Pending,
}
#[derive(Debug, PartialEq, Eq)]
pub enum InsertResult {
NewAdded, // new added in target queue
AlreadyExists, // already exists in target queue
Upgraded, // upgraded from pending queue to ready queue
Downgraded, // downgraged from ready queue to pending queue
}
pub struct SyncStore {
store: Arc<RwLock<Store>>,
@ -64,31 +78,69 @@ impl SyncStore {
store.set_config_encoded(&KEY_MAX_TX_SEQ, &tx_seq)
}
pub async fn add_pending_tx(&self, tx_seq: u64) -> Result<bool> {
let async_store = self.store.write().await;
pub async fn contains(&self, tx_seq: u64) -> Result<Option<Queue>> {
let async_store = self.store.read().await;
let store = async_store.get_store();
// already in ready queue
if self.ready_txs.has(store, tx_seq)? {
return Ok(false);
return Ok(Some(Queue::Ready));
}
// always add in pending queue
self.pending_txs.add(store, None, tx_seq)
if self.pending_txs.has(store, tx_seq)? {
return Ok(Some(Queue::Pending));
}
Ok(None)
}
pub async fn upgrade_tx_to_ready(&self, tx_seq: u64) -> Result<bool> {
pub async fn insert(&self, tx_seq: u64, queue: Queue) -> Result<InsertResult> {
let async_store = self.store.write().await;
let store = async_store.get_store();
let mut tx = ConfigTx::default();
match queue {
Queue::Ready => {
if !self.ready_txs.add(store, Some(&mut tx), tx_seq)? {
return Ok(InsertResult::AlreadyExists);
}
let removed = self.pending_txs.remove(store, Some(&mut tx), tx_seq)?;
store.exec_configs(tx)?;
if removed {
Ok(InsertResult::Upgraded)
} else {
Ok(InsertResult::NewAdded)
}
}
Queue::Pending => {
if !self.pending_txs.add(store, Some(&mut tx), tx_seq)? {
return Ok(InsertResult::AlreadyExists);
}
let removed = self.ready_txs.remove(store, Some(&mut tx), tx_seq)?;
store.exec_configs(tx)?;
if removed {
Ok(InsertResult::Downgraded)
} else {
Ok(InsertResult::NewAdded)
}
}
}
}
pub async fn upgrade(&self, tx_seq: u64) -> Result<bool> {
let async_store = self.store.write().await;
let store = async_store.get_store();
let mut tx = ConfigTx::default();
// not in pending queue
if !self.pending_txs.remove(store, Some(&mut tx), tx_seq)? {
return Ok(false);
}
// move from pending to ready queue
let added = self.ready_txs.add(store, Some(&mut tx), tx_seq)?;
store.exec_configs(tx)?;
@ -96,26 +148,7 @@ impl SyncStore {
Ok(added)
}
pub async fn downgrade_tx_to_pending(&self, tx_seq: u64) -> Result<bool> {
let async_store = self.store.write().await;
let store = async_store.get_store();
let mut tx = ConfigTx::default();
// not in ready queue
if !self.ready_txs.remove(store, Some(&mut tx), tx_seq)? {
return Ok(false);
}
// move from ready to pending queue
let added = self.pending_txs.add(store, Some(&mut tx), tx_seq)?;
store.exec_configs(tx)?;
Ok(added)
}
pub async fn random_tx(&self) -> Result<Option<u64>> {
pub async fn random(&self) -> Result<Option<u64>> {
let async_store = self.store.read().await;
let store = async_store.get_store();
@ -128,17 +161,21 @@ impl SyncStore {
self.pending_txs.random(store)
}
pub async fn remove_tx(&self, tx_seq: u64) -> Result<bool> {
pub async fn remove(&self, tx_seq: u64) -> Result<Option<Queue>> {
let async_store = self.store.write().await;
let store = async_store.get_store();
// removed in ready queue
if self.ready_txs.remove(store, None, tx_seq)? {
return Ok(true);
return Ok(Some(Queue::Ready));
}
// otherwise, try to remove in pending queue
self.pending_txs.remove(store, None, tx_seq)
// removed in pending queue
if self.pending_txs.remove(store, None, tx_seq)? {
return Ok(Some(Queue::Pending));
}
Ok(None)
}
}
@ -146,7 +183,7 @@ impl SyncStore {
mod tests {
use crate::test_util::tests::TestStoreRuntime;
use super::SyncStore;
use super::{InsertResult::*, Queue::*, SyncStore};
#[tokio::test]
async fn test_tx_seq_range() {
@ -165,106 +202,79 @@ mod tests {
}
#[tokio::test]
async fn test_add_pending_tx() {
async fn test_insert() {
let runtime = TestStoreRuntime::default();
let store = SyncStore::new(runtime.store.clone());
// add pending tx 3
assert!(store.add_pending_tx(3).await.unwrap());
assert_eq!(store.contains(1).await.unwrap(), None);
assert_eq!(store.insert(1, Pending).await.unwrap(), NewAdded);
assert_eq!(store.contains(1).await.unwrap(), Some(Pending));
assert_eq!(store.insert(1, Pending).await.unwrap(), AlreadyExists);
assert_eq!(store.insert(1, Ready).await.unwrap(), Upgraded);
assert_eq!(store.contains(1).await.unwrap(), Some(Ready));
// cannot add pending tx 3 again
assert!(!store.add_pending_tx(3).await.unwrap());
assert_eq!(store.insert(2, Ready).await.unwrap(), NewAdded);
assert_eq!(store.contains(2).await.unwrap(), Some(Ready));
assert_eq!(store.insert(2, Ready).await.unwrap(), AlreadyExists);
assert_eq!(store.insert(2, Pending).await.unwrap(), Downgraded);
assert_eq!(store.contains(2).await.unwrap(), Some(Pending));
}
#[tokio::test]
async fn test_upgrade_tx() {
async fn test_upgrade() {
let runtime = TestStoreRuntime::default();
let store = SyncStore::new(runtime.store.clone());
// cannot upgrade by default
assert!(!store.upgrade_tx_to_ready(3).await.unwrap());
assert!(!store.upgrade(3).await.unwrap());
// add pending tx 3
assert!(store.add_pending_tx(3).await.unwrap());
assert_eq!(store.insert(3, Pending).await.unwrap(), NewAdded);
// can upgrade to ready
assert!(store.upgrade_tx_to_ready(3).await.unwrap());
// cannot add pending tx 3 again event upgraded to ready
assert!(!store.add_pending_tx(3).await.unwrap());
assert!(store.upgrade(3).await.unwrap());
assert_eq!(store.contains(3).await.unwrap(), Some(Ready));
// cannot upgrade again
assert!(!store.upgrade_tx_to_ready(3).await.unwrap());
assert!(!store.upgrade(3).await.unwrap());
}
#[tokio::test]
async fn test_downgrade_tx() {
let runtime = TestStoreRuntime::default();
let store = SyncStore::new(runtime.store.clone());
// cannot downgrade by default
assert!(!store.downgrade_tx_to_pending(3).await.unwrap());
// add pending tx 3
assert!(store.add_pending_tx(3).await.unwrap());
// cannot downgrade for non-ready tx
assert!(!store.downgrade_tx_to_pending(3).await.unwrap());
// upgrade tx 3 to ready
assert!(store.upgrade_tx_to_ready(3).await.unwrap());
// can downgrade now
assert!(store.downgrade_tx_to_pending(3).await.unwrap());
// cannot downgrade now
assert!(!store.downgrade_tx_to_pending(3).await.unwrap());
}
#[tokio::test]
async fn test_random_tx() {
async fn test_random() {
let runtime = TestStoreRuntime::default();
let store = SyncStore::new(runtime.store.clone());
// no tx by default
assert_eq!(store.random_tx().await.unwrap(), None);
assert_eq!(store.random().await.unwrap(), None);
// add pending txs 1, 2, 3
assert!(store.add_pending_tx(1).await.unwrap());
assert!(store.add_pending_tx(2).await.unwrap());
assert!(store.add_pending_tx(3).await.unwrap());
let tx = store.random_tx().await.unwrap().unwrap();
assert_eq!(store.insert(1, Pending).await.unwrap(), NewAdded);
assert_eq!(store.insert(2, Pending).await.unwrap(), NewAdded);
assert_eq!(store.insert(3, Pending).await.unwrap(), NewAdded);
let tx = store.random().await.unwrap().unwrap();
assert!((1..=3).contains(&tx));
// upgrade tx 1 to ready
assert!(store.upgrade_tx_to_ready(2).await.unwrap());
assert_eq!(store.random_tx().await.unwrap(), Some(2));
// upgrade tx 2 to ready
assert_eq!(store.insert(2, Ready).await.unwrap(), Upgraded);
assert_eq!(store.random().await.unwrap(), Some(2));
}
#[tokio::test]
async fn test_remove_tx() {
async fn test_remove() {
let runtime = TestStoreRuntime::default();
let store = SyncStore::new(runtime.store.clone());
// cannot remove by default
assert!(!store.remove_tx(1).await.unwrap());
assert_eq!(store.remove(1).await.unwrap(), None);
// add pending tx 1, 2
assert!(store.add_pending_tx(1).await.unwrap());
assert!(store.add_pending_tx(2).await.unwrap());
// add tx 1, 2
assert_eq!(store.insert(1, Pending).await.unwrap(), NewAdded);
assert_eq!(store.insert(2, Ready).await.unwrap(), NewAdded);
// upgrade tx 1 to ready
assert!(store.upgrade_tx_to_ready(1).await.unwrap());
assert_eq!(store.random_tx().await.unwrap(), Some(1));
// remove tx 1
assert!(store.remove_tx(1).await.unwrap());
assert_eq!(store.random_tx().await.unwrap(), Some(2));
assert!(!store.remove_tx(1).await.unwrap());
// remove tx 2
assert!(store.remove_tx(2).await.unwrap());
assert_eq!(store.random_tx().await.unwrap(), None);
assert!(!store.remove_tx(2).await.unwrap());
// remove txs
assert_eq!(store.remove(1).await.unwrap(), Some(Pending));
assert_eq!(store.remove(1).await.unwrap(), None);
assert_eq!(store.remove(2).await.unwrap(), Some(Ready));
assert_eq!(store.remove(2).await.unwrap(), None);
}
}

View File

@ -80,8 +80,8 @@ impl Default for Config {
// auto sync config
auto_sync_idle_interval: Duration::from_secs(3),
auto_sync_error_interval: Duration::from_secs(10),
max_sequential_workers: 24,
max_random_workers: 8,
max_sequential_workers: 0,
max_random_workers: 30,
sequential_find_peer_timeout: Duration::from_secs(60),
random_find_peer_timeout: Duration::from_secs(500),
}

View File

@ -87,7 +87,7 @@ log_contract_address = "0x0460aA47b41a66694c0a73f667a1b795A5ED3556"
log_sync_start_block_number = 595059
# Number of blocks to confirm a transaction.
confirmation_block_count = 6
# confirmation_block_count = 3
# Maximum number of event logs to poll at a time.
# log_page_size = 999
@ -264,10 +264,10 @@ auto_sync_enabled = true
# peer_chunks_download_timeout = "15s"
# Maximum threads to sync files in sequence.
# max_sequential_workers = 24
# max_sequential_workers = 0
# Maximum threads to sync files randomly.
# max_random_workers = 8
# max_random_workers = 30
# Timeout to terminate a file sync in sequence.
# sequential_find_peer_timeout = "60s"
@ -287,14 +287,14 @@ auto_sync_enabled = true
# When the cache is full, the storage position information with oldest timestamp will be replaced.
# Global cache capacity.
# max_entries_total = 256000
# max_entries_total = 1000000
# Location information capacity for each file.
# max_entries_per_file = 4
# Validity period of location information.
# If the timestamp in the storage location information exceeds this duration from the current time, it will be removed from the cache.
# entry_expiration_time_secs = 3600
# entry_expiration_time_secs = 86400
#######################################################################
### Metrics Options ###

View File

@ -87,7 +87,7 @@ log_contract_address = "0xbD2C3F0E65eDF5582141C35969d66e34629cC768"
log_sync_start_block_number = 595059
# Number of blocks to confirm a transaction.
confirmation_block_count = 6
# confirmation_block_count = 3
# Maximum number of event logs to poll at a time.
# log_page_size = 999
@ -276,10 +276,10 @@ auto_sync_enabled = true
# peer_chunks_download_timeout = "15s"
# Maximum threads to sync files in sequence.
# max_sequential_workers = 24
# max_sequential_workers = 0
# Maximum threads to sync files randomly.
# max_random_workers = 8
# max_random_workers = 30
# Timeout to terminate a file sync in sequence.
# sequential_find_peer_timeout = "60s"
@ -299,14 +299,14 @@ auto_sync_enabled = true
# When the cache is full, the storage position information with oldest timestamp will be replaced.
# Global cache capacity.
# max_entries_total = 256000
# max_entries_total = 1000000
# Location information capacity for each file.
# max_entries_per_file = 4
# Validity period of location information.
# If the timestamp in the storage location information exceeds this duration from the current time, it will be removed from the cache.
# entry_expiration_time_secs = 3600
# entry_expiration_time_secs = 86400
#######################################################################
### Metrics Options ###

View File

@ -87,7 +87,7 @@
# log_sync_start_block_number = 0
# Number of blocks to confirm a transaction.
# confirmation_block_count = 12
# confirmation_block_count = 3
# Maximum number of event logs to poll at a time.
# log_page_size = 999
@ -278,10 +278,10 @@
# peer_chunks_download_timeout = "15s"
# Maximum threads to sync files in sequence.
# max_sequential_workers = 24
# max_sequential_workers = 0
# Maximum threads to sync files randomly.
# max_random_workers = 8
# max_random_workers = 30
# Timeout to terminate a file sync in sequence.
# sequential_find_peer_timeout = "60s"
@ -301,14 +301,14 @@
# When the cache is full, the storage position information with oldest timestamp will be replaced.
# Global cache capacity.
# max_entries_total = 256000
# max_entries_total = 1000000
# Location information capacity for each file.
# max_entries_per_file = 4
# Validity period of location information.
# If the timestamp in the storage location information exceeds this duration from the current time, it will be removed from the cache.
# entry_expiration_time_secs = 3600
# entry_expiration_time_secs = 86400
#######################################################################
### Metrics Options ###

View File

@ -63,3 +63,13 @@ TX_PARAMS1 = {
NO_SEAL_FLAG = 0x1
NO_MERKLE_PROOF_FLAG = 0x2
def update_config(default: dict, custom: dict):
"""
Supports to update configurations with dict value.
"""
for (key, value) in custom.items():
if default.get(key) is None or type(value) != dict:
default[key] = value
else:
update_config(default[key], value)

View File

@ -5,18 +5,14 @@ import random
from test_framework.test_framework import TestFramework
from utility.submission import create_submission
from utility.submission import submit_data
from utility.utils import (
assert_equal,
wait_until,
)
from utility.utils import wait_until
class RandomTest(TestFramework):
def setup_params(self):
self.num_blockchain_nodes = 1
self.num_nodes = 4
for i in range(self.num_nodes):
self.zgs_node_configs[i] = {"find_peer_timeout_secs": 1, "confirmation_block_count": 1, "sync": {"auto_sync_enabled": True}}
self.zgs_node_configs[i] = {"sync": {"auto_sync_enabled": True}}
def run_test(self):
max_size = 256 * 1024 * 64

View File

@ -0,0 +1,32 @@
#!/usr/bin/env python3
from test_framework.test_framework import TestFramework
from utility.utils import wait_until
class AutoRandomSyncTest(TestFramework):
def setup_params(self):
self.num_nodes = 2
# Enable random auto sync only
for i in range(self.num_nodes):
self.zgs_node_configs[i] = {
"sync": {
"auto_sync_enabled": True,
"max_sequential_workers": 0,
"max_random_workers": 3,
}
}
def run_test(self):
# Submit and upload files on node 0
data_root_1 = self.__upload_file__(0, 256 * 1024)
data_root_2 = self.__upload_file__(0, 256 * 1024)
# Files should be available on node 1 via auto sync
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1) is not None)
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1)["finalized"])
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_2) is not None)
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_2)["finalized"])
if __name__ == "__main__":
AutoRandomSyncTest().main()

View File

@ -0,0 +1,32 @@
#!/usr/bin/env python3
from test_framework.test_framework import TestFramework
from utility.utils import wait_until
class AutoSequentialSyncTest(TestFramework):
def setup_params(self):
self.num_nodes = 2
# Enable sequential auto sync only
for i in range(self.num_nodes):
self.zgs_node_configs[i] = {
"sync": {
"auto_sync_enabled": True,
"max_sequential_workers": 3,
"max_random_workers": 0,
}
}
def run_test(self):
# Submit and upload files on node 0
data_root_1 = self.__upload_file__(0, 256 * 1024)
data_root_2 = self.__upload_file__(0, 256 * 1024)
# Files should be available on node 1 via auto sync
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1) is not None)
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1)["finalized"])
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_2) is not None)
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_2)["finalized"])
if __name__ == "__main__":
AutoSequentialSyncTest().main()

32
tests/sync_auto_test.py Normal file
View File

@ -0,0 +1,32 @@
#!/usr/bin/env python3
from test_framework.test_framework import TestFramework
from utility.utils import wait_until
class AutoSyncTest(TestFramework):
def setup_params(self):
self.num_nodes = 2
# Enable auto sync
for i in range(self.num_nodes):
self.zgs_node_configs[i] = {
"sync": {
"auto_sync_enabled": True,
"max_sequential_workers": 3,
"max_random_workers": 3,
}
}
def run_test(self):
# Submit and upload files on node 0
data_root_1 = self.__upload_file__(0, 256 * 1024)
data_root_2 = self.__upload_file__(0, 256 * 1024)
# Files should be available on node 1 via auto sync
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1) is not None)
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_1)["finalized"])
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_2) is not None)
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root_2)["finalized"])
if __name__ == "__main__":
AutoSyncTest().main()

View File

@ -4,8 +4,7 @@ import random
import time
from test_framework.test_framework import TestFramework
from utility.submission import create_submission
from utility.submission import submit_data, data_to_segments
from utility.submission import data_to_segments
from utility.utils import (
assert_equal,
wait_until,
@ -13,9 +12,7 @@ from utility.utils import (
class SyncTest(TestFramework):
def setup_params(self):
self.num_blockchain_nodes = 2
self.num_nodes = 2
self.__deployed_contracts = 0
def run_test(self):
# By default, auto_sync_enabled and sync_file_on_announcement_enabled are both false,
@ -32,19 +29,8 @@ class SyncTest(TestFramework):
# stop client2, preventing it from receiving AnnounceFile
client2.shutdown()
# Create submission
chunk_data = random.randbytes(256 * 1024)
data_root = self.__create_submission(chunk_data)
data_root = self.__upload_file__(0, 256 * 1024)
# Ensure log entry sync from blockchain node
wait_until(lambda: client1.zgs_get_file_info(data_root) is not None)
assert_equal(client1.zgs_get_file_info(data_root)["finalized"], False)
# Upload file to storage node
segments = submit_data(client1, chunk_data)
self.log.info("segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments])
wait_until(lambda: client1.zgs_get_file_info(data_root)["finalized"])
# restart client2
client2.start()
client2.wait_for_rpc_connection()
@ -75,7 +61,7 @@ class SyncTest(TestFramework):
# Prepare 3 segments to upload
chunk_data = random.randbytes(256 * 1024 * 3)
data_root = self.__create_submission(chunk_data)
data_root = self.__submit_file__(chunk_data)
# Ensure log entry sync from blockchain node
wait_until(lambda: client1.zgs_get_file_info(data_root) is not None)
@ -111,13 +97,5 @@ class SyncTest(TestFramework):
# Validate data
assert_equal(client2.zgs_download_segment_decoded(data_root, 1024, 2048), chunk_data[1024*256:2048*256])
def __create_submission(self, chunk_data: bytes) -> str:
submissions, data_root = create_submission(chunk_data)
self.contract.submit(submissions)
self.__deployed_contracts += 1
wait_until(lambda: self.contract.num_submissions() == self.__deployed_contracts)
self.log.info("Submission created, data root: %s, submissions(%s) = %s", data_root, len(submissions), submissions)
return data_root
if __name__ == "__main__":
SyncTest().main()

View File

@ -2,9 +2,7 @@ import os
import subprocess
import tempfile
import time
import rlp
from eth_utils import decode_hex, keccak
from web3 import Web3, HTTPProvider
from web3.middleware import construct_sign_and_send_raw_middleware
from enum import Enum, unique
@ -12,7 +10,6 @@ from config.node_config import (
GENESIS_PRIV_KEY,
GENESIS_PRIV_KEY1,
TX_PARAMS,
MINER_ID,
)
from utility.simple_rpc_proxy import SimpleRpcProxy
from utility.utils import (

View File

@ -1,7 +1,5 @@
from pathlib import Path
import json
from web3 import Web3
def load_contract_metadata(path: str, name: str):
path = Path(path)

View File

@ -20,8 +20,9 @@ from test_framework.zgs_node import ZgsNode
from test_framework.blockchain_node import BlockChainNodeType
from test_framework.conflux_node import ConfluxNode, connect_sample_nodes
from test_framework.zg_node import ZGNode, zg_node_init_genesis
from utility.utils import PortMin, is_windows_platform, wait_until
from utility.utils import PortMin, is_windows_platform, wait_until, assert_equal
from utility.build_binary import build_cli
from utility.submission import create_submission, submit_data
__file_path__ = os.path.dirname(os.path.realpath(__file__))
@ -40,8 +41,8 @@ class TestFramework:
if "http_proxy" in os.environ:
del os.environ["http_proxy"]
self.num_blockchain_nodes = None
self.num_nodes = None
self.num_blockchain_nodes = 1
self.num_nodes = 1
self.blockchain_nodes = []
self.nodes = []
self.contract = None
@ -53,6 +54,7 @@ class TestFramework:
self.mine_period = 100
self.lifetime_seconds = 3600
self.launch_wait_seconds = 1
self.num_deployed_contracts = 0
# Set default binary path
binary_ext = ".exe" if is_windows_platform() else ""
@ -398,6 +400,31 @@ class TestFramework:
return root
def __submit_file__(self, chunk_data: bytes) -> str:
submissions, data_root = create_submission(chunk_data)
self.contract.submit(submissions)
self.num_deployed_contracts += 1
wait_until(lambda: self.contract.num_submissions() == self.num_deployed_contracts)
self.log.info("Submission completed, data root: %s, submissions(%s) = %s", data_root, len(submissions), submissions)
return data_root
def __upload_file__(self, node_index: int, random_data_size: int) -> str:
# Create submission
chunk_data = random.randbytes(random_data_size)
data_root = self.__submit_file__(chunk_data)
# Ensure log entry sync from blockchain node
client = self.nodes[node_index]
wait_until(lambda: client.zgs_get_file_info(data_root) is not None)
assert_equal(client.zgs_get_file_info(data_root)["finalized"], False)
# Upload file to storage node
segments = submit_data(client, chunk_data)
self.log.info("segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments])
wait_until(lambda: client.zgs_get_file_info(data_root)["finalized"])
return data_root
def setup_params(self):
self.num_blockchain_nodes = 1
self.num_nodes = 1

View File

@ -2,9 +2,8 @@ import os
import shutil
import base64
from config.node_config import ZGS_CONFIG
from config.node_config import ZGS_CONFIG, update_config
from test_framework.blockchain_node import NodeType, TestNode
from config.node_config import MINER_ID
from utility.utils import (
initialize_toml_config,
p2p_port,
@ -12,7 +11,6 @@ from utility.utils import (
blockchain_rpc_port,
)
class ZgsNode(TestNode):
def __init__(
self,
@ -48,9 +46,9 @@ class ZgsNode(TestNode):
"blockchain_rpc_endpoint": f"http://127.0.0.1:{blockchain_rpc_port(0)}",
}
# Set configs for this specific node.
local_conf.update(indexed_config)
update_config(local_conf, indexed_config)
# Overwrite with personalized configs.
local_conf.update(updated_config)
update_config(local_conf, updated_config)
data_dir = os.path.join(root_dir, "zgs_node" + str(index))
rpc_url = "http://" + local_conf["rpc_listen_address"]
super().__init__(

View File

@ -1,6 +1,6 @@
import base64
from eth_utils import encode_hex, decode_hex
from eth_utils import decode_hex
from math import log2
from utility.merkle_tree import add_0x_prefix, Leaf, MerkleTree
from utility.spec import ENTRY_SIZE, PORA_CHUNK_SIZE

View File

@ -1,15 +1,10 @@
import base64
import inspect
import os
import platform
import rtoml
import time
import sha3
from config.node_config import ZGS_CONFIG
from eth_utils import encode_hex
class PortMin:
# Must be initialized with a unique integer for each process
n = 11000