Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions deepwell/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ use crate::locales::Localizations;
use crate::middleware::{RequestContextHeaders, RequestContextLayer};
use crate::services::blob::MimeAnalyzer;
use crate::services::job::JobWorker;
use crate::services::{RequestContext, ServiceContext, SessionService};
use crate::services::{
PostCommitActions, RequestContext, ServiceContext, SessionService,
};
use crate::utils::debug_pointer;
use crate::{database, info, redis as redis_db};
use jsonrpsee::server::{RpcModule, Server, ServerHandle};
Expand Down Expand Up @@ -222,11 +224,15 @@ async fn build_module(app_state: ServerState) -> Result<RpcModule<ServerState>>
//
// At this level, we take the database-or-RPC error and make it just an RPC error.
let db_state = Arc::clone(&state);
db_state
let post_commit_actions = PostCommitActions::default();
let transaction_actions = post_commit_actions.clone();
let result = db_state
.database
.transaction(move |txn| {
let post_commit_actions = transaction_actions.clone();
Box::pin(async move {
let ctx = ServiceContext::new(&state, &txn);
let ctx = ServiceContext::new(&state, &txn)
.with_post_commit_actions(post_commit_actions);
let make_error = || Error::new(
format!("method '{}' failed", $name),
ErrorType::Request,
Expand All @@ -249,7 +255,16 @@ async fn build_module(app_state: ServerState) -> Result<RpcModule<ServerState>>
.await
.map_err(unwrap_transaction_error)
.inspect_err(|error| error!("JSONRPC method {} failed: {}", $name, error))
.map_err(exn_error_to_rpc_error)
.map_err(exn_error_to_rpc_error);

// The transaction helper rolls back whenever the endpoint returns Err.
// In that path, queued cleanup must stay inert: deleting S3 objects
// would make the rolled-back database state point at missing blobs.
if result.is_ok() {
post_commit_actions.run_after_commit(&db_state).await;
}
Comment on lines +263 to +265

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀 Performance & Scalability | 🔵 Trivial | ⚡ Quick win

Post-commit deletes run inline before returning the response.

run_after_commit awaits each S3 delete_object sequentially on the request task, so the client response is held until cleanup finishes (and slow/failing S3 inflates latency). Consider spawning the drain on a background task (tokio::spawn) so cleanup doesn't block the response path.


result
})
.or_raise(|| Error::new(
format!("failed to register JSONRPC method '{}'", $name),
Expand Down
94 changes: 93 additions & 1 deletion deepwell/src/services/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,83 @@ use rsmq_async::Rsmq;
use s3::bucket::Bucket;
use sea_orm::DatabaseTransaction;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use tokio::sync::OnceCell;

#[derive(Debug)]
enum PostCommitAction {
DeleteTextBlockObjects(Vec<String>),
}

/// Request-scoped work that must only run after the database transaction commits.
///
/// These actions protect transaction consistency: database rollback must not
/// delete external objects that still describe committed rows. The queue is
/// intentionally best-effort and in-memory; guaranteed reclamation across
/// process crashes or persistent S3 failures requires a separate durable
/// cleanup outbox.
#[derive(Debug, Clone, Default)]
pub struct PostCommitActions {
actions: Arc<Mutex<Vec<PostCommitAction>>>,
}

impl PostCommitActions {
pub fn delete_text_block_objects<I>(&self, filenames: I)
where
I: IntoIterator<Item = String>,
{
let filenames: Vec<String> = filenames
.into_iter()
.filter(|filename| !filename.is_empty())
.collect();
if filenames.is_empty() {
return;
}

self.actions
.lock()
.expect("post-commit actions mutex poisoned")
.push(PostCommitAction::DeleteTextBlockObjects(filenames));
}

pub fn pending_count(&self) -> usize {
self.actions
.lock()
.expect("post-commit actions mutex poisoned")
.len()
}

pub async fn run_after_commit(&self, state: &ServerState) {
let actions = {
let mut actions = self
.actions
.lock()
.expect("post-commit actions mutex poisoned");
std::mem::take(&mut *actions)
};

for action in actions {
match action {
PostCommitAction::DeleteTextBlockObjects(filenames) => {
delete_text_block_objects_after_commit(state, filenames).await;
}
}
}
}
}

async fn delete_text_block_objects_after_commit(
state: &ServerState,
filenames: Vec<String>,
) {
let bucket = &state.s3_tblocks_bucket;
for filename in filenames {
if let Err(error) = bucket.delete_object(&filename).await {
warn!("Failed to delete committed stale S3 text block {filename}: {error}");
}
}
}
Comment on lines +80 to +109

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🔵 Trivial

Make post-commit cleanup durable if S3 reclamation must be guaranteed.

run_after_commit drains the in-memory queue before awaiting S3 deletes, and delete_text_block_objects_after_commit only logs failures. A process crash, cancellation, or persistent S3 error after DB commit drops the cleanup with no retry, leaving stale objects behind. Consider a durable outbox/cleanup table or background retry path for failed deletes.


/// Per-request context derived from HTTP headers by the middleware layer.
#[derive(Debug, Clone, Default)]
pub struct RequestContext {
Expand Down Expand Up @@ -84,6 +158,7 @@ pub struct ServiceContext<'txn> {
state: ServerState,
transaction: &'txn DatabaseTransaction,
request_ctx: RequestContext,
post_commit_actions: PostCommitActions,
user_permissions: OnceCell<HashSet<Permission<'static>>>,
}

Expand All @@ -97,6 +172,7 @@ impl<'txn> ServiceContext<'txn> {
state: Arc::clone(state),
transaction,
request_ctx: RequestContext::default(),
post_commit_actions: PostCommitActions::default(),
user_permissions: OnceCell::new(),
}
}
Expand All @@ -109,6 +185,17 @@ impl<'txn> ServiceContext<'txn> {
}
}

#[inline]
pub fn with_post_commit_actions(
self,
post_commit_actions: PostCommitActions,
) -> Self {
Self {
post_commit_actions,
..self
}
}

#[inline]
/// Internal method to update the request context, for use in testing only.
pub fn set_request_for_test(&mut self, request_ctx: RequestContext) {
Expand Down Expand Up @@ -164,6 +251,11 @@ impl<'txn> ServiceContext<'txn> {
self.transaction
}

#[inline]
pub fn post_commit_actions(&self) -> &PostCommitActions {
&self.post_commit_actions
}

#[inline]
pub fn request(&self) -> &RequestContext {
&self.request_ctx
Expand Down
4 changes: 2 additions & 2 deletions deepwell/src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
//! services or by route implementations found in the `methods` module.

mod prelude {
pub use super::context::{RequestContext, ServiceContext};
pub use super::context::{PostCommitActions, RequestContext, ServiceContext};
pub use crate::config::Config;
pub use crate::error::prelude::*;
pub use crate::types::{Maybe, Reference};
Expand Down Expand Up @@ -110,7 +110,7 @@ pub use self::blob::BlobService;
pub use self::blueprint::BlueprintPageService;
pub use self::caddy::CaddyService;
pub use self::category::CategoryService;
pub use self::context::{RequestContext, ServiceContext};
pub use self::context::{PostCommitActions, RequestContext, ServiceContext};
pub use self::domain::DomainService;
pub use self::file::FileService;
pub use self::file_revision::FileRevisionService;
Expand Down
45 changes: 34 additions & 11 deletions deepwell/src/services/text_block/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,13 @@ impl TextBlockService {
// If there are more or the same number of blocks now,
// then this will do nothing.

let mut stale_filenames = Vec::new();
for index in max_index..=prev_max_index {
let filename = filename!(index);
debug!("Deleting now-out-of-range S3 text block {filename}");
bucket.delete_object(filename).await.or_raise(make_error)?;
let filename = filename!(index).to_owned();
debug!(
"Queueing now-out-of-range S3 text block {filename} for post-commit cleanup"
);
stale_filenames.push(filename);
}

// Upload the new text blocks to S3.
Expand Down Expand Up @@ -216,6 +219,16 @@ impl TextBlockService {
"Deleted row count do not match previous maximum block index",
);

if !stale_filenames.is_empty() {
debug!(
"Queueing {} replaced text block S3 objects for page ID {} after transaction commit",
stale_filenames.len(),
page_id,
);
ctx.post_commit_actions()
.delete_text_block_objects(stale_filenames);
}

// Finally, insert the batch of new text block rows, then return.
if !models.is_empty() {
TextBlockTable::insert_many(models)
Expand Down Expand Up @@ -320,8 +333,8 @@ impl TextBlockService {
/// becomes unnecessary.
pub async fn delete_blocks(ctx: &ServiceContext<'_>, page_id: i64) -> Result<()> {
let txn = ctx.transaction();
let bucket = ctx.s3_tblocks_bucket();
let mut buffer = String::new();
let mut filenames = Vec::new();

let make_error = || {
Error::new(
Expand All @@ -330,8 +343,8 @@ impl TextBlockService {
)
};

// For each kind of text block type, find out how many
// blocks exist and then delete the objects in S3.
// For each kind of text block type, find out how many blocks exist
// and queue their S3 objects for post-commit deletion.
for block_type in TextBlockType::iter() {
macro_rules! filename {
($index:expr) => {{
Expand All @@ -345,20 +358,30 @@ impl TextBlockService {
.or_raise(make_error)?;

for index in 1..=max_index {
let filename = filename!(index);
debug!("Deleting text block {filename}");
bucket.delete_object(filename).await.or_raise(make_error)?;
let filename = filename!(index).to_owned();
debug!("Queueing text block {filename} for post-commit cleanup");
filenames.push(filename);
}
}

// Now that S3 is cleared out, we can delete all the
// database rows in one sweep.
// Delete the database rows in one sweep. The S3 objects are queued for
// deletion only after the outer transaction commits.
TextBlockTable::delete_many()
.filter(text_block::Column::PageId.eq(page_id))
.exec(txn)
.await
.or_raise(make_error)?;

if !filenames.is_empty() {
debug!(
"Queueing {} deleted text block S3 objects for page ID {} after transaction commit",
filenames.len(),
page_id,
);
ctx.post_commit_actions()
.delete_text_block_objects(filenames);
}

Ok(())
}
}
Expand Down
87 changes: 87 additions & 0 deletions deepwell/tests/text_block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* tests/text_block.rs
*
* DEEPWELL - Wikijump API provider and database manager
* Copyright (C) 2019-2026 Wikijump Team
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

#[allow(unused_imports)]
mod common;

use self::common::TestRunner;
use deepwell::services::PostCommitActions;
use uuid::Uuid;

#[tokio::test]
async fn post_commit_text_block_cleanup_deletes_only_after_drain() {
let runner = TestRunner::setup().await;
let bucket = runner.context().s3_tblocks_bucket();
let filename = format!("test-post-commit-text-block-{}", Uuid::new_v4());

bucket
.put_object_with_content_type(&filename, b"stale text block", "text/plain")
.await
.expect("test text-block object should upload");
assert_object_status(bucket, &filename, 200).await;

let actions = PostCommitActions::default();
actions.delete_text_block_objects([filename.clone()]);
assert_eq!(actions.pending_count(), 1);

actions.run_after_commit(runner.state()).await;

assert_eq!(actions.pending_count(), 0);
assert_object_status(bucket, &filename, 404).await;
}

#[tokio::test]
async fn queued_text_block_cleanup_preserves_objects_when_not_drained() {
let runner = TestRunner::setup().await;
let bucket = runner.context().s3_tblocks_bucket();
let filename = format!("test-rollback-text-block-{}", Uuid::new_v4());

bucket
.put_object_with_content_type(&filename, b"rollback text block", "text/plain")
.await
.expect("test text-block object should upload");
assert_object_status(bucket, &filename, 200).await;

let actions = PostCommitActions::default();
actions.delete_text_block_objects([filename.clone()]);
assert_eq!(actions.pending_count(), 1);

assert_object_status(bucket, &filename, 200).await;

bucket
.delete_object(&filename)
.await
.expect("rollback preservation test object should clean up");
}
Comment on lines +50 to +72

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📐 Maintainability & Code Quality | 🔵 Trivial | 🏗️ Heavy lift

Exercise a real rollback instead of only skipping the drain.

This proves queued actions are inert until run_after_commit, but it would still pass if the JSON-RPC/transaction wrapper accidentally drained after a failed transaction. Add coverage that queues cleanup inside a transaction or RPC call that returns Err, then verifies the object remains after rollback.


async fn assert_object_status(
bucket: &s3::bucket::Bucket,
filename: &str,
expected: u16,
) {
let (_head, status) = bucket
.head_object(filename)
.await
.expect("test text-block object HEAD should complete");
assert_eq!(
status, expected,
"unexpected S3 status for text-block object {filename}",
);
}
Comment on lines +74 to +87

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🩺 Stability & Availability | 🟡 Minor

🧩 Analysis chain

🌐 Web query:

rust-s3 0.37 Bucket head_object return value for 404 not found object error or status code

💡 Result:

In the rust-s3 crate (version 0.37 and later), the bucket.head_object method returns a Result<(HeadObjectResult, u16), S3Error> [1]. When an object is not found, the S3 service returns a 404 status code, which the crate treats as an error, typically surfacing it as an S3Error::HttpFailWithBody(404, ...) or similar error variant depending on your specific configuration [2][3]. If you specifically need to check for the existence of an object without handling raw S3 errors, it is recommended to use the bucket.object_exists(path) method instead of head_object [3]. This method is designed to return Ok(false) specifically when the server returns a 404 status code, and Ok(true) otherwise [4][3]. Summary of approaches: 1. head_object: Returns an error when the object is missing (e.g., 404 status code) [2][1]. You must catch and inspect the error to identify the 404 status. 2. object_exists: A higher-level helper that abstracts the 404 check, returning Result<bool, S3Error> where false indicates the object was not found [4][3].

Citations:


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect the relevant test and the rust-s3 dependency version used by the repo.
git ls-files deepwell/tests/text_block.rs Cargo.toml Cargo.lock | sed -n '1,20p'

printf '\n== deepwell/tests/text_block.rs ==\n'
cat -n deepwell/tests/text_block.rs | sed -n '1,180p'

printf '\n== rust-s3 references ==\n'
rg -n 'rust-s3|s3 =' Cargo.toml Cargo.lock deepwell -g 'Cargo.toml' -g 'Cargo.lock' -g '*.toml' -g '*.lock' -g '*.rs'

Repository: Rokurolize/wikijump

Length of output: 4148


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect the relevant test and the rust-s3 dependency version used by the repo.
git ls-files deepwell/tests/text_block.rs Cargo.toml Cargo.lock | sed -n '1,20p'

printf '\n== deepwell/tests/text_block.rs ==\n'
cat -n deepwell/tests/text_block.rs | sed -n '1,180p'

printf '\n== rust-s3 references ==\n'
rg -n 'rust-s3|s3 =' Cargo.toml Cargo.lock deepwell -g 'Cargo.toml' -g 'Cargo.lock' -g '*.toml' -g '*.lock' -g '*.rs'

Repository: Rokurolize/wikijump

Length of output: 4148


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect the relevant test and the rust-s3 dependency version used by the repo.
git ls-files deepwell/tests/text_block.rs Cargo.toml Cargo.lock | sed -n '1,20p'

printf '\n== deepwell/tests/text_block.rs ==\n'
cat -n deepwell/tests/text_block.rs | sed -n '1,180p'

printf '\n== rust-s3 references ==\n'
rg -n 'rust-s3|s3 =' Cargo.toml Cargo.lock deepwell -g 'Cargo.toml' -g 'Cargo.lock' -g '*.toml' -g '*.lock' -g '*.rs'

Repository: Rokurolize/wikijump

Length of output: 4148


🏁 Script executed:

#!/bin/bash
set -euo pipefail

git ls-files deepwell/tests/text_block.rs Cargo.toml Cargo.lock | sed -n '1,20p'

printf '\n== deepwell/tests/text_block.rs ==\n'
cat -n deepwell/tests/text_block.rs | sed -n '1,180p'

printf '\n== rust-s3 references ==\n'
rg -n 'rust-s3|s3 =' Cargo.toml Cargo.lock deepwell -g 'Cargo.toml' -g 'Cargo.lock' -g '*.toml' -g '*.lock' -g '*.rs'

Repository: Rokurolize/wikijump

Length of output: 4148


Handle the 404 case explicitly. rust-s3 0.37 returns Err for a missing object on head_object, so .expect(...) will panic before the 404 assertion. Match the error or use bucket.object_exists(...) for the post-delete check.

Loading