diff --git a/Cargo.lock b/Cargo.lock index d4c9188..e69b117 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -311,6 +311,7 @@ dependencies = [ "bdk_sp_oracles", "bdk_sp_wallet", "clap", + "electrum_streaming_client", "indexer", "miniscript", "rand 0.9.2", @@ -326,8 +327,10 @@ version = "0.1.0" dependencies = [ "bip157", "bitcoin", + "electrum_streaming_client", "futures", "indexer", + "jsonrpc", "rayon", "redb", "reqwest", @@ -840,6 +843,19 @@ dependencies = [ "serde_json", ] +[[package]] +name = "electrum_streaming_client" +version = "0.4.0" +source = "git+https://github.com/sdmg15/electrum_streaming_client?branch=master#225fc0cd8bb8a95afca39843471bd4f7b9fda824" +dependencies = [ + "bitcoin", + "futures", + "serde", + "serde_json", + "tokio", + "tokio-util", +] + [[package]] name = "encoding_rs" version = "0.8.35" @@ -2417,6 +2433,7 @@ checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", diff --git a/cli/v2/Cargo.toml b/cli/v2/Cargo.toml index c2aa2a5..58b205f 100644 --- a/cli/v2/Cargo.toml +++ b/cli/v2/Cargo.toml @@ -21,5 +21,6 @@ rand = "0.9.0" indexer = { version = "0.1.0", path = "../../indexer", features = ["serde"]} bdk_sp_wallet = { version = "0.1.0", path = "../../wallet", features = ["serde"]} bdk_sp_oracles = { version = "0.1.0", path = "../../oracles" } +electrum_streaming_client = { git = "https://github.com/sdmg15/electrum_streaming_client", branch = "master", features = ["frigate"]} tracing = "0.1.41" tracing-subscriber = "0.3.19" diff --git a/cli/v2/src/main.rs b/cli/v2/src/main.rs index a32d009..ca8bde2 100644 --- a/cli/v2/src/main.rs +++ b/cli/v2/src/main.rs @@ -10,7 +10,7 @@ use bdk_sp::{ self, address::NetworkUnchecked, bip32, - consensus::Decodable, + consensus::{self, deserialize, Decodable}, hex::{DisplayHex, FromHex}, key::Secp256k1, script::PushBytesBuf, @@ -34,6 +34,7 @@ use bdk_sp_oracles::{ TrustedPeer, UnboundedReceiver, Warning, }, filters::kyoto::{FilterEvent, FilterSubscriber}, + frigate::{FrigateClient, StreamExt, SubscribeRequest, UnsubscribeRequest, DUMMY_COINBASE}, tweaks::blindbit::{BlindbitSubscriber, TweakEvent}, }; use bdk_sp_wallet::{ @@ -45,6 +46,7 @@ use bdk_sp_wallet::{ ChangeSet, SpWallet, }; use clap::{self, ArgGroup, Args, Parser, Subcommand}; +use electrum_streaming_client::{notification::Notification, Event}; use indexer::bdk_chain::BlockId; use rand::RngCore; use serde_json::json; @@ -161,6 +163,14 @@ pub enum Commands { #[clap(long)] hash: Option, }, + ScanFrigate { + #[clap(flatten)] + rpc_args: RpcArgs, + #[clap(long)] + height: Option, + #[clap(long)] + hash: Option, + }, Create { /// Network #[clap(long, short, default_value = "signet")] @@ -561,6 +571,132 @@ async fn main() -> anyhow::Result<()> { ); } } + Commands::ScanFrigate { + rpc_args, + height, + hash, + } => { + // The implementation done here differs from what is mentioned in the section + // https://github.com/sparrowwallet/frigate/tree/master?tab=readme-ov-file#blockchainsilentpaymentssubscribe + // This implementation is doing a one time scanning only. So instead of calling + // `blockchain.scripthash.subscribe` on each script from the wallet, we just subscribe + // and read the scanning result from the stream. On each result received we update the + // wallet state and once scanning progress reaches 1.0 (100%) we stop. + let sync_point = if let (Some(height), Some(hash)) = (height, hash) { + HeaderCheckpoint::new(height, hash) + } else if wallet.birthday.height <= wallet.chain().tip().height() { + let height = wallet.chain().tip().height(); + let hash = wallet.chain().tip().hash(); + HeaderCheckpoint::new(height, hash) + } else { + let checkpoint = wallet + .chain() + .get(wallet.birthday.height) + .expect("should be something"); + let height = checkpoint.height(); + let hash = checkpoint.hash(); + HeaderCheckpoint::new(height, hash) + }; + + let mut client = FrigateClient::connect(&rpc_args.url) + .await + .unwrap() + .with_timeout(tokio::time::Duration::from_secs(60)); + + let labels = wallet + .indexer() + .index() + .num_to_label + .clone() + .into_keys() + .collect::>(); + let labels = if !labels.is_empty() { + Some(labels) + } else { + None + }; + + let subscribe_params = SubscribeRequest { + scan_priv_key: *wallet.indexer().scan_sk(), + spend_pub_key: *wallet.indexer().spend_pk(), + start_height: Some(sync_point.height), + labels, + }; + + client.version().await?; + client.subscribe(&subscribe_params).await?; + + while let Some(event) = client.events.next().await { + if let Event::Notification(notification) = event { + match notification { + Notification::SpSubscribe(sp_subscribe_notification) => { + let histories = sp_subscribe_notification.history().clone(); + let progress = sp_subscribe_notification.progress(); + + let mut secrets_by_height: HashMap> = + HashMap::new(); + + tracing::debug!("Received history {:#?}", histories); + + histories.iter().for_each(|h| { + secrets_by_height + .entry(h.height) + .and_modify(|v| { + v.insert(h.tx_hash, h.tweak_key); + }) + .or_insert(HashMap::from([(h.tx_hash, h.tweak_key)])); + }); + + // Filter when the height is 0, because that would mean mempool transaction + for secret in secrets_by_height.into_iter().filter(|v| v.0 > 0) { + // Since frigate doesn't provide a blockchain.getblock we will mimick that here + // By constructing a block from the block header and the list of transactions + // received from the scan request + let header = client.get_block_header(secret.0).await?.header; + let mut raw_blk = consensus::serialize(&header); + raw_blk.push(0); + + // Push dummy coinbase + let coinbase: Transaction = + deserialize(&Vec::::from_hex(DUMMY_COINBASE).unwrap()) + .unwrap(); + let mut block: Block = deserialize(&raw_blk).unwrap(); + + let blockhash = header.block_hash(); + let mut txs: Vec = vec![coinbase]; + + for key in secret.1.keys() { + let tx_result = client.get_transaction(*key).await?; + txs.push(tx_result.tx); + } + + block.txdata = txs; + tracing::debug!("Final block {:?}", block); + wallet.apply_block_relevant(&block, secret.1, secret.0); + + let checkpoint = wallet.chain().tip().insert(BlockId { + height: secret.0, + hash: blockhash, + }); + wallet.update_chain(checkpoint); + } + + if progress >= 1.0 { + tracing::info!("Scanning completed"); + break; + } + } + _ => tracing::error!("Notification event not supported"), + } + } + } + // Unsubscribe once scanning is done + let unsub_req = UnsubscribeRequest { + scan_priv_key: *wallet.indexer().scan_sk(), + spend_pub_key: *wallet.indexer().spend_pk(), + }; + client.unsubscribe(&unsub_req).await?; + } Commands::Balance => { fn print_balances<'a>( title_str: &'a str, diff --git a/doc/tabconf7/frigate_playbook.sh b/doc/tabconf7/frigate_playbook.sh new file mode 100644 index 0000000..d2d4d06 --- /dev/null +++ b/doc/tabconf7/frigate_playbook.sh @@ -0,0 +1,51 @@ +#!/usr/bin/env bash + +########################### STAGE 1: setup #################################### + +# 1. Install dependencies locally and setup regtest environment +just non_nix_init +# 2. Check bitcoind is running on regtest +just cli getblockchaininfo +# 3. Check bdk-cli wallet was created correctly +just regtest-bdk balance +# 4. Check sp-cli wallet was created correctly +just regtest-sp balance +# 5. Synchronize bdk-cli wallet +just regtest-bdk sync + +###################### STAGE 2: fund bdk-cli wallet ########################### + +# 6. Get a new address from bdk-cli wallet +REGTEST_ADDRESS=$(just regtest-bdk unused_address | jq -r '.address' | tr -d '\n') +# 7. Mine a few more blocks to fund the wallet +just mine 1 $REGTEST_ADDRESS +# 8. Mine some of them to the internal wallet to confirm the bdk-cli balance +just mine 101 +# 9. Synchronize bdk-cli wallet +just regtest-bdk sync +# 10. Check balance +just regtest-bdk balance + +################ STAGE 3: create a silent payment output ###################### + +# 11. Get a silent payment code from sp-cli2 wallet +SP_CODE=$(just regtest-sp code | jq -r '.silent_payment_code' | tr -d '\n') +# 12. Create a transaction spending bdk-cli wallet UTXOs to a the previous silent payment code +RAW_TX=$(just regtest-bdk create_sp_tx --to-sp $SP_CODE:10000 --fee_rate 5 | jq -r '.raw_tx' | tr -d '\n') +TXID=$(just regtest-bdk broadcast --tx $RAW_TX | jq -r '.txid' | tr -d '\n') +# 14. Mine a new block +just mine 1 +# 15. Once the new transaction has been mined, synchronize bdk-cli wallet again +just regtest-bdk sync + +# ################## STAGE 4: find a silent payment output ###################### + +# 16. Now synchronize sp-cli2 wallet using frigate ephemeral scanning +FRIGATE_HOST="127.0.0.1:57001" +just regtest-sp scan-frigate --url $FRIGATE_HOST +# 17. Check balance on sp-cli2 wallet +just regtest-sp balance +# 18. Check balance on bdk-cli wallet +just regtest-bdk balance + +# At this point we will able to see SP outputs paid to out wallet! diff --git a/doc/tabconf7/justfile b/doc/tabconf7/justfile index 15259a3..5ca2233 100644 --- a/doc/tabconf7/justfile +++ b/doc/tabconf7/justfile @@ -121,7 +121,7 @@ build TAG="1.0.0" VERSION="29.0" RELEASE="29.0": machine RUN mkdir -p /build/frigate RUN mkdir -p /frigate WORKDIR /frigate - RUN git clone --recursive --branch 1.1.0 --depth 1 https://github.com/sparrowwallet/frigate.git . + RUN git clone --recursive --branch 1.3.2 --depth 1 https://github.com/sparrowwallet/frigate.git . RUN ./gradlew jpackage RUN cp -r ./build/jpackage/frigate /build/frigate RUN rm -rf /frigate diff --git a/oracles/Cargo.toml b/oracles/Cargo.toml index 2313138..cb9854d 100644 --- a/oracles/Cargo.toml +++ b/oracles/Cargo.toml @@ -13,9 +13,11 @@ redb = "2.4.0" rayon = "1.11.0" reqwest = { version = "0.12.23", features = ["json", "rustls-tls", "http2", "charset"], default-features = false } serde = { version = "1.0.219", features = ["serde_derive"] } -serde_json = "1.0.142" +serde_json = { version = "1.0.142", features = ["raw_value"]} +electrum_streaming_client = { git = "https://github.com/sdmg15/electrum_streaming_client", branch = "master", features = ["frigate"]} url = "2.5.4" tracing = "0.1.41" +jsonrpc = "=0.18.0" [lints] workspace = true diff --git a/oracles/src/frigate/mod.rs b/oracles/src/frigate/mod.rs new file mode 100644 index 0000000..3fe7289 --- /dev/null +++ b/oracles/src/frigate/mod.rs @@ -0,0 +1,213 @@ +use bip157::tokio; +use bip157::tokio::net::TcpStream; +use bip157::tokio::time::{timeout, Duration}; +use bitcoin::secp256k1::{PublicKey, SecretKey}; +use bitcoin::Txid; +use electrum_streaming_client::response::{FullTx, HeaderResp}; +use electrum_streaming_client::{request, AsyncClient, Event}; +use futures::channel::mpsc::UnboundedReceiver; +pub use futures::StreamExt; +use serde::{Deserialize, Serialize}; + +pub const DUMMY_COINBASE: &str = "01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff1b03951a0604f15ccf5609013803062b9b5a0100072f425443432f20000000000000000000"; + +#[derive(Debug)] +pub enum FrigateError { + JsonRpc(jsonrpc::Error), + ParseUrl(url::ParseError), + Serde(serde_json::Error), + Generic(String), +} + +impl From for FrigateError { + fn from(value: serde_json::Error) -> Self { + FrigateError::Serde(value) + } +} + +impl From for FrigateError { + fn from(value: url::ParseError) -> Self { + Self::ParseUrl(value) + } +} + +impl From for FrigateError { + fn from(value: jsonrpc::Error) -> Self { + Self::JsonRpc(value) + } +} + +impl From for FrigateError { + fn from(value: std::io::Error) -> Self { + Self::Generic(format!("Generic error {:?}", value)) + } +} + +impl std::fmt::Display for FrigateError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + FrigateError::Generic(str) => write!(f, "{str}"), + _ => write!(f, "Something wrong happened"), + } + } +} +impl std::error::Error for FrigateError {} + +pub struct FrigateClient { + pub host_url: String, + pub client: AsyncClient, + pub events: UnboundedReceiver, + pub worker: tokio::task::JoinHandle>, + pub request_timeout: Duration, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct History { + pub height: u32, + pub tx_hash: Txid, + pub tweak_key: PublicKey, +} + +#[derive(Serialize, Deserialize)] +pub struct NotifPayload { + scan_private_key: SecretKey, + spend_public_key: PublicKey, + address: String, + labels: Option>, + start_height: u32, + progress: f32, + history: Vec, +} + +#[derive(Serialize, Deserialize)] +pub struct SubscribeRequest { + pub scan_priv_key: SecretKey, + pub spend_pub_key: PublicKey, + pub start_height: Option, + pub labels: Option>, +} + +#[derive(Serialize, Deserialize)] +pub struct UnsubscribeRequest { + pub scan_priv_key: SecretKey, + pub spend_pub_key: PublicKey, +} + +#[derive(Serialize, Deserialize)] +pub struct GetRequest { + pub tx_hash: Txid, +} + +impl FrigateClient { + pub async fn connect(host_url: &str) -> Result { + let stream = TcpStream::connect(host_url) + .await + .map_err(|_| FrigateError::Generic("Can't connect to socket".to_string()))?; + + let (reader, writer) = stream.into_split(); + let (client, events, worker) = AsyncClient::new_tokio(reader, writer); + + let worker = tokio::spawn(async move { + tracing::debug!("Worker task started"); + match worker.await { + Ok(()) => { + tracing::debug!("Worker task completed successfully"); + Ok(()) + } + Err(e) => { + tracing::error!("Worker task failed: {}", e); + Err(e) + } + } + }); + + Ok(Self { + host_url: host_url.to_string(), + client, + events, + worker, + request_timeout: Duration::from_secs(10), + }) + } + + /// Sets a custom request timeout for this client. + pub fn with_timeout(mut self, timeout: Duration) -> Self { + self.request_timeout = timeout; + self + } + + pub async fn get_block_header(&mut self, height: u32) -> Result { + let res = timeout( + self.request_timeout, + self.client.send_request(request::Header { height }), + ) + .await + .map_err(|_| FrigateError::Generic("Header request timed out".to_string()))? + .map_err(|e| FrigateError::Generic(e.to_string()))?; + Ok(res) + } + + pub async fn get_transaction(&mut self, txid: Txid) -> Result { + let res = timeout( + self.request_timeout, + self.client.send_request(request::GetTx { txid }), + ) + .await + .map_err(|_| FrigateError::Generic("GetTx request timed out".to_string()))? + .map_err(|e| FrigateError::Generic(e.to_string()))?; + Ok(res) + } + + pub async fn version(&mut self) -> Result, FrigateError> { + let res = timeout( + self.request_timeout, + self.client.send_request(request::Version { + client_name: "bdk-sp".into(), + version: "1.4".into(), + }), + ) + .await + .map_err(|_| FrigateError::Generic("Version request timed out".to_string()))? + .map_err(|e| FrigateError::Generic(e.to_string()))?; + Ok(res) + } + + pub async fn subscribe(&mut self, req: &SubscribeRequest) -> Result { + let subscribe_req = request::Subscribe { + scan_priv_key: req.scan_priv_key, + scan_pub_key: req.spend_pub_key, + labels: req.labels.clone(), + start_height: req.start_height, + }; + + tracing::debug!("Sending subscribe event request..."); + let res = timeout( + self.request_timeout, + self.client.send_request(subscribe_req), + ) + .await + .map_err(|_| FrigateError::Generic("Subscribe request timed out".to_string()))? + .map_err(|e| FrigateError::Generic(e.to_string()))?; + + tracing::info!("Subscribed to silent payment address: {}", res); + Ok(res) + } + + pub async fn unsubscribe(&mut self, req: &UnsubscribeRequest) -> Result<(), FrigateError> { + let unsubscribe_req = request::UnSubscribe { + scan_priv_key: req.scan_priv_key, + scan_pub_key: req.spend_pub_key, + }; + + let res = timeout( + self.request_timeout, + self.client.send_request(unsubscribe_req), + ) + .await + .map_err(|_| FrigateError::Generic("Unsubscribe request timed out".to_string()))? + .map_err(|e| FrigateError::Generic(e.to_string()))?; + + tracing::info!("Unsubscribed to silent payment address: {:?}", res); + Ok(()) + } +} diff --git a/oracles/src/lib.rs b/oracles/src/lib.rs index 1d5a6a9..93a6146 100644 --- a/oracles/src/lib.rs +++ b/oracles/src/lib.rs @@ -1,3 +1,4 @@ pub mod filters; +pub mod frigate; pub mod tweaks; pub use bip157;