From ba5088ac212c8b2b926a4a5cf31818ae962b218e Mon Sep 17 00:00:00 2001 From: Rokurolize <1701388+Rokurolize@users.noreply.github.com> Date: Sat, 20 Jun 2026 03:36:05 +0900 Subject: [PATCH] Add post-commit text block cleanup --- deepwell/src/api.rs | 23 ++++- deepwell/src/services/context.rs | 94 ++++++++++++++++++++- deepwell/src/services/mod.rs | 4 +- deepwell/src/services/text_block/service.rs | 45 +++++++--- deepwell/tests/text_block.rs | 87 +++++++++++++++++++ 5 files changed, 235 insertions(+), 18 deletions(-) create mode 100644 deepwell/tests/text_block.rs diff --git a/deepwell/src/api.rs b/deepwell/src/api.rs index 3595f76d77..0452bfe8b4 100644 --- a/deepwell/src/api.rs +++ b/deepwell/src/api.rs @@ -33,7 +33,9 @@ use crate::locales::Localizations; use crate::middleware::{RequestContextHeaders, RequestContextLayer}; use crate::services::blob::MimeAnalyzer; use crate::services::job::JobWorker; -use crate::services::{RequestContext, ServiceContext, SessionService}; +use crate::services::{ + PostCommitActions, RequestContext, ServiceContext, SessionService, +}; use crate::utils::debug_pointer; use crate::{database, info, redis as redis_db}; use jsonrpsee::server::{RpcModule, Server, ServerHandle}; @@ -222,11 +224,15 @@ async fn build_module(app_state: ServerState) -> Result> // // At this level, we take the database-or-RPC error and make it just an RPC error. let db_state = Arc::clone(&state); - db_state + let post_commit_actions = PostCommitActions::default(); + let transaction_actions = post_commit_actions.clone(); + let result = db_state .database .transaction(move |txn| { + let post_commit_actions = transaction_actions.clone(); Box::pin(async move { - let ctx = ServiceContext::new(&state, &txn); + let ctx = ServiceContext::new(&state, &txn) + .with_post_commit_actions(post_commit_actions); let make_error = || Error::new( format!("method '{}' failed", $name), ErrorType::Request, @@ -249,7 +255,16 @@ async fn build_module(app_state: ServerState) -> Result> .await .map_err(unwrap_transaction_error) .inspect_err(|error| error!("JSONRPC method {} failed: {}", $name, error)) - .map_err(exn_error_to_rpc_error) + .map_err(exn_error_to_rpc_error); + + // The transaction helper rolls back whenever the endpoint returns Err. + // In that path, queued cleanup must stay inert: deleting S3 objects + // would make the rolled-back database state point at missing blobs. + if result.is_ok() { + post_commit_actions.run_after_commit(&db_state).await; + } + + result }) .or_raise(|| Error::new( format!("failed to register JSONRPC method '{}'", $name), diff --git a/deepwell/src/services/context.rs b/deepwell/src/services/context.rs index 619c1d5762..8d4801dcb6 100644 --- a/deepwell/src/services/context.rs +++ b/deepwell/src/services/context.rs @@ -31,9 +31,83 @@ use rsmq_async::Rsmq; use s3::bucket::Bucket; use sea_orm::DatabaseTransaction; use std::collections::HashSet; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use tokio::sync::OnceCell; +#[derive(Debug)] +enum PostCommitAction { + DeleteTextBlockObjects(Vec), +} + +/// Request-scoped work that must only run after the database transaction commits. +/// +/// These actions protect transaction consistency: database rollback must not +/// delete external objects that still describe committed rows. The queue is +/// intentionally best-effort and in-memory; guaranteed reclamation across +/// process crashes or persistent S3 failures requires a separate durable +/// cleanup outbox. +#[derive(Debug, Clone, Default)] +pub struct PostCommitActions { + actions: Arc>>, +} + +impl PostCommitActions { + pub fn delete_text_block_objects(&self, filenames: I) + where + I: IntoIterator, + { + let filenames: Vec = filenames + .into_iter() + .filter(|filename| !filename.is_empty()) + .collect(); + if filenames.is_empty() { + return; + } + + self.actions + .lock() + .expect("post-commit actions mutex poisoned") + .push(PostCommitAction::DeleteTextBlockObjects(filenames)); + } + + pub fn pending_count(&self) -> usize { + self.actions + .lock() + .expect("post-commit actions mutex poisoned") + .len() + } + + pub async fn run_after_commit(&self, state: &ServerState) { + let actions = { + let mut actions = self + .actions + .lock() + .expect("post-commit actions mutex poisoned"); + std::mem::take(&mut *actions) + }; + + for action in actions { + match action { + PostCommitAction::DeleteTextBlockObjects(filenames) => { + delete_text_block_objects_after_commit(state, filenames).await; + } + } + } + } +} + +async fn delete_text_block_objects_after_commit( + state: &ServerState, + filenames: Vec, +) { + let bucket = &state.s3_tblocks_bucket; + for filename in filenames { + if let Err(error) = bucket.delete_object(&filename).await { + warn!("Failed to delete committed stale S3 text block {filename}: {error}"); + } + } +} + /// Per-request context derived from HTTP headers by the middleware layer. #[derive(Debug, Clone, Default)] pub struct RequestContext { @@ -84,6 +158,7 @@ pub struct ServiceContext<'txn> { state: ServerState, transaction: &'txn DatabaseTransaction, request_ctx: RequestContext, + post_commit_actions: PostCommitActions, user_permissions: OnceCell>>, } @@ -97,6 +172,7 @@ impl<'txn> ServiceContext<'txn> { state: Arc::clone(state), transaction, request_ctx: RequestContext::default(), + post_commit_actions: PostCommitActions::default(), user_permissions: OnceCell::new(), } } @@ -109,6 +185,17 @@ impl<'txn> ServiceContext<'txn> { } } + #[inline] + pub fn with_post_commit_actions( + self, + post_commit_actions: PostCommitActions, + ) -> Self { + Self { + post_commit_actions, + ..self + } + } + #[inline] /// Internal method to update the request context, for use in testing only. pub fn set_request_for_test(&mut self, request_ctx: RequestContext) { @@ -164,6 +251,11 @@ impl<'txn> ServiceContext<'txn> { self.transaction } + #[inline] + pub fn post_commit_actions(&self) -> &PostCommitActions { + &self.post_commit_actions + } + #[inline] pub fn request(&self) -> &RequestContext { &self.request_ctx diff --git a/deepwell/src/services/mod.rs b/deepwell/src/services/mod.rs index a43f80f1e7..8675b30758 100644 --- a/deepwell/src/services/mod.rs +++ b/deepwell/src/services/mod.rs @@ -37,7 +37,7 @@ //! services or by route implementations found in the `methods` module. mod prelude { - pub use super::context::{RequestContext, ServiceContext}; + pub use super::context::{PostCommitActions, RequestContext, ServiceContext}; pub use crate::config::Config; pub use crate::error::prelude::*; pub use crate::types::{Maybe, Reference}; @@ -110,7 +110,7 @@ pub use self::blob::BlobService; pub use self::blueprint::BlueprintPageService; pub use self::caddy::CaddyService; pub use self::category::CategoryService; -pub use self::context::{RequestContext, ServiceContext}; +pub use self::context::{PostCommitActions, RequestContext, ServiceContext}; pub use self::domain::DomainService; pub use self::file::FileService; pub use self::file_revision::FileRevisionService; diff --git a/deepwell/src/services/text_block/service.rs b/deepwell/src/services/text_block/service.rs index 8985d80570..f7ccb678ce 100644 --- a/deepwell/src/services/text_block/service.rs +++ b/deepwell/src/services/text_block/service.rs @@ -144,10 +144,13 @@ impl TextBlockService { // If there are more or the same number of blocks now, // then this will do nothing. + let mut stale_filenames = Vec::new(); for index in max_index..=prev_max_index { - let filename = filename!(index); - debug!("Deleting now-out-of-range S3 text block {filename}"); - bucket.delete_object(filename).await.or_raise(make_error)?; + let filename = filename!(index).to_owned(); + debug!( + "Queueing now-out-of-range S3 text block {filename} for post-commit cleanup" + ); + stale_filenames.push(filename); } // Upload the new text blocks to S3. @@ -216,6 +219,16 @@ impl TextBlockService { "Deleted row count do not match previous maximum block index", ); + if !stale_filenames.is_empty() { + debug!( + "Queueing {} replaced text block S3 objects for page ID {} after transaction commit", + stale_filenames.len(), + page_id, + ); + ctx.post_commit_actions() + .delete_text_block_objects(stale_filenames); + } + // Finally, insert the batch of new text block rows, then return. if !models.is_empty() { TextBlockTable::insert_many(models) @@ -320,8 +333,8 @@ impl TextBlockService { /// becomes unnecessary. pub async fn delete_blocks(ctx: &ServiceContext<'_>, page_id: i64) -> Result<()> { let txn = ctx.transaction(); - let bucket = ctx.s3_tblocks_bucket(); let mut buffer = String::new(); + let mut filenames = Vec::new(); let make_error = || { Error::new( @@ -330,8 +343,8 @@ impl TextBlockService { ) }; - // For each kind of text block type, find out how many - // blocks exist and then delete the objects in S3. + // For each kind of text block type, find out how many blocks exist + // and queue their S3 objects for post-commit deletion. for block_type in TextBlockType::iter() { macro_rules! filename { ($index:expr) => {{ @@ -345,20 +358,30 @@ impl TextBlockService { .or_raise(make_error)?; for index in 1..=max_index { - let filename = filename!(index); - debug!("Deleting text block {filename}"); - bucket.delete_object(filename).await.or_raise(make_error)?; + let filename = filename!(index).to_owned(); + debug!("Queueing text block {filename} for post-commit cleanup"); + filenames.push(filename); } } - // Now that S3 is cleared out, we can delete all the - // database rows in one sweep. + // Delete the database rows in one sweep. The S3 objects are queued for + // deletion only after the outer transaction commits. TextBlockTable::delete_many() .filter(text_block::Column::PageId.eq(page_id)) .exec(txn) .await .or_raise(make_error)?; + if !filenames.is_empty() { + debug!( + "Queueing {} deleted text block S3 objects for page ID {} after transaction commit", + filenames.len(), + page_id, + ); + ctx.post_commit_actions() + .delete_text_block_objects(filenames); + } + Ok(()) } } diff --git a/deepwell/tests/text_block.rs b/deepwell/tests/text_block.rs new file mode 100644 index 0000000000..ec9bd6287c --- /dev/null +++ b/deepwell/tests/text_block.rs @@ -0,0 +1,87 @@ +/* + * tests/text_block.rs + * + * DEEPWELL - Wikijump API provider and database manager + * Copyright (C) 2019-2026 Wikijump Team + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#[allow(unused_imports)] +mod common; + +use self::common::TestRunner; +use deepwell::services::PostCommitActions; +use uuid::Uuid; + +#[tokio::test] +async fn post_commit_text_block_cleanup_deletes_only_after_drain() { + let runner = TestRunner::setup().await; + let bucket = runner.context().s3_tblocks_bucket(); + let filename = format!("test-post-commit-text-block-{}", Uuid::new_v4()); + + bucket + .put_object_with_content_type(&filename, b"stale text block", "text/plain") + .await + .expect("test text-block object should upload"); + assert_object_status(bucket, &filename, 200).await; + + let actions = PostCommitActions::default(); + actions.delete_text_block_objects([filename.clone()]); + assert_eq!(actions.pending_count(), 1); + + actions.run_after_commit(runner.state()).await; + + assert_eq!(actions.pending_count(), 0); + assert_object_status(bucket, &filename, 404).await; +} + +#[tokio::test] +async fn queued_text_block_cleanup_preserves_objects_when_not_drained() { + let runner = TestRunner::setup().await; + let bucket = runner.context().s3_tblocks_bucket(); + let filename = format!("test-rollback-text-block-{}", Uuid::new_v4()); + + bucket + .put_object_with_content_type(&filename, b"rollback text block", "text/plain") + .await + .expect("test text-block object should upload"); + assert_object_status(bucket, &filename, 200).await; + + let actions = PostCommitActions::default(); + actions.delete_text_block_objects([filename.clone()]); + assert_eq!(actions.pending_count(), 1); + + assert_object_status(bucket, &filename, 200).await; + + bucket + .delete_object(&filename) + .await + .expect("rollback preservation test object should clean up"); +} + +async fn assert_object_status( + bucket: &s3::bucket::Bucket, + filename: &str, + expected: u16, +) { + let (_head, status) = bucket + .head_object(filename) + .await + .expect("test text-block object HEAD should complete"); + assert_eq!( + status, expected, + "unexpected S3 status for text-block object {filename}", + ); +}