From f58b2c62a3e03ebede51d1c0bb1c1d370f50722a Mon Sep 17 00:00:00 2001 From: Erick Tryzelaar Date: Tue, 17 Jul 2018 14:53:41 -0700 Subject: [PATCH 1/2] Convert Client::_fetch_target's lookup fn into a method This is a minor simplification of `Client::_fetch_target` to factor out the interior lookup function in order to cut out on a number of variables being passed in as arguments. --- src/client.rs | 278 +++++++++++++++++++++++--------------------------- 1 file changed, 130 insertions(+), 148 deletions(-) diff --git a/src/client.rs b/src/client.rs index 8d965ed0..c4012498 100644 --- a/src/client.rs +++ b/src/client.rs @@ -410,175 +410,157 @@ where // TODO this should check the local repo first fn _fetch_target(&mut self, target: &TargetPath) -> Result> { - fn lookup( - tuf: &mut Tuf, - config: &Config, - default_terminate: bool, - current_depth: u32, - target: &VirtualTargetPath, - snapshot: &SnapshotMetadata, - targets: Option<&TargetsMetadata>, - local: &mut L_, - remote: &mut R_, - ) -> (bool, Result) - where - D_: DataInterchange, - L_: Repository, - R_: Repository, - T_: PathTranslator, - { - if current_depth > config.max_delegation_depth { - warn!( - "Walking the delegation graph would have exceeded the configured max depth: {}", - config.max_delegation_depth - ); - return (default_terminate, Err(Error::NotFound)); - } + let virt = self.config.path_translator.real_to_virtual(target)?; - // these clones are dumb, but we need immutable values and not references for update - // tuf in the loop below - let targets = match targets { - Some(t) => t.clone(), - None => match tuf.targets() { - Some(t) => t.clone(), - None => { - return ( - default_terminate, - Err(Error::MissingMetadata(Role::Targets)), - ) - } - }, - }; + let snapshot = self.tuf + .snapshot() + .ok_or_else(|| Error::MissingMetadata(Role::Snapshot))? + .clone(); + let (_, target_description) = + self.lookup_target_description(false, 0, &virt, &snapshot, None); + let target_description = target_description?; - if let Some(t) = targets.targets().get(target) { - return (default_terminate, Ok(t.clone())); - } + self.remote.fetch_target( + target, + &target_description, + self.config.min_bytes_per_second, + ) + } - let delegations = match targets.delegations() { - Some(d) => d, - None => return (default_terminate, Err(Error::NotFound)), - }; + fn lookup_target_description( + &mut self, + default_terminate: bool, + current_depth: u32, + target: &VirtualTargetPath, + snapshot: &SnapshotMetadata, + targets: Option<&TargetsMetadata>, + ) -> (bool, Result) { + if current_depth > self.config.max_delegation_depth { + warn!( + "Walking the delegation graph would have exceeded the configured max depth: {}", + self.config.max_delegation_depth + ); + return (default_terminate, Err(Error::NotFound)); + } - for delegation in delegations.roles().iter() { - if !delegation.paths().iter().any(|p| target.is_child(p)) { - if delegation.terminating() { - return (true, Err(Error::NotFound)); - } else { - continue; - } + // these clones are dumb, but we need immutable values and not references for update + // tuf in the loop below + let targets = match targets { + Some(t) => t.clone(), + None => match self.tuf.targets() { + Some(t) => t.clone(), + None => { + return ( + default_terminate, + Err(Error::MissingMetadata(Role::Targets)), + ) } + }, + }; - let role_meta = match snapshot.meta().get(delegation.role()) { - Some(m) => m, - None if !delegation.terminating() => continue, - None => return (true, Err(Error::NotFound)), - }; + if let Some(t) = targets.targets().get(target) { + return (default_terminate, Ok(t.clone())); + } - let (alg, value) = match crypto::hash_preference(role_meta.hashes()) { - Ok(h) => h, - Err(e) => return (delegation.terminating(), Err(e)), - }; + let delegations = match targets.delegations() { + Some(d) => d, + None => return (default_terminate, Err(Error::NotFound)), + }; - let version = if tuf.root().consistent_snapshot() { - MetadataVersion::Hash(value.clone()) + for delegation in delegations.roles().iter() { + if !delegation.paths().iter().any(|p| target.is_child(p)) { + if delegation.terminating() { + return (true, Err(Error::NotFound)); } else { - MetadataVersion::None - }; + continue; + } + } + + let role_meta = match snapshot.meta().get(delegation.role()) { + Some(m) => m, + None if !delegation.terminating() => continue, + None => return (true, Err(Error::NotFound)), + }; - let signed_meta = match local - .fetch_metadata::( + let (alg, value) = match crypto::hash_preference(role_meta.hashes()) { + Ok(h) => h, + Err(e) => return (delegation.terminating(), Err(e)), + }; + + let version = if self.tuf.root().consistent_snapshot() { + MetadataVersion::Hash(value.clone()) + } else { + MetadataVersion::None + }; + + let signed_meta = match self.local + .fetch_metadata::( + delegation.role(), + &MetadataVersion::None, + &Some(role_meta.size()), + self.config.min_bytes_per_second(), + Some((alg, value.clone())), + ) + .or_else(|_| { + self.remote.fetch_metadata::( delegation.role(), - &MetadataVersion::None, + &version, &Some(role_meta.size()), - config.min_bytes_per_second(), + self.config.min_bytes_per_second(), Some((alg, value.clone())), ) - .or_else(|_| { - remote.fetch_metadata::( + }) { + Ok(m) => m, + Err(ref e) if !delegation.terminating() => { + warn!("Failed to fetch metadata {:?}: {:?}", delegation.role(), e); + continue; + } + Err(e) => { + warn!("Failed to fetch metadata {:?}: {:?}", delegation.role(), e); + return (true, Err(e)); + } + }; + + match self.tuf.update_delegation(delegation.role(), &signed_meta) { + Ok(_) => { + match self.local.store_metadata( + delegation.role(), + &MetadataVersion::None, + &signed_meta, + ) { + Ok(_) => (), + Err(e) => warn!( + "Error storing metadata {:?} locally: {:?}", delegation.role(), - &version, - &Some(role_meta.size()), - config.min_bytes_per_second(), - Some((alg, value.clone())), - ) - }) { - Ok(m) => m, - Err(ref e) if !delegation.terminating() => { - warn!("Failed to fetch metadata {:?}: {:?}", delegation.role(), e); - continue; + e + ), } - Err(e) => { - warn!("Failed to fetch metadata {:?}: {:?}", delegation.role(), e); - return (true, Err(e)); - } - }; - match tuf.update_delegation(delegation.role(), &signed_meta) { - Ok(_) => { - match local.store_metadata( - delegation.role(), - &MetadataVersion::None, - &signed_meta, - ) { - Ok(_) => (), - Err(e) => warn!( - "Error storing metadata {:?} locally: {:?}", - delegation.role(), - e - ), - } - - let meta = tuf.delegations().get(delegation.role()).unwrap().clone(); - let (term, res) = lookup( - tuf, - config, - delegation.terminating(), - current_depth + 1, - target, - snapshot, - Some(&meta), - local, - remote, - ); - - if term && res.is_err() { - return (true, res); - } - - // TODO end recursion early + let meta = self.tuf + .delegations() + .get(delegation.role()) + .unwrap() + .clone(); + let (term, res) = self.lookup_target_description( + delegation.terminating(), + current_depth + 1, + target, + snapshot, + Some(&meta), + ); + + if term && res.is_err() { + return (true, res); } - Err(_) if !delegation.terminating() => continue, - Err(e) => return (true, Err(e)), - }; - } - (default_terminate, Err(Error::NotFound)) + // TODO end recursion early + } + Err(_) if !delegation.terminating() => continue, + Err(e) => return (true, Err(e)), + }; } - let virt = self.config.path_translator.real_to_virtual(target)?; - - let snapshot = self.tuf - .snapshot() - .ok_or_else(|| Error::MissingMetadata(Role::Snapshot))? - .clone(); - let (_, target_description) = lookup( - &mut self.tuf, - &self.config, - false, - 0, - &virt, - &snapshot, - None, - &mut self.local, - &mut self.remote, - ); - let target_description = target_description?; - - self.remote.fetch_target( - target, - &target_description, - self.config.min_bytes_per_second, - ) + (default_terminate, Err(Error::NotFound)) } } From 5e24ebe38a4588702f520179d8ecaca9e156dc86 Mon Sep 17 00:00:00 2001 From: Erick Tryzelaar Date: Mon, 23 Jul 2018 20:39:10 -0700 Subject: [PATCH 2/2] WIP: Migrate over to futures 0.1 and hyper 0.12 This patch rewrites tuf to be asynchronous, which required a substantial amount of changes, and frankly made some things quite a bit uglier. Async/Await however should really clean this up though. --- Cargo.toml | 10 +- src/client.rs | 919 +++++++++++++++++++++++++--------------- src/error.rs | 11 +- src/lib.rs | 18 + src/macros.rs | 21 + src/metadata.rs | 2 +- src/repository.rs | 488 ++++++++++++--------- src/tuf.rs | 73 ++-- src/util.rs | 273 ++++++++---- tests/integration.rs | 4 +- tests/simple_example.rs | 41 +- 11 files changed, 1164 insertions(+), 696 deletions(-) create mode 100644 src/macros.rs diff --git a/Cargo.toml b/Cargo.toml index cdad101d..2298b7a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,10 +21,15 @@ name = "tuf" path = "./src/lib.rs" [dependencies] +bytes = "0.4" chrono = { version = "0.4", features = [ "serde" ] } data-encoding = "2.0.0-rc.2" derp = "0.0.10" -hyper = "0.10" +futures = "0.1" +futures-cpupool = "0.1" +futures-fs = "0.0.4" +http = "0.1" +hyper = "0.12" itoa = "0.4" log = "0.4" ring = { version = "0.12", features = [ "rsa_signing" ] } @@ -32,7 +37,10 @@ serde = "1" serde_derive = "1" serde_json = "1" tempfile = "3" +tokio = "0.1" +tokio-serde = "0.2" untrusted = "0.5" +url = "1" [dev-dependencies] lazy_static = "1" diff --git a/src/client.rs b/src/client.rs index c4012498..3293db55 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3,11 +3,14 @@ //! # Example //! //! ```no_run +//! extern crate futures; +//! extern crate futures_fs; //! extern crate hyper; //! extern crate tuf; +//! extern crate url; //! +//! use futures::Future; //! use hyper::client::Client as HttpClient; -//! use hyper::Url; //! use std::path::PathBuf; //! use tuf::Tuf; //! use tuf::crypto::KeyId; @@ -16,6 +19,7 @@ //! MetadataVersion}; //! use tuf::interchange::Json; //! use tuf::repository::{Repository, FileSystemRepository, HttpRepository}; +//! use url::Url; //! //! static TRUSTED_ROOT_KEY_IDS: &'static [&str] = &[ //! "diNfThTFm0PI8R-Bq7NztUIvZbZiaC_weJBgcqaHlWw=", @@ -28,7 +32,8 @@ //! .map(|k| KeyId::from_string(k).unwrap()) //! .collect(); //! -//! let local = FileSystemRepository::::new(PathBuf::from("~/.rustup")) +//! let pool = futures_fs::FsPool::new(1); +//! let local = FileSystemRepository::::new(pool, PathBuf::from("~/.rustup")) //! .unwrap(); //! //! let remote = HttpRepository::new( @@ -38,20 +43,25 @@ //! None); //! //! let mut client = Client::with_root_pinned( -//! &key_ids, +//! key_ids, //! Config::default(), //! local, //! remote, -//! ).unwrap(); -//! let _ = client.update_local().unwrap(); -//! let _ = client.update_remote().unwrap(); +//! ).wait().unwrap(); +//! let _ = client.update_local().wait().unwrap(); +//! let _ = client.update_remote().wait().unwrap(); //! } //! ``` -use std::io::{Read, Write}; +use std::io::Write; +use std::iter::Iterator; +use std::sync::{Arc, Mutex}; +use bytes::Bytes; use crypto::{self, KeyId}; use error::Error; +use futures::future::{Either, Loop, loop_fn}; +use futures::{future, Future, Stream}; use interchange::DataInterchange; use metadata::{ MetadataPath, MetadataVersion, Role, RootMetadata, SnapshotMetadata, TargetDescription, @@ -59,8 +69,8 @@ use metadata::{ }; use repository::Repository; use tuf::Tuf; -use util::SafeReader; -use Result; +use util::{future_ok, future_err}; +use {TufFuture, TufStream, Result}; /// Translates real paths (where a file is stored) into virtual paths (how it is addressed in TUF) /// and back. @@ -110,457 +120,678 @@ impl PathTranslator for DefaultTranslator { /// A client that interacts with TUF repositories. pub struct Client where - D: DataInterchange, - L: Repository, - R: Repository, - T: PathTranslator, + D: DataInterchange + 'static, + L: Repository + 'static, + R: Repository + 'static, + T: PathTranslator + 'static, { - tuf: Tuf, - config: Config, - local: L, - remote: R, + tuf: Arc>>, + config: Arc>, + local: Arc, + remote: Arc, } impl Client where - D: DataInterchange, - L: Repository, - R: Repository, - T: PathTranslator, + D: DataInterchange + 'static, + L: Repository + 'static, + R: Repository + 'static, + T: PathTranslator + 'static, { /// Create a new TUF client. It will attempt to load initial root metadata from the local repo /// and return an error if it cannot do so. /// /// **WARNING**: This method offers weaker security guarantees than the related method /// `with_root_pinned`. - pub fn new(config: Config, local: L, remote: R) -> Result { - let root = local - .fetch_metadata( + pub fn new(config: Config, local: L, remote: R) -> TufFuture { + let local = Arc::new(local); + let remote = Arc::new(remote); + + let max_root_size = config.max_root_size; + let min_bytes_per_second = config.min_bytes_per_second; + + let client = local.fetch_metadata( &MetadataPath::from_role(&Role::Root), &MetadataVersion::Number(1), - &config.max_root_size, - config.min_bytes_per_second, + max_root_size, + min_bytes_per_second, None, - )?; - - let tuf = Tuf::from_root(&root)?; - - Ok(Client { - tuf, - config, - local, - remote, - }) + ) + .and_then(move |root| { + let tuf = Tuf::from_root(&root)?; + + Ok(Client { + tuf: Arc::new(Mutex::new(tuf)), + config: Arc::new(config), + local, + remote, + }) + }); + + Box::new(client) } /// Create a new TUF client. It will attempt to load initial root metadata the local and remote /// repositories using the provided key IDs to pin the verification. /// /// This is the preferred method of creating a client. - pub fn with_root_pinned<'a, I>( + pub fn with_root_pinned( trusted_root_keys: I, config: Config, local: L, remote: R, - ) -> Result + ) -> TufFuture where - I: IntoIterator, + I: IntoIterator + 'static, T: PathTranslator, { - let root = local + let local = Arc::new(local); + let remote = Arc::new(remote); + + let remote_ = remote.clone(); + let max_root_size = config.max_root_size; + let min_bytes_per_second = config.min_bytes_per_second; + + let client = local .fetch_metadata( &MetadataPath::from_role(&Role::Root), &MetadataVersion::Number(1), - &config.max_root_size, - config.min_bytes_per_second, + max_root_size, + min_bytes_per_second, None, ) - .or_else(|_| { - remote.fetch_metadata( + .or_else(move |_| { + remote_.fetch_metadata( &MetadataPath::from_role(&Role::Root), &MetadataVersion::Number(1), - &config.max_root_size, - config.min_bytes_per_second, + max_root_size, + min_bytes_per_second, None, ) - })?; - - let tuf = Tuf::from_root_pinned(root, trusted_root_keys)?; - - Ok(Client { - tuf, - config, - local, - remote, - }) + }) + .and_then(move |root| { + let tuf = Tuf::from_root_pinned(root, trusted_root_keys)?; + + Ok(Client { + tuf: Arc::new(Mutex::new(tuf)), + config: Arc::new(config), + local, + remote, + }) + }); + + Box::new(client) } /// Update TUF metadata from the local repository. /// /// Returns `true` if an update occurred and `false` otherwise. - pub fn update_local(&mut self) -> Result { - let r = Self::update_root(&mut self.tuf, &mut self.local, &self.config)?; - let ts = match Self::update_timestamp(&mut self.tuf, &mut self.local, &self.config) { - Ok(b) => b, - Err(e) => { - warn!( - "Error updating timestamp metadata from local sources: {:?}", - e - ); - false - } - }; - let sn = match Self::update_snapshot(&mut self.tuf, &mut self.local, &self.config) { - Ok(b) => b, - Err(e) => { - warn!( - "Error updating snapshot metadata from local sources: {:?}", - e - ); - false - } - }; - let ta = match Self::update_targets(&mut self.tuf, &mut self.local, &self.config) { - Ok(b) => b, - Err(e) => { - warn!( - "Error updating targets metadata from local sources: {:?}", - e - ); - false - } - }; - - Ok(r || ts || sn || ta) + pub fn update_local(&mut self) -> TufFuture { + let tuf1 = self.tuf.clone(); + let tuf2 = self.tuf.clone(); + let tuf3 = self.tuf.clone(); + let tuf4 = self.tuf.clone(); + + let config1 = self.config.clone(); + let config2 = self.config.clone(); + let config3 = self.config.clone(); + let config4 = self.config.clone(); + + let local1 = self.local.clone(); + let local2 = self.local.clone(); + let local3 = self.local.clone(); + let local4 = self.local.clone(); + + Box::new( + Self::update_root(tuf1, local1, config1) + .and_then(move |r| { + Self::update_timestamp(tuf2, local2, config2) + .or_else(|e| { + warn!( + "Error updating root metadata from local sources: {:?}", + e + ); + Ok(false) + }) + .and_then(move |ts| { + Self::update_snapshot(tuf3, local3, config3) + .or_else(|e| { + warn!( + "Error updating snapshot metadata from local sources: {:?}", + e + ); + Ok(false) + }) + .and_then(move |sn| { + Self::update_targets(tuf4, local4, config4) + .or_else(|e| { + warn!( + "Error updating targets metadata from local sources: {:?}", + e + ); + Ok(false) + }) + .and_then(move |ta| { + Ok(r || ts || sn || ta) + }) + }) + }) + }) + ) } /// Update TUF metadata from the remote repository. /// /// Returns `true` if an update occurred and `false` otherwise. - pub fn update_remote(&mut self) -> Result { - let r = Self::update_root(&mut self.tuf, &mut self.remote, &self.config)?; - let ts = Self::update_timestamp(&mut self.tuf, &mut self.remote, &self.config)?; - let sn = Self::update_snapshot(&mut self.tuf, &mut self.remote, &self.config)?; - let ta = Self::update_targets(&mut self.tuf, &mut self.remote, &self.config)?; - - Ok(r || ts || sn || ta) + pub fn update_remote(&mut self) -> TufFuture { + let tuf1 = self.tuf.clone(); + let tuf2 = self.tuf.clone(); + let tuf3 = self.tuf.clone(); + let tuf4 = self.tuf.clone(); + + let config1 = self.config.clone(); + let config2 = self.config.clone(); + let config3 = self.config.clone(); + let config4 = self.config.clone(); + + let remote1 = self.remote.clone(); + let remote2 = self.remote.clone(); + let remote3 = self.remote.clone(); + let remote4 = self.remote.clone(); + + Box::new( + Self::update_root(tuf1, remote1, config1) + .and_then(move |r| { + Self::update_timestamp(tuf2, remote2, config2) + .and_then(move |ts| { + Self::update_snapshot(tuf3, remote3, config3) + .and_then(move |sn| { + Self::update_targets(tuf4, remote4, config4) + .and_then(move |ta| { + Ok(r || ts || sn || ta) + }) + }) + }) + }) + ) } /// Returns `true` if an update occurred and `false` otherwise. - fn update_root(tuf: &mut Tuf, repo: &mut V, config: &Config) -> Result + fn update_root(tuf: Arc>>, repo: Arc, config: Arc>) -> TufFuture where - V: Repository, + V: Repository + 'static, U: PathTranslator, { - let latest_root = repo.fetch_metadata( - &MetadataPath::from_role(&Role::Root), - &MetadataVersion::None, - &config.max_root_size, - config.min_bytes_per_second, - None, - )?; - let latest_version = D::deserialize::(latest_root.signed())?.version(); - - if latest_version < tuf.root().version() { - return Err(Error::VerificationFailure(format!( - "Latest root version is lower than current root version: {} < {}", - latest_version, - tuf.root().version() - ))); - } else if latest_version == tuf.root().version() { - return Ok(false); - } - let err_msg = "TUF claimed no update occurred when one should have. \ This is a programming error. Please report this as a bug."; - for i in (tuf.root().version() + 1)..latest_version { - let signed = repo.fetch_metadata( + //let tuf = self.tuf.clone(); + let max_root_size = config.max_root_size; + let min_bytes_per_second = config.min_bytes_per_second; + + let updated = + repo.fetch_metadata( &MetadataPath::from_role(&Role::Root), - &MetadataVersion::Number(i), - &config.max_root_size, - config.min_bytes_per_second, + &MetadataVersion::None, + max_root_size, + min_bytes_per_second, None, - )?; - if !tuf.update_root(&signed)? { - error!("{}", err_msg); - return Err(Error::Programming(err_msg.into())); - } - } + ) + .and_then(move |latest_root| { + let latest_version = + D::deserialize::(latest_root.signed())?.version(); + + let root_version = { + let tuf = tuf.lock().expect("poisoned lock"); + tuf.root().version() + }; + + if latest_version < root_version { + return Err(Error::VerificationFailure(format!( + "Latest root version is lower than current root version: {} < {}", + latest_version, root_version, + ))); + } else if latest_version == root_version { + return Ok(Either::A(future::ok(false))); + } - if !tuf.update_root(&latest_root)? { - error!("{}", err_msg); - return Err(Error::Programming(err_msg.into())); - } - Ok(true) + let mut updated_roots = Vec::new(); + for i in (root_version + 1)..latest_version { + let tuf = tuf.clone(); + + updated_roots.push( + repo + .fetch_metadata( + &MetadataPath::from_role(&Role::Root), + &MetadataVersion::Number(i), + max_root_size, + min_bytes_per_second, + None, + ) + .map(move |signed| { + let mut tuf = tuf.lock().expect("poisoned lock"); + + if !tuf.update_root(&signed)? { + error!("{}", err_msg); + return Err(Error::Programming(err_msg.into())); + } + + Ok(()) + }), + ); + } + + Ok(Either::B(future::join_all(updated_roots).and_then( + move |_| { + let mut tuf = tuf.lock().expect("poisoned lock"); + + if !tuf.update_root(&latest_root)? { + error!("{}", err_msg); + return Err(Error::Programming(err_msg.into())); + } + + Ok(true) + } + ))) + }) + .flatten(); + + Box::new(updated) } /// Returns `true` if an update occurred and `false` otherwise. - fn update_timestamp(tuf: &mut Tuf, repo: &mut V, config: &Config) -> Result + fn update_timestamp(tuf: Arc>>, repo: Arc, config: Arc>) -> TufFuture where V: Repository, U: PathTranslator, { - let ts = repo.fetch_metadata( - &MetadataPath::from_role(&Role::Timestamp), - &MetadataVersion::None, - &config.max_timestamp_size, - config.min_bytes_per_second, - None, - )?; - tuf.update_timestamp(&ts) + let ts = repo + .fetch_metadata( + &MetadataPath::from_role(&Role::Timestamp), + &MetadataVersion::None, + config.max_timestamp_size, + config.min_bytes_per_second, + None, + ) + .and_then(move |ts| { + let mut tuf = tuf.lock().expect("poisoned lock"); + tuf.update_timestamp(&ts) + }); + + Box::new(ts) } /// Returns `true` if an update occurred and `false` otherwise. - fn update_snapshot(tuf: &mut Tuf, repo: &mut V, config: &Config) -> Result + fn update_snapshot(tuf: Arc>>, repo: Arc, config: Arc>) -> TufFuture where V: Repository, U: PathTranslator, { - let snapshot_description = match tuf.timestamp() { - Some(ts) => Ok(ts.snapshot()), - None => Err(Error::MissingMetadata(Role::Timestamp)), - }?.clone(); + let snap = { + let tuf = tuf.lock().expect("poisoned lock"); - if snapshot_description.version() <= tuf.snapshot().map(|s| s.version()).unwrap_or(0) { - return Ok(false); - } + let snapshot_description = match tuf.timestamp() { + Some(ts) => ts.snapshot().clone(), + None => { + return future_err(Error::MissingMetadata(Role::Timestamp)); + } + }; - let (alg, value) = crypto::hash_preference(snapshot_description.hashes())?; + let (alg, value) = try_future!( + crypto::hash_preference(snapshot_description.hashes()) + ); - let version = if tuf.root().consistent_snapshot() { - MetadataVersion::Number(snapshot_description.version()) - } else { - MetadataVersion::None + if snapshot_description.version() <= tuf.snapshot().map(|s| s.version()).unwrap_or(0) { + return future_ok(false); + } + + let version = if tuf.root().consistent_snapshot() { + MetadataVersion::Number(snapshot_description.version()) + } else { + MetadataVersion::None + }; + + repo.fetch_metadata( + &MetadataPath::from_role(&Role::Snapshot), + &version, + Some(snapshot_description.size()), + config.min_bytes_per_second, + Some((alg, value.clone())), + ) }; - let snap = repo.fetch_metadata( - &MetadataPath::from_role(&Role::Snapshot), - &version, - &Some(snapshot_description.size()), - config.min_bytes_per_second, - Some((alg, value.clone())), - )?; - tuf.update_snapshot(&snap) + Box::new( + snap.and_then(move |snap| { + let mut tuf = tuf.lock().unwrap(); + tuf.update_snapshot(&snap) + }) + ) } /// Returns `true` if an update occurred and `false` otherwise. - fn update_targets(tuf: &mut Tuf, repo: &mut V, config: &Config) -> Result + fn update_targets(tuf: Arc>>, repo: Arc, config: Arc>) -> TufFuture where V: Repository, U: PathTranslator, { - let targets_description = match tuf.snapshot() { - Some(sn) => match sn.meta().get(&MetadataPath::from_role(&Role::Targets)) { - Some(d) => Ok(d), - None => Err(Error::VerificationFailure( - "Snapshot metadata did not contain a description of the \ - current targets metadata." - .into(), - )), - }, - None => Err(Error::MissingMetadata(Role::Snapshot)), - }?.clone(); - - if targets_description.version() <= tuf.targets().map(|t| t.version()).unwrap_or(0) { - return Ok(false); - } + let targets = { + let tuf = tuf.lock().expect("poisoned lock"); + + let targets_description = match tuf.snapshot() { + Some(sn) => match sn.meta().get(&MetadataPath::from_role(&Role::Targets)) { + Some(d) => d, + None => { + return Box::new(future::err(Error::VerificationFailure( + "Snapshot metadata did not contain a description of the \ + current targets metadata." + .into(), + ))); + } + }, + None => { + return Box::new(future::err(Error::MissingMetadata(Role::Snapshot))); + } + }; + + if targets_description.version() <= tuf.targets().map(|t| t.version()).unwrap_or(0) { + return Box::new(future::ok(false)); + } + + let (alg, value) = try_future!(crypto::hash_preference(targets_description.hashes())); - let (alg, value) = crypto::hash_preference(targets_description.hashes())?; + let version = if tuf.root().consistent_snapshot() { + MetadataVersion::Hash(value.clone()) + } else { + MetadataVersion::None + }; - let version = if tuf.root().consistent_snapshot() { - MetadataVersion::Hash(value.clone()) - } else { - MetadataVersion::None + repo.fetch_metadata( + &MetadataPath::from_role(&Role::Targets), + &version, + Some(targets_description.size()), + config.min_bytes_per_second, + Some((alg, value.clone())), + ) }; - let targets = repo.fetch_metadata( - &MetadataPath::from_role(&Role::Targets), - &version, - &Some(targets_description.size()), - config.min_bytes_per_second, - Some((alg, value.clone())), - )?; - tuf.update_targets(&targets) + Box::new( + targets.and_then(move |targets| { + let mut tuf = tuf.lock().unwrap(); + tuf.update_targets(&targets) + }) + ) } /// Fetch a target from the remote repo and write it to the local repo. - pub fn fetch_target(&mut self, target: &TargetPath) -> Result<()> { - let read = self._fetch_target(target)?; - self.local.store_target(read, target) + pub fn fetch_target(&mut self, target: &TargetPath) -> TufFuture<()> { + let stream = self.fetch_target_stream(target); + self.local.store_target(stream, target) } /// Fetch a target from the remote repo and write it to the provided writer. - pub fn fetch_target_to_writer( + pub fn fetch_target_to_writer( &mut self, target: &TargetPath, mut write: W, - ) -> Result<()> { - let mut read = self._fetch_target(&target)?; - let mut buf = [0; 1024]; - loop { - let bytes_read = read.read(&mut buf)?; - if bytes_read == 0 { - break; - } - write.write_all(&buf[..bytes_read])? - } - Ok(()) + ) -> TufFuture<()> { + Box::new( + self.fetch_target_stream(&target) + .for_each(move |bytes| { + write.write_all(&bytes)?; + Ok(()) + }) + ) } // TODO this should check the local repo first - fn _fetch_target(&mut self, target: &TargetPath) -> Result> { - let virt = self.config.path_translator.real_to_virtual(target)?; - - let snapshot = self.tuf - .snapshot() - .ok_or_else(|| Error::MissingMetadata(Role::Snapshot))? - .clone(); - let (_, target_description) = - self.lookup_target_description(false, 0, &virt, &snapshot, None); - let target_description = target_description?; - - self.remote.fetch_target( - target, - &target_description, - self.config.min_bytes_per_second, + fn fetch_target_stream(&mut self, target: &TargetPath) -> TufStream { + let virt = try_stream!(self.config.path_translator.real_to_virtual(target)); + + let snapshot = { + let tuf = self.tuf.lock().unwrap(); + try_stream!(tuf.snapshot().ok_or_else(|| Error::MissingMetadata(Role::Snapshot))) + .clone() + }; + + let target = target.clone(); + let remote = self.remote.clone(); + let min_bytes_per_second = self.config.min_bytes_per_second; + + Box::new( + Self::lookup_target_description( + self.tuf.clone(), + self.config.max_delegation_depth, + min_bytes_per_second, + false, + 0, + Arc::new(virt), + snapshot, + None, + self.local.clone(), + self.remote.clone(), + ) + .map(move |target_description| { + remote.fetch_target( + &target, + &target_description, + min_bytes_per_second, + ) + }) + .map_err(|(_, err)| err) + .flatten_stream() ) } - fn lookup_target_description( - &mut self, + fn lookup_target_description( + tuf: Arc>>, + max_delegation_depth: u32, + min_bytes_per_second: u32, default_terminate: bool, current_depth: u32, - target: &VirtualTargetPath, - snapshot: &SnapshotMetadata, - targets: Option<&TargetsMetadata>, - ) -> (bool, Result) { - if current_depth > self.config.max_delegation_depth { + target: Arc, + snapshot: Arc, + targets: Option>, + local: Arc, + remote: Arc, + ) -> Box> + where + D_: DataInterchange + 'static, + L_: Repository + 'static, + R_: Repository + 'static, + { + if current_depth > max_delegation_depth { warn!( "Walking the delegation graph would have exceeded the configured max depth: {}", - self.config.max_delegation_depth + max_delegation_depth ); - return (default_terminate, Err(Error::NotFound)); + return future_err((default_terminate, Error::NotFound)); } - // these clones are dumb, but we need immutable values and not references for update - // tuf in the loop below - let targets = match targets { - Some(t) => t.clone(), - None => match self.tuf.targets() { - Some(t) => t.clone(), - None => { - return ( - default_terminate, - Err(Error::MissingMetadata(Role::Targets)), - ) - } - }, - }; - - if let Some(t) = targets.targets().get(target) { - return (default_terminate, Ok(t.clone())); - } - - let delegations = match targets.delegations() { - Some(d) => d, - None => return (default_terminate, Err(Error::NotFound)), - }; - - for delegation in delegations.roles().iter() { - if !delegation.paths().iter().any(|p| target.is_child(p)) { - if delegation.terminating() { - return (true, Err(Error::NotFound)); - } else { - continue; - } - } + let delegations = { + let tuf = tuf.lock().expect("poisoned lock"); - let role_meta = match snapshot.meta().get(delegation.role()) { - Some(m) => m, - None if !delegation.terminating() => continue, - None => return (true, Err(Error::NotFound)), + let targets = if let Some(targets) = targets { + targets.clone() + } else if let Some(targets) = tuf.targets() { + targets.clone() + } else { + return future_err((default_terminate, Error::MissingMetadata(Role::Targets))); }; - let (alg, value) = match crypto::hash_preference(role_meta.hashes()) { - Ok(h) => h, - Err(e) => return (delegation.terminating(), Err(e)), - }; + if let Some(target) = targets.targets().get(&target) { + return future_ok(target.clone()); + } - let version = if self.tuf.root().consistent_snapshot() { - MetadataVersion::Hash(value.clone()) + if let Some(delegations) = targets.delegations() { + // this clone is dumb, but we need immutable values and not references for update + // tuf in the loop below + delegations.roles().clone().into_iter() } else { - MetadataVersion::None - }; + return future_err((default_terminate, Error::NotFound)); + } + }; - let signed_meta = match self.local - .fetch_metadata::( - delegation.role(), - &MetadataVersion::None, - &Some(role_meta.size()), - self.config.min_bytes_per_second(), - Some((alg, value.clone())), - ) - .or_else(|_| { - self.remote.fetch_metadata::( - delegation.role(), - &version, - &Some(role_meta.size()), - self.config.min_bytes_per_second(), - Some((alg, value.clone())), - ) - }) { - Ok(m) => m, - Err(ref e) if !delegation.terminating() => { - warn!("Failed to fetch metadata {:?}: {:?}", delegation.role(), e); - continue; - } - Err(e) => { - warn!("Failed to fetch metadata {:?}: {:?}", delegation.role(), e); - return (true, Err(e)); - } - }; + Box::new( + loop_fn(delegations, move |mut delegations| { + let snapshot = snapshot.clone(); - match self.tuf.update_delegation(delegation.role(), &signed_meta) { - Ok(_) => { - match self.local.store_metadata( - delegation.role(), - &MetadataVersion::None, - &signed_meta, - ) { - Ok(_) => (), - Err(e) => warn!( - "Error storing metadata {:?} locally: {:?}", - delegation.role(), - e - ), + let delegation = if let Some(delegation) = delegations.next() { + delegation + } else { + return future_err((default_terminate, Error::NotFound)); + }; + + if !delegation.paths().iter().any(|p| target.is_child(p)) { + if delegation.terminating() { + return future_err((true, Error::NotFound)); + } else { + return future_ok(Loop::Continue(delegations)); } + } - let meta = self.tuf - .delegations() - .get(delegation.role()) - .unwrap() - .clone(); - let (term, res) = self.lookup_target_description( - delegation.terminating(), - current_depth + 1, - target, - snapshot, - Some(&meta), - ); + let role_meta = if let Some(m) = snapshot.meta().get(delegation.role()) { + m + } else { + if delegation.terminating() { + return future_err((true, Error::NotFound)); + } else { + return future_ok(Loop::Continue(delegations)); + } + }; - if term && res.is_err() { - return (true, res); + let (alg, value) = match crypto::hash_preference(role_meta.hashes()) { + Ok((alg, value)) => (alg, value.clone()), + Err(e) => { + return future_err((delegation.terminating(), e)); } + }; - // TODO end recursion early - } - Err(_) if !delegation.terminating() => continue, - Err(e) => return (true, Err(e)), - }; - } + let version = { + let tuf = tuf.lock().unwrap(); - (default_terminate, Err(Error::NotFound)) + if tuf.root().consistent_snapshot() { + MetadataVersion::Hash(value.clone()) + } else { + MetadataVersion::None + } + }; + + let tuf = tuf.clone(); + let target = target.clone(); + let snapshot = snapshot.clone(); + let local = local.clone(); + let remote1 = remote.clone(); + let remote2 = remote.clone(); + let delegation = Arc::new(delegation); + let delegation1 = delegation.clone(); + let role_meta_size = role_meta.size(); + let value1 = value.clone(); + + Box::new( + local + .fetch_metadata( + delegation.role(), + &MetadataVersion::None, + Some(role_meta_size), + min_bytes_per_second, + Some((alg, value)), + ) + .or_else(move |_| { + remote1.fetch_metadata( + delegation.role(), + &version, + Some(role_meta_size), + min_bytes_per_second, + Some((alg, value1)), + ) + }) + .then(move |result| { + let signed_meta = match result { + Ok(m) => m, + Err(e) => { + warn!("Failed to fetch metadata {:?}: {:?}", delegation1.role(), e); + if delegation1.terminating() { + return future_err((true, e)); + } else { + return future_ok(Loop::Continue(delegations)); + } + } + }; + + let result = { + let mut tuf = tuf.lock().expect("poisoned lock"); + tuf.update_delegation(delegation1.role(), &signed_meta) + }; + + if let Err(err) = result { + if delegation1.terminating() { + return future_err((true, err)); + } else { + return future_ok(Loop::Continue(delegations)); + } + } + + Box::new( + local + .store_metadata( + delegation1.role(), + &MetadataVersion::None, + &signed_meta, + ) + .then(move |result| { + if let Err(err) = result { + warn!( + "Error storing metadata {:?} locally: {:?}", + delegation1.role(), + err + ); + } + + let meta = { + let tuf = tuf.lock().unwrap(); + tuf.get_delegation(delegation1.role()).unwrap() + }; + + Self::lookup_target_description::( + tuf, + max_delegation_depth, + min_bytes_per_second, + delegation1.terminating(), + current_depth + 1, + target, + snapshot, + Some(meta), + local, + remote2, + ).then(|result| { + match result { + Err((true, err)) => { + Err((true, err)) + } + Err((false, err)) => { + warn!( + "Error looking up target description: {:?}", + err + ); + Ok(Loop::Continue(delegations)) + } + _ => { + Ok(Loop::Continue(delegations)) + } + } + }) + }) + ) + }) + ) + }) + .and_then(move |loop_result: Loop<_, (bool, Error)>| { + match loop_result { + Loop::Break(result) => Ok(result), + Loop::Continue(_) => Err((default_terminate, Error::NotFound)), + } + }) + ) } } @@ -767,7 +998,7 @@ mod test { &MetadataPath::from_role(&Role::Root), &MetadataVersion::Number(1), &root, - ).unwrap(); + ).wait().unwrap(); let root = RootMetadata::new( 2, @@ -788,7 +1019,7 @@ mod test { &MetadataPath::from_role(&Role::Root), &MetadataVersion::Number(2), &root, - ).unwrap(); + ).wait().unwrap(); let root = RootMetadata::new( 3, @@ -809,19 +1040,19 @@ mod test { &MetadataPath::from_role(&Role::Root), &MetadataVersion::Number(3), &root, - ).unwrap(); + ).wait().unwrap(); repo.store_metadata( &MetadataPath::from_role(&Role::Root), &MetadataVersion::None, &root, - ).unwrap(); + ).wait().unwrap(); let mut client = Client::new( Config::build().finish().unwrap(), repo, EphemeralRepository::new(), - ).unwrap(); - assert_eq!(client.update_local(), Ok(true)); - assert_eq!(client.tuf.root().version(), 3); + ).wait().unwrap(); + assert_eq!(client.update_local().wait(), Ok(true)); + assert_eq!(client.tuf.lock().unwrap().root().version(), 3); } } diff --git a/src/error.rs b/src/error.rs index 45cc70b9..2d609b86 100644 --- a/src/error.rs +++ b/src/error.rs @@ -2,6 +2,7 @@ use data_encoding::DecodeError; use derp; +use http; use hyper; use json; use std::fmt; @@ -89,14 +90,14 @@ impl From for Error { } } -impl From for Error { - fn from(err: hyper::error::Error) -> Error { - Error::Opaque(format!("Hyper: {:?}", err)) +impl From for Error { + fn from(err: http::Error) -> Error { + Error::Opaque(format!("Http: {:?}", err)) } } -impl From for Error { - fn from(err: hyper::error::ParseError) -> Error { +impl From for Error { + fn from(err: hyper::Error) -> Error { Error::Opaque(format!("Hyper: {:?}", err)) } } diff --git a/src/lib.rs b/src/lib.rs index 761519d3..c5d4458f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -107,9 +107,15 @@ allow(collapsible_if, implicit_hasher, new_ret_no_self, op_ref, too_many_arguments) )] +extern crate bytes; extern crate chrono; extern crate data_encoding; extern crate derp; +#[macro_use] +extern crate futures; +extern crate futures_cpupool; +extern crate futures_fs; +extern crate http; extern crate hyper; extern crate itoa; #[cfg(test)] @@ -134,13 +140,25 @@ extern crate serde_json as json; #[cfg(test)] extern crate tempdir; extern crate tempfile; +extern crate tokio; +extern crate tokio_serde; extern crate untrusted; +extern crate url; + +#[macro_use] +mod macros; pub mod error; /// Alias for `Result`. pub type Result = ::std::result::Result; +/// Alias for `Box>`. +pub type TufFuture = Box>; + +/// Alias for `Box>`. +pub type TufStream = Box>; + pub mod client; pub mod crypto; pub mod interchange; diff --git a/src/macros.rs b/src/macros.rs new file mode 100644 index 00000000..1ae4a28f --- /dev/null +++ b/src/macros.rs @@ -0,0 +1,21 @@ +macro_rules! try_future { + ($e:expr) => { + match $e { + Ok(value) => value, + Err(err) => { + return Box::new(::futures::future::err(err.into())); + } + } + } +} + +macro_rules! try_stream { + ($e:expr) => { + match $e { + Ok(value) => value, + Err(err) => { + return Box::new(::futures::stream::once(Err(err.into()))); //::std::from::From::from(err)))); + } + } + } +} diff --git a/src/metadata.rs b/src/metadata.rs index 4f9f5810..dc69833c 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -54,7 +54,7 @@ static PATH_ILLEGAL_COMPONENTS_CASE_INSENSITIVE: &'static [&str] = &[ ]; static PATH_ILLEGAL_STRINGS: &'static [&str] = &[ - ":", // for *nix compatibility + ":", // for *nix compatibility "\\", // for windows compatibility "<", ">", diff --git a/src/repository.rs b/src/repository.rs index 1bfb705c..bcc2fc98 100644 --- a/src/repository.rs +++ b/src/repository.rs @@ -1,12 +1,17 @@ //! Interfaces for interacting with different types of TUF repositories. -use hyper::client::response::Response; -use hyper::header::{Headers, UserAgent}; -use hyper::status::StatusCode; -use hyper::{Client, Url}; +use bytes::Bytes; +use futures::{stream, Future, Stream}; +use futures_fs::FsPool; +use http::Uri; +use hyper::body::Body; +use hyper::client::connect::Connect; +use hyper::client::ResponseFuture; +use hyper::Request; +use hyper::Client; use std::collections::HashMap; use std::fs::{self, DirBuilder, File}; -use std::io::{Cursor, Read, Write}; +use std::io::Write; use std::marker::PhantomData; use std::path::PathBuf; use std::sync::{Arc, RwLock}; @@ -18,8 +23,9 @@ use interchange::DataInterchange; use metadata::{ Metadata, MetadataPath, MetadataVersion, SignedMetadata, TargetDescription, TargetPath, }; -use util::SafeReader; -use Result; +use url::Url; +use util::{SafeStreamExt, future_ok, future_err, stream_err}; +use {TufStream, Result, TufFuture}; /// Top-level trait that represents a TUF repository and contains all the ways it can be interacted /// with. @@ -27,9 +33,6 @@ pub trait Repository where D: DataInterchange, { - /// The type returned when reading a target. - type TargetRead: Read; - /// Store signed metadata. /// /// Note: This **MUST** canonicalize the bytes before storing them as a read will expect the @@ -39,7 +42,7 @@ where meta_path: &MetadataPath, version: &MetadataVersion, metadata: &SignedMetadata, - ) -> Result<()> + ) -> TufFuture<()> where M: Metadata; @@ -48,17 +51,17 @@ where &self, meta_path: &MetadataPath, version: &MetadataVersion, - max_size: &Option, + max_size: Option, min_bytes_per_second: u32, - hash_data: Option<(&HashAlgorithm, HashValue)>, - ) -> Result> + hash_data: Option<(&'static HashAlgorithm, HashValue)>, + ) -> TufFuture> where - M: Metadata; + M: Metadata + 'static; /// Store the given target. - fn store_target(&self, read: R, target_path: &TargetPath) -> Result<()> + fn store_target(&self, stream: S, target_path: &TargetPath) -> TufFuture<()> where - R: Read; + S: Stream + 'static; /// Fetch the given target. fn fetch_target( @@ -66,7 +69,7 @@ where target_path: &TargetPath, target_description: &TargetDescription, min_bytes_per_second: u32, - ) -> Result>; + ) -> TufStream; /// Perform a sanity check that `M`, `Role`, and `MetadataPath` all desrcribe the same entity. fn check(meta_path: &MetadataPath) -> Result<()> @@ -90,16 +93,17 @@ pub struct FileSystemRepository where D: DataInterchange, { + pool: FsPool, local_path: PathBuf, interchange: PhantomData, } impl FileSystemRepository where - D: DataInterchange, + D: DataInterchange + 'static, { /// Create a new repository on the local file system. - pub fn new(local_path: PathBuf) -> Result { + pub fn new(pool: FsPool, local_path: PathBuf) -> Result { for p in &["metadata", "targets", "temp"] { DirBuilder::new() .recursive(true) @@ -107,6 +111,7 @@ where } Ok(FileSystemRepository { + pool, local_path, interchange: PhantomData, }) @@ -115,20 +120,19 @@ where impl Repository for FileSystemRepository where - D: DataInterchange, + D: DataInterchange + 'static, { - type TargetRead = File; - fn store_metadata( &self, meta_path: &MetadataPath, version: &MetadataVersion, metadata: &SignedMetadata, - ) -> Result<()> + ) -> TufFuture<()> where M: Metadata, { - Self::check::(meta_path)?; + try_future!(Self::check::(meta_path)); + let components = meta_path.components::(version); let mut path = self.local_path.join("metadata"); @@ -136,18 +140,19 @@ where if path.exists() { debug!("Metadata path exists. Deleting: {:?}", path); - fs::remove_file(&path)? + try_future!(fs::remove_file(&path)); } if components.len() > 1 { let mut path = self.local_path.clone(); path.extend(&components[..(components.len() - 1)]); - DirBuilder::new().recursive(true).create(path)?; + try_future!(DirBuilder::new().recursive(true).create(path)); } - let mut file = File::create(&path)?; - D::to_writer(&mut file, metadata)?; - Ok(()) + let mut file = try_future!(File::create(&path)); + try_future!(D::to_writer(&mut file, metadata)); + + future_ok(()) } /// Fetch signed metadata. @@ -155,55 +160,61 @@ where &self, meta_path: &MetadataPath, version: &MetadataVersion, - max_size: &Option, + max_size: Option, min_bytes_per_second: u32, hash_data: Option<(&HashAlgorithm, HashValue)>, - ) -> Result> + ) -> TufFuture> where - M: Metadata, + M: Metadata + 'static, { - Self::check::(meta_path)?; + try_future!(Self::check::(meta_path)); let mut path = self.local_path.join("metadata"); path.extend(meta_path.components::(&version)); - let read = SafeReader::new( - File::open(&path)?, - max_size.unwrap_or(::std::usize::MAX) as u64, - min_bytes_per_second, - hash_data, - )?; - - Ok(D::from_reader(read)?) + Box::new( + try_future!( + self.pool + .read(path, Default::default()) + .map_err(Error::from) + .safe_stream( + max_size.unwrap_or(::std::usize::MAX) as u64, + min_bytes_per_second, + hash_data, + ) + ) + .concat2() + .and_then(|bytes| D::from_reader(&bytes[..])) + ) } - fn store_target(&self, mut read: R, target_path: &TargetPath) -> Result<()> + fn store_target(&self, stream: S, target_path: &TargetPath) -> TufFuture<()> where - R: Read, + S: Stream + 'static, { - let mut temp_file = NamedTempFile::new_in(self.local_path.join("temp"))?; - let mut buf = [0; 1024]; - loop { - let bytes_read = read.read(&mut buf)?; - if bytes_read == 0 { - break; - } - temp_file.write_all(&buf[..bytes_read])? - } + let temp_file = try_future!(NamedTempFile::new_in(self.local_path.join("temp"))); - let mut path = self.local_path.clone().join("targets"); + let local_path = self.local_path.clone(); let components = target_path.components(); - if components.len() > 1 { - let mut path = path.clone(); - path.extend(&components[..(components.len() - 1)]); - DirBuilder::new().recursive(true).create(path)?; - } - - path.extend(components); - temp_file.persist(&path)?; - - Ok(()) + Box::new( + stream + .fold(temp_file, |mut temp_file, bytes| -> Result { + temp_file.write_all(&bytes)?; + Ok(temp_file) + }) + .and_then(move |temp_file| { + let mut path = local_path.clone().join("targets"); + if components.len() > 1 { + let mut path = path.clone(); + path.extend(&components[..(components.len() - 1)]); + DirBuilder::new().recursive(true).create(path)?; + } + path.extend(components); + temp_file.persist(&path)?; + Ok(()) + }) + ) } fn fetch_target( @@ -211,42 +222,54 @@ where target_path: &TargetPath, target_description: &TargetDescription, min_bytes_per_second: u32, - ) -> Result> { + ) -> TufStream { let mut path = self.local_path.join("targets"); path.extend(target_path.components()); if !path.exists() { - return Err(Error::NotFound); + return stream_err(Error::NotFound); } - let (alg, value) = crypto::hash_preference(target_description.hashes())?; - - SafeReader::new( - File::open(&path)?, - target_description.size(), - min_bytes_per_second, - Some((alg, value.clone())), + let (alg, value) = try_stream!(crypto::hash_preference(target_description.hashes())); + + Box::new( + try_stream!( + self.pool + .read(path, Default::default()) + .map_err(Error::from) + .safe_stream( + target_description.size(), + min_bytes_per_second, + Some((alg, value.clone())), + ) + ) ) } } /// A repository accessible over HTTP. -pub struct HttpRepository +pub struct HttpRepository where + C: Connect + Sync + 'static, + C::Transport: 'static, + C::Future: 'static, D: DataInterchange, { url: Url, - client: Client, + client: Client, user_agent: String, metadata_prefix: Option>, interchange: PhantomData, } -impl HttpRepository +impl HttpRepository where - D: DataInterchange, + C: Connect + Sync + 'static, + C::Transport: 'static, + C::Future: 'static, + D: DataInterchange + 'static, { - /// Create a new repository with the given `Url` and `Client`. + /// Create a new repository with the given `Uri` and `Client`. /// /// Callers *should* include a custom User-Agent prefix to help maintainers of TUF repositories /// keep track of which client versions exist in the field. @@ -258,7 +281,7 @@ where /// `https://tuf.example.com/meta/root.json`. pub fn new( url: Url, - client: Client, + client: Client, user_agent_prefix: Option, metadata_prefix: Option>, ) -> Self { @@ -276,10 +299,7 @@ where } } - fn get(&self, prefix: &Option>, components: &[String]) -> Result { - let mut headers = Headers::new(); - headers.set(UserAgent(self.user_agent.clone())); - + fn get(&self, prefix: &Option>, components: &[String]) -> Result { let mut url = self.url.clone(); { let mut segments = url.path_segments_mut().map_err(|_| { @@ -291,75 +311,81 @@ where segments.extend(components); } - let req = self.client.get(url.clone()).headers(headers); - let resp = req.send()?; + let uri: Uri = url.into_string().parse().map_err(|_| { + Error::IllegalArgument(format!("URL was 'cannot-be-a-base': {:?}", self.url)) + })?; - if !resp.status.is_success() { - if resp.status == StatusCode::NotFound { - Err(Error::NotFound) - } else { - Err(Error::Opaque(format!( - "Error getting {:?}: {:?}", - url, resp - ))) - } - } else { - Ok(resp) - } + let req = Request::builder() + .uri(uri) + .header("User-Agent", &*self.user_agent) + .body(Body::default())?; + + Ok(self.client.request(req)) } } -impl Repository for HttpRepository +impl Repository for HttpRepository where - D: DataInterchange, + C: Connect + Sync + 'static, + C::Transport: 'static, + C::Future: 'static, + D: DataInterchange + 'static, { - type TargetRead = Response; - /// This always returns `Err` as storing over HTTP is not yet supported. fn store_metadata( &self, _: &MetadataPath, _: &MetadataVersion, _: &SignedMetadata, - ) -> Result<()> + ) -> TufFuture<()> where M: Metadata, { - Err(Error::Opaque( - "Http repo store metadata not implemented".to_string(), - )) + future_err(Error::Opaque("Http repo store metadata not implemented".to_string())) } fn fetch_metadata( &self, meta_path: &MetadataPath, version: &MetadataVersion, - max_size: &Option, + max_size: Option, min_bytes_per_second: u32, - hash_data: Option<(&HashAlgorithm, HashValue)>, - ) -> Result> + hash_data: Option<(&'static HashAlgorithm, HashValue)>, + ) -> TufFuture> where - M: Metadata, + M: Metadata + 'static, { - Self::check::(meta_path)?; + try_future!(Self::check::(meta_path)); + + let resp = try_future!( + self.get(&self.metadata_prefix, &meta_path.components::(&version)) + ); - let resp = self.get(&self.metadata_prefix, &meta_path.components::(&version))?; + let bytes = try_future!( + resp + .map(|resp| resp.into_body().map(|chunk| chunk.into_bytes())) + .flatten_stream() + .map_err(Error::from) + .safe_stream( + max_size.unwrap_or(::std::usize::MAX) as u64, + min_bytes_per_second, + hash_data, + ) + ); - let read = SafeReader::new( - resp, - max_size.unwrap_or(::std::usize::MAX) as u64, - min_bytes_per_second, - hash_data, - )?; - Ok(D::from_reader(read)?) + Box::new( + bytes + .concat2() + .and_then(|bytes| D::from_reader(&bytes[..])) + ) } /// This always returns `Err` as storing over HTTP is not yet supported. - fn store_target(&self, _: R, _: &TargetPath) -> Result<()> + fn store_target(&self, _: S, _: &TargetPath) -> TufFuture<()> where - R: Read, + S: Stream + 'static, { - Err(Error::Opaque("Http repo store not implemented".to_string())) + future_err(Error::Opaque("Http repo store not implemented".to_string())) } fn fetch_target( @@ -367,31 +393,44 @@ where target_path: &TargetPath, target_description: &TargetDescription, min_bytes_per_second: u32, - ) -> Result> { - let resp = self.get(&None, &target_path.components())?; - let (alg, value) = crypto::hash_preference(target_description.hashes())?; - Ok(SafeReader::new( - resp, - target_description.size(), - min_bytes_per_second, - Some((alg, value.clone())), - )?) + ) -> TufStream { + let (alg, value) = try_stream!( + crypto::hash_preference(target_description.hashes()) + ); + + let resp = try_stream!( + self.get(&None, &target_path.components()) + ); + + Box::new( + try_stream!(resp + .map(|resp| resp.into_body()) + .flatten_stream() + .map_err(Error::from) + .map(|chunk| chunk.into_bytes()) + .safe_stream( + target_description.size(), + min_bytes_per_second, + Some((alg, value.clone())), + ) + ) + ) } } /// An ephemeral repository contained solely in memory. pub struct EphemeralRepository where - D: DataInterchange, + D: DataInterchange + 'static, { - metadata: Arc>>>, - targets: Arc>>>, + metadata: Arc>>, + targets: Arc>>, interchange: PhantomData, } impl EphemeralRepository where - D: DataInterchange, + D: DataInterchange + 'static, { /// Create a new ephemercal repository. pub fn new() -> Self { @@ -405,7 +444,7 @@ where impl Default for EphemeralRepository where - D: DataInterchange, + D: DataInterchange + 'static, { fn default() -> Self { EphemeralRepository::new() @@ -414,64 +453,89 @@ where impl Repository for EphemeralRepository where - D: DataInterchange, + D: DataInterchange + 'static, { - type TargetRead = Cursor>; - fn store_metadata( &self, meta_path: &MetadataPath, version: &MetadataVersion, metadata: &SignedMetadata, - ) -> Result<()> + ) -> TufFuture<()> where M: Metadata, { - Self::check::(meta_path)?; + try_future!(Self::check::(meta_path)); + let mut buf = Vec::new(); - D::to_writer(&mut buf, metadata)?; + try_future!(D::to_writer(&mut buf, metadata)); + let mut metadata = self.metadata.write().unwrap(); - let _ = metadata.insert((meta_path.clone(), version.clone()), buf); - Ok(()) + let _ = metadata.insert((meta_path.clone(), version.clone()), Bytes::from(buf)); + + future_ok(()) } fn fetch_metadata( &self, meta_path: &MetadataPath, version: &MetadataVersion, - max_size: &Option, + max_size: Option, min_bytes_per_second: u32, - hash_data: Option<(&HashAlgorithm, HashValue)>, - ) -> Result> + hash_data: Option<(&'static HashAlgorithm, HashValue)>, + ) -> TufFuture> where - M: Metadata, + M: Metadata + 'static, { - Self::check::(meta_path)?; + try_future!(Self::check::(meta_path)); - let metadata = self.metadata.read().unwrap(); - match metadata.get(&(meta_path.clone(), version.clone())) { - Some(bytes) => { - let reader = SafeReader::new( - &**bytes, + let bytes = { + let metadata = self.metadata.read().expect("poisoned lock"); + + if let Some(bytes) = metadata.get(&(meta_path.clone(), version.clone())) { + bytes.clone() + } else { + return future_err(Error::NotFound); + } + }; + + // FIXME: we probably only need to validate the hash once on insert instead of + // every time we read it. + let bytes = try_future!( + stream::once(Ok(bytes)) + .safe_stream( max_size.unwrap_or(::std::usize::MAX) as u64, min_bytes_per_second, hash_data, - )?; - D::from_reader(reader) - } - None => Err(Error::NotFound), - } + ) + ); + + Box::new( + bytes + .concat2() + .and_then(|bytes| D::from_reader(&bytes[..])) + ) } - fn store_target(&self, mut read: R, target_path: &TargetPath) -> Result<()> + fn store_target(&self, stream: S, target_path: &TargetPath) -> TufFuture<()> where - R: Read, + S: Stream + 'static, { - let mut buf = Vec::new(); - read.read_to_end(&mut buf)?; - let mut targets = self.targets.write().unwrap(); - let _ = targets.insert(target_path.clone(), buf); - Ok(()) + let buf = Vec::new(); + let targets = self.targets.clone(); + let target_path = target_path.clone(); + + Box::new( + stream + .fold(buf, |mut buf, bytes| -> Result<_> { + buf.extend(&bytes); + Ok(buf) + }) + .and_then(move |buf| { + let mut targets = targets.write().unwrap(); + let _ = targets.insert(target_path, Bytes::from(buf)); + Ok(()) + }) + ) } fn fetch_target( @@ -479,22 +543,30 @@ where target_path: &TargetPath, target_description: &TargetDescription, min_bytes_per_second: u32, - ) -> Result> { - let targets = self.targets.read().unwrap(); - match targets.get(target_path) { - Some(bytes) => { - let cur = Cursor::new(bytes.clone()); - let (alg, value) = crypto::hash_preference(target_description.hashes())?; - let read = SafeReader::new( - cur, - target_description.size(), - min_bytes_per_second, - Some((alg, value.clone())), - )?; - Ok(read) + ) -> TufStream { + let (alg, value) = try_stream!(crypto::hash_preference(target_description.hashes())); + + let bytes = { + let targets = self.targets.read().expect("poisoned lock"); + + if let Some(bytes) = targets.get(target_path) { + bytes.clone() + } else { + return stream_err(Error::NotFound); } - None => Err(Error::NotFound), - } + }; + + // FIXME: we probably only need to validate the hash once on insert instead of + // every time we read it. + let stream = try_stream!( + stream::once(Ok(bytes)).safe_stream( + target_description.size(), + min_bytes_per_second, + Some((alg, value.clone())), + ) + ); + + Box::new(stream) } } @@ -512,23 +584,33 @@ mod test { let target_description = TargetDescription::from_reader(data, &[HashAlgorithm::Sha256]).unwrap(); let path = TargetPath::new("batty".into()).unwrap(); - repo.store_target(data, &path).unwrap(); + repo.store_target(stream::once(Ok(Bytes::from_static(data))), &path) + .wait() + .unwrap(); - let mut read = repo.fetch_target(&path, &target_description, 0).unwrap(); - let mut buf = Vec::new(); - read.read_to_end(&mut buf).unwrap(); - assert_eq!(buf.as_slice(), data); + let buf = repo.fetch_target(&path, &target_description, 0) + .concat2() + .wait() + .unwrap(); + assert_eq!(data, &buf); let bad_data: &[u8] = b"you're in a desert"; - repo.store_target(bad_data, &path).unwrap(); - let mut read = repo.fetch_target(&path, &target_description, 0).unwrap(); - assert!(read.read_to_end(&mut buf).is_err()); + repo.store_target(stream::once(Ok(Bytes::from_static(bad_data))), &path) + .wait() + .unwrap(); + let buf = repo.fetch_target(&path, &target_description, 0) + .concat2() + .wait(); + assert!(buf.is_err()); } #[test] fn file_system_repo_targets() { let temp_dir = TempDir::new("rust-tuf").unwrap(); - let repo = FileSystemRepository::::new(temp_dir.path().to_path_buf()).unwrap(); + let repo = FileSystemRepository::::new( + FsPool::new(1), + temp_dir.path().to_path_buf(), + ).unwrap(); // test that init worked assert!(temp_dir.path().join("metadata").exists()); @@ -539,7 +621,9 @@ mod test { let target_description = TargetDescription::from_reader(data, &[HashAlgorithm::Sha256]).unwrap(); let path = TargetPath::new("foo/bar/baz".into()).unwrap(); - repo.store_target(data, &path).unwrap(); + repo.store_target(stream::once(Ok(Bytes::from_static(data))), &path) + .wait() + .unwrap(); assert!( temp_dir .path() @@ -550,20 +634,24 @@ mod test { .exists() ); - let mut buf = Vec::new(); - // Enclose `fetch_target` in a scope to make sure the file is closed. // This is needed for `tempfile` on Windows, which doesn't open the // files in a mode that allows the file to be opened multiple times. - { - let mut read = repo.fetch_target(&path, &target_description, 0).unwrap(); - read.read_to_end(&mut buf).unwrap(); - assert_eq!(buf.as_slice(), data); - } + let buf = { + repo.fetch_target(&path, &target_description, 0) + .concat2() + .wait() + .unwrap() + }; + assert_eq!(data, &buf); let bad_data: &[u8] = b"you're in a desert"; - repo.store_target(bad_data, &path).unwrap(); - let mut read = repo.fetch_target(&path, &target_description, 0).unwrap(); - assert!(read.read_to_end(&mut buf).is_err()); + repo.store_target(stream::once(Ok(Bytes::from_static(bad_data))), &path) + .wait() + .unwrap(); + let buf = repo.fetch_target(&path, &target_description, 0) + .concat2() + .wait(); + assert!(buf.is_err()); } } diff --git a/src/tuf.rs b/src/tuf.rs index 9e06704a..81ebb216 100644 --- a/src/tuf.rs +++ b/src/tuf.rs @@ -3,6 +3,7 @@ use chrono::offset::Utc; use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; +use std::sync::{Arc, Mutex}; use crypto::KeyId; use error::Error; @@ -16,25 +17,25 @@ use Result; /// Contains trusted TUF metadata and can be used to verify other metadata and targets. #[derive(Debug)] pub struct Tuf { - root: RootMetadata, - snapshot: Option, - targets: Option, - timestamp: Option, - delegations: HashMap, + root: Arc, + snapshot: Option>, + targets: Option>, + timestamp: Option>, + delegations: Arc>>>, interchange: PhantomData, } impl Tuf { /// Create a new `TUF` struct from a known set of pinned root keys that are used to verify the /// signed metadata. - pub fn from_root_pinned<'a, I>( + pub fn from_root_pinned( mut signed_root: SignedMetadata, root_key_ids: I, ) -> Result where - I: IntoIterator, + I: IntoIterator, { - let root_key_ids = root_key_ids.into_iter().collect::>(); + let root_key_ids = root_key_ids.into_iter().collect::>(); signed_root .signatures_mut() @@ -59,11 +60,11 @@ impl Tuf { }), )?; Ok(Tuf { - root, + root: Arc::new(root), snapshot: None, targets: None, timestamp: None, - delegations: HashMap::new(), + delegations: Arc::new(Mutex::new(HashMap::new())), interchange: PhantomData, }) } @@ -74,23 +75,24 @@ impl Tuf { } /// An immutable reference to the optional snapshot metadata. - pub fn snapshot(&self) -> Option<&SnapshotMetadata> { + pub fn snapshot(&self) -> Option<&Arc> { self.snapshot.as_ref() } /// An immutable reference to the optional targets metadata. - pub fn targets(&self) -> Option<&TargetsMetadata> { + pub fn targets(&self) -> Option<&Arc> { self.targets.as_ref() } /// An immutable reference to the optional timestamp metadata. - pub fn timestamp(&self) -> Option<&TimestampMetadata> { + pub fn timestamp(&self) -> Option<&Arc> { self.timestamp.as_ref() } - /// An immutable reference to the delegated metadata. - pub fn delegations(&self) -> &HashMap { - &self.delegations + /// An immutable reference to the delegated metadata for a role. + pub fn get_delegation(&self, role: &MetadataPath) -> Option> { + let delegations = self.delegations.lock().unwrap(); + delegations.get(role).map(|meta| meta.clone()) } /// Verify and update the root metadata. @@ -139,7 +141,7 @@ impl Tuf { self.purge_metadata(); - self.root = root; + self.root = Arc::new(root); Ok(true) } @@ -181,7 +183,7 @@ impl Tuf { self.snapshot = None; } - self.timestamp = Some(timestamp); + self.timestamp = Some(Arc::new(timestamp)); Ok(true) } @@ -245,12 +247,14 @@ impl Tuf { self.targets = None; } - self.snapshot = Some(snapshot); + self.snapshot = Some(Arc::new(snapshot)); self.purge_delegations(); Ok(true) } fn purge_delegations(&mut self) { + let mut delegations = self.delegations.lock().unwrap(); + let purge = { let snapshot = match self.snapshot() { Some(s) => s, @@ -258,7 +262,7 @@ impl Tuf { }; let mut purge = HashSet::new(); for (role, definition) in snapshot.meta().iter() { - let delegation = match self.delegations.get(role) { + let delegation = match delegations.get(role) { Some(d) => d, None => continue, }; @@ -273,7 +277,7 @@ impl Tuf { }; for role in &purge { - let _ = self.delegations.remove(role); + let _ = delegations.remove(role); } } @@ -334,7 +338,7 @@ impl Tuf { targets }; - self.targets = Some(targets); + self.targets = Some(Arc::new(targets)); Ok(true) } @@ -344,6 +348,8 @@ impl Tuf { role: &MetadataPath, signed: &SignedMetadata, ) -> Result { + let mut delegations = self.delegations.lock().unwrap(); + let delegation = { let _ = self.safe_root_ref()?; let snapshot = self.safe_snapshot_ref()?; @@ -367,7 +373,7 @@ impl Tuf { } }; - let current_version = self.delegations.get(role).map(|t| t.version()).unwrap_or(0); + let current_version = delegations.get(role).map(|t| t.version()).unwrap_or(0); if delegation_description.version() < current_version { return Err(Error::VerificationFailure(format!( "Snapshot metadata did listed delegation {:?} version as {} but current\ @@ -380,7 +386,7 @@ impl Tuf { return Ok(false); } - for delegated_targets in self.delegations.values() { + for delegated_targets in delegations.values() { let parent = match delegated_targets.delegations() { Some(d) => d, None => &targets_delegations, @@ -422,7 +428,7 @@ impl Tuf { delegation }; - let _ = self.delegations.insert(role.clone(), delegation); + let _ = delegations.insert(role.clone(), Arc::new(delegation)); Ok(true) } @@ -461,7 +467,7 @@ impl Tuf { return (delegation.terminating(), None); } - let targets = match tuf.delegations.get(delegation.role()) { + let targets = match tuf.get_delegation(delegation.role()) { Some(t) => t, None => return (delegation.terminating(), None), }; @@ -511,17 +517,18 @@ impl Tuf { self.snapshot = None; self.targets = None; self.timestamp = None; - self.delegations.clear(); + let mut delegations = self.delegations.lock().unwrap(); + delegations.clear(); } - fn safe_root_ref(&self) -> Result<&RootMetadata> { + fn safe_root_ref(&self) -> Result<&Arc> { if self.root.expires() <= &Utc::now() { return Err(Error::ExpiredMetadata(Role::Root)); } Ok(&self.root) } - fn safe_snapshot_ref(&self) -> Result<&SnapshotMetadata> { + fn safe_snapshot_ref(&self) -> Result<&Arc> { match self.snapshot { Some(ref snapshot) => { if snapshot.expires() <= &Utc::now() { @@ -533,7 +540,7 @@ impl Tuf { } } - fn safe_targets_ref(&self) -> Result<&TargetsMetadata> { + fn safe_targets_ref(&self) -> Result<&Arc> { match self.targets { Some(ref targets) => { if targets.expires() <= &Utc::now() { @@ -544,7 +551,7 @@ impl Tuf { None => Err(Error::MissingMetadata(Role::Targets)), } } - fn safe_timestamp_ref(&self) -> Result<&TimestampMetadata> { + fn safe_timestamp_ref(&self) -> Result<&Arc> { match self.timestamp { Some(ref timestamp) => { if timestamp.expires() <= &Utc::now() { @@ -597,7 +604,7 @@ mod test { let root: SignedMetadata = SignedMetadata::new(&root, &root_key).unwrap(); - assert!(Tuf::from_root_pinned(root, &[root_key.key_id().clone()]).is_ok()); + assert!(Tuf::from_root_pinned(root, vec![root_key.key_id().clone()]).is_ok()); } #[test] @@ -615,7 +622,7 @@ mod test { let root: SignedMetadata = SignedMetadata::new(&root, &KEYS[0]).unwrap(); - assert!(Tuf::from_root_pinned(root, &[KEYS[1].key_id().clone()]).is_err()); + assert!(Tuf::from_root_pinned(root, vec![KEYS[1].key_id().clone()]).is_err()); } #[test] diff --git a/src/util.rs b/src/util.rs index c6ba9577..ad9406c0 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,7 +1,9 @@ +use bytes::Bytes; use chrono::offset::Utc; use chrono::DateTime; +use futures::{Async, Future, Poll, Stream, future, stream}; use ring::digest::{self, SHA256, SHA512}; -use std::io::{self, ErrorKind, Read}; +use std::io::{self, ErrorKind}; use crypto::{HashAlgorithm, HashValue}; use error::Error; @@ -9,7 +11,7 @@ use Result; /// Wrapper to verify a byte stream as it is read. /// -/// Wraps a `Read` to ensure that the consumer can't read more than a capped maximum number of +/// Wraps a `Stream` to ensure that the consumer can't read more than a capped maximum number of /// bytes. Also, this ensures that a minimum bitrate and returns an `Err` if it is not. Finally, /// when the underlying `Read` is fully consumed, the hash of the data is optionally calculated. If /// the calculated hash does not match the given hash, it will return an `Err`. Consumers of a @@ -17,24 +19,82 @@ use Result; /// /// It is **critical** that none of the bytes from this struct are used until it has been fully /// consumed as the data is untrusted. -pub struct SafeReader { - inner: R, - max_size: u64, - min_bytes_per_second: u32, - hasher: Option<(digest::Context, HashValue)>, - start_time: Option>, - bytes_read: u64, +pub trait SafeStreamExt: Stream + Sized { + /// Wrap a byte stream. + fn safe_stream( + self, + max_size: u64, + min_bytes_per_second: u32, + hash_data: Option<(&HashAlgorithm, HashValue)>, + ) -> Result> { + SafeStream::new(self, max_size, min_bytes_per_second, hash_data) + } +} + +impl> SafeStreamExt for T {} + +/// Wrapper to verify a byte stream as it is read. +/// +/// Wraps a `Stream` to ensure that the consumer can't read more than a capped maximum number of +/// bytes. Also, this ensures that a minimum bitrate and returns an `Err` if it is not. Finally, +/// when the underlying `Read` is fully consumed, the hash of the data is optionally calculated. If +/// the calculated hash does not match the given hash, it will return an `Err`. Consumers of a +/// `SafeReader` should purge and untrust all read bytes if this ever returns an `Err`. +/// +/// It is **critical** that none of the bytes from this struct are used until it has been fully +/// consumed as the data is untrusted. +pub struct SafeStream> { + inner: T, + hasher: SafeHasher, } -impl SafeReader { - /// Create a new `SafeReader`. +impl> SafeStream { + /// Create a new `SafeStream`. /// /// The argument `hash_data` takes a `HashAlgorithm` and expected `HashValue`. The given /// algorithm is used to hash the data as it is read. At the end of the stream, the digest is /// calculated and compared against `HashValue`. If the two are not equal, it means the data /// stream has been tampered with in some way. pub fn new( - read: R, + stream: T, + max_size: u64, + min_bytes_per_second: u32, + hash_data: Option<(&HashAlgorithm, HashValue)>, + ) -> Result { + let hasher = SafeHasher::new(max_size, min_bytes_per_second, hash_data)?; + + Ok(SafeStream { + inner: stream, + hasher: hasher, + }) + } +} + +impl> Stream for SafeStream { + type Item = Bytes; + type Error = Error; + + fn poll(&mut self) -> Poll, Self::Error> { + if let Some(bytes) = try_ready!(self.inner.poll()) { + self.hasher.update(&*bytes)?; + Ok(Async::Ready(Some(bytes))) + } else { + self.hasher.finish()?; + Ok(Async::Ready(None)) + } + } +} + +struct SafeHasher { + max_size: u64, + min_bytes_per_second: u32, + hasher: Option<(digest::Context, HashValue)>, + start_time: Option>, + bytes_read: u64, +} + +impl SafeHasher { + fn new( max_size: u64, min_bytes_per_second: u32, hash_data: Option<(&HashAlgorithm, HashValue)>, @@ -56,8 +116,7 @@ impl SafeReader { None => None, }; - Ok(SafeReader { - inner: read, + Ok(SafeHasher { max_size, min_bytes_per_second, hasher, @@ -65,109 +124,139 @@ impl SafeReader { bytes_read: 0, }) } -} -impl Read for SafeReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - match self.inner.read(buf) { - Ok(read_bytes) => { - if self.start_time.is_none() { - self.start_time = Some(Utc::now()) - } - - if read_bytes == 0 { - if let Some((context, expected_hash)) = self.hasher.take() { - let generated_hash = context.finish(); - if generated_hash.as_ref() != expected_hash.value() { - return Err(io::Error::new( - ErrorKind::InvalidData, - "Calculated hash did not match the required hash.", - )); - } - } + fn update(&mut self, buf: &[u8]) -> io::Result<()> { + if self.start_time.is_none() { + self.start_time = Some(Utc::now()) + } - return Ok(0); - } + match self.bytes_read.checked_add(buf.len() as u64) { + Some(sum) if sum <= self.max_size => self.bytes_read = sum, + _ => { + return Err(io::Error::new( + ErrorKind::InvalidData, + "Read exceeded the maximum allowed bytes.", + )); + } + } - match self.bytes_read.checked_add(read_bytes as u64) { - Some(sum) if sum <= self.max_size => self.bytes_read = sum, - _ => { - return Err(io::Error::new( - ErrorKind::InvalidData, - "Read exceeded the maximum allowed bytes.", - )); - } - } - - let duration = Utc::now().signed_duration_since(self.start_time.unwrap()); - // 30 second grace period before we start checking the bitrate - if duration.num_seconds() >= 30 { - if self.bytes_read as f32 / (duration.num_seconds() as f32) - < self.min_bytes_per_second as f32 - { - return Err(io::Error::new( - ErrorKind::TimedOut, - "Read aborted. Bitrate too low.", - )); - } - } + let duration = Utc::now().signed_duration_since(self.start_time.unwrap()); + // 30 second grace period before we start checking the bitrate + if duration.num_seconds() >= 30 { + if self.bytes_read as f32 / (duration.num_seconds() as f32) + < self.min_bytes_per_second as f32 + { + return Err(io::Error::new( + ErrorKind::TimedOut, + "Read aborted. Bitrate too low.", + )); + } + } - if let Some((ref mut context, _)) = self.hasher { - context.update(&buf[..(read_bytes)]); - } + if let Some((ref mut context, _)) = self.hasher { + context.update(buf); + } - Ok(read_bytes) + Ok(()) + } + + fn finish(&mut self) -> io::Result<()> { + if let Some((context, expected_hash)) = self.hasher.take() { + let generated_hash = context.finish(); + if generated_hash.as_ref() != expected_hash.value() { + return Err(io::Error::new( + ErrorKind::InvalidData, + "Calculated hash did not match the required hash.", + )); } - e @ Err(_) => e, } + + Ok(()) } } +/// Helper function to convert an ok type into a future. +pub fn future_ok(value: T) -> Box> { + Box::new(future::ok(value)) +} + +/// Helper function to convert a error type into a future. +pub fn future_err(err: E) -> Box> { + Box::new(future::err(err)) +} + +/// Helper function to convert an error type into a stream. +pub fn stream_err(err: E) -> Box> { + Box::new(stream::once(Err::(err))) +} + #[cfg(test)] mod test { + use futures::stream; + use bytes::Bytes; use super::*; #[test] fn valid_read() { let bytes: &[u8] = &[0x00, 0x01, 0x02, 0x03]; - let mut reader = SafeReader::new(bytes, bytes.len() as u64, 0, None).unwrap(); - let mut buf = Vec::new(); - assert!(reader.read_to_end(&mut buf).is_ok()); + let reader = SafeStream::new( + stream::once(Ok(Bytes::from_static(bytes))), + bytes.len() as u64, + 0, + None, + ).unwrap(); + let buf = reader.concat2().wait().unwrap(); assert_eq!(buf, bytes); } #[test] fn valid_read_large_data() { let bytes: &[u8] = &[0x00; 64 * 1024]; - let mut reader = SafeReader::new(bytes, bytes.len() as u64, 0, None).unwrap(); - let mut buf = Vec::new(); - assert!(reader.read_to_end(&mut buf).is_ok()); + let reader = SafeStream::new( + stream::once(Ok(Bytes::from_static(bytes))), + bytes.len() as u64, + 0, + None, + ).unwrap(); + let buf = reader.concat2().wait().unwrap(); assert_eq!(buf, bytes); } #[test] fn valid_read_below_max_size() { let bytes: &[u8] = &[0x00, 0x01, 0x02, 0x03]; - let mut reader = SafeReader::new(bytes, (bytes.len() as u64) + 1, 0, None).unwrap(); - let mut buf = Vec::new(); - assert!(reader.read_to_end(&mut buf).is_ok()); + let reader = SafeStream::new( + stream::once(Ok(Bytes::from_static(bytes))), + (bytes.len() as u64) + 1, + 0, + None, + ).unwrap(); + let buf = reader.concat2().wait().unwrap(); assert_eq!(buf, bytes); } #[test] fn invalid_read_above_max_size() { let bytes: &[u8] = &[0x00, 0x01, 0x02, 0x03]; - let mut reader = SafeReader::new(bytes, (bytes.len() as u64) - 1, 0, None).unwrap(); - let mut buf = Vec::new(); - assert!(reader.read_to_end(&mut buf).is_err()); + let reader = SafeStream::new( + stream::once(Ok(Bytes::from_static(bytes))), + (bytes.len() as u64) - 1, + 0, + None, + ).unwrap(); + assert!(reader.concat2().wait().is_err()); } #[test] fn invalid_read_above_max_size_large_data() { let bytes: &[u8] = &[0x00; 64 * 1024]; - let mut reader = SafeReader::new(bytes, (bytes.len() as u64) - 1, 0, None).unwrap(); - let mut buf = Vec::new(); - assert!(reader.read_to_end(&mut buf).is_err()); + let reader = SafeStream::new( + stream::once(Ok(Bytes::from_static(bytes))), + (bytes.len() as u64) - 1, + 0, + None, + ).unwrap(); + assert!(reader.concat2().wait().is_err()); } #[test] @@ -176,14 +265,14 @@ mod test { let mut context = digest::Context::new(&SHA256); context.update(&bytes); let hash_value = HashValue::new(context.finish().as_ref().to_vec()); - let mut reader = SafeReader::new( - bytes, + let reader = SafeStream::new( + stream::once(Ok(Bytes::from_static(bytes))), bytes.len() as u64, 0, Some((&HashAlgorithm::Sha256, hash_value)), ).unwrap(); - let mut buf = Vec::new(); - assert!(reader.read_to_end(&mut buf).is_ok()); + + let buf = reader.concat2().wait().unwrap(); assert_eq!(buf, bytes); } @@ -194,14 +283,13 @@ mod test { context.update(&bytes); context.update(&[0xFF]); // evil bytes let hash_value = HashValue::new(context.finish().as_ref().to_vec()); - let mut reader = SafeReader::new( - bytes, + let reader = SafeStream::new( + stream::once(Ok(Bytes::from_static(bytes))), bytes.len() as u64, 0, Some((&HashAlgorithm::Sha256, hash_value)), ).unwrap(); - let mut buf = Vec::new(); - assert!(reader.read_to_end(&mut buf).is_err()); + assert!(reader.concat2().wait().is_err()); } #[test] @@ -210,14 +298,14 @@ mod test { let mut context = digest::Context::new(&SHA256); context.update(&bytes); let hash_value = HashValue::new(context.finish().as_ref().to_vec()); - let mut reader = SafeReader::new( - bytes, + let reader = SafeStream::new( + stream::once(Ok(Bytes::from_static(bytes))), bytes.len() as u64, 0, Some((&HashAlgorithm::Sha256, hash_value)), ).unwrap(); - let mut buf = Vec::new(); - assert!(reader.read_to_end(&mut buf).is_ok()); + + let buf = reader.concat2().wait().unwrap(); assert_eq!(buf, bytes); } @@ -228,13 +316,12 @@ mod test { context.update(&bytes); context.update(&[0xFF]); // evil bytes let hash_value = HashValue::new(context.finish().as_ref().to_vec()); - let mut reader = SafeReader::new( - bytes, + let reader = SafeStream::new( + stream::once(Ok(Bytes::from_static(bytes))), bytes.len() as u64, 0, Some((&HashAlgorithm::Sha256, hash_value)), ).unwrap(); - let mut buf = Vec::new(); - assert!(reader.read_to_end(&mut buf).is_err()); + assert!(reader.concat2().wait().is_err()); } } diff --git a/tests/integration.rs b/tests/integration.rs index 92af93e7..d805499e 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -56,7 +56,7 @@ fn simple_delegation() { let signed = SignedMetadata::::new(&root, &root_key).unwrap(); - let mut tuf = Tuf::::from_root_pinned(signed, &[root_key.key_id().clone()]).unwrap(); + let mut tuf = Tuf::::from_root_pinned(signed, vec![root_key.key_id().clone()]).unwrap(); //// build the timestamp //// let snap = MetadataDescription::from_reader(&*vec![0u8], 1, &[HashAlgorithm::Sha256]).unwrap(); @@ -167,7 +167,7 @@ fn nested_delegation() { let signed = SignedMetadata::::new(&root, &root_key).unwrap(); - let mut tuf = Tuf::::from_root_pinned(signed, &[root_key.key_id().clone()]).unwrap(); + let mut tuf = Tuf::::from_root_pinned(signed, vec![root_key.key_id().clone()]).unwrap(); //// build the timestamp //// let snap = MetadataDescription::from_reader(&*vec![0u8], 1, &[HashAlgorithm::Sha256]).unwrap(); diff --git a/tests/simple_example.rs b/tests/simple_example.rs index 5782fa03..467ea601 100644 --- a/tests/simple_example.rs +++ b/tests/simple_example.rs @@ -1,10 +1,14 @@ +extern crate bytes; extern crate chrono; +extern crate futures; #[macro_use] extern crate maplit; extern crate tuf; +use bytes::Bytes; use chrono::offset::Utc; use chrono::prelude::*; +use futures::{Future, stream}; use tuf::client::{Client, Config, PathTranslator}; use tuf::crypto::{HashAlgorithm, KeyId, PrivateKey, SignatureScheme}; use tuf::interchange::{DataInterchange, Json}; @@ -40,7 +44,7 @@ fn with_translator() { let mut remote = EphemeralRepository::::new(); let config = Config::default(); let root_key_ids = init_server(&mut remote, &config).unwrap(); - init_client(&root_key_ids, remote, config).unwrap(); + init_client(root_key_ids, remote, config).unwrap(); } #[test] @@ -51,25 +55,25 @@ fn without_translator() { .finish() .unwrap(); let root_key_ids = init_server(&mut remote, &config).unwrap(); - init_client(&root_key_ids, remote, config).unwrap(); + init_client(root_key_ids, remote, config).unwrap(); } fn init_client( - root_key_ids: &[KeyId], + root_key_ids: Vec, remote: EphemeralRepository, config: Config, ) -> Result<()> where - T: PathTranslator, + T: PathTranslator + 'static, { let local = EphemeralRepository::::new(); - let mut client = Client::with_root_pinned(root_key_ids, config, local, remote)?; - match client.update_local() { + let mut client = Client::with_root_pinned(root_key_ids, config, local, remote).wait()?; + match client.update_local().wait() { Ok(_) => (), Err(e) => println!("{:?}", e), } - let _ = client.update_remote()?; - client.fetch_target(&TargetPath::new("foo-bar".into())?) + let _ = client.update_remote().wait()?; + client.fetch_target(&TargetPath::new("foo-bar".into())?).wait() } fn init_server(remote: &mut EphemeralRepository, config: &Config) -> Result> @@ -113,19 +117,22 @@ where &MetadataPath::new("root".into())?, &MetadataVersion::Number(1), &signed, - )?; + ).wait()?; remote.store_metadata( &MetadataPath::new("root".into())?, &MetadataVersion::None, &signed, - )?; + ).wait()?; //// build the targets //// let target_file: &[u8] = b"things fade, alternatives exclude"; let target_path = TargetPath::new("foo-bar".into())?; let target_description = TargetDescription::from_reader(target_file, &[HashAlgorithm::Sha256])?; - let _ = remote.store_target(target_file, &target_path); + let _ = remote.store_target( + stream::once(Ok(Bytes::from_static(target_file))), + &target_path, + ).wait(); let target_map = hashmap!(config.path_translator().real_to_virtual(&target_path)? => target_description); @@ -137,12 +144,12 @@ where &MetadataPath::new("targets".into())?, &MetadataVersion::Number(1), &signed, - )?; + ).wait()?; remote.store_metadata( &MetadataPath::new("targets".into())?, &MetadataVersion::None, &signed, - )?; + ).wait()?; let targets_bytes = Json::canonicalize(&Json::serialize(&signed)?)?; @@ -159,12 +166,12 @@ where &MetadataPath::new("snapshot".into())?, &MetadataVersion::Number(1), &signed, - )?; + ).wait()?; remote.store_metadata( &MetadataPath::new("snapshot".into())?, &MetadataVersion::None, &signed, - )?; + ).wait()?; let snapshot_bytes = Json::canonicalize(&Json::serialize(&signed)?)?; @@ -178,12 +185,12 @@ where &MetadataPath::new("timestamp".into())?, &MetadataVersion::Number(1), &signed, - )?; + ).wait()?; remote.store_metadata( &MetadataPath::new("timestamp".into())?, &MetadataVersion::None, &signed, - )?; + ).wait()?; Ok(vec![root_key.key_id().clone()]) }