Compare commits

...

3 Commits

Author SHA1 Message Date
0xroy
81e82a0a89
Merge fd9c033176 into da2cdec8a1 2024-10-29 23:24:22 +08:00
peilun-conflux
da2cdec8a1
Remove unused. (#251)
Some checks are pending
abi-consistent-check / build-and-compare (push) Waiting to run
code-coverage / unittest-cov (push) Waiting to run
rust / check (push) Waiting to run
rust / test (push) Waiting to run
rust / lints (push) Waiting to run
functional-test / test (push) Waiting to run
2024-10-29 23:23:53 +08:00
Roy Lu
fd9c033176 Updated README 2024-10-23 08:52:56 -07:00
4 changed files with 28 additions and 286 deletions

View File

@ -2,69 +2,32 @@
## Overview
0G Storage is the storage layer for the ZeroGravity data availability (DA) system. The 0G Storage layer holds three important features:
0G Storage is a decentralized data storage system designed to address the challenges of high-throughput and low-latency data storage and retrieval, in areas such as AI and gaming.
* Buit-in - It is natively built into the ZeroGravity DA system for data storage and retrieval.
* General purpose - It is designed to support atomic transactions, mutable kv stores as well as archive log systems to enable wide range of applications with various data types.
* Incentive - Instead of being just a decentralized database, 0G Storage introduces PoRA mining algorithm to incentivize storage network participants.
In addition, it forms the storage layer for the 0G data availability (DA) system, with the cross-layer integration abstracted away from Rollup and AppChain builders.
To dive deep into the technical details, continue reading [0G Storage Spec.](docs/)
## System Architecture
## Integration
0G Storage consists of two main components:
We provide a [SDK](https://github.com/0glabs/0g-js-storage-sdk) for users to easily integrate 0G Storage in their applications with the following features:
1. **Data Publishing Lane**: Ensures quick data availability and verification through the 0G Consensus network.
2. **Data Storage Lane**: Manages large data transfers and storage using an erasure-coding mechanism for redundancy and reliability.
* File Merkle Tree Class
* Flow Contract Types
* RPC methods support
* File upload
* Support browser environment
* Tests for different environments (In Progress)
* File download (In Progress)
Across the two lanes, 0G Storage supports the following features:
## Deployment
* **General Purpose Design**: Supports atomic transactions, mutable key-value stores, and archive log systems, enabling a wide range of applications with various data types.
* **Incentivized Participation**: Utilizes the PoRA (Proof of Random Access) mining algorithm to incentivize storage network participants.
Please refer to [Deployment](docs/run.md) page for detailed steps to compile and start a 0G Storage node.
For in-depth technical details about 0G Storage, please read our [Intro to 0G Storage](https://docs.0g.ai/og-storage).
## Test
## Documentation
### Prerequisites
- If you want to run a node, please refer to the [Running a Node](https://docs.0g.ai/run-a-node/storage-node) guide.
- If you want build a project using 0G storage, please refer to the [0G Storage SDK](https://docs.0g.ai/build-with-0g/storage-sdk) guide.
* Required python version: 3.8, 3.9, 3.10, higher version is not guaranteed (e.g. failed to install `pysha3`).
* Install dependencies under root folder: `pip3 install -r requirements.txt`
## Support and Additional Resources
We want to do everything we can to help you be successful while working on your contribution and projects. Here you'll find various resources and communities that may help you complete a project or contribute to 0G.
### Dependencies
Python test framework will launch blockchain fullnodes at local for storage node to interact with. There are 2 kinds of fullnodes supported:
* Conflux eSpace node (by default).
* BSC node (geth).
For Conflux eSpace node, the test framework will automatically compile the binary at runtime, and copy the binary to `tests/tmp` folder. For BSC node, the test framework will automatically download the latest version binary from [github](https://github.com/bnb-chain/bsc/releases) to `tests/tmp` folder.
Alternatively, you could also manually copy specific version binaries (conflux or geth) to the `tests/tmp` folder. Note, do **NOT** copy released conflux binary on github, since block height of some CIPs are hardcoded.
For testing, it's also dependent on the following repos:
* [0G Storage Contract](https://github.com/0glabs/0g-storage-contracts): It essentially provides two abi interfaces for 0G Storage Node to interact with the on-chain contracts.
* ZgsFlow: It contains apis to submit chunk data.
* PoraMine: It contains apis to submit PoRA answers.
* [0G Storage Client](https://github.com/0glabs/0g-storage-client): It is used to interact with certain 0G Storage Nodes to upload/download files.
### Run Tests
Go to the `tests` folder and run the following command to run all tests:
```
python test_all.py
```
or, run any single test, e.g.
```
python sync_test.py
```
## Contributing
To make contributions to the project, please follow the guidelines [here](contributing.md).
### Communities
- [0G Telegram](https://t.me/web3_0glabs)
- [0G Discord](https://discord.com/invite/0glabs)

View File

@ -4,25 +4,23 @@ use super::{MineLoadChunk, SealAnswer, SealTask};
use crate::config::ShardConfig;
use crate::error::Error;
use crate::log_store::log_manager::{
bytes_to_entries, data_to_merkle_leaves, COL_ENTRY_BATCH, COL_ENTRY_BATCH_ROOT,
COL_FLOW_MPT_NODES, ENTRY_SIZE, PORA_CHUNK_SIZE,
bytes_to_entries, COL_ENTRY_BATCH, COL_FLOW_MPT_NODES, PORA_CHUNK_SIZE,
};
use crate::log_store::{FlowRead, FlowSeal, FlowWrite};
use crate::{try_option, ZgsKeyValueDB};
use any::Any;
use anyhow::{anyhow, bail, Result};
use append_merkle::{MerkleTreeInitialData, MerkleTreeRead, NodeDatabase, NodeTransaction};
use append_merkle::{MerkleTreeRead, NodeDatabase, NodeTransaction};
use itertools::Itertools;
use kvdb::DBTransaction;
use parking_lot::RwLock;
use shared_types::{ChunkArray, DataRoot, FlowProof, Merkle};
use shared_types::{ChunkArray, DataRoot, FlowProof};
use ssz::{Decode, Encode};
use ssz_derive::{Decode as DeriveDecode, Encode as DeriveEncode};
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::{any, cmp, mem};
use std::{any, cmp};
use tracing::{debug, error, trace};
use zgs_spec::{BYTES_PER_SECTOR, SEALS_PER_LOAD, SECTORS_PER_LOAD, SECTORS_PER_SEAL};
@ -41,10 +39,6 @@ impl FlowStore {
}
}
pub fn put_batch_root_list(&self, root_map: BTreeMap<usize, (DataRoot, usize)>) -> Result<()> {
self.db.put_batch_root_list(root_map)
}
pub fn insert_subtree_list_for_batch(
&self,
batch_index: usize,
@ -74,22 +68,10 @@ impl FlowStore {
merkle.gen_proof(sector_index)
}
pub fn put_mpt_node_list(&self, node_list: Vec<(usize, usize, DataRoot)>) -> Result<()> {
self.db.put_mpt_node_list(node_list)
}
pub fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> {
self.seal_manager.delete_batch_list(batch_list);
self.db.delete_batch_list(batch_list)
}
pub fn get_raw_batch(&self, batch_index: u64) -> Result<Option<EntryBatch>> {
self.db.get_entry_batch(batch_index)
}
pub fn get_batch_root(&self, batch_index: u64) -> Result<Option<DataRoot>> {
self.db.get_batch_root(batch_index)
}
}
#[derive(Clone, Debug)]
@ -187,11 +169,6 @@ impl FlowRead for FlowStore {
Ok(entry_list)
}
/// Return the list of all stored chunk roots.
fn get_chunk_root_list(&self) -> Result<MerkleTreeInitialData<DataRoot>> {
self.db.get_batch_root_list()
}
fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>> {
let batch = try_option!(self.db.get_entry_batch(chunk_index)?);
let mut mine_chunk = MineLoadChunk::default();
@ -388,12 +365,6 @@ impl FlowDBStore {
);
if let Some(root) = batch.build_root(batch_index == 0)? {
trace!("complete batch: index={}", batch_index);
tx.put(
COL_ENTRY_BATCH_ROOT,
// (batch_index, subtree_depth)
&encode_batch_root_key(batch_index as usize, 1),
root.as_bytes(),
);
completed_batches.push((batch_index, root));
}
}
@ -419,94 +390,6 @@ impl FlowDBStore {
Ok(Some(EntryBatch::from_ssz_bytes(&raw).map_err(Error::from)?))
}
fn put_batch_root_list(&self, root_map: BTreeMap<usize, (DataRoot, usize)>) -> Result<()> {
let mut tx = self.kvdb.transaction();
for (batch_index, (root, subtree_depth)) in root_map {
tx.put(
COL_ENTRY_BATCH_ROOT,
&encode_batch_root_key(batch_index, subtree_depth),
root.as_bytes(),
);
}
Ok(self.kvdb.write(tx)?)
}
fn get_batch_root_list(&self) -> Result<MerkleTreeInitialData<DataRoot>> {
let mut range_root = None;
// A list of `BatchRoot` that can reconstruct the whole merkle tree structure.
let mut root_list = Vec::new();
// A list of leaf `(index, root_hash)` in the subtrees of some nodes in `root_list`,
// and they will be updated in the merkle tree with `fill_leaf` by the caller.
let mut leaf_list = Vec::new();
let mut expected_index = 0;
let empty_data = vec![0; PORA_CHUNK_SIZE * ENTRY_SIZE];
let empty_root = Merkle::new(data_to_merkle_leaves(&empty_data)?, 0, None).root();
for r in self.kvdb.iter(COL_ENTRY_BATCH_ROOT) {
let (index_bytes, root_bytes) = r?;
let (batch_index, subtree_depth) = decode_batch_root_key(index_bytes.as_ref())?;
let root = DataRoot::from_slice(root_bytes.as_ref());
debug!(
"load root depth={}, index expected={} get={} root={:?}",
subtree_depth, expected_index, batch_index, root,
);
if subtree_depth == 1 {
if range_root.is_none() {
// This is expected to be the next leaf.
if batch_index == expected_index {
root_list.push((1, root));
expected_index += 1;
} else {
bail!(
"unexpected chunk leaf, expected={}, get={}",
expected_index,
batch_index
);
}
} else {
match batch_index.cmp(&expected_index) {
Ordering::Less => {
// This leaf is within a subtree whose root is known.
leaf_list.push((batch_index, root));
}
Ordering::Equal => {
// A subtree range ends.
range_root = None;
root_list.push((1, root));
expected_index += 1;
}
Ordering::Greater => {
while batch_index > expected_index {
// Fill the gap with empty leaves.
root_list.push((1, empty_root));
expected_index += 1;
}
range_root = None;
root_list.push((1, root));
expected_index += 1;
}
}
}
} else {
while batch_index > expected_index {
// Fill the gap with empty leaves.
root_list.push((1, empty_root));
expected_index += 1;
}
range_root = Some(BatchRoot::Multiple((subtree_depth, root)));
root_list.push((subtree_depth, root));
expected_index += 1 << (subtree_depth - 1);
}
}
let extra_node_list = self.get_mpt_node_list()?;
Ok(MerkleTreeInitialData {
subtree_list: root_list,
known_leaves: leaf_list,
extra_mpt_nodes: extra_node_list,
})
}
fn truncate(&self, start_index: u64, batch_size: usize) -> crate::error::Result<Vec<usize>> {
let mut tx = self.kvdb.transaction();
let mut start_batch_index = start_index / batch_size as u64;
@ -547,38 +430,11 @@ impl FlowDBStore {
};
for batch_index in start_batch_index as usize..=end {
tx.delete(COL_ENTRY_BATCH, &batch_index.to_be_bytes());
tx.delete_prefix(COL_ENTRY_BATCH_ROOT, &batch_index.to_be_bytes());
}
self.kvdb.write(tx)?;
Ok(index_to_reseal)
}
fn put_mpt_node_list(&self, mpt_node_list: Vec<(usize, usize, DataRoot)>) -> Result<()> {
let mut tx = self.kvdb.transaction();
for (layer_index, position, data) in mpt_node_list {
tx.put(
COL_FLOW_MPT_NODES,
&encode_mpt_node_key(layer_index, position),
data.as_bytes(),
);
}
Ok(self.kvdb.write(tx)?)
}
fn get_mpt_node_list(&self) -> Result<Vec<(usize, usize, DataRoot)>> {
let mut node_list = Vec::new();
for r in self.kvdb.iter(COL_FLOW_MPT_NODES) {
let (index_bytes, node_bytes) = r?;
let (layer_index, position) = decode_mpt_node_key(index_bytes.as_ref())?;
node_list.push((
layer_index,
position,
DataRoot::from_slice(node_bytes.as_ref()),
));
}
Ok(node_list)
}
fn delete_batch_list(&self, batch_list: &[u64]) -> Result<()> {
let mut tx = self.kvdb.transaction();
for i in batch_list {
@ -586,16 +442,6 @@ impl FlowDBStore {
}
Ok(self.kvdb.write(tx)?)
}
fn get_batch_root(&self, batch_index: u64) -> Result<Option<DataRoot>> {
Ok(self
.kvdb
.get(
COL_ENTRY_BATCH_ROOT,
&encode_batch_root_key(batch_index as usize, 1),
)?
.map(|v| DataRoot::from_slice(&v)))
}
}
#[derive(DeriveEncode, DeriveDecode, Clone, Debug)]
@ -641,37 +487,12 @@ fn decode_batch_index(data: &[u8]) -> Result<usize> {
try_decode_usize(data)
}
/// For the same batch_index, we want to process the larger subtree_depth first in iteration.
fn encode_batch_root_key(batch_index: usize, subtree_depth: usize) -> Vec<u8> {
let mut key = batch_index.to_be_bytes().to_vec();
key.extend_from_slice(&(usize::MAX - subtree_depth).to_be_bytes());
key
}
fn decode_batch_root_key(data: &[u8]) -> Result<(usize, usize)> {
if data.len() != mem::size_of::<usize>() * 2 {
bail!("invalid data length");
}
let batch_index = try_decode_usize(&data[..mem::size_of::<u64>()])?;
let subtree_depth = usize::MAX - try_decode_usize(&data[mem::size_of::<u64>()..])?;
Ok((batch_index, subtree_depth))
}
fn encode_mpt_node_key(layer_index: usize, position: usize) -> Vec<u8> {
let mut key = layer_index.to_be_bytes().to_vec();
key.extend_from_slice(&position.to_be_bytes());
key
}
fn decode_mpt_node_key(data: &[u8]) -> Result<(usize, usize)> {
if data.len() != mem::size_of::<usize>() * 2 {
bail!("invalid data length");
}
let layer_index = try_decode_usize(&data[..mem::size_of::<u64>()])?;
let position = try_decode_usize(&data[mem::size_of::<u64>()..])?;
Ok((layer_index, position))
}
fn layer_size_key(layer: usize) -> Vec<u8> {
let mut key = "layer_size".as_bytes().to_vec();
key.extend_from_slice(&layer.to_be_bytes());

View File

@ -22,7 +22,7 @@ use shared_types::{
ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof, Merkle, Transaction,
};
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::path::Path;
use std::sync::mpsc;
use std::sync::Arc;
@ -56,7 +56,6 @@ static PAD_SEGMENT_ROOT: Lazy<H256> = Lazy::new(|| {
.root()
});
pub struct UpdateFlowMessage {
pub root_map: BTreeMap<usize, (H256, usize)>,
pub pad_data: usize,
pub tx_start_flow_index: u64,
}
@ -214,12 +213,11 @@ impl LogStoreChunkWrite for LogManager {
self.append_entries(flow_entry_array, &mut merkle)?;
if let Some(file_proof) = maybe_file_proof {
let updated_node_list = merkle.pora_chunks_merkle.fill_with_file_proof(
merkle.pora_chunks_merkle.fill_with_file_proof(
file_proof,
tx.merkle_nodes,
tx.start_entry_index,
)?;
self.flow_store.put_mpt_node_list(updated_node_list)?;
}
Ok(true)
}
@ -385,10 +383,9 @@ impl LogStoreWrite for LogManager {
// `merkle` is used in `validate_range_proof`.
let mut merkle = self.merkle.write();
if valid {
let updated_nodes = merkle
merkle
.pora_chunks_merkle
.fill_with_range_proof(data.proof.clone())?;
self.flow_store.put_mpt_node_list(updated_nodes)?;
}
Ok(valid)
}
@ -774,8 +771,6 @@ impl LogManager {
loop {
match rx.recv() {
std::result::Result::Ok(data) => {
// Update the root index.
flow_store.put_batch_root_list(data.root_map).unwrap();
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known.
@ -848,21 +843,7 @@ impl LogManager {
.gen_proof(flow_index as usize % PORA_CHUNK_SIZE)?,
}
};
let r = entry_proof(&top_proof, &sub_proof);
if r.is_err() {
let raw_batch = self.flow_store.get_raw_batch(seg_index as u64)?.unwrap();
let db_root = self.flow_store.get_batch_root(seg_index as u64)?;
error!(
?r,
?db_root,
?seg_index,
"gen proof error: top_leaves={}, last={}, raw_batch={}",
merkle.pora_chunks_merkle.leaves(),
merkle.last_chunk_merkle.leaves(),
serde_json::to_string(&raw_batch).unwrap(),
);
}
r
entry_proof(&top_proof, &sub_proof)
}
#[instrument(skip(self, merkle))]
@ -878,7 +859,6 @@ impl LogManager {
self.pad_tx(tx_start_index, &mut *merkle)?;
let mut batch_root_map = BTreeMap::new();
for (subtree_depth, subtree_root) in merkle_list {
let subtree_size = 1 << (subtree_depth - 1);
if merkle.last_chunk_merkle.leaves() + subtree_size <= PORA_CHUNK_SIZE {
@ -896,10 +876,6 @@ impl LogManager {
.update_last(merkle.last_chunk_merkle.root());
}
if merkle.last_chunk_merkle.leaves() == PORA_CHUNK_SIZE {
batch_root_map.insert(
merkle.pora_chunks_merkle.leaves() - 1,
(merkle.last_chunk_merkle.root(), 1),
);
self.complete_last_chunk_merkle(
merkle.pora_chunks_merkle.leaves() - 1,
&mut *merkle,
@ -910,16 +886,11 @@ impl LogManager {
// the chunks boundary.
assert_eq!(merkle.last_chunk_merkle.leaves(), 0);
assert!(subtree_size >= PORA_CHUNK_SIZE);
batch_root_map.insert(
merkle.pora_chunks_merkle.leaves(),
(subtree_root, subtree_depth - log2_pow2(PORA_CHUNK_SIZE)),
);
merkle
.pora_chunks_merkle
.append_subtree(subtree_depth - log2_pow2(PORA_CHUNK_SIZE), subtree_root)?;
}
}
self.flow_store.put_batch_root_list(batch_root_map)?;
Ok(())
}
@ -937,7 +908,6 @@ impl LogManager {
if pad_size != 0 {
for pad_data in Self::padding(pad_size as usize) {
let mut is_full_empty = true;
let mut root_map = BTreeMap::new();
// Update the in-memory merkle tree.
let last_chunk_pad = if merkle.last_chunk_merkle.leaves() == 0 {
@ -965,10 +935,6 @@ impl LogManager {
merkle
.pora_chunks_merkle
.update_last(merkle.last_chunk_merkle.root());
root_map.insert(
merkle.pora_chunks_merkle.leaves() - 1,
(merkle.last_chunk_merkle.root(), 1),
);
completed_chunk_index = Some(merkle.pora_chunks_merkle.leaves() - 1);
}
@ -976,10 +942,6 @@ impl LogManager {
let mut start_index = last_chunk_pad / ENTRY_SIZE;
while pad_data.len() >= (start_index + PORA_CHUNK_SIZE) * ENTRY_SIZE {
merkle.pora_chunks_merkle.append(*PAD_SEGMENT_ROOT);
root_map.insert(
merkle.pora_chunks_merkle.leaves() - 1,
(*PAD_SEGMENT_ROOT, 1),
);
start_index += PORA_CHUNK_SIZE;
}
assert_eq!(pad_data.len(), start_index * ENTRY_SIZE);
@ -988,12 +950,10 @@ impl LogManager {
let data_size = pad_data.len() / ENTRY_SIZE;
if is_full_empty {
self.sender.send(UpdateFlowMessage {
root_map,
pad_data: pad_data.len(),
tx_start_flow_index,
})?;
} else {
self.flow_store.put_batch_root_list(root_map).unwrap();
// Update the flow database.
// This should be called before `complete_last_chunk_merkle` so that we do not save
// subtrees with data known.

View File

@ -1,5 +1,5 @@
use crate::config::ShardConfig;
use append_merkle::MerkleTreeInitialData;
use ethereum_types::H256;
use shared_types::{
Chunk, ChunkArray, ChunkArrayWithProof, ChunkWithProof, DataRoot, FlowProof, FlowRangeProof,
@ -211,8 +211,6 @@ pub trait FlowRead {
/// For simplicity, `index_start` and `index_end` must be at the batch boundaries.
fn get_available_entries(&self, index_start: u64, index_end: u64) -> Result<Vec<ChunkArray>>;
fn get_chunk_root_list(&self) -> Result<MerkleTreeInitialData<DataRoot>>;
fn load_sealed_data(&self, chunk_index: u64) -> Result<Option<MineLoadChunk>>;
// An estimation of the number of entries in the flow db.