Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions daemon/src/analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio::sync::mpsc::Receiver;
use tokio::sync::{RwLock, RwLockWriteGuard};
use tokio_util::task::TaskTracker;

use crate::qmdl_store::RecordingStore;
use crate::qmdl_store::{FileKind, RecordingStore};
use crate::server::ServerState;

pub struct AnalysisWriter {
Expand Down Expand Up @@ -145,9 +145,10 @@ async fn perform_analysis(
.await
.map_err(|e| format!("{e:?}"))?;
let qmdl_file = qmdl_store
.open_entry_qmdl(entry_index)
.open_file(entry_index, FileKind::Qmdl)
.await
.map_err(|e| format!("{e:?}"))?;
.map_err(|e| format!("{e:?}"))?
.ok_or("QMDL file not found")?;

(analysis_file, qmdl_file)
};
Expand Down
7 changes: 4 additions & 3 deletions daemon/src/diag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::analysis::{AnalysisCtrlMessage, AnalysisWriter};
use crate::config::GpsMode;
use crate::display;
use crate::notifications::{Notification, NotificationType};
use crate::qmdl_store::{RecordingStore, RecordingStoreError};
use crate::qmdl_store::{FileKind, RecordingStore, RecordingStoreError};
use crate::server::ServerState;
use crate::stats::DiskStats;

Expand Down Expand Up @@ -747,9 +747,10 @@ pub async fn get_analysis_report(
))?
};
let analysis_file = qmdl_store
.open_entry_analysis(entry_index)
.open_file(entry_index, FileKind::Analysis)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))?;
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))?
.ok_or((StatusCode::NOT_FOUND, "Analysis file not found".to_string()))?;

// Read and normalize the NDJSON file
let reader = BufReader::new(analysis_file);
Expand Down
8 changes: 5 additions & 3 deletions daemon/src/pcap.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::gps::{GpsRecord, load_gps_records};
use crate::qmdl_store::FileKind;
use crate::server::ServerState;

use crate::config::GpsMode;
Expand Down Expand Up @@ -52,9 +53,10 @@ pub async fn get_pcap(
}
let qmdl_size_bytes = entry.qmdl_size_bytes;
let qmdl_file = qmdl_store
.open_entry_qmdl(entry_index)
.open_file(entry_index, FileKind::Qmdl)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))?;
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e:?}")))?
.ok_or((StatusCode::NOT_FOUND, "QMDL file not found".to_string()))?;
let (reader, writer) = duplex(1024);
let gps_records = load_gps_records_for_entry(&state, entry_index).await;
drop(qmdl_store);
Expand All @@ -75,7 +77,7 @@ pub(crate) async fn load_gps_records_for_entry(
entry_index: usize,
) -> Vec<GpsRecord> {
let qmdl_store = state.qmdl_store_lock.read().await;
match qmdl_store.open_entry_gps(entry_index).await {
match qmdl_store.open_file(entry_index, FileKind::Gps).await {
Ok(Some(file)) => load_gps_records(file).await,
Ok(None) => {
let gps_mode = qmdl_store
Expand Down
133 changes: 64 additions & 69 deletions daemon/src/qmdl_store.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::fmt::Display;
use std::io::{self, ErrorKind};
use std::os::unix::fs::MetadataExt;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -43,6 +44,40 @@ pub enum RecordingStoreError {
SerializationError(#[from] serde_json::Error),
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FileKind {
Qmdl,
Analysis,
Gps,
}

impl FileKind {
// List of all possible physical files on disk.
pub const ALL: &'static [FileKind] = &[FileKind::Qmdl, FileKind::Analysis, FileKind::Gps];

pub fn get_filename(&self, entry_name: &str) -> String {
match self {
FileKind::Qmdl => format!("{}.qmdl", entry_name),
FileKind::Analysis => format!("{}.ndjson", entry_name),
FileKind::Gps => format!("{}-gps.ndjson", entry_name),
}
}

pub fn get_filepath<P: AsRef<Path>>(&self, entry_name: &str, base_path: P) -> PathBuf {
base_path.as_ref().join(self.get_filename(entry_name))
}
}

impl Display for FileKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FileKind::Qmdl => write!(f, "QMDL"),
FileKind::Analysis => write!(f, "analysis"),
FileKind::Gps => write!(f, "GPS"),
}
}
}

pub struct RecordingStore {
pub path: PathBuf,
pub manifest: Manifest,
Expand Down Expand Up @@ -101,20 +136,8 @@ impl ManifestEntry {
}
}

pub fn get_qmdl_filepath<P: AsRef<Path>>(&self, path: P) -> PathBuf {
let mut filepath = path.as_ref().join(&self.name);
filepath.set_extension("qmdl");
filepath
}

pub fn get_analysis_filepath<P: AsRef<Path>>(&self, path: P) -> PathBuf {
let mut filepath = path.as_ref().join(&self.name);
filepath.set_extension("ndjson");
filepath
}

pub fn get_gps_filepath<P: AsRef<Path>>(&self, path: P) -> PathBuf {
path.as_ref().join(format!("{}-gps.ndjson", self.name))
pub fn get_filepath<P: AsRef<Path>>(&self, file_kind: FileKind, path: P) -> PathBuf {
file_kind.get_filepath(&self.name, path)
}
}

Expand Down Expand Up @@ -275,15 +298,15 @@ impl RecordingStore {
self.close_current_entry().await?;
}
let new_entry = ManifestEntry::new(gps_mode);
let qmdl_filepath = new_entry.get_qmdl_filepath(&self.path);
let qmdl_filepath = new_entry.get_filepath(FileKind::Qmdl, &self.path);
let qmdl_file = File::create(&qmdl_filepath)
.await
.map_err(RecordingStoreError::CreateFileError)?;
let analysis_filepath = new_entry.get_analysis_filepath(&self.path);
let analysis_filepath = new_entry.get_filepath(FileKind::Analysis, &self.path);
let analysis_file = File::create(&analysis_filepath)
.await
.map_err(RecordingStoreError::CreateFileError)?;
let gps_filepath = new_entry.get_gps_filepath(&self.path);
let gps_filepath = new_entry.get_filepath(FileKind::Gps, &self.path);
File::create(&gps_filepath)
.await
.map_err(RecordingStoreError::CreateFileError)?;
Expand All @@ -293,31 +316,15 @@ impl RecordingStore {
Ok((qmdl_file, analysis_file))
}

// Returns the corresponding QMDL file for a given entry
pub async fn open_entry_qmdl(&self, entry_index: usize) -> Result<File, RecordingStoreError> {
let entry = &self.manifest.entries[entry_index];
File::open(entry.get_qmdl_filepath(&self.path))
.await
.map_err(RecordingStoreError::ReadFileError)
}

// Returns the corresponding QMDL file for a given entry
pub async fn open_entry_analysis(
&self,
entry_index: usize,
) -> Result<File, RecordingStoreError> {
let entry = &self.manifest.entries[entry_index];
File::open(entry.get_analysis_filepath(&self.path))
.await
.map_err(RecordingStoreError::ReadFileError)
}

pub async fn open_entry_gps(
pub async fn open_file(
&self,
entry_index: usize,
file_kind: FileKind,
) -> Result<Option<File>, RecordingStoreError> {
let entry = &self.manifest.entries[entry_index];
match File::open(entry.get_gps_filepath(&self.path)).await {
let filepath = file_kind.get_filepath(&entry.name, &self.path);

match File::open(&filepath).await {
Ok(file) => Ok(Some(file)),
Err(e) if e.kind() == ErrorKind::NotFound => Ok(None),
Err(e) => Err(RecordingStoreError::ReadFileError(e)),
Expand All @@ -332,7 +339,7 @@ impl RecordingStore {
match OpenOptions::new()
.create(true)
.append(true)
.open(entry.get_gps_filepath(&self.path))
.open(entry.get_filepath(FileKind::Gps, &self.path))
.await
{
Ok(file) => Ok(Some(file)),
Expand All @@ -349,7 +356,7 @@ impl RecordingStore {
let file = OpenOptions::new()
.write(true)
.truncate(true)
.open(entry.get_analysis_filepath(&self.path))
.open(entry.get_filepath(FileKind::Analysis, &self.path))
.await
.map_err(RecordingStoreError::ReadFileError)?;
Ok(file)
Expand Down Expand Up @@ -487,18 +494,13 @@ impl RecordingStore {
};
let entry_to_delete = self.manifest.entries.remove(entry_to_delete_idx);
self.write_manifest().await?;
let qmdl_filepath = entry_to_delete.get_qmdl_filepath(&self.path);
let analysis_filepath = entry_to_delete.get_analysis_filepath(&self.path);
let gps_filepath = entry_to_delete.get_gps_filepath(&self.path);
remove_file_if_exists(&qmdl_filepath)
.await
.map_err(RecordingStoreError::DeleteFileError)?;
remove_file_if_exists(&analysis_filepath)
.await
.map_err(RecordingStoreError::DeleteFileError)?;
remove_file_if_exists(&gps_filepath)
.await
.map_err(RecordingStoreError::DeleteFileError)?;

for &file_kind in FileKind::ALL {
let filepath = file_kind.get_filepath(&entry_to_delete.name, &self.path);
remove_file_if_exists(&filepath)
.await
.map_err(RecordingStoreError::DeleteFileError)?;
}
Ok(())
}

Expand All @@ -509,25 +511,18 @@ impl RecordingStore {

let mut keep = Vec::new();

for entry in &self.manifest.entries {
let qmdl_filepath = entry.get_qmdl_filepath(&self.path);
let analysis_filepath = entry.get_analysis_filepath(&self.path);

if let Err(e) = remove_file_if_exists(&qmdl_filepath).await {
log::warn!("failed to remove {qmdl_filepath:?}: {e:?}");
keep.push(true);
continue;
}

if let Err(e) = remove_file_if_exists(&analysis_filepath).await {
log::warn!("failed to remove {analysis_filepath:?}: {e:?}");
keep.push(true);
continue;
'entries: for entry in &self.manifest.entries {
for &file_kind in FileKind::ALL {
let filepath = file_kind.get_filepath(&entry.name, &self.path);
if let Err(e) = remove_file_if_exists(&filepath).await {
log::warn!("failed to remove {filepath:?}: {e:?}");
// Some error happened with deleting this entry, abort and go to the next one.
// Also *keep* the manifest entry.
keep.push(true);
continue 'entries;
}
}

let gps_filepath = entry.get_gps_filepath(&self.path);
remove_file_if_exists(&gps_filepath).await.ok();

keep.push(false);
}

Expand Down
56 changes: 39 additions & 17 deletions daemon/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::display::DisplayState;
use crate::gps::GpsData;
use crate::notifications::DEFAULT_NOTIFICATION_TIMEOUT;
use crate::pcap::{generate_pcap_data, load_gps_records_for_entry};
use crate::qmdl_store::RecordingStore;
use crate::qmdl_store::{FileKind, RecordingStore};
use crate::update::UpdateStatus;

pub struct ServerState {
Expand Down Expand Up @@ -72,14 +72,15 @@ pub async fn get_qmdl(
format!("couldn't find qmdl file with name {qmdl_idx}"),
))?;
let qmdl_file = qmdl_store
.open_entry_qmdl(entry_index)
.open_file(entry_index, FileKind::Qmdl)
.await
.map_err(|err| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("error opening QMDL file: {err}"),
)
})?;
})?
.ok_or((StatusCode::NOT_FOUND, "QMDL file not found".to_string()))?;
let limited_qmdl_file = qmdl_file.take(entry.qmdl_size_bytes as u64);
let qmdl_stream = ReaderStream::new(limited_qmdl_file);

Expand Down Expand Up @@ -359,24 +360,40 @@ pub async fn get_zip(
let result: Result<(), Error> = async {
let mut zip = ZipFileWriter::with_tokio(writer);

// Add QMDL file
{
let entry =
ZipEntryBuilder::new(format!("{qmdl_idx}.qmdl").into(), Compression::Stored);
const EXCLUDED_FROM_ZIP: &[FileKind] = &[FileKind::Analysis];

// Add stored files
for &file_kind in FileKind::ALL {
if EXCLUDED_FROM_ZIP.contains(&file_kind) {
continue;
}

let file_opt = {
let qmdl_store = qmdl_store_lock.read().await;
qmdl_store.open_file(entry_index, file_kind).await?
};

let Some(mut file) = file_opt else {
continue;
};

let entry = ZipEntryBuilder::new(
file_kind.get_filename(&qmdl_idx).into(),
Compression::Stored,
);
// FuturesAsyncWriteCompatExt::compat_write because async-zip's entrystream does
// not impl tokio's AsyncWrite, but only future's AsyncWrite. This can be removed
// once https://github.com/Majored/rs-async-zip/pull/160 is released.
let mut entry_writer = zip.write_entry_stream(entry).await?.compat_write();

let mut qmdl_file = {
let qmdl_store = qmdl_store_lock.read().await;
qmdl_store
.open_entry_qmdl(entry_index)
.await?
.take(qmdl_size_bytes as u64)
};
// Truncating to qmdl_size_bytes is an attempt to ignore partial writes by the diag
// thread.
if file_kind == FileKind::Qmdl {
copy(&mut file.take(qmdl_size_bytes as u64), &mut entry_writer).await?;
} else {
copy(&mut file, &mut entry_writer).await?;
}

copy(&mut qmdl_file, &mut entry_writer).await?;
entry_writer.into_inner().close().await?;
}

Expand All @@ -389,8 +406,9 @@ pub async fn get_zip(
let qmdl_file_for_pcap = {
let qmdl_store = qmdl_store_lock.read().await;
qmdl_store
.open_entry_qmdl(entry_index)
.open_file(entry_index, FileKind::Qmdl)
.await?
.ok_or_else(|| anyhow::anyhow!("QMDL file not found"))?
.take(qmdl_size_bytes as u64)
};

Expand Down Expand Up @@ -615,7 +633,11 @@ mod tests {

assert_eq!(
filenames,
vec![format!("{entry_name}.qmdl"), format!("{entry_name}.pcapng"),]
vec![
format!("{entry_name}.qmdl"),
format!("{entry_name}-gps.ndjson"),
format!("{entry_name}.pcapng"),
]
);
}
}
Loading
Loading