From 21ede68c6f6e7dedd92a6dfd1e1e88b2017c5388 Mon Sep 17 00:00:00 2001 From: Philipp Falk <80761729+philippfalk@users.noreply.github.com> Date: Tue, 14 Apr 2026 12:41:21 +0200 Subject: [PATCH 1/9] fix(quota): only enforce quota if feature licensed Before this commit, the quota retrieval and distribution of exceeded IDs would be disabled on mgmtd startup if the feature wasn't licensed. This lead to a couple of issues with enforcement consistency: * When a license expired while mgmtd was running, enforcement would continue to work but only until mgmtd restarts * When mgmtd restarted, collection and distribution of exceeded quotas would be disabled, leading to other nodes enforcing based on an old state * When other nodes restarted, they would still be able to initially fetch outdated quota states and enforce based on them. This commit fixes all of the above by not allowing intial downloads of quota state if the feature is not licensed and not entirely disabling the quota distribution mechanism. If the quota feature is not licensed nodes quota collection from the nodes will be disabled for efficiency reasons, but nodes will still receive exceeded quota updates to allow for online changes in license state. These updates will not contain any information about exceeded IDs and will simply clear out the state on the nodes so quotas will no longer be enforced. --- mgmtd/src/bee_msg/request_exceeded_quota.rs | 4 + mgmtd/src/quota.rs | 87 +++++++++++++-------- mgmtd/src/timer.rs | 17 ++-- 3 files changed, 64 insertions(+), 44 deletions(-) diff --git a/mgmtd/src/bee_msg/request_exceeded_quota.rs b/mgmtd/src/bee_msg/request_exceeded_quota.rs index cf9915c..f513d4c 100644 --- a/mgmtd/src/bee_msg/request_exceeded_quota.rs +++ b/mgmtd/src/bee_msg/request_exceeded_quota.rs @@ -1,3 +1,5 @@ +use crate::license::LicensedFeature; + use super::*; use rusqlite::params; use shared::bee_msg::quota::*; @@ -13,6 +15,8 @@ impl HandleWithResponse for RequestExceededQuota { } async fn handle(self, app: &impl App, _req: &mut impl Request) -> Result { + app.verify_licensed_feature(LicensedFeature::Quota)?; + let inner = app .read_tx(move |tx| { // Quota is calculated per pool, so if a target ID is given, use its assigned pools diff --git a/mgmtd/src/quota.rs b/mgmtd/src/quota.rs index d4129e2..352fc0e 100644 --- a/mgmtd/src/quota.rs +++ b/mgmtd/src/quota.rs @@ -3,6 +3,7 @@ mod system_id; use crate::app::*; +use crate::license::LicensedFeature; use crate::types::SqliteEnumExt; use anyhow::{Context as AnyhowContext, Result}; use rusqlite::params; @@ -16,10 +17,16 @@ use sqlite_check::sql; use std::collections::HashSet; use std::path::Path; -/// Fetches quota information for all storage targets, calculates exceeded IDs and distributes them. -pub(crate) async fn update_and_distribute(app: &impl App) -> Result<()> { - // Fetch quota data from storage daemons +/// Fetches quota information for all storage targets and updates the quota usage database +pub(crate) async fn fetch_and_update(app: &impl App) -> Result<()> { + if app.verify_licensed_feature(LicensedFeature::Quota).is_err() { + log::info!( + "Quota is enabled but feature not licensed. Skipping quota collection" + ); + return Ok(()); + } + // Fetch quota data from storage daemons let targets: Vec<(TargetId, PoolId, Uid)> = app .read_tx(move |tx| { tx.query_map_collect( @@ -196,19 +203,20 @@ pub(crate) async fn update_and_distribute(app: &impl App) -> Result<()> { } } - if app.static_info().user_config.quota_enforce { - exceeded_quota(app).await?; - } - Ok(()) } -/// Calculate and push exceeded quota info to the nodes -async fn exceeded_quota(app: &impl App) -> Result<()> { +/// Calculates and pushes exceeded quota info to the nodes +pub(crate) async fn distribute_exceeded(app: &impl App) -> Result<()> { + if !app.static_info().user_config.quota_enforce { + return Ok(()); + } log::info!("Calculating and pushing exceeded quota"); + let quota_licensed = app.verify_licensed_feature(LicensedFeature::Quota).is_ok(); + let (msges, nodes) = app - .read_tx(|tx| { + .read_tx(move |tx| { let pools: Vec<_> = tx.query_map_collect(sql!("SELECT pool_id FROM pools"), [], |row| row.get(0))?; @@ -229,29 +237,36 @@ async fn exceeded_quota(app: &impl App) -> Result<()> { } } - // Fill the prepared messages with matching exceeded quota ids - let mut stmt = tx.prepare_cached(sql!( - "SELECT DISTINCT e.quota_id, e.id_type, e.quota_type, st.pool_id - FROM quota_usage AS e - INNER JOIN targets AS st USING(node_type, target_id) - LEFT JOIN quota_default_limits AS d USING(id_type, quota_type, pool_id) - LEFT JOIN quota_limits AS l USING(quota_id, id_type, quota_type, pool_id) - GROUP BY e.quota_id, e.id_type, e.quota_type, st.pool_id - HAVING SUM(e.value) > COALESCE(l.value, d.value)" - ))?; - let mut rows = stmt.query([])?; - while let Some(row) = rows.next()? { - for m in &mut msges { - if row.get::<_, PoolId>(3)? == m.pool_id - && QuotaIdType::from_row(row, 1)? == m.id_type - && QuotaType::from_row(row, 2)? == m.quota_type - { - m.exceeded_quota_ids.push(row.get(0)?); - break; + if quota_licensed { + // Fill the prepared messages with matching exceeded quota ids + let mut stmt = tx.prepare_cached(sql!( + "SELECT DISTINCT e.quota_id, e.id_type, e.quota_type, st.pool_id + FROM quota_usage AS e + INNER JOIN targets AS st USING(node_type, target_id) + LEFT JOIN quota_default_limits AS d USING(id_type, quota_type, pool_id) + LEFT JOIN quota_limits AS l USING(quota_id, id_type, quota_type, pool_id) + GROUP BY e.quota_id, e.id_type, e.quota_type, st.pool_id + HAVING SUM(e.value) > COALESCE(l.value, d.value)" + ))?; + let mut rows = stmt.query([])?; + while let Some(row) = rows.next()? { + for m in &mut msges { + if row.get::<_, PoolId>(3)? == m.pool_id + && QuotaIdType::from_row(row, 1)? == m.id_type + && QuotaType::from_row(row, 2)? == m.quota_type + { + m.exceeded_quota_ids.push(row.get(0)?); + break; + } } } + } else { + log::info!( + "Quota enforcement enabled but feature not licensed. Removing quota limits from nodes" + ); } + // Get all node uids to send the messages to let nodes: Vec = tx.query_map_collect( sql!("SELECT node_uid FROM nodes WHERE node_type IN (?1,?2)"), @@ -369,7 +384,9 @@ mod test { })) }); - super::update_and_distribute(&app).await.unwrap(); + super::fetch_and_update(&app).await.unwrap(); + super::distribute_exceeded(&app).await.unwrap(); + // Find the amount of target 1 entries which values match the schema they have been reported // with @@ -425,7 +442,8 @@ mod test { })) }); - super::update_and_distribute(&app).await.unwrap(); + super::fetch_and_update(&app).await.unwrap(); + super::distribute_exceeded(&app).await.unwrap(); // Now target 2 quota should be empty, target 1 quota should be completely untouched due to // the error (even if it only failed for user quota request) @@ -465,7 +483,8 @@ mod test { })) }); - super::update_and_distribute(&app).await.unwrap(); + super::fetch_and_update(&app).await.unwrap(); + super::distribute_exceeded(&app).await.unwrap(); // Target 1 should now only have the couple of entries resulting from above app.db @@ -488,7 +507,7 @@ mod test { } #[tokio::test] - async fn exceeded_quota() { + async fn distribute_exceeded() { // This fn doesn't need special config let app = TestApp::new().await; @@ -521,6 +540,6 @@ mod test { })) }); - super::exceeded_quota(&app).await.unwrap(); + super::distribute_exceeded(&app).await.unwrap(); } } diff --git a/mgmtd/src/timer.rs b/mgmtd/src/timer.rs index 568caed..48dd359 100644 --- a/mgmtd/src/timer.rs +++ b/mgmtd/src/timer.rs @@ -3,8 +3,7 @@ use crate::App; use crate::app::RuntimeApp; use crate::db::{self}; -use crate::license::LicensedFeature; -use crate::quota::update_and_distribute; +use crate::quota::{fetch_and_update,distribute_exceeded}; use shared::bee_msg::target::RefreshTargetStates; use shared::run_state::RunStateHandle; use shared::types::NodeType; @@ -19,13 +18,7 @@ pub(crate) fn start_tasks(app: RuntimeApp, run_state: RunStateHandle) { tokio::spawn(switchover(app.clone(), run_state.clone())); if app.info.user_config.quota_enable { - if let Err(err) = app.license.verify_licensed_feature(LicensedFeature::Quota) { - log::error!( - "Quota is enabled in the config, but the feature could not be verified. Continuing without quota support: {err}" - ); - } else { - tokio::spawn(update_quota(app, run_state)); - } + tokio::spawn(update_quota(app, run_state)); } } @@ -63,10 +56,14 @@ async fn update_quota(app: RuntimeApp, mut run_state: RunStateHandle) { loop { log::debug!("Running quota update"); - match update_and_distribute(&app).await { + match fetch_and_update(&app).await { Ok(_) => {} Err(err) => log::error!("Updating quota failed: {err:#}"), } + match distribute_exceeded(&app).await { + Ok(_) => {} + Err(err) => log::error!("Distributing exceeded quota failed: {err:#}"), + } tokio::select! { _ = sleep(app.info.user_config.quota_update_interval) => {} From dda621d93c36c81257ae66c23eaf6ea546d96082 Mon Sep 17 00:00:00 2001 From: Philipp Falk <80761729+philippfalk@users.noreply.github.com> Date: Wed, 15 Apr 2026 15:47:23 +0200 Subject: [PATCH 2/9] feat(license): disallow client mounts when no license loaded --- mgmtd/src/bee_msg/common.rs | 38 +++++++++++++++++++++++++++++- mgmtd/src/bee_msg/heartbeat.rs | 1 + mgmtd/src/bee_msg/register_node.rs | 6 ++++- mgmtd/src/db/node.rs | 13 ++++++++++ shared/src/conn/msg_dispatch.rs | 13 ++++++++++ 5 files changed, 69 insertions(+), 2 deletions(-) diff --git a/mgmtd/src/bee_msg/common.rs b/mgmtd/src/bee_msg/common.rs index 6d37186..f11ace3 100644 --- a/mgmtd/src/bee_msg/common.rs +++ b/mgmtd/src/bee_msg/common.rs @@ -1,6 +1,7 @@ use super::*; use crate::db::node_nic::ReplaceNic; use db::misc::MetaRoot; +use protobuf::license::VerifyResult; use rusqlite::Transaction; use shared::bee_msg::node::*; use shared::bee_msg::target::*; @@ -9,12 +10,37 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; +const NUM_CLIENTS: u32 = 5; + /// Processes incoming node information. Registers new nodes if config allows it -pub(super) async fn update_node(msg: RegisterNode, app: &impl App) -> Result { +pub(super) async fn update_node(msg: RegisterNode, app: &impl App, reject: bool) -> Result { let nics = msg.nics.clone(); let requested_node_id = msg.node_id; let registration_disable = app.static_info().user_config.registration_disable; + let licensed_clients: Option = match app.get_license_cert_data() { + Ok(r) => match r.result { + // If license is valid, no limit to client count + _ if r.result == VerifyResult::VerifyValid as i32 => None, + // no license file loaded, limit number of clients to NUM_CLIENTS + _ if r.result == VerifyResult::VerifyError as i32 => Some(NUM_CLIENTS), + // license file was loaded and is outside validity period + _ if r.result == VerifyResult::VerifyInvalid as i32 => None, + _ => { + log::error!( + "Unexpected error during license verification, limiting number of clients to {NUM_CLIENTS}: {0}", r.message + ); + Some(NUM_CLIENTS) + }, + }, + Err(e) => { + log::error!( + "Unexpected error during license verification, limiting number of clients to {NUM_CLIENTS}: {e:#}", + ); + Some(NUM_CLIENTS) + }, + }; + let licensed_machines = match app.get_licensed_machines() { Ok(n) => n, Err(err) => { @@ -97,6 +123,16 @@ registration token ({new_alias_or_reg_token}) does not match the stored token ({ bail!("Registration of new nodes is not allowed"); } + if msg.node_type == NodeType::Client + && let Some(cs) = licensed_clients + && db::node::count_clients(tx)? >= cs { + if reject { + bail!("Number of licensed clients ({NUM_CLIENTS}) exhausted. Client registration denied."); + } else { + log::warn!("Number of licensed clients ({NUM_CLIENTS}) exhausted but client doesn't support rejection."); + } + } + let new_alias = if msg.node_type == NodeType::Client { // In versions prior to 8.0 the string node ID generated by the client // started with a number which is not allowed by the new alias schema. diff --git a/mgmtd/src/bee_msg/heartbeat.rs b/mgmtd/src/bee_msg/heartbeat.rs index f3127fd..3d75c24 100644 --- a/mgmtd/src/bee_msg/heartbeat.rs +++ b/mgmtd/src/bee_msg/heartbeat.rs @@ -24,6 +24,7 @@ impl HandleWithResponse for Heartbeat { machine_uuid: self.machine_uuid, }, app, + false, ) .await?; diff --git a/mgmtd/src/bee_msg/register_node.rs b/mgmtd/src/bee_msg/register_node.rs index 7d817ad..37e1d89 100644 --- a/mgmtd/src/bee_msg/register_node.rs +++ b/mgmtd/src/bee_msg/register_node.rs @@ -2,13 +2,17 @@ use super::*; use common::update_node; use shared::bee_msg::node::*; +const REGISTERNODEMSG_COMPATFLAG_CLIENT_SUPPORTS_REGREJ: u8 = 1; + impl HandleWithResponse for RegisterNode { type Response = RegisterNodeResp; async fn handle(self, app: &impl App, _req: &mut impl Request) -> Result { fail_on_pre_shutdown(app)?; - let node_id = update_node(self, app).await?; + let reject = (_req.msg_compat_feature_flags() & REGISTERNODEMSG_COMPATFLAG_CLIENT_SUPPORTS_REGREJ) != 0; + + let node_id = update_node(self, app, reject).await?; let fs_uuid: String = app .read_tx(|tx| db::config::get(tx, db::config::Config::FsUuid)) diff --git a/mgmtd/src/db/node.rs b/mgmtd/src/db/node.rs index 0ecce5c..7baf8bb 100644 --- a/mgmtd/src/db/node.rs +++ b/mgmtd/src/db/node.rs @@ -194,6 +194,19 @@ pub(crate) fn count_machines( .map_err(|e| anyhow!(e)) } +/// Counts the number of currently registered distinct clients. +/// +/// # Return value +/// Returns the number of currently registered distinct clients if successful. +pub(crate) fn count_clients(tx: &Transaction) -> Result { + tx.query_row( + sql!("SELECT COUNT(DISTINCT node_uid) FROM nodes WHERE node_type = ?1"), + params![NodeType::Client.sql_variant()], + |row| row.get(0), + ) + .map_err(|e| anyhow!(e)) +} + /// Delete a node from the database. pub(crate) fn delete(tx: &Transaction, node_uid: Uid) -> Result<()> { let affected = tx.execute_cached(sql!("DELETE FROM nodes WHERE node_uid = ?1"), [node_uid])?; diff --git a/shared/src/conn/msg_dispatch.rs b/shared/src/conn/msg_dispatch.rs index 2932148..337e84e 100644 --- a/shared/src/conn/msg_dispatch.rs +++ b/shared/src/conn/msg_dispatch.rs @@ -26,6 +26,7 @@ pub trait Request: Send + Sync { fn authenticate_connection(&mut self); fn addr(&self) -> SocketAddr; fn msg_id(&self) -> MsgId; + fn msg_compat_feature_flags(&self) -> u8; fn deserialize_msg(&self) -> Result; } @@ -64,6 +65,10 @@ impl Request for StreamRequest<'_> { fn msg_id(&self) -> MsgId { self.header.msg_id() } + + fn msg_compat_feature_flags(&self) -> u8 { + self.header.msg_compat_feature_flags + } } /// Represents a request made via a UDP datagram @@ -99,6 +104,10 @@ impl Request for SocketRequest<'_> { fn msg_id(&self) -> MsgId { self.header.msg_id() } + + fn msg_compat_feature_flags(&self) -> u8 { + self.header.msg_compat_feature_flags + } } pub mod test { @@ -137,6 +146,10 @@ pub mod test { self.msg_id } + fn msg_compat_feature_flags(&self) -> u8 { + 0 + } + fn deserialize_msg(&self) -> Result { unimplemented!() } From be31b42a4793e48d4d49d3fc75cd91bba224d848 Mon Sep 17 00:00:00 2001 From: Philipp Falk <80761729+philippfalk@users.noreply.github.com> Date: Tue, 12 May 2026 15:11:26 +0200 Subject: [PATCH 3/9] feat: only allow a single trial license --- Cargo.lock | 2 +- Cargo.toml | 2 +- mgmtd/src/app.rs | 1 + mgmtd/src/app/runtime.rs | 6 +++-- mgmtd/src/app/test.rs | 4 +++- mgmtd/src/db/config.rs | 2 ++ mgmtd/src/grpc/get_license.rs | 17 +++++++++++++-- mgmtd/src/lib.rs | 41 ++++++++++++++++++++++++++++++++++- mgmtd/src/license.rs | 17 +++++++++++++-- mgmtd/src/main.rs | 27 ----------------------- 10 files changed, 82 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e89fccc..e378a11 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -869,7 +869,7 @@ dependencies = [ [[package]] name = "protobuf" version = "0.0.0" -source = "git+https://github.com/thinkparq/protobuf?rev=e2e774e7db7e3d4474d6e7232bb06bbdffc5610c#e2e774e7db7e3d4474d6e7232bb06bbdffc5610c" +source = "git+https://github.com/thinkparq/protobuf?rev=4d5e5db085065acbbaa5bb76ce4b81d6d733e446#4d5e5db085065acbbaa5bb76ce4b81d6d733e446" dependencies = [ "prost", "prost-types", diff --git a/Cargo.toml b/Cargo.toml index 22f5dd9..d15d815 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ itertools = "0" libc = "0" log = { version = "0", features = ["std"] } prost = "0.14" -protobuf = { git = "https://github.com/thinkparq/protobuf", rev = "e2e774e7db7e3d4474d6e7232bb06bbdffc5610c" } +protobuf = { git = "https://github.com/thinkparq/protobuf", rev = "4d5e5db085065acbbaa5bb76ce4b81d6d733e446" } regex = "1" ring = "0" rusqlite = { version = "0", features = ["bundled", "vtab", "array", "fallible_uint"] } diff --git a/mgmtd/src/app.rs b/mgmtd/src/app.rs index c978c81..0ed1b89 100644 --- a/mgmtd/src/app.rs +++ b/mgmtd/src/app.rs @@ -81,6 +81,7 @@ pub(crate) trait App: Debug + Clone + Send + 'static { fn load_and_verify_license_cert( &self, cert_path: &Path, + prev_trial_serial: Option, ) -> impl Future> + Send; /// Get license certificate data diff --git a/mgmtd/src/app/runtime.rs b/mgmtd/src/app/runtime.rs index 7431200..d81394c 100644 --- a/mgmtd/src/app/runtime.rs +++ b/mgmtd/src/app/runtime.rs @@ -164,8 +164,10 @@ impl App for RuntimeApp { } } - async fn load_and_verify_license_cert(&self, cert_path: &Path) -> Result { - LicenseVerifier::load_and_verify_license_cert(&self.license, cert_path).await + async fn load_and_verify_license_cert(&self, cert_path: &Path, + prev_trial_serial: Option) -> Result + { + LicenseVerifier::load_and_verify_license_cert(&self.license, cert_path, prev_trial_serial).await } fn get_license_cert_data(&self) -> Result { diff --git a/mgmtd/src/app/test.rs b/mgmtd/src/app/test.rs index 7d5f424..11daee6 100644 --- a/mgmtd/src/app/test.rs +++ b/mgmtd/src/app/test.rs @@ -160,7 +160,9 @@ impl App for TestApp { fn notify_client_pulled_state(&self, _node_type: NodeType, _node_id: NodeId) {} - async fn load_and_verify_license_cert(&self, _cert_path: &std::path::Path) -> Result { + async fn load_and_verify_license_cert(&self, _cert_path: &std::path::Path, + _prev_trial_serial: Option) -> Result + { Ok("dummy cert".to_string()) } diff --git a/mgmtd/src/db/config.rs b/mgmtd/src/db/config.rs index 10720ac..1a4c5e5 100644 --- a/mgmtd/src/db/config.rs +++ b/mgmtd/src/db/config.rs @@ -17,6 +17,7 @@ pub(crate) enum Config { #[allow(unused)] FsName, CounterLastClientID, + TrialSerial, } // Config entries that should not be changed after initially set. Note that this only controls the @@ -31,6 +32,7 @@ impl Config { Config::FsInitDateSecs => "fs_init_date_secs", Config::FsName => "fs_name", Config::CounterLastClientID => "counter_last_client_id", + Config::TrialSerial => "trial_serial", } } } diff --git a/mgmtd/src/grpc/get_license.rs b/mgmtd/src/grpc/get_license.rs index f8b4479..3741a96 100644 --- a/mgmtd/src/grpc/get_license.rs +++ b/mgmtd/src/grpc/get_license.rs @@ -1,5 +1,7 @@ use super::*; +use crate::db::config::Config; use protobuf::management::{self as pm, GetLicenseResponse}; +use protobuf::license::CertType; pub(crate) async fn get_license( app: &impl App, @@ -7,8 +9,19 @@ pub(crate) async fn get_license( ) -> Result { let reload: bool = required_field(req.reload)?; if reload { - app.load_and_verify_license_cert(&app.static_info().user_config.license_cert_file) - .await?; + let prev_trial_serial: Option = app.read_tx(|tx| { + db::config::get(tx, Config::TrialSerial) + }) + .await?; + + let serial = app.load_and_verify_license_cert(&app.static_info().user_config.license_cert_file, + prev_trial_serial).await?; + + if app.get_license_cert_data()?.data.is_some_and(|d| d.r#type == CertType::Trial.into()) + { + app.write_tx(|tx| db::config::set(tx, Config::TrialSerial, serial)).await?; + } + } let cert_data = app.get_license_cert_data()?; Ok(GetLicenseResponse { diff --git a/mgmtd/src/lib.rs b/mgmtd/src/lib.rs index cac458a..097b64a 100644 --- a/mgmtd/src/lib.rs +++ b/mgmtd/src/lib.rs @@ -16,8 +16,10 @@ use crate::app::RuntimeApp; use crate::config::Config; use anyhow::{Context, Result}; use app::App; +use db::config::Config as dbConfig; use db::node_nic::ReplaceNic; use license::LicenseVerifier; +use protobuf::license::CertType; use shared::bee_msg::target::RefreshTargetStates; use shared::conn::incoming; use shared::conn::outgoing::Pool; @@ -54,7 +56,7 @@ pub struct StaticInfo { /// Returns after all setup work is done and all tasks are started. The caller is responsible for /// keeping the shutdown control handle and send a shutdown request when the program shall /// be terminated. -pub async fn start(info: StaticInfo, license: LicenseVerifier) -> Result { +pub async fn start(info: StaticInfo) -> Result { // Initialization let (run_state, run_state_control) = run_state::new(); @@ -114,6 +116,43 @@ pub async fn start(info: StaticInfo, license: LicenseVerifier) -> Result = db.read_tx(|tx| { + db::config::get(tx, db::config::Config::TrialSerial) + }) + .await?; + + // Load the licensing library + let license = if !info.user_config.license_disable { + // SAFETY: + // There is no way to verify that the user loaded dynamic library matches the + // requirements of LicenseVerifier. After all, users can load anything they + // want. Therefore, this is just not safe to do from the Rust compilers + // perspective and loading anything with non-matching fp signatures or not + // behaving as expected will lead to undefined behavior. + let license = unsafe { LicenseVerifier::with_lib(&info.user_config.license_lib_file) }; + + match license + .load_and_verify_license_cert(&info.user_config.license_cert_file, prev_trial_serial) + .await + { + Ok(serial) => { + if license.get_license_cert_data()?.data.is_some_and( + |d| d.r#type == CertType::Trial.into()) + { + db.write_tx(|tx| db::config::set(tx, dbConfig::TrialSerial, serial)).await?; + } + }, + Err(err) => log::warn!( + "Initializing licensing library failed. \ + Licensed features will be unavailable: {err}" + ), + } + + license + } else { + LicenseVerifier::with_no_lib() + }; + // Fill node addrs store from db db.read_tx(db::node_nic::get_all_addrs) .await? diff --git a/mgmtd/src/license.rs b/mgmtd/src/license.rs index 76eeb5b..00c91d1 100644 --- a/mgmtd/src/license.rs +++ b/mgmtd/src/license.rs @@ -203,6 +203,7 @@ impl LicenseVerifier { pub async fn load_and_verify_license_cert( &self, cert_path: impl AsRef, + prev_trial_serial: Option, ) -> Result { let Some(ref library) = self.0 else { bail!("License verification library not loaded."); @@ -224,8 +225,20 @@ impl LicenseVerifier { match result { VerifyResult::VerifyValid => { - log::info!("Successfully loaded license certificate: {serial}"); - Ok(serial) + match self.get_license_cert_data() { + Ok(c) => { + if c.data.is_some_and(|d| d.r#type() == CertType::Trial + && prev_trial_serial.is_some_and(|s| serial != s)) + { + library.init_cert_store(); + Err(anyhow!("System has previously used different trial license.")) + } else { + log::info!("Successfully loaded license certificate: {serial}"); + Ok(serial) + } + }, + Err(err) => Err(anyhow!("Error getting license data: {err}")), + } } VerifyResult::VerifyInvalid => Err(anyhow!(message)), VerifyResult::VerifyError => Err(anyhow!( diff --git a/mgmtd/src/main.rs b/mgmtd/src/main.rs index 0fa6617..dd78095 100644 --- a/mgmtd/src/main.rs +++ b/mgmtd/src/main.rs @@ -2,7 +2,6 @@ use anyhow::{Context, Result, anyhow}; use log::LevelFilter; use mgmtd::config::LogTarget; use mgmtd::db::{self}; -use mgmtd::license::LicenseVerifier; use mgmtd::{StaticInfo, start}; use shared::journald_logger; use shared::nic::check_ipv6; @@ -115,31 +114,6 @@ If you want to initialize a new system, refer to --help or doc.beegfs.io.", // Run the tokio executor rt.block_on(async move { - // Load the licensing library - let license = if !user_config.license_disable { - // SAFETY: - // There is no way to verify that the user loaded dynamic library matches the - // requirements of LicenseVerifier. After all, users can load anything they - // want. Therefore, this is just not safe to do from the Rust compilers - // perspective and loading anything with non-matching fp signatures or not - // behaving as expected will lead to undefined behavior. - let license = unsafe { LicenseVerifier::with_lib(&user_config.license_lib_file) }; - - if let Err(err) = license - .load_and_verify_license_cert(&user_config.license_cert_file) - .await - { - log::warn!( - "Initializing licensing library failed. \ - Licensed features will be unavailable: {err}" - ); - } - - license - } else { - LicenseVerifier::with_no_lib() - }; - // Start the actual daemon let run = start( StaticInfo { @@ -148,7 +122,6 @@ If you want to initialize a new system, refer to --help or doc.beegfs.io.", auth_secret, network_addrs, }, - license, ) .await?; From 53ae0e1031b9a02cd3a9d3193d2c9bd56e4a322f Mon Sep 17 00:00:00 2001 From: Philipp Falk <80761729+philippfalk@users.noreply.github.com> Date: Tue, 19 May 2026 19:33:00 +0200 Subject: [PATCH 4/9] chore (formatting): apply cargo fmt --- mgmtd/src/app/runtime.rs | 11 +++++--- mgmtd/src/app/test.rs | 8 ++++-- mgmtd/src/bee_msg/common.rs | 13 +++++---- mgmtd/src/bee_msg/register_node.rs | 4 ++- mgmtd/src/bee_msg/request_exceeded_quota.rs | 3 +- mgmtd/src/grpc/get_license.rs | 26 ++++++++++------- mgmtd/src/lib.rs | 18 ++++++------ mgmtd/src/license.rs | 31 +++++++++++---------- mgmtd/src/main.rs | 14 ++++------ mgmtd/src/quota.rs | 5 +--- mgmtd/src/timer.rs | 2 +- 11 files changed, 73 insertions(+), 62 deletions(-) diff --git a/mgmtd/src/app/runtime.rs b/mgmtd/src/app/runtime.rs index d81394c..941d7eb 100644 --- a/mgmtd/src/app/runtime.rs +++ b/mgmtd/src/app/runtime.rs @@ -164,10 +164,13 @@ impl App for RuntimeApp { } } - async fn load_and_verify_license_cert(&self, cert_path: &Path, - prev_trial_serial: Option) -> Result - { - LicenseVerifier::load_and_verify_license_cert(&self.license, cert_path, prev_trial_serial).await + async fn load_and_verify_license_cert( + &self, + cert_path: &Path, + prev_trial_serial: Option, + ) -> Result { + LicenseVerifier::load_and_verify_license_cert(&self.license, cert_path, prev_trial_serial) + .await } fn get_license_cert_data(&self) -> Result { diff --git a/mgmtd/src/app/test.rs b/mgmtd/src/app/test.rs index 11daee6..7e5b65a 100644 --- a/mgmtd/src/app/test.rs +++ b/mgmtd/src/app/test.rs @@ -160,9 +160,11 @@ impl App for TestApp { fn notify_client_pulled_state(&self, _node_type: NodeType, _node_id: NodeId) {} - async fn load_and_verify_license_cert(&self, _cert_path: &std::path::Path, - _prev_trial_serial: Option) -> Result - { + async fn load_and_verify_license_cert( + &self, + _cert_path: &std::path::Path, + _prev_trial_serial: Option, + ) -> Result { Ok("dummy cert".to_string()) } diff --git a/mgmtd/src/bee_msg/common.rs b/mgmtd/src/bee_msg/common.rs index f11ace3..8b4f6fa 100644 --- a/mgmtd/src/bee_msg/common.rs +++ b/mgmtd/src/bee_msg/common.rs @@ -28,17 +28,18 @@ pub(super) async fn update_node(msg: RegisterNode, app: &impl App, reject: bool) _ if r.result == VerifyResult::VerifyInvalid as i32 => None, _ => { log::error!( - "Unexpected error during license verification, limiting number of clients to {NUM_CLIENTS}: {0}", r.message + "Unexpected error during license verification, limiting number of clients to {NUM_CLIENTS}: {0}", + r.message ); Some(NUM_CLIENTS) - }, + } }, Err(e) => { log::error!( - "Unexpected error during license verification, limiting number of clients to {NUM_CLIENTS}: {e:#}", - ); - Some(NUM_CLIENTS) - }, + "Unexpected error during license verification, limiting number of clients to {NUM_CLIENTS}: {e:#}", + ); + Some(NUM_CLIENTS) + } }; let licensed_machines = match app.get_licensed_machines() { diff --git a/mgmtd/src/bee_msg/register_node.rs b/mgmtd/src/bee_msg/register_node.rs index 37e1d89..7ee9bf6 100644 --- a/mgmtd/src/bee_msg/register_node.rs +++ b/mgmtd/src/bee_msg/register_node.rs @@ -10,7 +10,9 @@ impl HandleWithResponse for RegisterNode { async fn handle(self, app: &impl App, _req: &mut impl Request) -> Result { fail_on_pre_shutdown(app)?; - let reject = (_req.msg_compat_feature_flags() & REGISTERNODEMSG_COMPATFLAG_CLIENT_SUPPORTS_REGREJ) != 0; + let reject = (_req.msg_compat_feature_flags() + & REGISTERNODEMSG_COMPATFLAG_CLIENT_SUPPORTS_REGREJ) + != 0; let node_id = update_node(self, app, reject).await?; diff --git a/mgmtd/src/bee_msg/request_exceeded_quota.rs b/mgmtd/src/bee_msg/request_exceeded_quota.rs index f513d4c..50064e3 100644 --- a/mgmtd/src/bee_msg/request_exceeded_quota.rs +++ b/mgmtd/src/bee_msg/request_exceeded_quota.rs @@ -1,6 +1,5 @@ -use crate::license::LicensedFeature; - use super::*; +use crate::license::LicensedFeature; use rusqlite::params; use shared::bee_msg::quota::*; diff --git a/mgmtd/src/grpc/get_license.rs b/mgmtd/src/grpc/get_license.rs index 3741a96..6bbce0b 100644 --- a/mgmtd/src/grpc/get_license.rs +++ b/mgmtd/src/grpc/get_license.rs @@ -1,7 +1,7 @@ use super::*; use crate::db::config::Config; -use protobuf::management::{self as pm, GetLicenseResponse}; use protobuf::license::CertType; +use protobuf::management::{self as pm, GetLicenseResponse}; pub(crate) async fn get_license( app: &impl App, @@ -9,19 +9,25 @@ pub(crate) async fn get_license( ) -> Result { let reload: bool = required_field(req.reload)?; if reload { - let prev_trial_serial: Option = app.read_tx(|tx| { - db::config::get(tx, Config::TrialSerial) - }) - .await?; + let prev_trial_serial: Option = app + .read_tx(|tx| db::config::get(tx, Config::TrialSerial)) + .await?; - let serial = app.load_and_verify_license_cert(&app.static_info().user_config.license_cert_file, - prev_trial_serial).await?; + let serial = app + .load_and_verify_license_cert( + &app.static_info().user_config.license_cert_file, + prev_trial_serial, + ) + .await?; - if app.get_license_cert_data()?.data.is_some_and(|d| d.r#type == CertType::Trial.into()) + if app + .get_license_cert_data()? + .data + .is_some_and(|d| d.r#type == CertType::Trial.into()) { - app.write_tx(|tx| db::config::set(tx, Config::TrialSerial, serial)).await?; + app.write_tx(|tx| db::config::set(tx, Config::TrialSerial, serial)) + .await?; } - } let cert_data = app.get_license_cert_data()?; Ok(GetLicenseResponse { diff --git a/mgmtd/src/lib.rs b/mgmtd/src/lib.rs index 097b64a..01eba0a 100644 --- a/mgmtd/src/lib.rs +++ b/mgmtd/src/lib.rs @@ -116,10 +116,9 @@ pub async fn start(info: StaticInfo) -> Result { }) .await?; - let prev_trial_serial: Option = db.read_tx(|tx| { - db::config::get(tx, db::config::Config::TrialSerial) - }) - .await?; + let prev_trial_serial: Option = db + .read_tx(|tx| db::config::get(tx, db::config::Config::TrialSerial)) + .await?; // Load the licensing library let license = if !info.user_config.license_disable { @@ -136,12 +135,15 @@ pub async fn start(info: StaticInfo) -> Result { .await { Ok(serial) => { - if license.get_license_cert_data()?.data.is_some_and( - |d| d.r#type == CertType::Trial.into()) + if license + .get_license_cert_data()? + .data + .is_some_and(|d| d.r#type == CertType::Trial.into()) { - db.write_tx(|tx| db::config::set(tx, dbConfig::TrialSerial, serial)).await?; + db.write_tx(|tx| db::config::set(tx, dbConfig::TrialSerial, serial)) + .await?; } - }, + } Err(err) => log::warn!( "Initializing licensing library failed. \ Licensed features will be unavailable: {err}" diff --git a/mgmtd/src/license.rs b/mgmtd/src/license.rs index 00c91d1..a292792 100644 --- a/mgmtd/src/license.rs +++ b/mgmtd/src/license.rs @@ -224,22 +224,23 @@ impl LicenseVerifier { let message = res.message; match result { - VerifyResult::VerifyValid => { - match self.get_license_cert_data() { - Ok(c) => { - if c.data.is_some_and(|d| d.r#type() == CertType::Trial - && prev_trial_serial.is_some_and(|s| serial != s)) - { - library.init_cert_store(); - Err(anyhow!("System has previously used different trial license.")) - } else { - log::info!("Successfully loaded license certificate: {serial}"); - Ok(serial) - } - }, - Err(err) => Err(anyhow!("Error getting license data: {err}")), + VerifyResult::VerifyValid => match self.get_license_cert_data() { + Ok(c) => { + if c.data.is_some_and(|d| { + d.r#type() == CertType::Trial + && prev_trial_serial.is_some_and(|s| serial != s) + }) { + library.init_cert_store(); + Err(anyhow!( + "System has previously used different trial license." + )) + } else { + log::info!("Successfully loaded license certificate: {serial}"); + Ok(serial) + } } - } + Err(err) => Err(anyhow!("Error getting license data: {err}")), + }, VerifyResult::VerifyInvalid => Err(anyhow!(message)), VerifyResult::VerifyError => Err(anyhow!( "Internal error during certificate verification: {message}" diff --git a/mgmtd/src/main.rs b/mgmtd/src/main.rs index dd78095..dd976cc 100644 --- a/mgmtd/src/main.rs +++ b/mgmtd/src/main.rs @@ -115,14 +115,12 @@ If you want to initialize a new system, refer to --help or doc.beegfs.io.", // Run the tokio executor rt.block_on(async move { // Start the actual daemon - let run = start( - StaticInfo { - use_ipv6, - user_config, - auth_secret, - network_addrs, - }, - ) + let run = start(StaticInfo { + use_ipv6, + user_config, + auth_secret, + network_addrs, + }) .await?; // Mgmtds systemd unit is set to service type "notify". Here we send out the diff --git a/mgmtd/src/quota.rs b/mgmtd/src/quota.rs index 352fc0e..800243d 100644 --- a/mgmtd/src/quota.rs +++ b/mgmtd/src/quota.rs @@ -20,9 +20,7 @@ use std::path::Path; /// Fetches quota information for all storage targets and updates the quota usage database pub(crate) async fn fetch_and_update(app: &impl App) -> Result<()> { if app.verify_licensed_feature(LicensedFeature::Quota).is_err() { - log::info!( - "Quota is enabled but feature not licensed. Skipping quota collection" - ); + log::info!("Quota is enabled but feature not licensed. Skipping quota collection"); return Ok(()); } @@ -387,7 +385,6 @@ mod test { super::fetch_and_update(&app).await.unwrap(); super::distribute_exceeded(&app).await.unwrap(); - // Find the amount of target 1 entries which values match the schema they have been reported // with let t1_sql = format!( diff --git a/mgmtd/src/timer.rs b/mgmtd/src/timer.rs index 48dd359..052a479 100644 --- a/mgmtd/src/timer.rs +++ b/mgmtd/src/timer.rs @@ -3,7 +3,7 @@ use crate::App; use crate::app::RuntimeApp; use crate::db::{self}; -use crate::quota::{fetch_and_update,distribute_exceeded}; +use crate::quota::{distribute_exceeded, fetch_and_update}; use shared::bee_msg::target::RefreshTargetStates; use shared::run_state::RunStateHandle; use shared::types::NodeType; From 567c2bbdd7019d677e1ffafe48f396b5e9e680ee Mon Sep 17 00:00:00 2001 From: Philipp Falk <80761729+philippfalk@users.noreply.github.com> Date: Thu, 21 May 2026 14:59:06 +0200 Subject: [PATCH 5/9] fix(review): address comments --- mgmtd/src/bee_msg/common.rs | 60 ++++++++++++++++++------------ mgmtd/src/bee_msg/register_node.rs | 8 ++-- mgmtd/src/db/node.rs | 13 ------- mgmtd/src/grpc/get_license.rs | 2 +- mgmtd/src/lib.rs | 50 +++++++++---------------- mgmtd/src/main.rs | 26 ++++++++++--- mgmtd/src/quota.rs | 4 -- 7 files changed, 78 insertions(+), 85 deletions(-) diff --git a/mgmtd/src/bee_msg/common.rs b/mgmtd/src/bee_msg/common.rs index 8b4f6fa..1f01675 100644 --- a/mgmtd/src/bee_msg/common.rs +++ b/mgmtd/src/bee_msg/common.rs @@ -2,7 +2,7 @@ use super::*; use crate::db::node_nic::ReplaceNic; use db::misc::MetaRoot; use protobuf::license::VerifyResult; -use rusqlite::Transaction; +use rusqlite::{Transaction, params}; use shared::bee_msg::node::*; use shared::bee_msg::target::*; use shared::types::{NodeId, TargetId}; @@ -10,7 +10,8 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; -const NUM_CLIENTS: u32 = 5; +// Maximum number of clients that can register if license verification fails or license is invalid +const MAX_NUM_CLIENTS: u32 = 5; /// Processes incoming node information. Registers new nodes if config allows it pub(super) async fn update_node(msg: RegisterNode, app: &impl App, reject: bool) -> Result { @@ -18,28 +19,33 @@ pub(super) async fn update_node(msg: RegisterNode, app: &impl App, reject: bool) let requested_node_id = msg.node_id; let registration_disable = app.static_info().user_config.registration_disable; - let licensed_clients: Option = match app.get_license_cert_data() { - Ok(r) => match r.result { - // If license is valid, no limit to client count - _ if r.result == VerifyResult::VerifyValid as i32 => None, - // no license file loaded, limit number of clients to NUM_CLIENTS - _ if r.result == VerifyResult::VerifyError as i32 => Some(NUM_CLIENTS), - // license file was loaded and is outside validity period - _ if r.result == VerifyResult::VerifyInvalid as i32 => None, - _ => { - log::error!( - "Unexpected error during license verification, limiting number of clients to {NUM_CLIENTS}: {0}", - r.message + let licensed_clients: Option = if msg.node_type == NodeType::Client { + match app.get_license_cert_data() { + Ok(r) => match r.result() { + // If license is valid, no limit to client count + VerifyResult::VerifyValid => None, + // no license file loaded, limit number of clients to NUM_CLIENTS + VerifyResult::VerifyError => Some(MAX_NUM_CLIENTS), + // license file was loaded and is outside validity period + VerifyResult::VerifyInvalid => None, + _ => { + log::debug!( + "Unexpected error during license verification, limiting number of clients to {MAX_NUM_CLIENTS}: {0}", + r.message + ); + Some(MAX_NUM_CLIENTS) + } + }, + Err(e) => { + log::debug!( + "Error during license verification, limiting number of clients to {MAX_NUM_CLIENTS}: {e:#}", ); - Some(NUM_CLIENTS) + Some(MAX_NUM_CLIENTS) } - }, - Err(e) => { - log::error!( - "Unexpected error during license verification, limiting number of clients to {NUM_CLIENTS}: {e:#}", - ); - Some(NUM_CLIENTS) } + } else { + // not a client registration, so not going to be used anyway. But let's be defensive + Some(MAX_NUM_CLIENTS) }; let licensed_machines = match app.get_licensed_machines() { @@ -124,13 +130,19 @@ registration token ({new_alias_or_reg_token}) does not match the stored token ({ bail!("Registration of new nodes is not allowed"); } + let num_reg_clients: u32 = tx.query_row( + sql!("SELECT COUNT(DISTINCT node_uid) FROM nodes WHERE node_type = ?1"), + params![NodeType::Client.sql_variant()], + |row| row.get(0), + )?; + if msg.node_type == NodeType::Client && let Some(cs) = licensed_clients - && db::node::count_clients(tx)? >= cs { + && num_reg_clients >= cs { if reject { - bail!("Number of licensed clients ({NUM_CLIENTS}) exhausted. Client registration denied."); + bail!("Number of licensed clients ({MAX_NUM_CLIENTS}) exhausted. Client registration denied."); } else { - log::warn!("Number of licensed clients ({NUM_CLIENTS}) exhausted but client doesn't support rejection."); + log::warn!("Number of licensed clients ({MAX_NUM_CLIENTS}) exhausted but client doesn't support rejection."); } } diff --git a/mgmtd/src/bee_msg/register_node.rs b/mgmtd/src/bee_msg/register_node.rs index 7ee9bf6..a6782d5 100644 --- a/mgmtd/src/bee_msg/register_node.rs +++ b/mgmtd/src/bee_msg/register_node.rs @@ -2,17 +2,15 @@ use super::*; use common::update_node; use shared::bee_msg::node::*; -const REGISTERNODEMSG_COMPATFLAG_CLIENT_SUPPORTS_REGREJ: u8 = 1; +const COMPATFLAG_CLIENT_SUPPORTS_REGREJ: u8 = 1; impl HandleWithResponse for RegisterNode { type Response = RegisterNodeResp; - async fn handle(self, app: &impl App, _req: &mut impl Request) -> Result { + async fn handle(self, app: &impl App, req: &mut impl Request) -> Result { fail_on_pre_shutdown(app)?; - let reject = (_req.msg_compat_feature_flags() - & REGISTERNODEMSG_COMPATFLAG_CLIENT_SUPPORTS_REGREJ) - != 0; + let reject = (req.msg_compat_feature_flags() & COMPATFLAG_CLIENT_SUPPORTS_REGREJ) != 0; let node_id = update_node(self, app, reject).await?; diff --git a/mgmtd/src/db/node.rs b/mgmtd/src/db/node.rs index 7baf8bb..0ecce5c 100644 --- a/mgmtd/src/db/node.rs +++ b/mgmtd/src/db/node.rs @@ -194,19 +194,6 @@ pub(crate) fn count_machines( .map_err(|e| anyhow!(e)) } -/// Counts the number of currently registered distinct clients. -/// -/// # Return value -/// Returns the number of currently registered distinct clients if successful. -pub(crate) fn count_clients(tx: &Transaction) -> Result { - tx.query_row( - sql!("SELECT COUNT(DISTINCT node_uid) FROM nodes WHERE node_type = ?1"), - params![NodeType::Client.sql_variant()], - |row| row.get(0), - ) - .map_err(|e| anyhow!(e)) -} - /// Delete a node from the database. pub(crate) fn delete(tx: &Transaction, node_uid: Uid) -> Result<()> { let affected = tx.execute_cached(sql!("DELETE FROM nodes WHERE node_uid = ?1"), [node_uid])?; diff --git a/mgmtd/src/grpc/get_license.rs b/mgmtd/src/grpc/get_license.rs index 6bbce0b..29d131b 100644 --- a/mgmtd/src/grpc/get_license.rs +++ b/mgmtd/src/grpc/get_license.rs @@ -23,7 +23,7 @@ pub(crate) async fn get_license( if app .get_license_cert_data()? .data - .is_some_and(|d| d.r#type == CertType::Trial.into()) + .is_some_and(|d| d.r#type() == CertType::Trial) { app.write_tx(|tx| db::config::set(tx, Config::TrialSerial, serial)) .await?; diff --git a/mgmtd/src/lib.rs b/mgmtd/src/lib.rs index 01eba0a..15a3915 100644 --- a/mgmtd/src/lib.rs +++ b/mgmtd/src/lib.rs @@ -56,7 +56,7 @@ pub struct StaticInfo { /// Returns after all setup work is done and all tasks are started. The caller is responsible for /// keeping the shutdown control handle and send a shutdown request when the program shall /// be terminated. -pub async fn start(info: StaticInfo) -> Result { +pub async fn start(info: StaticInfo, license: LicenseVerifier) -> Result { // Initialization let (run_state, run_state_control) = run_state::new(); @@ -120,39 +120,25 @@ pub async fn start(info: StaticInfo) -> Result { .read_tx(|tx| db::config::get(tx, db::config::Config::TrialSerial)) .await?; - // Load the licensing library - let license = if !info.user_config.license_disable { - // SAFETY: - // There is no way to verify that the user loaded dynamic library matches the - // requirements of LicenseVerifier. After all, users can load anything they - // want. Therefore, this is just not safe to do from the Rust compilers - // perspective and loading anything with non-matching fp signatures or not - // behaving as expected will lead to undefined behavior. - let license = unsafe { LicenseVerifier::with_lib(&info.user_config.license_lib_file) }; - - match license - .load_and_verify_license_cert(&info.user_config.license_cert_file, prev_trial_serial) - .await - { - Ok(serial) => { - if license - .get_license_cert_data()? - .data - .is_some_and(|d| d.r#type == CertType::Trial.into()) - { - db.write_tx(|tx| db::config::set(tx, dbConfig::TrialSerial, serial)) - .await?; - } + // Load and verify license certificate + match license + .load_and_verify_license_cert(&info.user_config.license_cert_file, prev_trial_serial) + .await + { + Ok(serial) => { + if license + .get_license_cert_data()? + .data + .is_some_and(|d| d.r#type == CertType::Trial.into()) + { + db.write_tx(|tx| db::config::set(tx, dbConfig::TrialSerial, serial)) + .await?; } - Err(err) => log::warn!( - "Initializing licensing library failed. \ - Licensed features will be unavailable: {err}" - ), } - - license - } else { - LicenseVerifier::with_no_lib() + Err(err) => log::warn!( + "Initializing licensing library failed. \ + Licensed features will be unavailable: {err}" + ), }; // Fill node addrs store from db diff --git a/mgmtd/src/main.rs b/mgmtd/src/main.rs index dd976cc..622e315 100644 --- a/mgmtd/src/main.rs +++ b/mgmtd/src/main.rs @@ -2,6 +2,7 @@ use anyhow::{Context, Result, anyhow}; use log::LevelFilter; use mgmtd::config::LogTarget; use mgmtd::db::{self}; +use mgmtd::license::LicenseVerifier; use mgmtd::{StaticInfo, start}; use shared::journald_logger; use shared::nic::check_ipv6; @@ -112,15 +113,28 @@ If you want to initialize a new system, refer to --help or doc.beegfs.io.", .max_blocking_threads(user_config.max_blocking_threads) .build()?; + // Load the licensing library + // + // SAFETY: + // There is no way to verify that the user loaded dynamic library matches the + // requirements of LicenseVerifier. After all, users can load anything they + // want. Therefore, this is just not safe to do from the Rust compilers + // perspective and loading anything with non-matching fp signatures or not + // behaving as expected will lead to undefined behavior. + let license = unsafe { LicenseVerifier::with_lib(&user_config.license_lib_file) }; + // Run the tokio executor rt.block_on(async move { // Start the actual daemon - let run = start(StaticInfo { - use_ipv6, - user_config, - auth_secret, - network_addrs, - }) + let run = start( + StaticInfo { + use_ipv6, + user_config, + auth_secret, + network_addrs, + }, + license, + ) .await?; // Mgmtds systemd unit is set to service type "notify". Here we send out the diff --git a/mgmtd/src/quota.rs b/mgmtd/src/quota.rs index 800243d..3630a21 100644 --- a/mgmtd/src/quota.rs +++ b/mgmtd/src/quota.rs @@ -343,7 +343,6 @@ mod test { async fn update() { let app = TestApp::with_config(Config { quota_enable: true, - quota_enforce: false, // Exceeded calculation and push is tested separately quota_user_ids_range: Some(0..=9), quota_group_ids_range: Some(0..=9), ..Default::default() @@ -383,7 +382,6 @@ mod test { }); super::fetch_and_update(&app).await.unwrap(); - super::distribute_exceeded(&app).await.unwrap(); // Find the amount of target 1 entries which values match the schema they have been reported // with @@ -440,7 +438,6 @@ mod test { }); super::fetch_and_update(&app).await.unwrap(); - super::distribute_exceeded(&app).await.unwrap(); // Now target 2 quota should be empty, target 1 quota should be completely untouched due to // the error (even if it only failed for user quota request) @@ -481,7 +478,6 @@ mod test { }); super::fetch_and_update(&app).await.unwrap(); - super::distribute_exceeded(&app).await.unwrap(); // Target 1 should now only have the couple of entries resulting from above app.db From f9f1d8a02ca247c14efe507e7944a1b0dbdf9f66 Mon Sep 17 00:00:00 2001 From: Philipp Falk <80761729+philippfalk@users.noreply.github.com> Date: Fri, 22 May 2026 13:51:07 +0200 Subject: [PATCH 6/9] tweak(licensing): clarify verification error message Drop the somewhat misleading "Internal" prefix for errors during license verification. These errors are usually caused by invalid or non-existing license files, which isn't an "internal" problem and requires user action. --- mgmtd/src/license.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/mgmtd/src/license.rs b/mgmtd/src/license.rs index a292792..0af2b46 100644 --- a/mgmtd/src/license.rs +++ b/mgmtd/src/license.rs @@ -242,9 +242,9 @@ impl LicenseVerifier { Err(err) => Err(anyhow!("Error getting license data: {err}")), }, VerifyResult::VerifyInvalid => Err(anyhow!(message)), - VerifyResult::VerifyError => Err(anyhow!( - "Internal error during certificate verification: {message}" - )), + VerifyResult::VerifyError => { + Err(anyhow!("Error during license verification: {message}")) + } VerifyResult::VerifyUnspecified => Err(anyhow!("Unspecified result.")), } } @@ -302,9 +302,9 @@ impl LicenseVerifier { match result { VerifyResult::VerifyValid => Ok(()), VerifyResult::VerifyInvalid => Err(anyhow!(message)), - VerifyResult::VerifyError => Err(anyhow!( - "Internal error during feature verification: {message}" - )), + VerifyResult::VerifyError => { + Err(anyhow!("Error during feature verification: {message}")) + } VerifyResult::VerifyUnspecified => Err(anyhow!("Unspecified result.")), } } From 23cc154413978376699c5d6d588cf911f31c8fe0 Mon Sep 17 00:00:00 2001 From: Philipp Falk <80761729+philippfalk@users.noreply.github.com> Date: Wed, 27 May 2026 09:39:15 +0200 Subject: [PATCH 7/9] fix(review): address comments --- mgmtd/src/bee_msg.rs | 6 +++--- mgmtd/src/bee_msg/register_node.rs | 3 ++- mgmtd/src/db/config.rs | 2 +- mgmtd/src/lib.rs | 2 +- mgmtd/src/quota.rs | 2 +- mgmtd/src/timer.rs | 10 ++++------ shared/src/conn/msg_dispatch.rs | 30 ++++++++++-------------------- 7 files changed, 22 insertions(+), 33 deletions(-) diff --git a/mgmtd/src/bee_msg.rs b/mgmtd/src/bee_msg.rs index c7cd2bd..4152b0f 100644 --- a/mgmtd/src/bee_msg.rs +++ b/mgmtd/src/bee_msg.rs @@ -92,14 +92,14 @@ pub(crate) async fn dispatch_request(app: &RuntimeApp, mut req: impl Request) -> macro_rules! dispatch_msg { ($({$msg_type:path => $r:tt, $ctx_str:literal})*) => { // Match on the message ID provided by the request - match req.msg_id() { + match req.header().msg_id() { $( <$msg_type>::ID => { let des: $msg_type = req.deserialize_msg().with_context(|| { format!( "{} ({}) from {:?}", stringify!($msg_type), - req.msg_id(), + req.header().msg_id(), req.addr() ) })?; @@ -186,7 +186,7 @@ async fn handle_unspecified_msg(req: impl Request) -> Result<()> { log::warn!( "Unhandled msg INCOMING from {:?} with ID {}", req.addr(), - req.msg_id() + req.header().msg_id() ); // Signal to the caller that the msg is not handled. The generic response diff --git a/mgmtd/src/bee_msg/register_node.rs b/mgmtd/src/bee_msg/register_node.rs index a6782d5..10d7cf5 100644 --- a/mgmtd/src/bee_msg/register_node.rs +++ b/mgmtd/src/bee_msg/register_node.rs @@ -10,7 +10,8 @@ impl HandleWithResponse for RegisterNode { async fn handle(self, app: &impl App, req: &mut impl Request) -> Result { fail_on_pre_shutdown(app)?; - let reject = (req.msg_compat_feature_flags() & COMPATFLAG_CLIENT_SUPPORTS_REGREJ) != 0; + let reject = + (req.header().msg_compat_feature_flags & COMPATFLAG_CLIENT_SUPPORTS_REGREJ) != 0; let node_id = update_node(self, app, reject).await?; diff --git a/mgmtd/src/db/config.rs b/mgmtd/src/db/config.rs index 1a4c5e5..96f8948 100644 --- a/mgmtd/src/db/config.rs +++ b/mgmtd/src/db/config.rs @@ -22,7 +22,7 @@ pub(crate) enum Config { // Config entries that should not be changed after initially set. Note that this only controls the // functions below, the database entries could still be changed by manual query -const IMMUTABLE: &[Config] = &[Config::FsUuid, Config::FsInitDateSecs]; +const IMMUTABLE: &[Config] = &[Config::FsUuid, Config::FsInitDateSecs, Config::TrialSerial]; impl Config { /// The string representation of the config key as it is written to the db diff --git a/mgmtd/src/lib.rs b/mgmtd/src/lib.rs index 15a3915..158e449 100644 --- a/mgmtd/src/lib.rs +++ b/mgmtd/src/lib.rs @@ -136,7 +136,7 @@ pub async fn start(info: StaticInfo, license: LicenseVerifier) -> Result log::warn!( - "Initializing licensing library failed. \ + "Loading and verifying license certificate failed. \ Licensed features will be unavailable: {err}" ), }; diff --git a/mgmtd/src/quota.rs b/mgmtd/src/quota.rs index 3630a21..1b8e8bc 100644 --- a/mgmtd/src/quota.rs +++ b/mgmtd/src/quota.rs @@ -20,7 +20,7 @@ use std::path::Path; /// Fetches quota information for all storage targets and updates the quota usage database pub(crate) async fn fetch_and_update(app: &impl App) -> Result<()> { if app.verify_licensed_feature(LicensedFeature::Quota).is_err() { - log::info!("Quota is enabled but feature not licensed. Skipping quota collection"); + log::warn!("Quota is enabled but feature not licensed. Skipping quota collection"); return Ok(()); } diff --git a/mgmtd/src/timer.rs b/mgmtd/src/timer.rs index 052a479..e56bab4 100644 --- a/mgmtd/src/timer.rs +++ b/mgmtd/src/timer.rs @@ -56,13 +56,11 @@ async fn update_quota(app: RuntimeApp, mut run_state: RunStateHandle) { loop { log::debug!("Running quota update"); - match fetch_and_update(&app).await { - Ok(_) => {} - Err(err) => log::error!("Updating quota failed: {err:#}"), + if let Err(e) = fetch_and_update(&app).await { + log::error!("Updating quota failed: {e:#}"); } - match distribute_exceeded(&app).await { - Ok(_) => {} - Err(err) => log::error!("Distributing exceeded quota failed: {err:#}"), + if let Err(e) = distribute_exceeded(&app).await { + log::error!("Distributing exceeded quota failed: {e:#}"); } tokio::select! { diff --git a/shared/src/conn/msg_dispatch.rs b/shared/src/conn/msg_dispatch.rs index 337e84e..215613e 100644 --- a/shared/src/conn/msg_dispatch.rs +++ b/shared/src/conn/msg_dispatch.rs @@ -25,8 +25,7 @@ pub trait Request: Send + Sync { fn respond(self, msg: &M) -> impl Future> + Send; fn authenticate_connection(&mut self); fn addr(&self) -> SocketAddr; - fn msg_id(&self) -> MsgId; - fn msg_compat_feature_flags(&self) -> u8; + fn header(&self) -> &Header; fn deserialize_msg(&self) -> Result; } @@ -62,12 +61,8 @@ impl Request for StreamRequest<'_> { deserialize_body(self.header, &self.buf[Header::LEN..]) } - fn msg_id(&self) -> MsgId { - self.header.msg_id() - } - - fn msg_compat_feature_flags(&self) -> u8 { - self.header.msg_compat_feature_flags + fn header(&self) -> &Header { + self.header } } @@ -101,21 +96,19 @@ impl Request for SocketRequest<'_> { deserialize_body(self.header, &self.buf[Header::LEN..]) } - fn msg_id(&self) -> MsgId { - self.header.msg_id() - } - - fn msg_compat_feature_flags(&self) -> u8 { - self.header.msg_compat_feature_flags + fn header(&self) -> &Header { + self.header } } pub mod test { use super::*; + use crate::bee_msg::Header; use std::net::{Ipv4Addr, SocketAddrV4}; pub struct TestRequest { pub msg_id: MsgId, + pub header: Header, pub authenticate_connection: bool, } @@ -123,6 +116,7 @@ pub mod test { pub fn new(msg_id: MsgId) -> Self { Self { msg_id, + header: Header::default(), authenticate_connection: false, } } @@ -142,12 +136,8 @@ pub mod test { SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0).into() } - fn msg_id(&self) -> MsgId { - self.msg_id - } - - fn msg_compat_feature_flags(&self) -> u8 { - 0 + fn header(&self) -> &Header { + &self.header } fn deserialize_msg(&self) -> Result { From fd9f05e9e9b8f3b64764f6acb2b599ed1bfe567c Mon Sep 17 00:00:00 2001 From: Philipp Falk <80761729+philippfalk@users.noreply.github.com> Date: Wed, 27 May 2026 13:02:09 +0200 Subject: [PATCH 8/9] fix(review): remove msg_id from TestRequest --- mgmtd/src/bee_msg/authenticate_channel.rs | 3 ++- mgmtd/src/bee_msg/change_target_consistency_states.rs | 5 +++-- mgmtd/src/bee_msg/get_nodes.rs | 3 ++- mgmtd/src/bee_msg/request_exceeded_quota.rs | 3 ++- shared/src/conn/msg_dispatch.rs | 8 +++----- 5 files changed, 12 insertions(+), 10 deletions(-) diff --git a/mgmtd/src/bee_msg/authenticate_channel.rs b/mgmtd/src/bee_msg/authenticate_channel.rs index bfb053a..d6d5789 100644 --- a/mgmtd/src/bee_msg/authenticate_channel.rs +++ b/mgmtd/src/bee_msg/authenticate_channel.rs @@ -27,11 +27,12 @@ impl HandleNoResponse for AuthenticateChannel { mod test { use super::*; use crate::app::test::*; + use shared::bee_msg::Header; #[tokio::test] async fn authenticate_channel() { let app = TestApp::new().await; - let mut req = TestRequest::new(AuthenticateChannel::ID); + let mut req = TestRequest::new(Header::default()); AuthenticateChannel { auth_secret: AuthSecret::hash_from_bytes("secret"), diff --git a/mgmtd/src/bee_msg/change_target_consistency_states.rs b/mgmtd/src/bee_msg/change_target_consistency_states.rs index efe66a4..34c4051 100644 --- a/mgmtd/src/bee_msg/change_target_consistency_states.rs +++ b/mgmtd/src/bee_msg/change_target_consistency_states.rs @@ -95,11 +95,12 @@ doesn't match stored state {old_stored}, no consistency state changes will be ma mod test { use super::*; use crate::app::test::*; + use shared::bee_msg::Header; #[tokio::test] async fn change_target_consistency_states() { let app = TestApp::new().await; - let mut req = TestRequest::new(ChangeTargetConsistencyStates::ID); + let mut req = TestRequest::new(Header::default()); // Prepare times app.db @@ -174,7 +175,7 @@ mod test { #[tokio::test] async fn change_target_consistency_states_old_states() { let app = TestApp::new().await; - let mut req = TestRequest::new(ChangeTargetConsistencyStates::ID); + let mut req = TestRequest::new(Header::default()); // Mismatch of reported old state should not change the consistency states let msg = ChangeTargetConsistencyStates { diff --git a/mgmtd/src/bee_msg/get_nodes.rs b/mgmtd/src/bee_msg/get_nodes.rs index acacf68..a2f9e9e 100644 --- a/mgmtd/src/bee_msg/get_nodes.rs +++ b/mgmtd/src/bee_msg/get_nodes.rs @@ -61,11 +61,12 @@ impl HandleWithResponse for GetNodes { mod test { use super::*; use crate::app::test::*; + use shared::bee_msg::Header; #[tokio::test] async fn get_nodes() { let app = TestApp::new().await; - let mut req = TestRequest::new(GetNodes::ID); + let mut req = TestRequest::new(Header::default()); let resp = GetNodes { node_type: NodeType::Meta, diff --git a/mgmtd/src/bee_msg/request_exceeded_quota.rs b/mgmtd/src/bee_msg/request_exceeded_quota.rs index 50064e3..17ba72f 100644 --- a/mgmtd/src/bee_msg/request_exceeded_quota.rs +++ b/mgmtd/src/bee_msg/request_exceeded_quota.rs @@ -69,11 +69,12 @@ mod test { use super::*; use crate::app::test::*; use crate::bee_msg::HandleWithResponse; + use shared::bee_msg::Header; #[tokio::test] async fn request_exceeded_quota() { let app = TestApp::new().await; - let mut req = TestRequest::new(RequestExceededQuota::ID); + let mut req = TestRequest::new(Header::default()); let tests: &[(_, &[u32])] = &[ ( diff --git a/shared/src/conn/msg_dispatch.rs b/shared/src/conn/msg_dispatch.rs index 215613e..4bf7824 100644 --- a/shared/src/conn/msg_dispatch.rs +++ b/shared/src/conn/msg_dispatch.rs @@ -1,7 +1,7 @@ //! Facilities for dispatching TCP and UDP messages to their message handlers use super::stream::Stream; -use crate::bee_msg::{Header, Msg, MsgId, deserialize_body, serialize}; +use crate::bee_msg::{Header, Msg, deserialize_body, serialize}; use crate::bee_serde::{Deserializable, Serializable}; use anyhow::Result; use std::fmt::Debug; @@ -107,16 +107,14 @@ pub mod test { use std::net::{Ipv4Addr, SocketAddrV4}; pub struct TestRequest { - pub msg_id: MsgId, pub header: Header, pub authenticate_connection: bool, } impl TestRequest { - pub fn new(msg_id: MsgId) -> Self { + pub fn new(header: Header) -> Self { Self { - msg_id, - header: Header::default(), + header, authenticate_connection: false, } } From a5ce0fd464a5a1a3e7dcd2c28b997ab56c3661fa Mon Sep 17 00:00:00 2001 From: Philipp Falk <80761729+philippfalk@users.noreply.github.com> Date: Fri, 29 May 2026 18:12:35 +0200 Subject: [PATCH 9/9] fix(review): address more comments --- mgmtd/assets/beegfs-mgmtd.toml | 5 +---- mgmtd/src/config.rs | 4 +++- mgmtd/src/grpc/get_license.rs | 3 ++- mgmtd/src/lib.rs | 8 ++++++-- mgmtd/src/license.rs | 5 +++-- 5 files changed, 15 insertions(+), 10 deletions(-) diff --git a/mgmtd/assets/beegfs-mgmtd.toml b/mgmtd/assets/beegfs-mgmtd.toml index 6e43c28..8562550 100644 --- a/mgmtd/assets/beegfs-mgmtd.toml +++ b/mgmtd/assets/beegfs-mgmtd.toml @@ -90,9 +90,6 @@ # Defines after which time without contact a client is considered gone and will be removed. # client-auto-remove-timeout = "30m" -# Disables loading the license library. This disables all enterprise features. -# license-disable = false - # The BeeGFS license certificate file. # license-cert-file = "/etc/beegfs/license.pem" @@ -151,7 +148,7 @@ # quota-group-ids-range = "1000-1100" -### Capacity pools ### +### Capacity pools ### # Sets the limits / boundaries of the meta capacity pools. If changed, the whole block must # be uncommented and set. These cannot be lower than the cap-pool-dynamic-meta-limits below. diff --git a/mgmtd/src/config.rs b/mgmtd/src/config.rs index 7c912fe..bf21083 100644 --- a/mgmtd/src/config.rs +++ b/mgmtd/src/config.rs @@ -279,9 +279,11 @@ generate_structs! { /// Disables loading the license library. /// - /// This disables all enterprise features. + /// Deprecated. Loading a license is now mandatory. #[arg(long)] #[arg(num_args = 0..=1, default_missing_value = "true")] + #[arg(hide = true)] + #[serde(skip)] license_disable: bool = false, /// The BeeGFS license certificate file. [default: /etc/beegfs/license.pem] diff --git a/mgmtd/src/grpc/get_license.rs b/mgmtd/src/grpc/get_license.rs index 29d131b..d41b34b 100644 --- a/mgmtd/src/grpc/get_license.rs +++ b/mgmtd/src/grpc/get_license.rs @@ -16,7 +16,7 @@ pub(crate) async fn get_license( let serial = app .load_and_verify_license_cert( &app.static_info().user_config.license_cert_file, - prev_trial_serial, + prev_trial_serial.clone(), ) .await?; @@ -24,6 +24,7 @@ pub(crate) async fn get_license( .get_license_cert_data()? .data .is_some_and(|d| d.r#type() == CertType::Trial) + && prev_trial_serial.is_none() { app.write_tx(|tx| db::config::set(tx, Config::TrialSerial, serial)) .await?; diff --git a/mgmtd/src/lib.rs b/mgmtd/src/lib.rs index 158e449..652165b 100644 --- a/mgmtd/src/lib.rs +++ b/mgmtd/src/lib.rs @@ -122,14 +122,18 @@ pub async fn start(info: StaticInfo, license: LicenseVerifier) -> Result { if license .get_license_cert_data()? .data - .is_some_and(|d| d.r#type == CertType::Trial.into()) + .is_some_and(|d| d.r#type() == CertType::Trial) + && prev_trial_serial.is_none() { db.write_tx(|tx| db::config::set(tx, dbConfig::TrialSerial, serial)) .await?; diff --git a/mgmtd/src/license.rs b/mgmtd/src/license.rs index 0af2b46..4a02d73 100644 --- a/mgmtd/src/license.rs +++ b/mgmtd/src/license.rs @@ -228,11 +228,12 @@ impl LicenseVerifier { Ok(c) => { if c.data.is_some_and(|d| { d.r#type() == CertType::Trial - && prev_trial_serial.is_some_and(|s| serial != s) + && prev_trial_serial.clone().is_some_and(|s| serial != s) }) { library.init_cert_store(); Err(anyhow!( - "System has previously used different trial license." + "Unable to apply trial license {serial}, because system was previously used with trial license {}", + prev_trial_serial.unwrap() )) } else { log::info!("Successfully loaded license certificate: {serial}");