From 9905dc1ec319b5bcfa19cd2cd4094173d7f12144 Mon Sep 17 00:00:00 2001 From: Sonkeng Maldini Date: Mon, 22 Dec 2025 21:32:43 +0100 Subject: [PATCH 1/2] feat(oracles): integrate frigate ephemeral scanning --- Cargo.lock | 1 + cli/v2/src/main.rs | 175 +++++++++++++++++- doc/tabconf7/frigate_playbook.sh | 51 ++++++ doc/tabconf7/justfile | 2 +- oracles/Cargo.toml | 3 +- oracles/src/frigate/mod.rs | 298 +++++++++++++++++++++++++++++++ oracles/src/lib.rs | 1 + 7 files changed, 528 insertions(+), 3 deletions(-) create mode 100644 doc/tabconf7/frigate_playbook.sh create mode 100644 oracles/src/frigate/mod.rs diff --git a/Cargo.lock b/Cargo.lock index d4c9188..58cf969 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -328,6 +328,7 @@ dependencies = [ "bitcoin", "futures", "indexer", + "jsonrpc", "rayon", "redb", "reqwest", diff --git a/cli/v2/src/main.rs b/cli/v2/src/main.rs index a32d009..db040b4 100644 --- a/cli/v2/src/main.rs +++ b/cli/v2/src/main.rs @@ -10,7 +10,8 @@ use bdk_sp::{ self, address::NetworkUnchecked, bip32, - consensus::Decodable, + consensus::{deserialize, Decodable}, + hashes::Hash, hex::{DisplayHex, FromHex}, key::Secp256k1, script::PushBytesBuf, @@ -34,6 +35,7 @@ use bdk_sp_oracles::{ TrustedPeer, UnboundedReceiver, Warning, }, filters::kyoto::{FilterEvent, FilterSubscriber}, + frigate::{FrigateClient, History, SubscribeRequest, UnsubscribeRequest, DUMMY_COINBASE}, tweaks::blindbit::{BlindbitSubscriber, TweakEvent}, }; use bdk_sp_wallet::{ @@ -161,6 +163,16 @@ pub enum Commands { #[clap(long)] hash: Option, }, + + ScanFrigate { + #[clap(flatten)] + rpc_args: RpcArgs, + #[clap(long)] + height: Option, + #[clap(long)] + hash: Option, + }, + Create { /// Network #[clap(long, short, default_value = "signet")] @@ -561,6 +573,167 @@ async fn main() -> anyhow::Result<()> { ); } } + Commands::ScanFrigate { + rpc_args, + height, + hash, + } => { + // The implementation done here differs from what is mentioned in the section + // https://github.com/sparrowwallet/frigate/tree/master?tab=readme-ov-file#blockchainsilentpaymentssubscribe + // This implementation is doing a one time scanning only. So instead of calling + // `blockchain.scripthash.subscribe` on each script from the wallet, we just subscribe + // and read the scanning result from the stream. On each result received we update the + // wallet state and once scanning progress reaches 1.0 (100%) we stop. + let sync_point = if let (Some(height), Some(hash)) = (height, hash) { + HeaderCheckpoint::new(height, hash) + } else if wallet.birthday.height <= wallet.chain().tip().height() { + let height = wallet.chain().tip().height(); + let hash = wallet.chain().tip().hash(); + HeaderCheckpoint::new(height, hash) + } else { + let checkpoint = wallet + .chain() + .get(wallet.birthday.height) + .expect("should be something"); + let height = checkpoint.height(); + let hash = checkpoint.hash(); + HeaderCheckpoint::new(height, hash) + }; + + let mut client = FrigateClient::connect(&rpc_args.url) + .await + .unwrap() + .with_timeout(tokio::time::Duration::from_secs(60)); + + let labels = wallet + .indexer() + .index() + .num_to_label + .clone() + .into_keys() + .collect::>(); + let labels = if !labels.is_empty() { + Some(labels) + } else { + None + }; + + let subscribe_params = SubscribeRequest { + scan_priv_key: *wallet.indexer().scan_sk(), + spend_pub_key: *wallet.indexer().spend_pk(), + start_height: Some(sync_point.height), + labels, + }; + + // Attempt to subscribe; any timeout will trigger unsubscribe automatically. + match client.subscribe_with_timeout(&subscribe_params).await { + Ok(Some((histories, progress))) => { + tracing::info!( + "Initial subscription result: {} histories, progress {}", + histories.len(), + progress + ); + } + Ok(None) => { + tracing::info!("Subscription acknowledged, awaiting notifications"); + } + Err(e) => { + tracing::error!("Subscribe failed: {}", e); + return Err(e.into()); + } + } + + tracing::info!("Starting frigate scanning loop..."); + loop { + match client.read_from_stream(4096).await { + Ok(subscribe_result) => { + if subscribe_result["params"].is_object() { + let histories: Vec = serde_json::from_value( + subscribe_result["params"]["history"].clone(), + )?; + let progress = subscribe_result["params"]["progress"] + .as_f64() + .unwrap_or(0.0) as f32; + + let mut secrets_by_height: HashMap> = + HashMap::new(); + + tracing::debug!("Received history {:#?}", histories); + + histories.iter().for_each(|h| { + secrets_by_height + .entry(h.height) + .and_modify(|v| { + v.insert(h.tx_hash, h.tweak_key); + }) + .or_insert(HashMap::from([(h.tx_hash, h.tweak_key)])); + }); + + // Filter when the height is 0, because that would mean mempool transaction + for secret in secrets_by_height.into_iter().filter(|v| v.0 > 0) { + // Since frigate doesn't provide a blockchain.getblock we will mimick that here + // By constructing a block from the block header and the list of transactions + // received from the scan request + let mut raw_blk = client.get_block_header(secret.0).await.unwrap(); + raw_blk.push_str("00"); + + // Push dummy coinbase + let coinbase: Transaction = + deserialize(&Vec::::from_hex(DUMMY_COINBASE).unwrap()) + .unwrap(); + let mut block: Block = + deserialize(&Vec::::from_hex(&raw_blk).unwrap()).unwrap(); + + let mut blockhash = BlockHash::all_zeros(); + + let mut txs: Vec = vec![coinbase]; + for key in secret.1.keys() { + let tx_result = + client.get_transaction(key.to_string()).await.unwrap(); + let tx: Transaction = + deserialize(&Vec::::from_hex(&tx_result.1).unwrap()) + .unwrap(); + txs.push(tx); + + blockhash = BlockHash::from_str(&tx_result.0).unwrap(); + } + + block.txdata = txs; + tracing::debug!("Final block {:?}", block); + wallet.apply_block_relevant(&block, secret.1, secret.0); + + tracing::debug!("Checkpoint hash {blockhash:?}"); + let checkpoint = wallet.chain().tip().insert(BlockId { + height: secret.0, + hash: blockhash, + }); + wallet.update_chain(checkpoint); + } + + tracing::info!("Progress {progress}"); + // Check the progress + if progress >= 1.0 { + tracing::info!("Scanning completed"); + break; + } + } + } + Err(e) if e.to_string().contains("timed out") => { + tracing::warn!("read_from_stream timeout, exiting scan"); + let unsubscribe_request = UnsubscribeRequest { + scan_privkey: *wallet.indexer().scan_sk(), + spend_pubkey: *wallet.indexer().spend_pk(), + }; + let _ = client.unsubscribe(&unsubscribe_request).await; + break; + } + Err(e) => { + tracing::error!("read_from_stream error: {}", e); + return Err(e.into()); + } + } + } + } Commands::Balance => { fn print_balances<'a>( title_str: &'a str, diff --git a/doc/tabconf7/frigate_playbook.sh b/doc/tabconf7/frigate_playbook.sh new file mode 100644 index 0000000..d2d4d06 --- /dev/null +++ b/doc/tabconf7/frigate_playbook.sh @@ -0,0 +1,51 @@ +#!/usr/bin/env bash + +########################### STAGE 1: setup #################################### + +# 1. Install dependencies locally and setup regtest environment +just non_nix_init +# 2. Check bitcoind is running on regtest +just cli getblockchaininfo +# 3. Check bdk-cli wallet was created correctly +just regtest-bdk balance +# 4. Check sp-cli wallet was created correctly +just regtest-sp balance +# 5. Synchronize bdk-cli wallet +just regtest-bdk sync + +###################### STAGE 2: fund bdk-cli wallet ########################### + +# 6. Get a new address from bdk-cli wallet +REGTEST_ADDRESS=$(just regtest-bdk unused_address | jq -r '.address' | tr -d '\n') +# 7. Mine a few more blocks to fund the wallet +just mine 1 $REGTEST_ADDRESS +# 8. Mine some of them to the internal wallet to confirm the bdk-cli balance +just mine 101 +# 9. Synchronize bdk-cli wallet +just regtest-bdk sync +# 10. Check balance +just regtest-bdk balance + +################ STAGE 3: create a silent payment output ###################### + +# 11. Get a silent payment code from sp-cli2 wallet +SP_CODE=$(just regtest-sp code | jq -r '.silent_payment_code' | tr -d '\n') +# 12. Create a transaction spending bdk-cli wallet UTXOs to a the previous silent payment code +RAW_TX=$(just regtest-bdk create_sp_tx --to-sp $SP_CODE:10000 --fee_rate 5 | jq -r '.raw_tx' | tr -d '\n') +TXID=$(just regtest-bdk broadcast --tx $RAW_TX | jq -r '.txid' | tr -d '\n') +# 14. Mine a new block +just mine 1 +# 15. Once the new transaction has been mined, synchronize bdk-cli wallet again +just regtest-bdk sync + +# ################## STAGE 4: find a silent payment output ###################### + +# 16. Now synchronize sp-cli2 wallet using frigate ephemeral scanning +FRIGATE_HOST="127.0.0.1:57001" +just regtest-sp scan-frigate --url $FRIGATE_HOST +# 17. Check balance on sp-cli2 wallet +just regtest-sp balance +# 18. Check balance on bdk-cli wallet +just regtest-bdk balance + +# At this point we will able to see SP outputs paid to out wallet! diff --git a/doc/tabconf7/justfile b/doc/tabconf7/justfile index 15259a3..5ca2233 100644 --- a/doc/tabconf7/justfile +++ b/doc/tabconf7/justfile @@ -121,7 +121,7 @@ build TAG="1.0.0" VERSION="29.0" RELEASE="29.0": machine RUN mkdir -p /build/frigate RUN mkdir -p /frigate WORKDIR /frigate - RUN git clone --recursive --branch 1.1.0 --depth 1 https://github.com/sparrowwallet/frigate.git . + RUN git clone --recursive --branch 1.3.2 --depth 1 https://github.com/sparrowwallet/frigate.git . RUN ./gradlew jpackage RUN cp -r ./build/jpackage/frigate /build/frigate RUN rm -rf /frigate diff --git a/oracles/Cargo.toml b/oracles/Cargo.toml index 2313138..f3ba162 100644 --- a/oracles/Cargo.toml +++ b/oracles/Cargo.toml @@ -13,9 +13,10 @@ redb = "2.4.0" rayon = "1.11.0" reqwest = { version = "0.12.23", features = ["json", "rustls-tls", "http2", "charset"], default-features = false } serde = { version = "1.0.219", features = ["serde_derive"] } -serde_json = "1.0.142" +serde_json = { version = "1.0.142", features = ["raw_value"]} url = "2.5.4" tracing = "0.1.41" +jsonrpc = "=0.18.0" [lints] workspace = true diff --git a/oracles/src/frigate/mod.rs b/oracles/src/frigate/mod.rs new file mode 100644 index 0000000..b46dda2 --- /dev/null +++ b/oracles/src/frigate/mod.rs @@ -0,0 +1,298 @@ +use bip157::tokio::io::{AsyncReadExt, AsyncWriteExt}; +use bip157::tokio::net::TcpStream; +use bip157::tokio::time::{timeout, Duration}; +use bitcoin::secp256k1::{PublicKey, SecretKey}; +use bitcoin::Txid; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Debug)] +pub enum FrigateError { + JsonRpc(jsonrpc::Error), + ParseUrl(url::ParseError), + Serde(serde_json::Error), + Generic(String), +} + +impl From for FrigateError { + fn from(value: serde_json::Error) -> Self { + FrigateError::Serde(value) + } +} + +impl From for FrigateError { + fn from(value: url::ParseError) -> Self { + Self::ParseUrl(value) + } +} + +impl From for FrigateError { + fn from(value: jsonrpc::Error) -> Self { + Self::JsonRpc(value) + } +} + +impl From for FrigateError { + fn from(value: std::io::Error) -> Self { + Self::Generic(format!("Generic error {:?}", value)) + } +} + +impl std::fmt::Display for FrigateError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + FrigateError::Generic(str) => write!(f, "{str}"), + _ => write!(f, "Something wrong happened"), + } + } +} +impl std::error::Error for FrigateError {} + +pub struct FrigateClient { + pub host_url: String, + client: Box, + pub request_timeout: Duration, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct History { + pub height: u32, + pub tx_hash: Txid, + pub tweak_key: PublicKey, +} + +#[derive(Serialize, Deserialize)] +pub struct NotifPayload { + scan_private_key: SecretKey, + spend_public_key: PublicKey, + address: String, + labels: Option>, + start_height: u32, + progress: f32, + history: Vec, +} + +#[derive(Serialize, Deserialize)] +pub struct SubscribeRequest { + pub scan_priv_key: SecretKey, + pub spend_pub_key: PublicKey, + pub start_height: Option, + pub labels: Option>, +} + +#[derive(Serialize, Deserialize)] +pub struct UnsubscribeRequest { + pub scan_privkey: SecretKey, + pub spend_pubkey: PublicKey, +} + +#[derive(Serialize, Deserialize)] +pub struct GetRequest { + pub tx_hash: Txid, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct RequestPayload { + pub method: String, + pub params: Value, + pub id: serde_json::Value, + pub jsonrpc: String, +} + +const SUBSCRIBE_RPC_METHOD: &str = "blockchain.silentpayments.subscribe"; +const UNSUBSCRIBE_RPC_METHOD: &str = "blockchain.silentpayments.unsubscribe"; +const GET_RPC_METHOD: &str = "blockchain.transaction.get"; +const VERSION_RPC_METHOD: &str = "server.version"; +const BLOCK_HEADER_RPC_METHOD: &str = "blockchain.block.header"; +const STREAM_READ_BYTES: usize = 4096; +pub const DUMMY_COINBASE: &str = "01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff1b03951a0604f15ccf5609013803062b9b5a0100072f425443432f20000000000000000000"; + +impl FrigateClient { + pub async fn connect(host_url: &str) -> Result { + let stream = TcpStream::connect(host_url) + .await + .map_err(|_| FrigateError::Generic("Can't connect to socket".to_string()))?; + + Ok(Self { + host_url: host_url.to_string(), + client: Box::new(stream), + request_timeout: Duration::from_secs(10), + }) + } + + /// Sets a custom request timeout for this client. + pub fn with_timeout(mut self, timeout: Duration) -> Self { + self.request_timeout = timeout; + self + } + + pub async fn read_from_stream(&mut self, size: usize) -> Result { + let mut buffer = vec![0; size]; + let n = self.client.read(&mut buffer).await?; + + tracing::debug!("Read bytes from stream {n}"); + match n { + 0 => Err(FrigateError::Generic("Nothing read".to_string())), + _ => { + let response_str = String::from_utf8_lossy(&buffer[..n]); + let result: Value = + serde_json::from_str(&response_str).map_err(FrigateError::Serde)?; + + Ok(result) + } + } + } + + async fn send_request(&mut self, req_bytes: &[u8]) -> Result { + match timeout(self.request_timeout, async { + self.client.write_all(req_bytes).await?; + self.client.write_all(b"\n").await?; + self.client.flush().await?; + self.read_from_stream(STREAM_READ_BYTES).await + }) + .await + { + Ok(res) => res, + Err(_) => Err(FrigateError::Generic(format!( + "request timed out after {:?}", + self.request_timeout + ))), + } + } + + pub async fn get_block_header(&mut self, height: u32) -> Result { + let params = vec![height]; + let req = RequestPayload { + method: BLOCK_HEADER_RPC_METHOD.to_string(), + params: serde_json::json!(params), + id: serde_json::Value::from(5), + jsonrpc: "2.0".to_string(), + }; + let req_bytes = serde_json::to_vec(&req)?; + let res = self.send_request(&req_bytes).await?; + + tracing::debug!("[Block Header Request] Result {:?}", res); + Ok(String::from(res["result"].as_str().unwrap())) + } + + pub async fn get_transaction( + &mut self, + txid: String, + ) -> Result<(String, String), FrigateError> { + let params = vec![txid, "true".to_string()]; + let req = RequestPayload { + method: GET_RPC_METHOD.to_string(), + id: serde_json::Value::from(4), + params: serde_json::json!(params), + jsonrpc: "2.0".to_string(), + }; + let req_bytes = serde_json::to_vec(&req)?; + let res = self.send_request(&req_bytes).await?; + + tracing::debug!("[Get tx Request] Result {:#?}", res); + let blockhash = String::from(res["result"]["blockhash"].as_str().unwrap()); + let hex = String::from(res["result"]["hex"].as_str().unwrap()); + Ok((blockhash, hex)) + } + + pub async fn version(&mut self) -> Result<(), FrigateError> { + let params = vec!["frigate-cli", "1.4"]; + + let req = RequestPayload { + method: VERSION_RPC_METHOD.to_string(), + params: serde_json::json!(params), + id: serde_json::Value::from(3), + jsonrpc: "2.0".to_string(), + }; + + let req_bytes = serde_json::to_vec(&req)?; + self.send_request(&req_bytes).await?; + + Ok(()) + } + + pub async fn subscribe( + &mut self, + req: &SubscribeRequest, + ) -> Result, f32)>, FrigateError> { + self.version().await?; + let mut params: Vec = vec![ + serde_json::json!(req.scan_priv_key), + serde_json::json!(req.spend_pub_key), + ]; + + if let Some(start_height) = req.start_height { + params.push(serde_json::json!(start_height)); + } + + if let Some(labels) = &req.labels { + params.push(serde_json::json!(labels)); + } + + let req = RequestPayload { + method: SUBSCRIBE_RPC_METHOD.to_string(), + params: serde_json::json!(params), + id: serde_json::Value::from(2), + jsonrpc: "2.0".to_string(), + }; + + let req_bytes = serde_json::to_vec(&req)?; + let result = self.send_request(&req_bytes).await?; + + if result["result"].is_string() { + tracing::info!( + "Subscribed to silent payment address: {:?}", + result["result"] + ); + return Ok(None); + } else if result["params"].is_object() { + let histories: Vec = + serde_json::from_value(result["params"]["history"].clone()) + .map_err(FrigateError::Serde)?; + let progress = result["params"]["progress"].as_f64().unwrap_or(0.0) as f32; + return Ok(Some((histories, progress))); + } + + Ok(None) + } + + pub async fn unsubscribe(&mut self, req: &UnsubscribeRequest) -> Result { + let params: Vec = vec![ + serde_json::json!(req.scan_privkey), + serde_json::json!(req.spend_pubkey), + ]; + + self.version().await?; + let req = RequestPayload { + method: UNSUBSCRIBE_RPC_METHOD.to_string(), + id: serde_json::Value::from(1), + params: serde_json::json!(params), + jsonrpc: "2.0".to_string(), + }; + + let req_bytes = serde_json::to_vec(&req)?; + let result = self.send_request(&req_bytes).await?; + + Ok(result["result"].to_string()) + } + + pub async fn subscribe_with_timeout( + &mut self, + req: &SubscribeRequest, + ) -> Result, f32)>, FrigateError> { + match self.subscribe(req).await { + Ok(res) => Ok(res), + Err(e) => { + if e.to_string().contains("timed out") { + tracing::warn!("subscribe request timed out, attempting unsubscribe"); + let unsub = UnsubscribeRequest { + scan_privkey: req.scan_priv_key, + spend_pubkey: req.spend_pub_key, + }; + let _ = self.unsubscribe(&unsub).await; + } + Err(e) + } + } + } +} diff --git a/oracles/src/lib.rs b/oracles/src/lib.rs index 1d5a6a9..93a6146 100644 --- a/oracles/src/lib.rs +++ b/oracles/src/lib.rs @@ -1,3 +1,4 @@ pub mod filters; +pub mod frigate; pub mod tweaks; pub use bip157; From ec47641beb8141acc0f0d8f051f05730acfa8c7f Mon Sep 17 00:00:00 2001 From: Sonkeng Maldini Date: Tue, 14 Apr 2026 10:36:08 +0100 Subject: [PATCH 2/2] feat: use electrum_streaming_client for handling requests --- Cargo.lock | 16 +++ cli/v2/Cargo.toml | 1 + cli/v2/src/main.rs | 89 ++++--------- oracles/Cargo.toml | 1 + oracles/src/frigate/mod.rs | 265 +++++++++++++------------------------ 5 files changed, 134 insertions(+), 238 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 58cf969..e69b117 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -311,6 +311,7 @@ dependencies = [ "bdk_sp_oracles", "bdk_sp_wallet", "clap", + "electrum_streaming_client", "indexer", "miniscript", "rand 0.9.2", @@ -326,6 +327,7 @@ version = "0.1.0" dependencies = [ "bip157", "bitcoin", + "electrum_streaming_client", "futures", "indexer", "jsonrpc", @@ -841,6 +843,19 @@ dependencies = [ "serde_json", ] +[[package]] +name = "electrum_streaming_client" +version = "0.4.0" +source = "git+https://github.com/sdmg15/electrum_streaming_client?branch=master#225fc0cd8bb8a95afca39843471bd4f7b9fda824" +dependencies = [ + "bitcoin", + "futures", + "serde", + "serde_json", + "tokio", + "tokio-util", +] + [[package]] name = "encoding_rs" version = "0.8.35" @@ -2418,6 +2433,7 @@ checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", diff --git a/cli/v2/Cargo.toml b/cli/v2/Cargo.toml index c2aa2a5..58b205f 100644 --- a/cli/v2/Cargo.toml +++ b/cli/v2/Cargo.toml @@ -21,5 +21,6 @@ rand = "0.9.0" indexer = { version = "0.1.0", path = "../../indexer", features = ["serde"]} bdk_sp_wallet = { version = "0.1.0", path = "../../wallet", features = ["serde"]} bdk_sp_oracles = { version = "0.1.0", path = "../../oracles" } +electrum_streaming_client = { git = "https://github.com/sdmg15/electrum_streaming_client", branch = "master", features = ["frigate"]} tracing = "0.1.41" tracing-subscriber = "0.3.19" diff --git a/cli/v2/src/main.rs b/cli/v2/src/main.rs index db040b4..ca8bde2 100644 --- a/cli/v2/src/main.rs +++ b/cli/v2/src/main.rs @@ -10,8 +10,7 @@ use bdk_sp::{ self, address::NetworkUnchecked, bip32, - consensus::{deserialize, Decodable}, - hashes::Hash, + consensus::{self, deserialize, Decodable}, hex::{DisplayHex, FromHex}, key::Secp256k1, script::PushBytesBuf, @@ -35,7 +34,7 @@ use bdk_sp_oracles::{ TrustedPeer, UnboundedReceiver, Warning, }, filters::kyoto::{FilterEvent, FilterSubscriber}, - frigate::{FrigateClient, History, SubscribeRequest, UnsubscribeRequest, DUMMY_COINBASE}, + frigate::{FrigateClient, StreamExt, SubscribeRequest, UnsubscribeRequest, DUMMY_COINBASE}, tweaks::blindbit::{BlindbitSubscriber, TweakEvent}, }; use bdk_sp_wallet::{ @@ -47,6 +46,7 @@ use bdk_sp_wallet::{ ChangeSet, SpWallet, }; use clap::{self, ArgGroup, Args, Parser, Subcommand}; +use electrum_streaming_client::{notification::Notification, Event}; use indexer::bdk_chain::BlockId; use rand::RngCore; use serde_json::json; @@ -163,7 +163,6 @@ pub enum Commands { #[clap(long)] hash: Option, }, - ScanFrigate { #[clap(flatten)] rpc_args: RpcArgs, @@ -172,7 +171,6 @@ pub enum Commands { #[clap(long)] hash: Option, }, - Create { /// Network #[clap(long, short, default_value = "signet")] @@ -625,35 +623,15 @@ async fn main() -> anyhow::Result<()> { labels, }; - // Attempt to subscribe; any timeout will trigger unsubscribe automatically. - match client.subscribe_with_timeout(&subscribe_params).await { - Ok(Some((histories, progress))) => { - tracing::info!( - "Initial subscription result: {} histories, progress {}", - histories.len(), - progress - ); - } - Ok(None) => { - tracing::info!("Subscription acknowledged, awaiting notifications"); - } - Err(e) => { - tracing::error!("Subscribe failed: {}", e); - return Err(e.into()); - } - } + client.version().await?; + client.subscribe(&subscribe_params).await?; - tracing::info!("Starting frigate scanning loop..."); - loop { - match client.read_from_stream(4096).await { - Ok(subscribe_result) => { - if subscribe_result["params"].is_object() { - let histories: Vec = serde_json::from_value( - subscribe_result["params"]["history"].clone(), - )?; - let progress = subscribe_result["params"]["progress"] - .as_f64() - .unwrap_or(0.0) as f32; + while let Some(event) = client.events.next().await { + if let Event::Notification(notification) = event { + match notification { + Notification::SpSubscribe(sp_subscribe_notification) => { + let histories = sp_subscribe_notification.history().clone(); + let progress = sp_subscribe_notification.progress(); let mut secrets_by_height: HashMap> = HashMap::new(); @@ -674,35 +652,28 @@ async fn main() -> anyhow::Result<()> { // Since frigate doesn't provide a blockchain.getblock we will mimick that here // By constructing a block from the block header and the list of transactions // received from the scan request - let mut raw_blk = client.get_block_header(secret.0).await.unwrap(); - raw_blk.push_str("00"); + let header = client.get_block_header(secret.0).await?.header; + let mut raw_blk = consensus::serialize(&header); + raw_blk.push(0); // Push dummy coinbase let coinbase: Transaction = deserialize(&Vec::::from_hex(DUMMY_COINBASE).unwrap()) .unwrap(); - let mut block: Block = - deserialize(&Vec::::from_hex(&raw_blk).unwrap()).unwrap(); - - let mut blockhash = BlockHash::all_zeros(); + let mut block: Block = deserialize(&raw_blk).unwrap(); + let blockhash = header.block_hash(); let mut txs: Vec = vec![coinbase]; + for key in secret.1.keys() { - let tx_result = - client.get_transaction(key.to_string()).await.unwrap(); - let tx: Transaction = - deserialize(&Vec::::from_hex(&tx_result.1).unwrap()) - .unwrap(); - txs.push(tx); - - blockhash = BlockHash::from_str(&tx_result.0).unwrap(); + let tx_result = client.get_transaction(*key).await?; + txs.push(tx_result.tx); } block.txdata = txs; tracing::debug!("Final block {:?}", block); wallet.apply_block_relevant(&block, secret.1, secret.0); - tracing::debug!("Checkpoint hash {blockhash:?}"); let checkpoint = wallet.chain().tip().insert(BlockId { height: secret.0, hash: blockhash, @@ -710,29 +681,21 @@ async fn main() -> anyhow::Result<()> { wallet.update_chain(checkpoint); } - tracing::info!("Progress {progress}"); - // Check the progress if progress >= 1.0 { tracing::info!("Scanning completed"); break; } } - } - Err(e) if e.to_string().contains("timed out") => { - tracing::warn!("read_from_stream timeout, exiting scan"); - let unsubscribe_request = UnsubscribeRequest { - scan_privkey: *wallet.indexer().scan_sk(), - spend_pubkey: *wallet.indexer().spend_pk(), - }; - let _ = client.unsubscribe(&unsubscribe_request).await; - break; - } - Err(e) => { - tracing::error!("read_from_stream error: {}", e); - return Err(e.into()); + _ => tracing::error!("Notification event not supported"), } } } + // Unsubscribe once scanning is done + let unsub_req = UnsubscribeRequest { + scan_priv_key: *wallet.indexer().scan_sk(), + spend_pub_key: *wallet.indexer().spend_pk(), + }; + client.unsubscribe(&unsub_req).await?; } Commands::Balance => { fn print_balances<'a>( diff --git a/oracles/Cargo.toml b/oracles/Cargo.toml index f3ba162..cb9854d 100644 --- a/oracles/Cargo.toml +++ b/oracles/Cargo.toml @@ -14,6 +14,7 @@ rayon = "1.11.0" reqwest = { version = "0.12.23", features = ["json", "rustls-tls", "http2", "charset"], default-features = false } serde = { version = "1.0.219", features = ["serde_derive"] } serde_json = { version = "1.0.142", features = ["raw_value"]} +electrum_streaming_client = { git = "https://github.com/sdmg15/electrum_streaming_client", branch = "master", features = ["frigate"]} url = "2.5.4" tracing = "0.1.41" jsonrpc = "=0.18.0" diff --git a/oracles/src/frigate/mod.rs b/oracles/src/frigate/mod.rs index b46dda2..3fe7289 100644 --- a/oracles/src/frigate/mod.rs +++ b/oracles/src/frigate/mod.rs @@ -1,10 +1,15 @@ -use bip157::tokio::io::{AsyncReadExt, AsyncWriteExt}; +use bip157::tokio; use bip157::tokio::net::TcpStream; use bip157::tokio::time::{timeout, Duration}; use bitcoin::secp256k1::{PublicKey, SecretKey}; use bitcoin::Txid; +use electrum_streaming_client::response::{FullTx, HeaderResp}; +use electrum_streaming_client::{request, AsyncClient, Event}; +use futures::channel::mpsc::UnboundedReceiver; +pub use futures::StreamExt; use serde::{Deserialize, Serialize}; -use serde_json::Value; + +pub const DUMMY_COINBASE: &str = "01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff1b03951a0604f15ccf5609013803062b9b5a0100072f425443432f20000000000000000000"; #[derive(Debug)] pub enum FrigateError { @@ -50,7 +55,9 @@ impl std::error::Error for FrigateError {} pub struct FrigateClient { pub host_url: String, - client: Box, + pub client: AsyncClient, + pub events: UnboundedReceiver, + pub worker: tokio::task::JoinHandle>, pub request_timeout: Duration, } @@ -82,8 +89,8 @@ pub struct SubscribeRequest { #[derive(Serialize, Deserialize)] pub struct UnsubscribeRequest { - pub scan_privkey: SecretKey, - pub spend_pubkey: PublicKey, + pub scan_priv_key: SecretKey, + pub spend_pub_key: PublicKey, } #[derive(Serialize, Deserialize)] @@ -91,31 +98,34 @@ pub struct GetRequest { pub tx_hash: Txid, } -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct RequestPayload { - pub method: String, - pub params: Value, - pub id: serde_json::Value, - pub jsonrpc: String, -} - -const SUBSCRIBE_RPC_METHOD: &str = "blockchain.silentpayments.subscribe"; -const UNSUBSCRIBE_RPC_METHOD: &str = "blockchain.silentpayments.unsubscribe"; -const GET_RPC_METHOD: &str = "blockchain.transaction.get"; -const VERSION_RPC_METHOD: &str = "server.version"; -const BLOCK_HEADER_RPC_METHOD: &str = "blockchain.block.header"; -const STREAM_READ_BYTES: usize = 4096; -pub const DUMMY_COINBASE: &str = "01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff1b03951a0604f15ccf5609013803062b9b5a0100072f425443432f20000000000000000000"; - impl FrigateClient { pub async fn connect(host_url: &str) -> Result { let stream = TcpStream::connect(host_url) .await .map_err(|_| FrigateError::Generic("Can't connect to socket".to_string()))?; + let (reader, writer) = stream.into_split(); + let (client, events, worker) = AsyncClient::new_tokio(reader, writer); + + let worker = tokio::spawn(async move { + tracing::debug!("Worker task started"); + match worker.await { + Ok(()) => { + tracing::debug!("Worker task completed successfully"); + Ok(()) + } + Err(e) => { + tracing::error!("Worker task failed: {}", e); + Err(e) + } + } + }); + Ok(Self { host_url: host_url.to_string(), - client: Box::new(stream), + client, + events, + worker, request_timeout: Duration::from_secs(10), }) } @@ -126,173 +136,78 @@ impl FrigateClient { self } - pub async fn read_from_stream(&mut self, size: usize) -> Result { - let mut buffer = vec![0; size]; - let n = self.client.read(&mut buffer).await?; - - tracing::debug!("Read bytes from stream {n}"); - match n { - 0 => Err(FrigateError::Generic("Nothing read".to_string())), - _ => { - let response_str = String::from_utf8_lossy(&buffer[..n]); - let result: Value = - serde_json::from_str(&response_str).map_err(FrigateError::Serde)?; - - Ok(result) - } - } - } - - async fn send_request(&mut self, req_bytes: &[u8]) -> Result { - match timeout(self.request_timeout, async { - self.client.write_all(req_bytes).await?; - self.client.write_all(b"\n").await?; - self.client.flush().await?; - self.read_from_stream(STREAM_READ_BYTES).await - }) + pub async fn get_block_header(&mut self, height: u32) -> Result { + let res = timeout( + self.request_timeout, + self.client.send_request(request::Header { height }), + ) .await - { - Ok(res) => res, - Err(_) => Err(FrigateError::Generic(format!( - "request timed out after {:?}", - self.request_timeout - ))), - } - } - - pub async fn get_block_header(&mut self, height: u32) -> Result { - let params = vec![height]; - let req = RequestPayload { - method: BLOCK_HEADER_RPC_METHOD.to_string(), - params: serde_json::json!(params), - id: serde_json::Value::from(5), - jsonrpc: "2.0".to_string(), - }; - let req_bytes = serde_json::to_vec(&req)?; - let res = self.send_request(&req_bytes).await?; - - tracing::debug!("[Block Header Request] Result {:?}", res); - Ok(String::from(res["result"].as_str().unwrap())) + .map_err(|_| FrigateError::Generic("Header request timed out".to_string()))? + .map_err(|e| FrigateError::Generic(e.to_string()))?; + Ok(res) } - pub async fn get_transaction( - &mut self, - txid: String, - ) -> Result<(String, String), FrigateError> { - let params = vec![txid, "true".to_string()]; - let req = RequestPayload { - method: GET_RPC_METHOD.to_string(), - id: serde_json::Value::from(4), - params: serde_json::json!(params), - jsonrpc: "2.0".to_string(), - }; - let req_bytes = serde_json::to_vec(&req)?; - let res = self.send_request(&req_bytes).await?; - - tracing::debug!("[Get tx Request] Result {:#?}", res); - let blockhash = String::from(res["result"]["blockhash"].as_str().unwrap()); - let hex = String::from(res["result"]["hex"].as_str().unwrap()); - Ok((blockhash, hex)) + pub async fn get_transaction(&mut self, txid: Txid) -> Result { + let res = timeout( + self.request_timeout, + self.client.send_request(request::GetTx { txid }), + ) + .await + .map_err(|_| FrigateError::Generic("GetTx request timed out".to_string()))? + .map_err(|e| FrigateError::Generic(e.to_string()))?; + Ok(res) } - pub async fn version(&mut self) -> Result<(), FrigateError> { - let params = vec!["frigate-cli", "1.4"]; - - let req = RequestPayload { - method: VERSION_RPC_METHOD.to_string(), - params: serde_json::json!(params), - id: serde_json::Value::from(3), - jsonrpc: "2.0".to_string(), - }; - - let req_bytes = serde_json::to_vec(&req)?; - self.send_request(&req_bytes).await?; - - Ok(()) + pub async fn version(&mut self) -> Result, FrigateError> { + let res = timeout( + self.request_timeout, + self.client.send_request(request::Version { + client_name: "bdk-sp".into(), + version: "1.4".into(), + }), + ) + .await + .map_err(|_| FrigateError::Generic("Version request timed out".to_string()))? + .map_err(|e| FrigateError::Generic(e.to_string()))?; + Ok(res) } - pub async fn subscribe( - &mut self, - req: &SubscribeRequest, - ) -> Result, f32)>, FrigateError> { - self.version().await?; - let mut params: Vec = vec![ - serde_json::json!(req.scan_priv_key), - serde_json::json!(req.spend_pub_key), - ]; - - if let Some(start_height) = req.start_height { - params.push(serde_json::json!(start_height)); - } - - if let Some(labels) = &req.labels { - params.push(serde_json::json!(labels)); - } - - let req = RequestPayload { - method: SUBSCRIBE_RPC_METHOD.to_string(), - params: serde_json::json!(params), - id: serde_json::Value::from(2), - jsonrpc: "2.0".to_string(), + pub async fn subscribe(&mut self, req: &SubscribeRequest) -> Result { + let subscribe_req = request::Subscribe { + scan_priv_key: req.scan_priv_key, + scan_pub_key: req.spend_pub_key, + labels: req.labels.clone(), + start_height: req.start_height, }; - let req_bytes = serde_json::to_vec(&req)?; - let result = self.send_request(&req_bytes).await?; - - if result["result"].is_string() { - tracing::info!( - "Subscribed to silent payment address: {:?}", - result["result"] - ); - return Ok(None); - } else if result["params"].is_object() { - let histories: Vec = - serde_json::from_value(result["params"]["history"].clone()) - .map_err(FrigateError::Serde)?; - let progress = result["params"]["progress"].as_f64().unwrap_or(0.0) as f32; - return Ok(Some((histories, progress))); - } + tracing::debug!("Sending subscribe event request..."); + let res = timeout( + self.request_timeout, + self.client.send_request(subscribe_req), + ) + .await + .map_err(|_| FrigateError::Generic("Subscribe request timed out".to_string()))? + .map_err(|e| FrigateError::Generic(e.to_string()))?; - Ok(None) + tracing::info!("Subscribed to silent payment address: {}", res); + Ok(res) } - pub async fn unsubscribe(&mut self, req: &UnsubscribeRequest) -> Result { - let params: Vec = vec![ - serde_json::json!(req.scan_privkey), - serde_json::json!(req.spend_pubkey), - ]; - - self.version().await?; - let req = RequestPayload { - method: UNSUBSCRIBE_RPC_METHOD.to_string(), - id: serde_json::Value::from(1), - params: serde_json::json!(params), - jsonrpc: "2.0".to_string(), + pub async fn unsubscribe(&mut self, req: &UnsubscribeRequest) -> Result<(), FrigateError> { + let unsubscribe_req = request::UnSubscribe { + scan_priv_key: req.scan_priv_key, + scan_pub_key: req.spend_pub_key, }; - let req_bytes = serde_json::to_vec(&req)?; - let result = self.send_request(&req_bytes).await?; - - Ok(result["result"].to_string()) - } + let res = timeout( + self.request_timeout, + self.client.send_request(unsubscribe_req), + ) + .await + .map_err(|_| FrigateError::Generic("Unsubscribe request timed out".to_string()))? + .map_err(|e| FrigateError::Generic(e.to_string()))?; - pub async fn subscribe_with_timeout( - &mut self, - req: &SubscribeRequest, - ) -> Result, f32)>, FrigateError> { - match self.subscribe(req).await { - Ok(res) => Ok(res), - Err(e) => { - if e.to_string().contains("timed out") { - tracing::warn!("subscribe request timed out, attempting unsubscribe"); - let unsub = UnsubscribeRequest { - scan_privkey: req.scan_priv_key, - spend_pubkey: req.spend_pub_key, - }; - let _ = self.unsubscribe(&unsub).await; - } - Err(e) - } - } + tracing::info!("Unsubscribed to silent payment address: {:?}", res); + Ok(()) } }