mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2024-11-20 15:05:19 +00:00
add python test for chunks sync by rpc
This commit is contained in:
parent
613c1a8eb5
commit
99ead6423b
@ -399,20 +399,20 @@ impl SyncService {
|
||||
}
|
||||
|
||||
// file may be removed, but remote peer still find one from the file location cache
|
||||
let finalized = self.store.check_tx_completed(request.tx_id.seq).await?;
|
||||
if !finalized {
|
||||
info!(%request.tx_id.seq, "Failed to handle chunks request due to tx not finalized");
|
||||
// FIXME(zz): If remote removes a file, we will also get failure here.
|
||||
// self.ctx
|
||||
// .report_peer(peer_id, PeerAction::HighToleranceError, "Tx not finalized");
|
||||
self.ctx.send(NetworkMessage::SendErrorResponse {
|
||||
peer_id,
|
||||
error: RPCResponseErrorCode::InvalidRequest,
|
||||
reason: "Tx not finalized".into(),
|
||||
id: request_id,
|
||||
});
|
||||
return Ok(());
|
||||
}
|
||||
// let finalized = self.store.check_tx_completed(request.tx_id.seq).await?;
|
||||
// if !finalized {
|
||||
// info!(%request.tx_id.seq, "Failed to handle chunks request due to tx not finalized");
|
||||
// // FIXME(zz): If remote removes a file, we will also get failure here.
|
||||
// // self.ctx
|
||||
// // .report_peer(peer_id, PeerAction::HighToleranceError, "Tx not finalized");
|
||||
// self.ctx.send(NetworkMessage::SendErrorResponse {
|
||||
// peer_id,
|
||||
// error: RPCResponseErrorCode::InvalidRequest,
|
||||
// reason: "Tx not finalized".into(),
|
||||
// id: request_id,
|
||||
// });
|
||||
// return Ok(());
|
||||
// }
|
||||
|
||||
let result = self
|
||||
.store
|
||||
|
@ -1,58 +1,109 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import random
|
||||
import time
|
||||
|
||||
from test_framework.test_framework import TestFramework
|
||||
from utility.submission import create_submission
|
||||
from utility.submission import submit_data
|
||||
from utility.submission import submit_data, data_to_segments
|
||||
from utility.utils import (
|
||||
assert_equal,
|
||||
wait_until,
|
||||
)
|
||||
|
||||
|
||||
class SyncTest(TestFramework):
|
||||
def setup_params(self):
|
||||
self.num_blockchain_nodes = 2
|
||||
self.num_nodes = 2
|
||||
self.__deployed_contracts = 0
|
||||
|
||||
def run_test(self):
|
||||
# By default, auto_sync_enabled and sync_file_on_announcement_enabled are both false,
|
||||
# and file or chunks sync should be triggered by rpc.
|
||||
self.__test_sync_file_by_rpc()
|
||||
self.__test_sync_chunks_by_rpc()
|
||||
|
||||
def __test_sync_file_by_rpc(self):
|
||||
self.log.info("Begin to test file sync by rpc")
|
||||
|
||||
client1 = self.nodes[0]
|
||||
client2 = self.nodes[1]
|
||||
|
||||
self.stop_storage_node(1)
|
||||
|
||||
size = 256 * 1024
|
||||
chunk_data = random.randbytes(size)
|
||||
|
||||
submissions, data_root = create_submission(chunk_data)
|
||||
self.log.info("data root: %s, submissions: %s", data_root, submissions)
|
||||
self.contract.submit(submissions)
|
||||
|
||||
wait_until(lambda: self.contract.num_submissions() == 1)
|
||||
# Create submission
|
||||
chunk_data = random.randbytes(256 * 1024)
|
||||
data_root = self.__create_submission(chunk_data)
|
||||
|
||||
# Ensure log entry sync from blockchain node
|
||||
wait_until(lambda: client1.zgs_get_file_info(data_root) is not None)
|
||||
assert_equal(client1.zgs_get_file_info(data_root)["finalized"], False)
|
||||
|
||||
# Upload file to storage node
|
||||
segments = submit_data(client1, chunk_data)
|
||||
self.log.info(
|
||||
"segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments]
|
||||
)
|
||||
|
||||
self.log.info("segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments])
|
||||
wait_until(lambda: client1.zgs_get_file_info(data_root)["finalized"])
|
||||
|
||||
self.start_storage_node(1)
|
||||
self.nodes[1].wait_for_rpc_connection()
|
||||
# File should not be auto sync on node 2
|
||||
wait_until(lambda: client2.zgs_get_file_info(data_root) is not None)
|
||||
time.sleep(3)
|
||||
assert_equal(client2.zgs_get_file_info(data_root)["finalized"], False)
|
||||
|
||||
client2.admin_start_sync_file(0)
|
||||
# Trigger file sync by rpc
|
||||
assert(client2.admin_start_sync_file(0) is None)
|
||||
wait_until(lambda: client2.sycn_status_is_completed_or_unknown(0))
|
||||
|
||||
wait_until(lambda: client2.zgs_get_file_info(data_root)["finalized"])
|
||||
|
||||
# Validate data
|
||||
assert_equal(
|
||||
client2.zgs_download_segment(data_root, 0, 1),
|
||||
client1.zgs_download_segment(data_root, 0, 1),
|
||||
client2.zgs_download_segment(data_root, 0, 1024),
|
||||
client1.zgs_download_segment(data_root, 0, 1024),
|
||||
)
|
||||
|
||||
def __test_sync_chunks_by_rpc(self):
|
||||
self.log.info("Begin to test chunks sync by rpc")
|
||||
|
||||
client1 = self.nodes[0]
|
||||
client2 = self.nodes[1]
|
||||
|
||||
# Prepare 3 segments to upload
|
||||
chunk_data = random.randbytes(256 * 1024 * 3)
|
||||
data_root = self.__create_submission(chunk_data)
|
||||
|
||||
# Ensure log entry sync from blockchain node
|
||||
wait_until(lambda: client1.zgs_get_file_info(data_root) is not None)
|
||||
assert_equal(client1.zgs_get_file_info(data_root)["finalized"], False)
|
||||
|
||||
# Upload only 2nd segment to storage node
|
||||
segments = data_to_segments(chunk_data)
|
||||
self.log.info("segments: %s", [(s["root"], s["index"], s["proof"]) for s in segments])
|
||||
assert(client1.zgs_upload_segment(segments[1]) is None)
|
||||
|
||||
# segment 0 is not able to download
|
||||
assert(client1.zgs_download_segment_decoded(data_root, 0, 1024) is None)
|
||||
# segment 1 is available to download
|
||||
assert_equal(client1.zgs_download_segment_decoded(data_root, 1024, 2048), chunk_data[1024*256:2048*256])
|
||||
# segment 2 is not able to download
|
||||
assert(client1.zgs_download_segment_decoded(data_root, 2048, 3072) is None)
|
||||
|
||||
# Segment 1 should not be able to download on node 2
|
||||
wait_until(lambda: client2.zgs_get_file_info(data_root) is not None)
|
||||
assert_equal(client2.zgs_get_file_info(data_root)["finalized"], False)
|
||||
assert(client2.zgs_download_segment_decoded(data_root, 1024, 2048) is None)
|
||||
|
||||
# Trigger chunks sync by rpc
|
||||
assert(client2.admin_start_sync_chunks(1, 1024, 2048) is None)
|
||||
wait_until(lambda: client2.sycn_status_is_completed_or_unknown(1))
|
||||
wait_until(lambda: client2.zgs_download_segment_decoded(data_root, 1024, 2048) is not None)
|
||||
|
||||
# Validate data
|
||||
assert_equal(client2.zgs_download_segment_decoded(data_root, 1024, 2048), chunk_data[1024*256:2048*256])
|
||||
|
||||
def __create_submission(self, chunk_data: bytes) -> str:
|
||||
submissions, data_root = create_submission(chunk_data)
|
||||
self.contract.submit(submissions)
|
||||
self.__deployed_contracts += 1
|
||||
wait_until(lambda: self.contract.num_submissions() == self.__deployed_contracts)
|
||||
self.log.info("Submission created, data root: %s, submissions(%s) = %s", data_root, len(submissions), submissions)
|
||||
return data_root
|
||||
|
||||
if __name__ == "__main__":
|
||||
SyncTest().main()
|
||||
|
@ -16,6 +16,7 @@ PORT_RANGE = 500
|
||||
|
||||
__file_path__ = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
CONFLUX_BINARY = "conflux.exe" if is_windows_platform() else "conflux"
|
||||
|
||||
def run_single_test(py, script, test_dir, index, port_min, port_max):
|
||||
try:
|
||||
@ -61,7 +62,7 @@ def run():
|
||||
if not os.path.exists(dir_name):
|
||||
os.makedirs(dir_name, exist_ok=True)
|
||||
|
||||
conflux_path = os.path.join(dir_name, "conflux")
|
||||
conflux_path = os.path.join(dir_name, CONFLUX_BINARY)
|
||||
if not os.path.exists(conflux_path):
|
||||
build_conflux(conflux_path)
|
||||
|
||||
@ -155,7 +156,7 @@ def build_conflux(conflux_path):
|
||||
os.chdir(destination_path)
|
||||
os.system("cargo build --release --bin conflux")
|
||||
|
||||
path = os.path.join(destination_path, "target", "release", "conflux")
|
||||
path = os.path.join(destination_path, "target", "release", CONFLUX_BINARY)
|
||||
shutil.copyfile(path, conflux_path)
|
||||
|
||||
if not is_windows_platform():
|
||||
|
@ -1,6 +1,6 @@
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
import base64
|
||||
|
||||
from config.node_config import ZGS_CONFIG
|
||||
from test_framework.blockchain_node import NodeType, TestNode
|
||||
@ -85,6 +85,10 @@ class ZgsNode(TestNode):
|
||||
|
||||
def zgs_download_segment(self, data_root, start_index, end_index):
|
||||
return self.rpc.zgs_downloadSegment([data_root, start_index, end_index])
|
||||
|
||||
def zgs_download_segment_decoded(self, data_root: str, start_chunk_index: int, end_chunk_index: int) -> bytes:
|
||||
encodedSegment = self.rpc.zgs_downloadSegment([data_root, start_chunk_index, end_chunk_index])
|
||||
return None if encodedSegment is None else base64.b64decode(encodedSegment)
|
||||
|
||||
def zgs_get_file_info(self, data_root):
|
||||
return self.rpc.zgs_getFileInfo([data_root])
|
||||
@ -98,6 +102,9 @@ class ZgsNode(TestNode):
|
||||
|
||||
def admin_start_sync_file(self, tx_seq):
|
||||
return self.rpc.admin_startSyncFile([tx_seq])
|
||||
|
||||
def admin_start_sync_chunks(self, tx_seq: int, start_chunk_index: int, end_chunk_index: int):
|
||||
return self.rpc.admin_startSyncChunks([tx_seq, start_chunk_index, end_chunk_index])
|
||||
|
||||
def admin_get_sync_status(self, tx_seq):
|
||||
return self.rpc.admin_getSyncStatus([tx_seq])
|
||||
|
Loading…
Reference in New Issue
Block a user