diff --git a/daemon/src/analysis.rs b/daemon/src/analysis.rs index 48c29b94..41efd959 100644 --- a/daemon/src/analysis.rs +++ b/daemon/src/analysis.rs @@ -18,7 +18,7 @@ use tokio::sync::mpsc::Receiver; use tokio::sync::{RwLock, RwLockWriteGuard}; use tokio_util::task::TaskTracker; -use crate::qmdl_store::RecordingStore; +use crate::qmdl_store::{FileKind, RecordingStore}; use crate::server::ServerState; pub struct AnalysisWriter { @@ -145,9 +145,10 @@ async fn perform_analysis( .await .map_err(|e| format!("{e:?}"))?; let qmdl_file = qmdl_store - .open_entry_qmdl(entry_index) + .open_file(entry_index, FileKind::Qmdl) .await - .map_err(|e| format!("{e:?}"))?; + .map_err(|e| format!("{e:?}"))? + .ok_or("QMDL file not found")?; (analysis_file, qmdl_file) }; diff --git a/daemon/src/diag.rs b/daemon/src/diag.rs index 7b72af7f..7e339129 100644 --- a/daemon/src/diag.rs +++ b/daemon/src/diag.rs @@ -31,7 +31,7 @@ use crate::analysis::{AnalysisCtrlMessage, AnalysisWriter}; use crate::config::GpsMode; use crate::display; use crate::notifications::{Notification, NotificationType}; -use crate::qmdl_store::{RecordingStore, RecordingStoreError}; +use crate::qmdl_store::{FileKind, RecordingStore, RecordingStoreError}; use crate::server::ServerState; use crate::stats::DiskStats; @@ -747,9 +747,10 @@ pub async fn get_analysis_report( ))? }; let analysis_file = qmdl_store - .open_entry_analysis(entry_index) + .open_file(entry_index, FileKind::Analysis) .await - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))?; + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))? + .ok_or((StatusCode::NOT_FOUND, "Analysis file not found".to_string()))?; // Read and normalize the NDJSON file let reader = BufReader::new(analysis_file); diff --git a/daemon/src/pcap.rs b/daemon/src/pcap.rs index c0c7a6b5..bad30b01 100644 --- a/daemon/src/pcap.rs +++ b/daemon/src/pcap.rs @@ -1,4 +1,5 @@ use crate::gps::{GpsRecord, load_gps_records}; +use crate::qmdl_store::FileKind; use crate::server::ServerState; use crate::config::GpsMode; @@ -52,9 +53,10 @@ pub async fn get_pcap( } let qmdl_size_bytes = entry.qmdl_size_bytes; let qmdl_file = qmdl_store - .open_entry_qmdl(entry_index) + .open_file(entry_index, FileKind::Qmdl) .await - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))?; + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))? + .ok_or((StatusCode::NOT_FOUND, "QMDL file not found".to_string()))?; let (reader, writer) = duplex(1024); let gps_records = load_gps_records_for_entry(&state, entry_index).await; drop(qmdl_store); @@ -75,7 +77,7 @@ pub(crate) async fn load_gps_records_for_entry( entry_index: usize, ) -> Vec { let qmdl_store = state.qmdl_store_lock.read().await; - match qmdl_store.open_entry_gps(entry_index).await { + match qmdl_store.open_file(entry_index, FileKind::Gps).await { Ok(Some(file)) => load_gps_records(file).await, Ok(None) => { let gps_mode = qmdl_store diff --git a/daemon/src/qmdl_store.rs b/daemon/src/qmdl_store.rs index bf197ecf..d15c1f77 100644 --- a/daemon/src/qmdl_store.rs +++ b/daemon/src/qmdl_store.rs @@ -1,3 +1,4 @@ +use std::fmt::Display; use std::io::{self, ErrorKind}; use std::os::unix::fs::MetadataExt; use std::path::{Path, PathBuf}; @@ -43,6 +44,40 @@ pub enum RecordingStoreError { SerializationError(#[from] serde_json::Error), } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FileKind { + Qmdl, + Analysis, + Gps, +} + +impl FileKind { + // List of all possible physical files on disk. + pub const ALL: &'static [FileKind] = &[FileKind::Qmdl, FileKind::Analysis, FileKind::Gps]; + + pub fn get_filename(&self, entry_name: &str) -> String { + match self { + FileKind::Qmdl => format!("{}.qmdl", entry_name), + FileKind::Analysis => format!("{}.ndjson", entry_name), + FileKind::Gps => format!("{}-gps.ndjson", entry_name), + } + } + + pub fn get_filepath>(&self, entry_name: &str, base_path: P) -> PathBuf { + base_path.as_ref().join(self.get_filename(entry_name)) + } +} + +impl Display for FileKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + FileKind::Qmdl => write!(f, "QMDL"), + FileKind::Analysis => write!(f, "analysis"), + FileKind::Gps => write!(f, "GPS"), + } + } +} + pub struct RecordingStore { pub path: PathBuf, pub manifest: Manifest, @@ -101,20 +136,8 @@ impl ManifestEntry { } } - pub fn get_qmdl_filepath>(&self, path: P) -> PathBuf { - let mut filepath = path.as_ref().join(&self.name); - filepath.set_extension("qmdl"); - filepath - } - - pub fn get_analysis_filepath>(&self, path: P) -> PathBuf { - let mut filepath = path.as_ref().join(&self.name); - filepath.set_extension("ndjson"); - filepath - } - - pub fn get_gps_filepath>(&self, path: P) -> PathBuf { - path.as_ref().join(format!("{}-gps.ndjson", self.name)) + pub fn get_filepath>(&self, file_kind: FileKind, path: P) -> PathBuf { + file_kind.get_filepath(&self.name, path) } } @@ -275,15 +298,15 @@ impl RecordingStore { self.close_current_entry().await?; } let new_entry = ManifestEntry::new(gps_mode); - let qmdl_filepath = new_entry.get_qmdl_filepath(&self.path); + let qmdl_filepath = new_entry.get_filepath(FileKind::Qmdl, &self.path); let qmdl_file = File::create(&qmdl_filepath) .await .map_err(RecordingStoreError::CreateFileError)?; - let analysis_filepath = new_entry.get_analysis_filepath(&self.path); + let analysis_filepath = new_entry.get_filepath(FileKind::Analysis, &self.path); let analysis_file = File::create(&analysis_filepath) .await .map_err(RecordingStoreError::CreateFileError)?; - let gps_filepath = new_entry.get_gps_filepath(&self.path); + let gps_filepath = new_entry.get_filepath(FileKind::Gps, &self.path); File::create(&gps_filepath) .await .map_err(RecordingStoreError::CreateFileError)?; @@ -293,31 +316,15 @@ impl RecordingStore { Ok((qmdl_file, analysis_file)) } - // Returns the corresponding QMDL file for a given entry - pub async fn open_entry_qmdl(&self, entry_index: usize) -> Result { - let entry = &self.manifest.entries[entry_index]; - File::open(entry.get_qmdl_filepath(&self.path)) - .await - .map_err(RecordingStoreError::ReadFileError) - } - - // Returns the corresponding QMDL file for a given entry - pub async fn open_entry_analysis( - &self, - entry_index: usize, - ) -> Result { - let entry = &self.manifest.entries[entry_index]; - File::open(entry.get_analysis_filepath(&self.path)) - .await - .map_err(RecordingStoreError::ReadFileError) - } - - pub async fn open_entry_gps( + pub async fn open_file( &self, entry_index: usize, + file_kind: FileKind, ) -> Result, RecordingStoreError> { let entry = &self.manifest.entries[entry_index]; - match File::open(entry.get_gps_filepath(&self.path)).await { + let filepath = file_kind.get_filepath(&entry.name, &self.path); + + match File::open(&filepath).await { Ok(file) => Ok(Some(file)), Err(e) if e.kind() == ErrorKind::NotFound => Ok(None), Err(e) => Err(RecordingStoreError::ReadFileError(e)), @@ -332,7 +339,7 @@ impl RecordingStore { match OpenOptions::new() .create(true) .append(true) - .open(entry.get_gps_filepath(&self.path)) + .open(entry.get_filepath(FileKind::Gps, &self.path)) .await { Ok(file) => Ok(Some(file)), @@ -349,7 +356,7 @@ impl RecordingStore { let file = OpenOptions::new() .write(true) .truncate(true) - .open(entry.get_analysis_filepath(&self.path)) + .open(entry.get_filepath(FileKind::Analysis, &self.path)) .await .map_err(RecordingStoreError::ReadFileError)?; Ok(file) @@ -487,18 +494,13 @@ impl RecordingStore { }; let entry_to_delete = self.manifest.entries.remove(entry_to_delete_idx); self.write_manifest().await?; - let qmdl_filepath = entry_to_delete.get_qmdl_filepath(&self.path); - let analysis_filepath = entry_to_delete.get_analysis_filepath(&self.path); - let gps_filepath = entry_to_delete.get_gps_filepath(&self.path); - remove_file_if_exists(&qmdl_filepath) - .await - .map_err(RecordingStoreError::DeleteFileError)?; - remove_file_if_exists(&analysis_filepath) - .await - .map_err(RecordingStoreError::DeleteFileError)?; - remove_file_if_exists(&gps_filepath) - .await - .map_err(RecordingStoreError::DeleteFileError)?; + + for &file_kind in FileKind::ALL { + let filepath = file_kind.get_filepath(&entry_to_delete.name, &self.path); + remove_file_if_exists(&filepath) + .await + .map_err(RecordingStoreError::DeleteFileError)?; + } Ok(()) } @@ -509,25 +511,18 @@ impl RecordingStore { let mut keep = Vec::new(); - for entry in &self.manifest.entries { - let qmdl_filepath = entry.get_qmdl_filepath(&self.path); - let analysis_filepath = entry.get_analysis_filepath(&self.path); - - if let Err(e) = remove_file_if_exists(&qmdl_filepath).await { - log::warn!("failed to remove {qmdl_filepath:?}: {e:?}"); - keep.push(true); - continue; - } - - if let Err(e) = remove_file_if_exists(&analysis_filepath).await { - log::warn!("failed to remove {analysis_filepath:?}: {e:?}"); - keep.push(true); - continue; + 'entries: for entry in &self.manifest.entries { + for &file_kind in FileKind::ALL { + let filepath = file_kind.get_filepath(&entry.name, &self.path); + if let Err(e) = remove_file_if_exists(&filepath).await { + log::warn!("failed to remove {filepath:?}: {e:?}"); + // Some error happened with deleting this entry, abort and go to the next one. + // Also *keep* the manifest entry. + keep.push(true); + continue 'entries; + } } - let gps_filepath = entry.get_gps_filepath(&self.path); - remove_file_if_exists(&gps_filepath).await.ok(); - keep.push(false); } diff --git a/daemon/src/server.rs b/daemon/src/server.rs index 1f72f131..c898a488 100644 --- a/daemon/src/server.rs +++ b/daemon/src/server.rs @@ -28,7 +28,7 @@ use crate::display::DisplayState; use crate::gps::GpsData; use crate::notifications::DEFAULT_NOTIFICATION_TIMEOUT; use crate::pcap::{generate_pcap_data, load_gps_records_for_entry}; -use crate::qmdl_store::RecordingStore; +use crate::qmdl_store::{FileKind, RecordingStore}; use crate::update::UpdateStatus; pub struct ServerState { @@ -72,14 +72,15 @@ pub async fn get_qmdl( format!("couldn't find qmdl file with name {qmdl_idx}"), ))?; let qmdl_file = qmdl_store - .open_entry_qmdl(entry_index) + .open_file(entry_index, FileKind::Qmdl) .await .map_err(|err| { ( StatusCode::INTERNAL_SERVER_ERROR, format!("error opening QMDL file: {err}"), ) - })?; + })? + .ok_or((StatusCode::NOT_FOUND, "QMDL file not found".to_string()))?; let limited_qmdl_file = qmdl_file.take(entry.qmdl_size_bytes as u64); let qmdl_stream = ReaderStream::new(limited_qmdl_file); @@ -359,24 +360,40 @@ pub async fn get_zip( let result: Result<(), Error> = async { let mut zip = ZipFileWriter::with_tokio(writer); - // Add QMDL file - { - let entry = - ZipEntryBuilder::new(format!("{qmdl_idx}.qmdl").into(), Compression::Stored); + const EXCLUDED_FROM_ZIP: &[FileKind] = &[FileKind::Analysis]; + + // Add stored files + for &file_kind in FileKind::ALL { + if EXCLUDED_FROM_ZIP.contains(&file_kind) { + continue; + } + + let file_opt = { + let qmdl_store = qmdl_store_lock.read().await; + qmdl_store.open_file(entry_index, file_kind).await? + }; + + let Some(mut file) = file_opt else { + continue; + }; + + let entry = ZipEntryBuilder::new( + file_kind.get_filename(&qmdl_idx).into(), + Compression::Stored, + ); // FuturesAsyncWriteCompatExt::compat_write because async-zip's entrystream does // not impl tokio's AsyncWrite, but only future's AsyncWrite. This can be removed // once https://github.com/Majored/rs-async-zip/pull/160 is released. let mut entry_writer = zip.write_entry_stream(entry).await?.compat_write(); - let mut qmdl_file = { - let qmdl_store = qmdl_store_lock.read().await; - qmdl_store - .open_entry_qmdl(entry_index) - .await? - .take(qmdl_size_bytes as u64) - }; + // Truncating to qmdl_size_bytes is an attempt to ignore partial writes by the diag + // thread. + if file_kind == FileKind::Qmdl { + copy(&mut file.take(qmdl_size_bytes as u64), &mut entry_writer).await?; + } else { + copy(&mut file, &mut entry_writer).await?; + } - copy(&mut qmdl_file, &mut entry_writer).await?; entry_writer.into_inner().close().await?; } @@ -389,8 +406,9 @@ pub async fn get_zip( let qmdl_file_for_pcap = { let qmdl_store = qmdl_store_lock.read().await; qmdl_store - .open_entry_qmdl(entry_index) + .open_file(entry_index, FileKind::Qmdl) .await? + .ok_or_else(|| anyhow::anyhow!("QMDL file not found"))? .take(qmdl_size_bytes as u64) }; @@ -615,7 +633,11 @@ mod tests { assert_eq!( filenames, - vec![format!("{entry_name}.qmdl"), format!("{entry_name}.pcapng"),] + vec![ + format!("{entry_name}.qmdl"), + format!("{entry_name}-gps.ndjson"), + format!("{entry_name}.pcapng"), + ] ); } } diff --git a/daemon/src/webdav.rs b/daemon/src/webdav.rs index a23a443f..6369f9ff 100644 --- a/daemon/src/webdav.rs +++ b/daemon/src/webdav.rs @@ -1,18 +1,17 @@ -use std::fmt::Display; use std::{sync::Arc, time::Duration}; use chrono::TimeDelta; +use futures::future::join_all; use log::{info, warn}; use reqwest::header::{CONTENT_LENGTH, CONTENT_TYPE}; use reqwest::{Body, Client, Response}; use tokio::fs::File; -use tokio::join; use tokio::{select, sync::RwLock, time}; use tokio_util::io::ReaderStream; use tokio_util::{sync::CancellationToken, task::TaskTracker}; use crate::config::WebdavConfig; -use crate::qmdl_store::RecordingStore; +use crate::qmdl_store::{FileKind, RecordingStore}; pub struct WebdavUploadWorkerConfig { poll_interval: Duration, @@ -38,29 +37,6 @@ impl From for WebdavUploadWorkerConfig { } } -enum FileKind { - Analysis, - Qmdl, -} - -impl FileKind { - fn as_extension(&self) -> &'static str { - match self { - FileKind::Analysis => ".ndjson", - FileKind::Qmdl => ".qmdl", - } - } -} - -impl Display for FileKind { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - FileKind::Analysis => write!(f, "analysis"), - FileKind::Qmdl => write!(f, "QMDL"), - } - } -} - #[derive(Debug, Clone)] struct WebDavClient { client: Client, @@ -127,22 +103,22 @@ async fn try_upload_entry( ) -> Option<()> { let read_lock = store.read().await; let entry_idx = read_lock.entry_for_name(&entry_name)?.0; - let file = match file_kind { - FileKind::Analysis => read_lock.open_entry_analysis(entry_idx).await, - FileKind::Qmdl => read_lock.open_entry_qmdl(entry_idx).await, - }; + let file = read_lock.open_file(entry_idx, file_kind).await; drop(read_lock); - let Ok(file) = file.map_err(|err| { - warn!( - "Unable to open entry: {} {} file: {:?}", - entry_name, file_kind, err - ) - }) else { - return None; + let file = match file { + Ok(Some(f)) => f, + Ok(None) => return Some(()), // File doesn't exist (e.g., GPS for old recordings) + Err(err) => { + warn!( + "Unable to open entry: {} {} file: {:?}", + entry_name, file_kind, err + ); + return None; + } }; - let file_name = format!("{}{}", entry_name, file_kind.as_extension()); + let file_name = file_kind.get_filename(&entry_name); let res = select! { _ = shutdown_token.cancelled() => { @@ -205,24 +181,23 @@ pub fn run_webdav_upload_worker( break; }; - let (Some(()), Some(())) = join!( - try_upload_entry( - webdav_client.clone(), - qmdl_store_lock.clone(), - unuploaded_entry.clone(), - FileKind::Qmdl, - shutdown_token.clone(), - ), - try_upload_entry( - webdav_client.clone(), - qmdl_store_lock.clone(), - unuploaded_entry.clone(), - FileKind::Analysis, - shutdown_token.clone() - ), - ) else { + let upload_futures: Vec<_> = FileKind::ALL + .iter() + .map(|&file_kind| { + try_upload_entry( + webdav_client.clone(), + qmdl_store_lock.clone(), + unuploaded_entry.clone(), + file_kind, + shutdown_token.clone(), + ) + }) + .collect(); + + let results = join_all(upload_futures).await; + if !results.iter().all(|r| r.is_some()) { break; - }; + } if config.delete_on_upload { match qmdl_store_lock.write().await.delete_entry(&unuploaded_entry).await { @@ -354,12 +329,14 @@ mod tests { cleanup_worker(shutdown, tracker).await; let recorded = captured.lock().await; - assert_eq!(recorded.len(), 2); + assert_eq!(recorded.len(), 3); let paths: Vec<&str> = recorded.iter().map(|r| r.path.as_str()).collect(); let qmdl_path = format!("dav/{}.qmdl", entry_name); let ndjson_path = format!("dav/{}.ndjson", entry_name); + let gps_path = format!("dav/{}-gps.ndjson", entry_name); assert!(paths.contains(&qmdl_path.as_str())); assert!(paths.contains(&ndjson_path.as_str())); + assert!(paths.contains(&gps_path.as_str())); for put in recorded.iter() { assert_eq!(put.auth.as_deref(), Some("Basic dXNlcjpwYXNzd29yZA==")); } @@ -408,7 +385,7 @@ mod tests { tokio::time::sleep(Duration::from_millis(500)).await; cleanup_worker(shutdown, tracker).await; - assert_eq!(captured.lock().await.len(), 2); + assert_eq!(captured.lock().await.len(), 3); let store_read = store.read().await; assert!(store_read.entry_for_name(&entry_name).is_none());