feat: Provider::watch_blocks_from and Provider::watch_canonical_blocks_from#3722
feat: Provider::watch_blocks_from and Provider::watch_canonical_blocks_from#37220xForerunner wants to merge 32 commits intoalloy-rs:mainfrom
Provider::watch_blocks_from and Provider::watch_canonical_blocks_from#3722Conversation
Provider::watch_blocks_from and Provider::watch_logs_fromProvider::watch_blocks_from and Provider::watch_canonical_blocks_from
|
I've updated this PR to remove |
|
@DaniPopes @mattsse Would love a review on this when you've got a minute! |
mattsse
left a comment
There was a problem hiding this comment.
this feature makes sense and I wanted something like this in the past.
because I believe we eventually want some additional logic on this, I really want this to be an actual stream type because maitnaining ~100 of stream! code is very difficult
| ) -> impl Stream< | ||
| Item = Pin<Box<dyn Future<Output = TransportResult<N::BlockResponse>> + Send + 'static>>, | ||
| > + Unpin |
There was a problem hiding this comment.
I want named types for this, because eventually we always need extra logic, and this isnt really easy to maintain
There was a problem hiding this comment.
Do you want a dedicated type here for the Stream, the future, or both?
|
|
||
| type Fut<T> = Pin<Box<dyn Future<Output = TransportResult<T>> + Send + 'static>>; | ||
|
|
||
| let stream = stream! { |
There was a problem hiding this comment.
I'd much prefer if we have a dedicated stream impl for this
There was a problem hiding this comment.
Works for me! Are you okay with futures::stream::unfold, or do you prefer a direct impl Stream
There was a problem hiding this comment.
Maybe you could let me know what you'd like the function signature to look like for WatchBlocksFrom::into_stream, so I can make sure I make the right types here.
|
@mattsse I've switched to using manual |
|
@mattsse I've also created concrete types for the futures here, and gotten rid of the Boxing. Feel free to revert the last two commits if you preferred it before. Should be good to go for another review, Thanks! |
|
@mattsse bumping again! |
We can now get a historical, concurrent, and reorg aware block stream with live tailing just like this:
provider .watch_blocks_from(start_block) .poll_interval(Duration::from_millis(500)) .hashes() .block_tag(BlockNumberOrTag::Latest) .canonical() .max_reorg_depth(16) .rpc_concurrency(rpc_concurrency) .into_stream() // Concurrently process canonical events. .for_each_concurrent(process_concurrency, |result| async move { match result { Ok(CanonicalEvent::Added(block)) => { let number = block.header().number(); let txs = block.transactions().len(); println!("processed block {number} ({txs} txs)"); } Ok(CanonicalEvent::Removed(block)) => { let number = block.header().number(); println!("reorged block {number}"); } Err(err) => eprintln!("failed to fetch block: {err}"), } }) .await;PR Checklist