mirror of
https://github.com/0glabs/0g-storage-node.git
synced 2025-04-04 15:35:18 +00:00
Compare commits
4 Commits
d762f50000
...
6a6f16191f
Author | SHA1 | Date | |
---|---|---|---|
![]() |
6a6f16191f | ||
![]() |
bcbd8b3baa | ||
![]() |
cae5b62440 | ||
![]() |
07704fedc9 |
1
.gitignore
vendored
1
.gitignore
vendored
@ -4,6 +4,5 @@
|
|||||||
/.idea
|
/.idea
|
||||||
tests/**/__pycache__
|
tests/**/__pycache__
|
||||||
tests/tmp/**
|
tests/tmp/**
|
||||||
tests/config/zgs
|
|
||||||
.vscode/*.json
|
.vscode/*.json
|
||||||
/0g-storage-contracts-dev
|
/0g-storage-contracts-dev
|
||||||
|
@ -592,7 +592,7 @@ where
|
|||||||
// peer that originally published the message.
|
// peer that originally published the message.
|
||||||
match PubsubMessage::decode(&gs_msg.topic, &gs_msg.data) {
|
match PubsubMessage::decode(&gs_msg.topic, &gs_msg.data) {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
debug!(topic = ?gs_msg.topic, error = ?e, "Could not decode gossipsub message");
|
debug!(topic = ?gs_msg.topic, %propagation_source, error = ?e, "Could not decode gossipsub message");
|
||||||
//reject the message
|
//reject the message
|
||||||
if let Err(e) = self.gossipsub.report_message_validation_result(
|
if let Err(e) = self.gossipsub.report_message_validation_result(
|
||||||
&id,
|
&id,
|
||||||
@ -601,6 +601,24 @@ where
|
|||||||
) {
|
) {
|
||||||
warn!(message_id = %id, peer_id = %propagation_source, error = ?e, "Failed to report message validation");
|
warn!(message_id = %id, peer_id = %propagation_source, error = ?e, "Failed to report message validation");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.peer_manager.report_peer(
|
||||||
|
&propagation_source,
|
||||||
|
PeerAction::Fatal,
|
||||||
|
ReportSource::Gossipsub,
|
||||||
|
None,
|
||||||
|
"gossipsub message decode error",
|
||||||
|
);
|
||||||
|
|
||||||
|
if let Some(source) = &gs_msg.source {
|
||||||
|
self.peer_manager.report_peer(
|
||||||
|
source,
|
||||||
|
PeerAction::Fatal,
|
||||||
|
ReportSource::Gossipsub,
|
||||||
|
None,
|
||||||
|
"gossipsub message decode error",
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Ok(msg) => {
|
Ok(msg) => {
|
||||||
// Notify the network
|
// Notify the network
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate tracing;
|
extern crate tracing;
|
||||||
|
|
||||||
use anyhow::bail;
|
use anyhow::{anyhow, bail};
|
||||||
use shared_types::{
|
use shared_types::{
|
||||||
Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, FlowRangeProof, Transaction,
|
Chunk, ChunkArray, ChunkArrayWithProof, DataRoot, FlowProof, FlowRangeProof, Transaction,
|
||||||
};
|
};
|
||||||
@ -136,17 +136,21 @@ impl Store {
|
|||||||
let store = self.store.clone();
|
let store = self.store.clone();
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
self.executor.spawn_blocking(
|
self.executor
|
||||||
move || {
|
.spawn_blocking_handle(
|
||||||
// FIXME(zz): Not all functions need `write`. Refactor store usage.
|
move || {
|
||||||
let res = f(&*store);
|
// FIXME(zz): Not all functions need `write`. Refactor store usage.
|
||||||
|
let res = f(&*store);
|
||||||
|
|
||||||
if tx.send(res).is_err() {
|
if tx.send(res).is_err() {
|
||||||
error!("Unable to complete async storage operation: the receiver dropped");
|
error!("Unable to complete async storage operation: the receiver dropped");
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
WORKER_TASK_NAME,
|
WORKER_TASK_NAME,
|
||||||
);
|
)
|
||||||
|
.ok_or(anyhow!("Unable to spawn async storage work"))?
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow!("join error: e={:?}", e))?;
|
||||||
|
|
||||||
rx.await
|
rx.await
|
||||||
.unwrap_or_else(|_| bail!(error::Error::Custom("Receiver error".to_string())))
|
.unwrap_or_else(|_| bail!(error::Error::Custom("Receiver error".to_string())))
|
||||||
|
1
tests/config/zgs/network/enr.dat
Normal file
1
tests/config/zgs/network/enr.dat
Normal file
@ -0,0 +1 @@
|
|||||||
|
enr:-Ly4QJZwz9htAorBIx_otqoaRFPohX7NQJ31iBB6mcEhBiuPWsOnigc1ABQsg6tLU1OirQdLR6aEvv8SlkkfIbV72T8CgmlkgnY0gmlwhH8AAAGQbmV0d29ya19pZGVudGl0eZ8oIwAAAAAAADPyz8cpvYcPpUtQMmYOBrTPKn-UAAIAiXNlY3AyNTZrMaEDeDdgnDgLPkxNxB39jKb9f1Na30t6R9vVolpTk5zu-hODdGNwgir4g3VkcIIq-A
|
1
tests/config/zgs/network/key
Normal file
1
tests/config/zgs/network/key
Normal file
@ -0,0 +1 @@
|
|||||||
|
Y<13><><02><><EFBFBD>Ң<>-
<0A><>r<>7<EFBFBD><37>jq<6A>p<>}<7D>
|
@ -40,14 +40,18 @@ class PrunerTest(TestFramework):
|
|||||||
for i in range(len(segments)):
|
for i in range(len(segments)):
|
||||||
client_index = i % 2
|
client_index = i % 2
|
||||||
self.nodes[client_index].zgs_upload_segment(segments[i])
|
self.nodes[client_index].zgs_upload_segment(segments[i])
|
||||||
|
wait_until(lambda: self.nodes[0].zgs_get_file_info(data_root) is not None)
|
||||||
wait_until(lambda: self.nodes[0].zgs_get_file_info(data_root)["finalized"])
|
wait_until(lambda: self.nodes[0].zgs_get_file_info(data_root)["finalized"])
|
||||||
|
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root) is not None)
|
||||||
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root)["finalized"])
|
wait_until(lambda: self.nodes[1].zgs_get_file_info(data_root)["finalized"])
|
||||||
|
|
||||||
self.nodes[2].admin_start_sync_file(0)
|
self.nodes[2].admin_start_sync_file(0)
|
||||||
self.nodes[3].admin_start_sync_file(0)
|
self.nodes[3].admin_start_sync_file(0)
|
||||||
wait_until(lambda: self.nodes[2].sync_status_is_completed_or_unknown(0))
|
wait_until(lambda: self.nodes[2].sync_status_is_completed_or_unknown(0))
|
||||||
|
wait_until(lambda: self.nodes[2].zgs_get_file_info(data_root) is not None)
|
||||||
wait_until(lambda: self.nodes[2].zgs_get_file_info(data_root)["finalized"])
|
wait_until(lambda: self.nodes[2].zgs_get_file_info(data_root)["finalized"])
|
||||||
wait_until(lambda: self.nodes[3].sync_status_is_completed_or_unknown(0))
|
wait_until(lambda: self.nodes[3].sync_status_is_completed_or_unknown(0))
|
||||||
|
wait_until(lambda: self.nodes[3].zgs_get_file_info(data_root) is not None)
|
||||||
wait_until(lambda: self.nodes[3].zgs_get_file_info(data_root)["finalized"])
|
wait_until(lambda: self.nodes[3].zgs_get_file_info(data_root)["finalized"])
|
||||||
|
|
||||||
for i in range(len(segments)):
|
for i in range(len(segments)):
|
||||||
|
@ -128,10 +128,14 @@ class TestNode:
|
|||||||
poll_per_s = 4
|
poll_per_s = 4
|
||||||
for _ in range(poll_per_s * self.rpc_timeout):
|
for _ in range(poll_per_s * self.rpc_timeout):
|
||||||
if self.process.poll() is not None:
|
if self.process.poll() is not None:
|
||||||
|
self.stderr.seek(0)
|
||||||
|
self.stdout.seek(0)
|
||||||
raise FailedToStartError(
|
raise FailedToStartError(
|
||||||
self._node_msg(
|
self._node_msg(
|
||||||
"exited with status {} during initialization".format(
|
"exited with status {} during initialization \n\nstderr: {}\n\nstdout: {}\n\n".format(
|
||||||
self.process.returncode
|
self.process.returncode,
|
||||||
|
self.stderr.read(),
|
||||||
|
self.stdout.read(),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
@ -3,14 +3,9 @@ import subprocess
|
|||||||
import tempfile
|
import tempfile
|
||||||
|
|
||||||
from test_framework.blockchain_node import BlockChainNodeType, BlockchainNode
|
from test_framework.blockchain_node import BlockChainNodeType, BlockchainNode
|
||||||
from utility.utils import blockchain_rpc_port, arrange_port
|
from utility.utils import blockchain_p2p_port, blockchain_rpc_port, blockchain_ws_port, blockchain_rpc_port_tendermint, pprof_port
|
||||||
from utility.build_binary import build_zg
|
from utility.build_binary import build_zg
|
||||||
|
|
||||||
ZGNODE_PORT_CATEGORY_WS = 0
|
|
||||||
ZGNODE_PORT_CATEGORY_P2P = 1
|
|
||||||
ZGNODE_PORT_CATEGORY_RPC = 2
|
|
||||||
ZGNODE_PORT_CATEGORY_PPROF = 3
|
|
||||||
|
|
||||||
def zg_node_init_genesis(binary: str, root_dir: str, num_nodes: int):
|
def zg_node_init_genesis(binary: str, root_dir: str, num_nodes: int):
|
||||||
assert num_nodes > 0, "Invalid number of blockchain nodes: %s" % num_nodes
|
assert num_nodes > 0, "Invalid number of blockchain nodes: %s" % num_nodes
|
||||||
|
|
||||||
@ -26,7 +21,7 @@ def zg_node_init_genesis(binary: str, root_dir: str, num_nodes: int):
|
|||||||
os.mkdir(zgchaind_dir)
|
os.mkdir(zgchaind_dir)
|
||||||
|
|
||||||
log_file = tempfile.NamedTemporaryFile(dir=zgchaind_dir, delete=False, prefix="init_genesis_", suffix=".log")
|
log_file = tempfile.NamedTemporaryFile(dir=zgchaind_dir, delete=False, prefix="init_genesis_", suffix=".log")
|
||||||
p2p_port_start = arrange_port(ZGNODE_PORT_CATEGORY_P2P, 0)
|
p2p_port_start = blockchain_p2p_port(0)
|
||||||
|
|
||||||
ret = subprocess.run(
|
ret = subprocess.run(
|
||||||
args=["bash", shell_script, zgchaind_dir, str(num_nodes), str(p2p_port_start)],
|
args=["bash", shell_script, zgchaind_dir, str(num_nodes), str(p2p_port_start)],
|
||||||
@ -71,13 +66,13 @@ class ZGNode(BlockchainNode):
|
|||||||
# overwrite json rpc http port: 8545
|
# overwrite json rpc http port: 8545
|
||||||
"--json-rpc.address", "127.0.0.1:%s" % blockchain_rpc_port(index),
|
"--json-rpc.address", "127.0.0.1:%s" % blockchain_rpc_port(index),
|
||||||
# overwrite json rpc ws port: 8546
|
# overwrite json rpc ws port: 8546
|
||||||
"--json-rpc.ws-address", "127.0.0.1:%s" % arrange_port(ZGNODE_PORT_CATEGORY_WS, index),
|
"--json-rpc.ws-address", "127.0.0.1:%s" % blockchain_ws_port(index),
|
||||||
# overwrite p2p port: 26656
|
# overwrite p2p port: 26656
|
||||||
"--p2p.laddr", "tcp://127.0.0.1:%s" % arrange_port(ZGNODE_PORT_CATEGORY_P2P, index),
|
"--p2p.laddr", "tcp://127.0.0.1:%s" % blockchain_p2p_port(index),
|
||||||
# overwrite rpc port: 26657
|
# overwrite rpc port: 26657
|
||||||
"--rpc.laddr", "tcp://127.0.0.1:%s" % arrange_port(ZGNODE_PORT_CATEGORY_RPC, index),
|
"--rpc.laddr", "tcp://127.0.0.1:%s" % blockchain_rpc_port_tendermint(index),
|
||||||
# overwrite pprof port: 6060
|
# overwrite pprof port: 6060
|
||||||
"--rpc.pprof_laddr", "127.0.0.1:%s" % arrange_port(ZGNODE_PORT_CATEGORY_PPROF, index),
|
"--rpc.pprof_laddr", "127.0.0.1:%s" % pprof_port(index),
|
||||||
"--log_level", "debug"
|
"--log_level", "debug"
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -11,6 +11,7 @@ class PortMin:
|
|||||||
|
|
||||||
|
|
||||||
MAX_NODES = 100
|
MAX_NODES = 100
|
||||||
|
MAX_BLOCKCHAIN_NODES = 50
|
||||||
|
|
||||||
|
|
||||||
def p2p_port(n):
|
def p2p_port(n):
|
||||||
@ -23,18 +24,25 @@ def rpc_port(n):
|
|||||||
|
|
||||||
|
|
||||||
def blockchain_p2p_port(n):
|
def blockchain_p2p_port(n):
|
||||||
return PortMin.n + 2 * MAX_NODES + n
|
assert n <= MAX_BLOCKCHAIN_NODES
|
||||||
|
return PortMin.n + MAX_NODES + MAX_BLOCKCHAIN_NODES + n
|
||||||
|
|
||||||
|
|
||||||
def blockchain_rpc_port(n):
|
def blockchain_rpc_port(n):
|
||||||
return PortMin.n + 3 * MAX_NODES + n
|
return PortMin.n + MAX_NODES + 2 * MAX_BLOCKCHAIN_NODES + n
|
||||||
|
|
||||||
|
|
||||||
def blockchain_rpc_port_core(n):
|
def blockchain_rpc_port_core(n):
|
||||||
return PortMin.n + 4 * MAX_NODES + n
|
return PortMin.n + MAX_NODES + 3 * MAX_BLOCKCHAIN_NODES + n
|
||||||
|
|
||||||
def arrange_port(category: int, node_index: int) -> int:
|
def blockchain_ws_port(n):
|
||||||
return PortMin.n + (100 + category) * MAX_NODES + node_index
|
return PortMin.n + MAX_NODES + 4 * MAX_BLOCKCHAIN_NODES + n
|
||||||
|
|
||||||
|
def blockchain_rpc_port_tendermint(n):
|
||||||
|
return PortMin.n + MAX_NODES + 5 * MAX_BLOCKCHAIN_NODES + n
|
||||||
|
|
||||||
|
def pprof_port(n):
|
||||||
|
return PortMin.n + MAX_NODES + 6 * MAX_BLOCKCHAIN_NODES + n
|
||||||
|
|
||||||
def wait_until(predicate, *, attempts=float("inf"), timeout=float("inf"), lock=None):
|
def wait_until(predicate, *, attempts=float("inf"), timeout=float("inf"), lock=None):
|
||||||
if attempts == float("inf") and timeout == float("inf"):
|
if attempts == float("inf") and timeout == float("inf"):
|
||||||
|
Loading…
Reference in New Issue
Block a user