mirror of
				https://github.com/0glabs/0g-storage-node.git
				synced 2025-11-04 08:37:27 +00:00 
			
		
		
		
	Query logs via LogQuery in watch loop (#177)
	
		
			
	
		
	
	
		
	
		
			Some checks failed
		
		
	
	
		
			
				
	
				abi-consistent-check / build-and-compare (push) Has been cancelled
				
			
		
			
				
	
				code-coverage / unittest-cov (push) Has been cancelled
				
			
		
			
				
	
				rust / check (push) Has been cancelled
				
			
		
			
				
	
				rust / test (push) Has been cancelled
				
			
		
			
				
	
				rust / lints (push) Has been cancelled
				
			
		
			
				
	
				functional-test / test (push) Has been cancelled
				
			
		
		
	
	
				
					
				
			
		
			Some checks failed
		
		
	
	abi-consistent-check / build-and-compare (push) Has been cancelled
				
			code-coverage / unittest-cov (push) Has been cancelled
				
			rust / check (push) Has been cancelled
				
			rust / test (push) Has been cancelled
				
			rust / lints (push) Has been cancelled
				
			functional-test / test (push) Has been cancelled
				
			* queyr logs via LogQuery in wath loop * fix lints error
This commit is contained in:
		
							parent
							
								
									e7a562fa61
								
							
						
					
					
						commit
						e20be63026
					
				@ -290,6 +290,7 @@ impl LogEntryFetcher {
 | 
			
		||||
        let contract = ZgsFlow::new(self.contract_address, self.provider.clone());
 | 
			
		||||
        let provider = self.provider.clone();
 | 
			
		||||
        let confirmation_delay = self.confirmation_delay;
 | 
			
		||||
        let log_page_size = self.log_page_size;
 | 
			
		||||
        executor.spawn(
 | 
			
		||||
            async move {
 | 
			
		||||
                debug!("start_watch starts, start={}", start_block_number);
 | 
			
		||||
@ -305,6 +306,7 @@ impl LogEntryFetcher {
 | 
			
		||||
                        confirmation_delay,
 | 
			
		||||
                        &contract,
 | 
			
		||||
                        &block_hash_cache,
 | 
			
		||||
                        log_page_size,
 | 
			
		||||
                    )
 | 
			
		||||
                    .await
 | 
			
		||||
                    {
 | 
			
		||||
@ -340,6 +342,7 @@ impl LogEntryFetcher {
 | 
			
		||||
        confirmation_delay: u64,
 | 
			
		||||
        contract: &ZgsFlow<Provider<RetryClient<Http>>>,
 | 
			
		||||
        block_hash_cache: &Arc<RwLock<BTreeMap<u64, Option<BlockHashAndSubmissionIndex>>>>,
 | 
			
		||||
        log_page_size: u64,
 | 
			
		||||
    ) -> Result<Option<(u64, H256, Option<Option<u64>>)>> {
 | 
			
		||||
        let latest_block_number = provider.get_block_number().await?.as_u64();
 | 
			
		||||
        debug!(
 | 
			
		||||
@ -408,8 +411,11 @@ impl LogEntryFetcher {
 | 
			
		||||
            .to_block(to_block_number)
 | 
			
		||||
            .address(contract.address().into())
 | 
			
		||||
            .filter;
 | 
			
		||||
        let mut stream = LogQuery::new(provider, &filter, Duration::from_millis(10))
 | 
			
		||||
            .with_page_size(log_page_size);
 | 
			
		||||
        let mut block_logs: BTreeMap<u64, Vec<Log>> = BTreeMap::new();
 | 
			
		||||
        for log in provider.get_logs(&filter).await? {
 | 
			
		||||
        while let Some(maybe_log) = stream.next().await {
 | 
			
		||||
            let log = maybe_log?;
 | 
			
		||||
            let block_number = log
 | 
			
		||||
                .block_number
 | 
			
		||||
                .ok_or_else(|| anyhow!("block number missing"))?
 | 
			
		||||
@ -496,20 +502,20 @@ impl LogEntryFetcher {
 | 
			
		||||
                } else {
 | 
			
		||||
                    None
 | 
			
		||||
                };
 | 
			
		||||
                for log in log_events.into_iter() {
 | 
			
		||||
                    if let Err(e) = watch_tx.send(log) {
 | 
			
		||||
                        warn!("send LogFetchProgress::Transaction failed: {:?}", e);
 | 
			
		||||
                        return Ok(progress);
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                if let Some(p) = &new_progress {
 | 
			
		||||
                    if let Err(e) = watch_tx.send(LogFetchProgress::SyncedBlock(*p)) {
 | 
			
		||||
                        warn!("send LogFetchProgress failed: {:?}", e);
 | 
			
		||||
                        warn!("send LogFetchProgress::SyncedBlock failed: {:?}", e);
 | 
			
		||||
                        return Ok(progress);
 | 
			
		||||
                    } else {
 | 
			
		||||
                        block_hash_cache.write().await.insert(p.0, None);
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                for log in log_events.into_iter() {
 | 
			
		||||
                    if let Err(e) = watch_tx.send(log) {
 | 
			
		||||
                        warn!("send log failed: {:?}", e);
 | 
			
		||||
                        return Ok(progress);
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                progress = new_progress;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@ -339,8 +339,7 @@ impl LogSyncManager {
 | 
			
		||||
                LogFetchProgress::Transaction(tx) => {
 | 
			
		||||
                    if !self.put_tx(tx.clone()).await {
 | 
			
		||||
                        // Unexpected error.
 | 
			
		||||
                        error!("log sync write error");
 | 
			
		||||
                        break;
 | 
			
		||||
                        bail!("log sync write error");
 | 
			
		||||
                    }
 | 
			
		||||
                    if let Err(e) = self.event_send.send(LogSyncEvent::TxSynced { tx }) {
 | 
			
		||||
                        // TODO: Do we need to wait until all receivers are initialized?
 | 
			
		||||
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user