mirror of
				https://github.com/0glabs/0g-storage-node.git
				synced 2025-11-04 00:27:39 +00:00 
			
		
		
		
	
							parent
							
								
									40104de891
								
							
						
					
					
						commit
						8f4dfff2f6
					
				@ -700,7 +700,7 @@ impl LogManager {
 | 
				
			|||||||
        data_db_source: Arc<dyn ZgsKeyValueDB>,
 | 
					        data_db_source: Arc<dyn ZgsKeyValueDB>,
 | 
				
			||||||
        config: LogConfig,
 | 
					        config: LogConfig,
 | 
				
			||||||
    ) -> Result<Self> {
 | 
					    ) -> Result<Self> {
 | 
				
			||||||
        let tx_store = TransactionStore::new(data_db_source.clone())?;
 | 
					        let tx_store = TransactionStore::new(flow_db_source.clone(), data_db_source.clone())?;
 | 
				
			||||||
        let flow_db = Arc::new(FlowDBStore::new(flow_db_source.clone()));
 | 
					        let flow_db = Arc::new(FlowDBStore::new(flow_db_source.clone()));
 | 
				
			||||||
        let data_db = Arc::new(FlowDBStore::new(data_db_source.clone()));
 | 
					        let data_db = Arc::new(FlowDBStore::new(data_db_source.clone()));
 | 
				
			||||||
        let flow_store = Arc::new(FlowStore::new(
 | 
					        let flow_store = Arc::new(FlowStore::new(
 | 
				
			||||||
 | 
				
			|||||||
@ -57,19 +57,24 @@ pub struct BlockHashAndSubmissionIndex {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
pub struct TransactionStore {
 | 
					pub struct TransactionStore {
 | 
				
			||||||
    kvdb: Arc<dyn ZgsKeyValueDB>,
 | 
					    flow_kvdb: Arc<dyn ZgsKeyValueDB>,
 | 
				
			||||||
 | 
					    data_kvdb: Arc<dyn ZgsKeyValueDB>,
 | 
				
			||||||
    /// This is always updated before writing the database to ensure no intermediate states.
 | 
					    /// This is always updated before writing the database to ensure no intermediate states.
 | 
				
			||||||
    next_tx_seq: AtomicU64,
 | 
					    next_tx_seq: AtomicU64,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
impl TransactionStore {
 | 
					impl TransactionStore {
 | 
				
			||||||
    pub fn new(kvdb: Arc<dyn ZgsKeyValueDB>) -> Result<Self> {
 | 
					    pub fn new(
 | 
				
			||||||
        let next_tx_seq = kvdb
 | 
					        flow_kvdb: Arc<dyn ZgsKeyValueDB>,
 | 
				
			||||||
 | 
					        data_kvdb: Arc<dyn ZgsKeyValueDB>,
 | 
				
			||||||
 | 
					    ) -> Result<Self> {
 | 
				
			||||||
 | 
					        let next_tx_seq = flow_kvdb
 | 
				
			||||||
            .get(COL_TX, NEXT_TX_KEY.as_bytes())?
 | 
					            .get(COL_TX, NEXT_TX_KEY.as_bytes())?
 | 
				
			||||||
            .map(|a| decode_tx_seq(&a))
 | 
					            .map(|a| decode_tx_seq(&a))
 | 
				
			||||||
            .unwrap_or(Ok(0))?;
 | 
					            .unwrap_or(Ok(0))?;
 | 
				
			||||||
        Ok(Self {
 | 
					        Ok(Self {
 | 
				
			||||||
            kvdb,
 | 
					            flow_kvdb,
 | 
				
			||||||
 | 
					            data_kvdb,
 | 
				
			||||||
            next_tx_seq: AtomicU64::new(next_tx_seq),
 | 
					            next_tx_seq: AtomicU64::new(next_tx_seq),
 | 
				
			||||||
        })
 | 
					        })
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@ -86,7 +91,7 @@ impl TransactionStore {
 | 
				
			|||||||
            return Ok(old_tx_seq_list);
 | 
					            return Ok(old_tx_seq_list);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        let mut db_tx = self.kvdb.transaction();
 | 
					        let mut db_tx = self.flow_kvdb.transaction();
 | 
				
			||||||
        if !tx.data.is_empty() {
 | 
					        if !tx.data.is_empty() {
 | 
				
			||||||
            tx.size = tx.data.len() as u64;
 | 
					            tx.size = tx.data.len() as u64;
 | 
				
			||||||
            let mut padded_data = tx.data.clone();
 | 
					            let mut padded_data = tx.data.clone();
 | 
				
			||||||
@ -113,7 +118,7 @@ impl TransactionStore {
 | 
				
			|||||||
            &new_tx_seq_list.as_ssz_bytes(),
 | 
					            &new_tx_seq_list.as_ssz_bytes(),
 | 
				
			||||||
        );
 | 
					        );
 | 
				
			||||||
        self.next_tx_seq.store(tx.seq + 1, Ordering::SeqCst);
 | 
					        self.next_tx_seq.store(tx.seq + 1, Ordering::SeqCst);
 | 
				
			||||||
        self.kvdb.write(db_tx)?;
 | 
					        self.flow_kvdb.write(db_tx)?;
 | 
				
			||||||
        metrics::TX_STORE_PUT.update_since(start_time);
 | 
					        metrics::TX_STORE_PUT.update_since(start_time);
 | 
				
			||||||
        Ok(old_tx_seq_list)
 | 
					        Ok(old_tx_seq_list)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@ -123,7 +128,7 @@ impl TransactionStore {
 | 
				
			|||||||
        if seq >= self.next_tx_seq() {
 | 
					        if seq >= self.next_tx_seq() {
 | 
				
			||||||
            return Ok(None);
 | 
					            return Ok(None);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        let value = try_option!(self.kvdb.get(COL_TX, &seq.to_be_bytes())?);
 | 
					        let value = try_option!(self.flow_kvdb.get(COL_TX, &seq.to_be_bytes())?);
 | 
				
			||||||
        let tx = Transaction::from_ssz_bytes(&value).map_err(Error::from)?;
 | 
					        let tx = Transaction::from_ssz_bytes(&value).map_err(Error::from)?;
 | 
				
			||||||
        metrics::TX_BY_SEQ_NUMBER.update_since(start_time);
 | 
					        metrics::TX_BY_SEQ_NUMBER.update_since(start_time);
 | 
				
			||||||
        Ok(Some(tx))
 | 
					        Ok(Some(tx))
 | 
				
			||||||
@ -132,15 +137,16 @@ impl TransactionStore {
 | 
				
			|||||||
    pub fn remove_tx_after(&self, min_seq: u64) -> Result<Vec<Transaction>> {
 | 
					    pub fn remove_tx_after(&self, min_seq: u64) -> Result<Vec<Transaction>> {
 | 
				
			||||||
        let mut removed_txs = Vec::new();
 | 
					        let mut removed_txs = Vec::new();
 | 
				
			||||||
        let max_seq = self.next_tx_seq();
 | 
					        let max_seq = self.next_tx_seq();
 | 
				
			||||||
        let mut db_tx = self.kvdb.transaction();
 | 
					        let mut flow_db_tx = self.flow_kvdb.transaction();
 | 
				
			||||||
 | 
					        let mut data_db_tx = self.data_kvdb.transaction();
 | 
				
			||||||
        let mut modified_merkle_root_map = HashMap::new();
 | 
					        let mut modified_merkle_root_map = HashMap::new();
 | 
				
			||||||
        for seq in min_seq..max_seq {
 | 
					        for seq in min_seq..max_seq {
 | 
				
			||||||
            let Some(tx) = self.get_tx_by_seq_number(seq)? else {
 | 
					            let Some(tx) = self.get_tx_by_seq_number(seq)? else {
 | 
				
			||||||
                error!(?seq, ?max_seq, "Transaction missing before the end");
 | 
					                error!(?seq, ?max_seq, "Transaction missing before the end");
 | 
				
			||||||
                break;
 | 
					                break;
 | 
				
			||||||
            };
 | 
					            };
 | 
				
			||||||
            db_tx.delete(COL_TX, &seq.to_be_bytes());
 | 
					            flow_db_tx.delete(COL_TX, &seq.to_be_bytes());
 | 
				
			||||||
            db_tx.delete(COL_TX_COMPLETED, &seq.to_be_bytes());
 | 
					            data_db_tx.delete(COL_TX_COMPLETED, &seq.to_be_bytes());
 | 
				
			||||||
            // We only remove tx when the blockchain reorgs.
 | 
					            // We only remove tx when the blockchain reorgs.
 | 
				
			||||||
            // If a tx is reverted, all data after it will also be reverted, so we call remove
 | 
					            // If a tx is reverted, all data after it will also be reverted, so we call remove
 | 
				
			||||||
            // all indices after it.
 | 
					            // all indices after it.
 | 
				
			||||||
@ -155,24 +161,25 @@ impl TransactionStore {
 | 
				
			|||||||
        }
 | 
					        }
 | 
				
			||||||
        for (merkle_root, tx_seq_list) in modified_merkle_root_map {
 | 
					        for (merkle_root, tx_seq_list) in modified_merkle_root_map {
 | 
				
			||||||
            if tx_seq_list.is_empty() {
 | 
					            if tx_seq_list.is_empty() {
 | 
				
			||||||
                db_tx.delete(COL_TX_DATA_ROOT_INDEX, merkle_root.as_bytes());
 | 
					                flow_db_tx.delete(COL_TX_DATA_ROOT_INDEX, merkle_root.as_bytes());
 | 
				
			||||||
            } else {
 | 
					            } else {
 | 
				
			||||||
                db_tx.put(
 | 
					                flow_db_tx.put(
 | 
				
			||||||
                    COL_TX_DATA_ROOT_INDEX,
 | 
					                    COL_TX_DATA_ROOT_INDEX,
 | 
				
			||||||
                    merkle_root.as_bytes(),
 | 
					                    merkle_root.as_bytes(),
 | 
				
			||||||
                    &tx_seq_list.as_ssz_bytes(),
 | 
					                    &tx_seq_list.as_ssz_bytes(),
 | 
				
			||||||
                );
 | 
					                );
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        db_tx.put(COL_TX, NEXT_TX_KEY.as_bytes(), &min_seq.to_be_bytes());
 | 
					        flow_db_tx.put(COL_TX, NEXT_TX_KEY.as_bytes(), &min_seq.to_be_bytes());
 | 
				
			||||||
        self.next_tx_seq.store(min_seq, Ordering::SeqCst);
 | 
					        self.next_tx_seq.store(min_seq, Ordering::SeqCst);
 | 
				
			||||||
        self.kvdb.write(db_tx)?;
 | 
					        self.data_kvdb.write(data_db_tx)?;
 | 
				
			||||||
 | 
					        self.flow_kvdb.write(flow_db_tx)?;
 | 
				
			||||||
        Ok(removed_txs)
 | 
					        Ok(removed_txs)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pub fn get_tx_seq_list_by_data_root(&self, data_root: &DataRoot) -> Result<Vec<u64>> {
 | 
					    pub fn get_tx_seq_list_by_data_root(&self, data_root: &DataRoot) -> Result<Vec<u64>> {
 | 
				
			||||||
        let value = match self
 | 
					        let value = match self
 | 
				
			||||||
            .kvdb
 | 
					            .flow_kvdb
 | 
				
			||||||
            .get(COL_TX_DATA_ROOT_INDEX, data_root.as_bytes())?
 | 
					            .get(COL_TX_DATA_ROOT_INDEX, data_root.as_bytes())?
 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
            Some(v) => v,
 | 
					            Some(v) => v,
 | 
				
			||||||
@ -183,7 +190,7 @@ impl TransactionStore {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    #[instrument(skip(self))]
 | 
					    #[instrument(skip(self))]
 | 
				
			||||||
    pub fn finalize_tx(&self, tx_seq: u64) -> Result<()> {
 | 
					    pub fn finalize_tx(&self, tx_seq: u64) -> Result<()> {
 | 
				
			||||||
        Ok(self.kvdb.put(
 | 
					        Ok(self.data_kvdb.put(
 | 
				
			||||||
            COL_TX_COMPLETED,
 | 
					            COL_TX_COMPLETED,
 | 
				
			||||||
            &tx_seq.to_be_bytes(),
 | 
					            &tx_seq.to_be_bytes(),
 | 
				
			||||||
            &[TxStatus::Finalized.into()],
 | 
					            &[TxStatus::Finalized.into()],
 | 
				
			||||||
@ -192,7 +199,7 @@ impl TransactionStore {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    #[instrument(skip(self))]
 | 
					    #[instrument(skip(self))]
 | 
				
			||||||
    pub fn prune_tx(&self, tx_seq: u64) -> Result<()> {
 | 
					    pub fn prune_tx(&self, tx_seq: u64) -> Result<()> {
 | 
				
			||||||
        Ok(self.kvdb.put(
 | 
					        Ok(self.data_kvdb.put(
 | 
				
			||||||
            COL_TX_COMPLETED,
 | 
					            COL_TX_COMPLETED,
 | 
				
			||||||
            &tx_seq.to_be_bytes(),
 | 
					            &tx_seq.to_be_bytes(),
 | 
				
			||||||
            &[TxStatus::Pruned.into()],
 | 
					            &[TxStatus::Pruned.into()],
 | 
				
			||||||
@ -200,7 +207,9 @@ impl TransactionStore {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pub fn get_tx_status(&self, tx_seq: u64) -> Result<Option<TxStatus>> {
 | 
					    pub fn get_tx_status(&self, tx_seq: u64) -> Result<Option<TxStatus>> {
 | 
				
			||||||
        let value = try_option!(self.kvdb.get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?);
 | 
					        let value = try_option!(self
 | 
				
			||||||
 | 
					            .data_kvdb
 | 
				
			||||||
 | 
					            .get(COL_TX_COMPLETED, &tx_seq.to_be_bytes())?);
 | 
				
			||||||
        match value.first() {
 | 
					        match value.first() {
 | 
				
			||||||
            Some(v) => Ok(Some(TxStatus::try_from(*v)?)),
 | 
					            Some(v) => Ok(Some(TxStatus::try_from(*v)?)),
 | 
				
			||||||
            None => Ok(None),
 | 
					            None => Ok(None),
 | 
				
			||||||
@ -239,14 +248,14 @@ impl TransactionStore {
 | 
				
			|||||||
                (progress.1, p).as_ssz_bytes(),
 | 
					                (progress.1, p).as_ssz_bytes(),
 | 
				
			||||||
            ));
 | 
					            ));
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        Ok(self.kvdb.puts(items)?)
 | 
					        Ok(self.flow_kvdb.puts(items)?)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    #[instrument(skip(self))]
 | 
					    #[instrument(skip(self))]
 | 
				
			||||||
    pub fn get_progress(&self) -> Result<Option<(u64, H256)>> {
 | 
					    pub fn get_progress(&self) -> Result<Option<(u64, H256)>> {
 | 
				
			||||||
        Ok(Some(
 | 
					        Ok(Some(
 | 
				
			||||||
            <(u64, H256)>::from_ssz_bytes(&try_option!(self
 | 
					            <(u64, H256)>::from_ssz_bytes(&try_option!(self
 | 
				
			||||||
                .kvdb
 | 
					                .flow_kvdb
 | 
				
			||||||
                .get(COL_MISC, LOG_SYNC_PROGRESS_KEY.as_bytes())?))
 | 
					                .get(COL_MISC, LOG_SYNC_PROGRESS_KEY.as_bytes())?))
 | 
				
			||||||
            .map_err(Error::from)?,
 | 
					            .map_err(Error::from)?,
 | 
				
			||||||
        ))
 | 
					        ))
 | 
				
			||||||
@ -254,7 +263,7 @@ impl TransactionStore {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    #[instrument(skip(self))]
 | 
					    #[instrument(skip(self))]
 | 
				
			||||||
    pub fn put_log_latest_block_number(&self, block_number: u64) -> Result<()> {
 | 
					    pub fn put_log_latest_block_number(&self, block_number: u64) -> Result<()> {
 | 
				
			||||||
        Ok(self.kvdb.put(
 | 
					        Ok(self.flow_kvdb.put(
 | 
				
			||||||
            COL_MISC,
 | 
					            COL_MISC,
 | 
				
			||||||
            LOG_LATEST_BLOCK_NUMBER_KEY.as_bytes(),
 | 
					            LOG_LATEST_BLOCK_NUMBER_KEY.as_bytes(),
 | 
				
			||||||
            &block_number.as_ssz_bytes(),
 | 
					            &block_number.as_ssz_bytes(),
 | 
				
			||||||
@ -265,7 +274,7 @@ impl TransactionStore {
 | 
				
			|||||||
    pub fn get_log_latest_block_number(&self) -> Result<Option<u64>> {
 | 
					    pub fn get_log_latest_block_number(&self) -> Result<Option<u64>> {
 | 
				
			||||||
        Ok(Some(
 | 
					        Ok(Some(
 | 
				
			||||||
            <u64>::from_ssz_bytes(&try_option!(self
 | 
					            <u64>::from_ssz_bytes(&try_option!(self
 | 
				
			||||||
                .kvdb
 | 
					                .flow_kvdb
 | 
				
			||||||
                .get(COL_MISC, LOG_LATEST_BLOCK_NUMBER_KEY.as_bytes())?))
 | 
					                .get(COL_MISC, LOG_LATEST_BLOCK_NUMBER_KEY.as_bytes())?))
 | 
				
			||||||
            .map_err(Error::from)?,
 | 
					            .map_err(Error::from)?,
 | 
				
			||||||
        ))
 | 
					        ))
 | 
				
			||||||
@ -277,7 +286,7 @@ impl TransactionStore {
 | 
				
			|||||||
    ) -> Result<Option<(H256, Option<u64>)>> {
 | 
					    ) -> Result<Option<(H256, Option<u64>)>> {
 | 
				
			||||||
        Ok(Some(
 | 
					        Ok(Some(
 | 
				
			||||||
            <(H256, Option<u64>)>::from_ssz_bytes(&try_option!(self
 | 
					            <(H256, Option<u64>)>::from_ssz_bytes(&try_option!(self
 | 
				
			||||||
                .kvdb
 | 
					                .flow_kvdb
 | 
				
			||||||
                .get(COL_BLOCK_PROGRESS, &block_number.to_be_bytes())?))
 | 
					                .get(COL_BLOCK_PROGRESS, &block_number.to_be_bytes())?))
 | 
				
			||||||
            .map_err(Error::from)?,
 | 
					            .map_err(Error::from)?,
 | 
				
			||||||
        ))
 | 
					        ))
 | 
				
			||||||
@ -285,7 +294,7 @@ impl TransactionStore {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    pub fn get_block_hashes(&self) -> Result<Vec<(u64, BlockHashAndSubmissionIndex)>> {
 | 
					    pub fn get_block_hashes(&self) -> Result<Vec<(u64, BlockHashAndSubmissionIndex)>> {
 | 
				
			||||||
        let mut block_numbers = vec![];
 | 
					        let mut block_numbers = vec![];
 | 
				
			||||||
        for r in self.kvdb.iter(COL_BLOCK_PROGRESS) {
 | 
					        for r in self.flow_kvdb.iter(COL_BLOCK_PROGRESS) {
 | 
				
			||||||
            let (key, val) = r?;
 | 
					            let (key, val) = r?;
 | 
				
			||||||
            let block_number =
 | 
					            let block_number =
 | 
				
			||||||
                u64::from_be_bytes(key.as_ref().try_into().map_err(|e| anyhow!("{:?}", e))?);
 | 
					                u64::from_be_bytes(key.as_ref().try_into().map_err(|e| anyhow!("{:?}", e))?);
 | 
				
			||||||
@ -305,7 +314,7 @@ impl TransactionStore {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    pub fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()> {
 | 
					    pub fn delete_block_hash_by_number(&self, block_number: u64) -> Result<()> {
 | 
				
			||||||
        Ok(self
 | 
					        Ok(self
 | 
				
			||||||
            .kvdb
 | 
					            .flow_kvdb
 | 
				
			||||||
            .delete(COL_BLOCK_PROGRESS, &block_number.to_be_bytes())?)
 | 
					            .delete(COL_BLOCK_PROGRESS, &block_number.to_be_bytes())?)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -42,7 +42,7 @@ class ContractProxy:
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        contract = self._get_contract(node_idx)
 | 
					        contract = self._get_contract(node_idx)
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
        return getattr(contract.events, event_name).create_filter(fromBlock =0, toBlock="latest").get_all_entries()
 | 
					        return getattr(contract.events, event_name).create_filter(from_block=0, to_block="latest").get_all_entries()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def transfer(self, value, node_idx = 0):
 | 
					    def transfer(self, value, node_idx = 0):
 | 
				
			||||||
        tx_params = copy(TX_PARAMS)
 | 
					        tx_params = copy(TX_PARAMS)
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
		Reference in New Issue
	
	Block a user