Skip to content

Commit 55b298f

Browse files
jackluo923claude
andcommitted
refactor(api-server): move CompressionUsageParams and validation to Client
Move `CompressionUsageParams`, `default_limit`, and the validation logic from `routes.rs` into `client.rs` so that `Client` owns its input types and validation. Any future caller of `Client` (CLI, library, etc.) gets the same validated interface without depending on the HTTP layer. - Add `ClientError::InvalidInput` for validation errors - `CompressionUsageParams::validate()` replaces standalone function - `HandlerError` maps `InvalidInput` to `BadRequest` in routes.rs - Uses `validate()` method rather than the `TryFrom<Unchecked>` pattern to avoid leaking an internal name into the `OpenAPI` schema Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 6e0b19e commit 55b298f

File tree

3 files changed

+82
-71
lines changed

3 files changed

+82
-71
lines changed

components/api-server/src/client.rs

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use futures::{Stream, StreamExt};
1515
use pin_project_lite::pin_project;
1616
use serde::{Deserialize, Serialize};
1717
use sqlx::Row;
18-
use utoipa::ToSchema;
18+
use utoipa::{IntoParams, ToSchema};
1919

2020
pub use crate::error::ClientError;
2121

@@ -30,6 +30,75 @@ pub const DEFAULT_JOB_STATUSES: &[CompressionJobStatus] = &[
3030
CompressionJobStatus::Killed,
3131
];
3232

33+
/// Query parameters for the compression usage endpoint.
34+
#[derive(Deserialize, IntoParams)]
35+
#[into_params(parameter_in = Query)]
36+
pub struct CompressionUsageParams {
37+
/// Start of usage window (epoch milliseconds, inclusive).
38+
pub begin_timestamp: i64,
39+
/// End of usage window (epoch milliseconds, inclusive).
40+
pub end_timestamp: i64,
41+
/// Job statuses to include as a comma-separated list (e.g.
42+
/// `job_status=succeeded,failed`). Recognized values (case-insensitive):
43+
/// `RUNNING`, `SUCCEEDED`, `FAILED`, `KILLED`.
44+
/// Defaults to `SUCCEEDED,FAILED,KILLED` (all terminal states).
45+
#[serde(default)]
46+
pub job_status: Option<String>,
47+
/// Maximum number of jobs to return. Defaults to 1000.
48+
#[serde(default = "default_limit")]
49+
pub limit: i64,
50+
}
51+
52+
const fn default_limit() -> i64 {
53+
1000
54+
}
55+
56+
impl CompressionUsageParams {
57+
/// Validates the parameters and resolves the requested job statuses into
58+
/// [`CompressionJobStatus`] variants.
59+
///
60+
/// We use a `validate()` method rather than the conventional
61+
/// `TryFrom<UncheckedX>` pattern because this struct also serves as the
62+
/// `OpenAPI` query-parameter schema. Introducing an `Unchecked` wrapper would
63+
/// leak an internal implementation detail into the generated API docs.
64+
///
65+
/// # Errors
66+
///
67+
/// Returns [`ClientError::InvalidInput`] if:
68+
/// - `begin_timestamp > end_timestamp`
69+
/// - `limit <= 0`
70+
/// - `job_status` contains unrecognized or empty values
71+
pub fn validate(&self) -> Result<Vec<CompressionJobStatus>, ClientError> {
72+
if self.begin_timestamp > self.end_timestamp {
73+
return Err(ClientError::InvalidInput(
74+
"begin_timestamp must be <= end_timestamp".to_owned(),
75+
));
76+
}
77+
if self.limit <= 0 {
78+
return Err(ClientError::InvalidInput("limit must be > 0".to_owned()));
79+
}
80+
let statuses = match &self.job_status {
81+
Some(s) => s
82+
.split(',')
83+
.map(str::trim)
84+
.filter(|t| !t.is_empty())
85+
.map(|token| {
86+
token.parse().map_err(|_| {
87+
ClientError::InvalidInput(format!("Unknown job_status: {token}"))
88+
})
89+
})
90+
.collect::<Result<Vec<_>, _>>()?,
91+
None => DEFAULT_JOB_STATUSES.to_vec(),
92+
};
93+
if statuses.is_empty() {
94+
return Err(ClientError::InvalidInput(
95+
"job_status must contain at least one valid status".to_owned(),
96+
));
97+
}
98+
Ok(statuses)
99+
}
100+
}
101+
33102
/// A single row returned by the compression usage query (one row per job).
34103
#[derive(Serialize, sqlx::FromRow, ToSchema)]
35104
pub struct CompressionUsage {

components/api-server/src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ pub enum ClientError {
3131
#[error("Invalid dataset name")]
3232
InvalidDatasetName,
3333

34+
#[error("Invalid input: {0}")]
35+
InvalidInput(String),
36+
3437
#[error("Dataset not found: {0}")]
3538
DatasetNotFound(String),
3639
}

components/api-server/src/routes.rs

Lines changed: 9 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,10 @@ use futures::{Stream, StreamExt};
1313
use serde::{Deserialize, Serialize};
1414
use thiserror::Error;
1515
use tower_http::cors::{Any, CorsLayer};
16-
use utoipa::{IntoParams, OpenApi, ToSchema};
16+
use utoipa::{OpenApi, ToSchema};
1717
use utoipa_axum::{router::OpenApiRouter, routes};
1818

19-
use crate::client::{
20-
Client,
21-
ClientError,
22-
CompressionJobStatus,
23-
CompressionUsage,
24-
DEFAULT_JOB_STATUSES,
25-
QueryConfig,
26-
};
19+
use crate::client::{Client, ClientError, CompressionUsage, CompressionUsageParams, QueryConfig};
2720

2821
/// Factory method to create an Axum router configured with all API routes.
2922
///
@@ -259,62 +252,6 @@ async fn get_timestamp_column_names(
259252
Ok(Json(names))
260253
}
261254

262-
#[derive(Deserialize, IntoParams)]
263-
#[into_params(parameter_in = Query)]
264-
struct CompressionUsageParams {
265-
/// Start of usage window (epoch milliseconds, inclusive).
266-
begin_timestamp: i64,
267-
/// End of usage window (epoch milliseconds, inclusive).
268-
end_timestamp: i64,
269-
/// Job statuses to include as a comma-separated list (e.g.
270-
/// `job_status=succeeded,failed`). Recognized values (case-insensitive):
271-
/// `RUNNING`, `SUCCEEDED`, `FAILED`, `KILLED`.
272-
/// Defaults to `SUCCEEDED,FAILED,KILLED` (all terminal states).
273-
#[serde(default)]
274-
job_status: Option<String>,
275-
/// Maximum number of jobs to return. Defaults to 1000.
276-
#[serde(default = "default_limit")]
277-
limit: i64,
278-
}
279-
280-
const fn default_limit() -> i64 {
281-
1000
282-
}
283-
284-
/// Validates compression usage parameters and resolves the requested job
285-
/// statuses into [`CompressionJobStatus`] variants.
286-
fn validate_compression_usage_params(
287-
params: &CompressionUsageParams,
288-
) -> Result<Vec<CompressionJobStatus>, HandlerError> {
289-
if params.begin_timestamp > params.end_timestamp {
290-
return Err(HandlerError::BadRequest(
291-
"begin_timestamp must be <= end_timestamp".to_owned(),
292-
));
293-
}
294-
if params.limit <= 0 {
295-
return Err(HandlerError::BadRequest("limit must be > 0".to_owned()));
296-
}
297-
let statuses = match &params.job_status {
298-
Some(s) => s
299-
.split(',')
300-
.map(str::trim)
301-
.filter(|t| !t.is_empty())
302-
.map(|token| {
303-
token
304-
.parse()
305-
.map_err(|_| HandlerError::BadRequest(format!("Unknown job_status: {token}")))
306-
})
307-
.collect::<Result<Vec<_>, _>>()?,
308-
None => DEFAULT_JOB_STATUSES.to_vec(),
309-
};
310-
if statuses.is_empty() {
311-
return Err(HandlerError::BadRequest(
312-
"job_status must contain at least one valid status".to_owned(),
313-
));
314-
}
315-
Ok(statuses)
316-
}
317-
318255
#[utoipa::path(
319256
get,
320257
path = "/usage/compression",
@@ -332,7 +269,7 @@ async fn compression_usage(
332269
State(client): State<Client>,
333270
Query(params): Query<CompressionUsageParams>,
334271
) -> Result<Json<Vec<CompressionUsage>>, HandlerError> {
335-
let job_statuses = validate_compression_usage_params(&params)?;
272+
let job_statuses = params.validate()?;
336273
tracing::info!(
337274
"Fetching compression usage: begin={}, end={}, job_statuses={:?}",
338275
params.begin_timestamp,
@@ -375,7 +312,9 @@ impl From<ClientError> for HandlerError {
375312
fn from(err: ClientError) -> Self {
376313
match err {
377314
ClientError::SearchJobNotFound(_) | ClientError::DatasetNotFound(_) => Self::NotFound,
378-
ClientError::InvalidDatasetName => Self::BadRequest(format!("{err}")),
315+
ClientError::InvalidDatasetName | ClientError::InvalidInput(_) => {
316+
Self::BadRequest(format!("{err}"))
317+
}
379318
_ => Self::InternalServer,
380319
}
381320
}
@@ -404,14 +343,14 @@ mod tests {
404343

405344
use super::*;
406345

407-
/// Builds a minimal Axum app that calls [`validate_compression_usage_params`]
408-
/// (the shared production validation function) and returns the resolved
346+
/// Builds a minimal Axum app that calls [`CompressionUsageParams::validate`]
347+
/// (the shared production validation method) and returns the resolved
409348
/// status integer codes on success. No real database is needed.
410349
fn test_app() -> axum::Router {
411350
axum::Router::new().route(
412351
"/usage/compression",
413352
get(|Query(params): Query<CompressionUsageParams>| async move {
414-
let statuses = validate_compression_usage_params(&params)?;
353+
let statuses = params.validate()?;
415354
let codes: Vec<i32> = statuses.into_iter().map(i32::from).collect();
416355
Ok::<_, HandlerError>(axum::Json(codes))
417356
}),

0 commit comments

Comments
 (0)