Conversation
- Rename task acquire/create/publish events to block-level variants - Update server auth checks to use immutable API borrows
- Change CreateTaskBlock/PublishTaskBlock to work on TaskBlock payloads - Add LEASED task state and move queued/solved access to sled helpers - Use direct DB helpers for queued/solved deletes, scans, and inserts
|
Warning Review limit reached
More reviews will be available in 42 minutes and 39 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
📝 WalkthroughWalkthroughThis PR refactors the task system from single-task operations to block-based operations. Proto definitions, data structures, and server handlers were updated to use concurrent DashMap-backed task queues and async channels. Single-instance task events were replaced with block variants supporting multiple instances per operation. ChangesTask Block Operations Refactor
🎯 4 (Complex) | ⏱️ ~60 minutes
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Greptile SummaryThis PR migrates the engine's task lifecycle from single-task operations to block-based processing, replacing
Confidence Score: 3/5The core acquire path has a timing gap between block delivery and lease registration that can result in the same task being dispatched to two workers simultaneously under concurrent load. The fill_locks mechanism closes the double-fill race called out in a prior review, but a narrower window remains: after recv() returns a block and before the next .await completes to register leases, a concurrent acquirer can see the channel as empty, snapshot an empty leased set, find the same tasks in sled, and re-enqueue them. Both workers then process identical task IDs. Additionally, the previously-flagged issues with lost in-flight leases on restart and ERROR-only logging in release builds remain unresolved. engine/src/bin/server.rs (aquire_task_block lease registration gap) and enginelib/src/api.rs (init_db and release log level). Important Files Changed
Sequence DiagramsequenceDiagram
participant C as Client Worker
participant S as gRPC Server
participant CH as async_channel
participant LT as LeasedTaskQueue
participant SL as Sled DB
C->>S: AquireTaskBlock(task_id, block_size)
S->>S: CheckAuth
S->>CH: is_empty()?
alt channel empty
S->>S: acquire fill_lock
S->>SL: scan_prefix(tasks:ns:task:)
SL-->>S: StoredTasks (excl. leased)
S->>CH: try_send(StoredTaskBlock)
S->>S: release fill_lock
end
S->>CH: recv().await
CH-->>S: StoredTaskBlock
Note over S,LT: Race window: another worker can see<br/>empty channel before leases registered
S->>LT: push LeasedTask(id, uid, now)
S-->>C: "TaskBlock{tasks}"
C->>S: PublishTaskBlock(TaskBlock)
S->>S: CheckAuth
loop per task
S->>LT: find and remove lease(id, uid)
S->>SL: put_solved(key, StoredTask)
S->>SL: delete_queued(key, id)
end
S-->>C: "Empty{}"
Note over S,SL: Lease GC (periodic)
S->>LT: "retain leases where age < 1h"
Note over LT,SL: Expired tasks remain in sled<br/>re-delivered on next fill_queue
Reviews (2): Last reviewed commit: "Prevent duplicate task refills and stale..." | Re-trigger Greptile |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
engine/src/bin/client.rs (1)
65-69:⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy liftMigrate the worker loop off the removed single-task RPCs.
TaskRequest,aquire_task, andpublish_taskwere removed by the block-based proto refactor, so this path neither builds nor matches the server contract anymore. The loop needs to acquire and publishTaskBlocks, then process the tasks inside each block.Also applies to: 156-156
engine/proto/engine.proto (1)
9-32:⚠️ Potential issue | 🟠 Major | ⚡ Quick win
TaskBlockSelectoris unusable in its current shape.It only nests more
TaskBlockSelectormessages, so clients cannot express any concrete task selection with it.DeleteTaskBlockstill takesTaskSelector, which leaves the new block selector disconnected from the RPC surface.Proposed schema fix
- rpc DeleteTaskBlock(TaskSelector) returns (empty); + rpc DeleteTaskBlock(TaskBlockSelector) returns (empty); ... message TaskBlockSelector { - repeated TaskBlockSelector selector = 1; + repeated TaskSelector selectors = 1; }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@engine/proto/engine.proto` around lines 9 - 32, TaskBlockSelector currently only nests itself and is unusable; replace it with a concrete selector (e.g., include a TaskSelector field and block identifier(s)) and wire the DeleteTaskBlock RPC to accept TaskBlockSelector instead of TaskSelector. Concretely: modify message TaskBlockSelector to reference TaskSelector (e.g., field name "task = 1") and add a block identifier field (e.g., "block_id" or "repeated block_ids") so clients can target blocks, and update rpc DeleteTaskBlock to take TaskBlockSelector as its input so the block selector is actually exposed on the RPC surface.
🧹 Nitpick comments (7)
enginelib/src/api.rs (3)
240-243: ⚡ Quick winERROR-only logging in release builds is very restrictive.
Setting
max_leveltoLevel::ERRORin non-debug builds suppresses all INFO and WARN logs, which are often essential for operational visibility. Consider usingLevel::INFOorLevel::WARNfor production.♻️ Proposed fix
#[cfg(not(debug_assertions))] let _ = tracing_subscriber::FmtSubscriber::builder() - .with_max_level(Level::ERROR) + .with_max_level(Level::INFO) .try_init();🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@enginelib/src/api.rs` around lines 240 - 243, The subscriber is forcing ERROR-only output by calling .with_max_level(Level::ERROR) in the tracing_subscriber::FmtSubscriber::builder(), which silences INFO/WARN in release; change it to a more appropriate production level (e.g., Level::INFO or Level::WARN) or make the level configurable (use cfg!(debug_assertions) to pick TRACE/DEBUG in debug and INFO/WARN in release, or read from RUST_LOG/env) and replace the hardcoded .with_max_level(Level::ERROR) accordingly.
1-5: ⚡ Quick winRemove unused imports flagged by the pipeline.
The CI reports unused imports:
chrono::Utc,crossbeam::queue::ArrayQueue, andtracing::error. These should be removed to clean up the build warnings.♻️ Proposed fix
-use chrono::Utc; -use crossbeam::queue::ArrayQueue; use dashmap::DashMap; use tokio::{spawn, sync::RwLock, time::interval}; -use tracing::{Level, debug, error, info, instrument}; +use tracing::{Level, debug, info, instrument};🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@enginelib/src/api.rs` around lines 1 - 5, Remove the unused imports from the top-level use statement: delete chrono::Utc, crossbeam::queue::ArrayQueue, and tracing::error so only the actually used symbols (e.g., dashmap::DashMap, tokio::{spawn, sync::RwLock, time::interval}, tracing::{Level, debug, info, instrument}) remain; adjust the use line(s) accordingly and run a build or cargo check to verify warnings are resolved.
91-92: ⚖️ Poor tradeoffUnbounded channels may cause memory exhaustion under load.
Using
async_channel::unbounded()means producers can push unlimited messages if consumers lag. Consider using a bounded channel withtask_queue_sizefrom config to provide backpressure.♻️ Proposed bounded channel approach
for (id, _tsk) in api.task_registry.tasks.clone() { - let (s, r) = async_channel::unbounded(); + let cap = api.cfg.config_toml.task_queue_size as usize; + let (s, r) = async_channel::bounded(cap.max(1)); api.task_queue.tasks.entry(id.clone()).insert((r, s)); api.leased_tasks.tasks.entry(id.clone()).or_default(); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@enginelib/src/api.rs` around lines 91 - 92, Replace the unbounded channel with a bounded one using the configured task_queue_size to add backpressure: instead of async_channel::unbounded() call async_channel::bounded(task_queue_size as usize) (preserving the returned sender/receiver tuple order assigned to (s, r) in the same insertion into api.task_queue.tasks.entry(id.clone()).insert((r, s))); also update any call sites that send into this channel to handle the SendError/await behavior when the channel is full (propagate or retry/timeout per existing semantics). Ensure task_queue_size is read from config (or defaulted safely) and used as the bound.enginelib/src/task.rs (1)
2-2: ⚡ Quick winRemove unused imports flagged by the pipeline.
The CI reports unused imports:
std::collections::HashMap,crossbeam::queue::ArrayQueue, anddashmap::DashSet.♻️ Proposed fix
use std::fmt::Debug; -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use crate::Identifier; use chrono::{DateTime, Utc}; -use crossbeam::queue::ArrayQueue; -use dashmap::{DashMap, DashSet}; +use dashmap::DashMap; use serde::{Deserialize, Serialize}; use tracing::{error, instrument, warn};Also applies to: 6-7
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@enginelib/src/task.rs` at line 2, The import list contains unused items—remove the unused symbols `std::collections::HashMap`, `crossbeam::queue::ArrayQueue`, and `dashmap::DashSet` from the `use` statements in this module; search for any remaining references to `HashMap`, `ArrayQueue`, or `DashSet` in `task.rs` and either delete those references or add the necessary usage, then run `cargo build`/`cargo clippy` to verify the warnings are gone.engine/src/bin/server.rs (3)
39-41: ⚡ Quick winPrefix unused
requestparameter with underscore.The pipeline warns that
requestis unused inget_metadata. Prefix it with_to silence the warning.♻️ Proposed fix
async fn get_metadata( &self, - request: tonic::Request<proto::Empty>, + _request: tonic::Request<proto::Empty>, ) -> Result<Response<proto::ServerMetadata>, Status> {🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@engine/src/bin/server.rs` around lines 39 - 41, Rename the unused parameter in the async function get_metadata from request to _request to silence the unused-variable warning; update the function signature (async fn get_metadata(&self, _request: tonic::Request<proto::Empty>, ...)) and ensure any internal references (if any) are updated to use the new name (likely none).
380-383: ⚖️ Poor tradeoff
recv().awaitblocks indefinitely without a timeout.If no tasks are available and no producer ever sends, the client will hang forever. Consider using
tokio::time::timeoutor advising clients to use gRPC deadlines, as noted in the comment. However, relying solely on client-side deadlines may not be sufficient for server resource management.♻️ Example timeout approach
+use tokio::time::{timeout, Duration}; + -let block = receiver - .recv() - .await - .map_err(|_| Status::unavailable("Task queue closed"))?; +let block = timeout(Duration::from_secs(30), receiver.recv()) + .await + .map_err(|_| Status::deadline_exceeded("No tasks available within timeout"))? + .map_err(|_| Status::unavailable("Task queue closed"))?;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@engine/src/bin/server.rs` around lines 380 - 383, The recv().await call on the task queue (the receiver.recv().await in the server loop) can block forever; wrap that await in tokio::time::timeout with a configurable Duration (or a sensible default) and map a timeout error to an appropriate gRPC Status (e.g., Status::deadline_exceeded or Status::unavailable) instead of hanging; update the code that currently maps recv errors (the map_err on receiver.recv()) to also handle the timeout branch and return a clear Status, and optionally document/configure that clients should still set gRPC deadlines.
1-24: ⚡ Quick winRemove unused imports flagged by the pipeline.
The CI reports multiple unused imports including
LibraryMetadata,LibraryManager,Task,TaskQueue,OS,Read,Ipv6Addr,RS_RwLock,Request, andMetadataValue.♻️ Proposed fix
use engine::{get_auth, get_uid}; -use enginelib::plugin::LibraryMetadata; use enginelib::{ Identifier, RawIdentifier, Registry, api::ServerAPI, chrono::Utc, event::{debug, info, warn}, events::{Events, ID}, - plugin::LibraryManager, - task::{LeasedTask, StoredTask, StoredTaskBlock, Task, TaskQueue}, + task::{LeasedTask, StoredTask, StoredTaskBlock}, }; use proto::{ TaskState, engine_server::{Engine, EngineServer}, }; use std::{ collections::HashMap, - env::consts::OS, - io::Read, - net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4}, - sync::{Arc, RwLock as RS_RwLock}, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + sync::Arc, }; use tokio::sync::RwLock; -use tonic::{Request, Response, Status, metadata::MetadataValue, transport::Server}; +use tonic::{Response, Status, transport::Server};🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@engine/src/bin/server.rs` around lines 1 - 24, Remove the unused imports reported by CI: delete LibraryMetadata and LibraryManager from plugin imports, remove Task and TaskQueue from task imports, drop OS from env::consts, remove Read from std::io, drop Ipv6Addr from std::net, replace RS_RwLock import (std::sync::RwLock as RS_RwLock) with no alias or remove it if unused, and remove Request and MetadataValue from tonic imports; update the use lines that reference LibraryMetadata, LibraryManager, Task, TaskQueue, OS, Read, Ipv6Addr, RS_RwLock, Request, and MetadataValue so only actually used symbols remain (e.g., keep Registry, api::ServerAPI, chrono::Utc, event::{debug, info, warn}, events::{self, Events, ID}, task::{LeasedTask, StoredTask, StoredTaskBlock}, and tonic::{Response, Status, transport::Server}).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@engine/src/bin/client.rs`:
- Line 2: The client imports and references the removed symbol `api::EngineAPI`
(and threads it through code using `Registry`, `events::Events`,
`plugin::LibraryInstance`, etc.), so update all imports and signatures to the
refactored API surface: remove `api::EngineAPI` from the use list, import the
new API type(s) exported from `enginelib::api` (replace `EngineAPI` usages with
the current type name(s)), update function signatures and variables that accept
or return `EngineAPI` to the new type, and adapt any method calls to the new API
methods; ensure `Registry`, `Events`, and `LibraryInstance` usages call the new
API surface where they previously relied on `EngineAPI`.
In `@engine/src/bin/server.rs`:
- Around line 484-502: The current flow removes the lease from api.leased_tasks
(via leased.remove(idx)) before calling api.put_solved, which can lose the task
if the process crashes between those calls; change the order so you create
StoredTask and call api.put_solved(&key, &stored) first and only on success
remove the lease from the leased list (the entry returned by
api.leased_tasks.tasks.entry(...) / the vector referenced by leased) and drop it
afterwards; also handle the case where put_solved reports the task already
exists (tolerate duplicate/Idempotent write) and log errors without removing the
lease when put_solved fails so tasks remain recoverable.
In `@enginelib/src/api.rs`:
- Around line 269-306: The function clear_sled_periodically is currently an
empty async and still being spawned from init_chron; either restore its periodic
purge logic or stop spawning it. To fix, either (A) re-enable the commented cron
loop inside clear_sled_periodically: recreate the interval.tick().await loop,
use api.write().await to access rw_api.executing_tasks.tasks, retain/filter
tasks by timestamp and push moved entries as StoredTask (preserve the
moved_tasks and touched_exec behavior), or (B) remove the spawn call in
init_chron and delete the clear_sled_periodically function entirely so no no-op
task is scheduled; reference clear_sled_periodically, init_chron,
executing_tasks.tasks, and StoredTask to locate the code to change.
---
Outside diff comments:
In `@engine/proto/engine.proto`:
- Around line 9-32: TaskBlockSelector currently only nests itself and is
unusable; replace it with a concrete selector (e.g., include a TaskSelector
field and block identifier(s)) and wire the DeleteTaskBlock RPC to accept
TaskBlockSelector instead of TaskSelector. Concretely: modify message
TaskBlockSelector to reference TaskSelector (e.g., field name "task = 1") and
add a block identifier field (e.g., "block_id" or "repeated block_ids") so
clients can target blocks, and update rpc DeleteTaskBlock to take
TaskBlockSelector as its input so the block selector is actually exposed on the
RPC surface.
---
Nitpick comments:
In `@engine/src/bin/server.rs`:
- Around line 39-41: Rename the unused parameter in the async function
get_metadata from request to _request to silence the unused-variable warning;
update the function signature (async fn get_metadata(&self, _request:
tonic::Request<proto::Empty>, ...)) and ensure any internal references (if any)
are updated to use the new name (likely none).
- Around line 380-383: The recv().await call on the task queue (the
receiver.recv().await in the server loop) can block forever; wrap that await in
tokio::time::timeout with a configurable Duration (or a sensible default) and
map a timeout error to an appropriate gRPC Status (e.g.,
Status::deadline_exceeded or Status::unavailable) instead of hanging; update the
code that currently maps recv errors (the map_err on receiver.recv()) to also
handle the timeout branch and return a clear Status, and optionally
document/configure that clients should still set gRPC deadlines.
- Around line 1-24: Remove the unused imports reported by CI: delete
LibraryMetadata and LibraryManager from plugin imports, remove Task and
TaskQueue from task imports, drop OS from env::consts, remove Read from std::io,
drop Ipv6Addr from std::net, replace RS_RwLock import (std::sync::RwLock as
RS_RwLock) with no alias or remove it if unused, and remove Request and
MetadataValue from tonic imports; update the use lines that reference
LibraryMetadata, LibraryManager, Task, TaskQueue, OS, Read, Ipv6Addr, RS_RwLock,
Request, and MetadataValue so only actually used symbols remain (e.g., keep
Registry, api::ServerAPI, chrono::Utc, event::{debug, info, warn},
events::{self, Events, ID}, task::{LeasedTask, StoredTask, StoredTaskBlock}, and
tonic::{Response, Status, transport::Server}).
In `@enginelib/src/api.rs`:
- Around line 240-243: The subscriber is forcing ERROR-only output by calling
.with_max_level(Level::ERROR) in the
tracing_subscriber::FmtSubscriber::builder(), which silences INFO/WARN in
release; change it to a more appropriate production level (e.g., Level::INFO or
Level::WARN) or make the level configurable (use cfg!(debug_assertions) to pick
TRACE/DEBUG in debug and INFO/WARN in release, or read from RUST_LOG/env) and
replace the hardcoded .with_max_level(Level::ERROR) accordingly.
- Around line 1-5: Remove the unused imports from the top-level use statement:
delete chrono::Utc, crossbeam::queue::ArrayQueue, and tracing::error so only the
actually used symbols (e.g., dashmap::DashMap, tokio::{spawn, sync::RwLock,
time::interval}, tracing::{Level, debug, info, instrument}) remain; adjust the
use line(s) accordingly and run a build or cargo check to verify warnings are
resolved.
- Around line 91-92: Replace the unbounded channel with a bounded one using the
configured task_queue_size to add backpressure: instead of
async_channel::unbounded() call async_channel::bounded(task_queue_size as usize)
(preserving the returned sender/receiver tuple order assigned to (s, r) in the
same insertion into api.task_queue.tasks.entry(id.clone()).insert((r, s))); also
update any call sites that send into this channel to handle the SendError/await
behavior when the channel is full (propagate or retry/timeout per existing
semantics). Ensure task_queue_size is read from config (or defaulted safely) and
used as the bound.
In `@enginelib/src/task.rs`:
- Line 2: The import list contains unused items—remove the unused symbols
`std::collections::HashMap`, `crossbeam::queue::ArrayQueue`, and
`dashmap::DashSet` from the `use` statements in this module; search for any
remaining references to `HashMap`, `ArrayQueue`, or `DashSet` in `task.rs` and
either delete those references or add the necessary usage, then run `cargo
build`/`cargo clippy` to verify the warnings are gone.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 82d2f4be-8495-47aa-812f-141bf3c754ff
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (16)
engine/proto/engine.protoengine/src/bin/client.rsengine/src/bin/server.rsenginelib/Cargo.tomlenginelib/src/api.rsenginelib/src/config.rsenginelib/src/events/admin_auth_event.rsenginelib/src/events/auth_event.rsenginelib/src/events/mod.rsenginelib/src/events/server_task_block_acquired_event.rsenginelib/src/events/server_task_block_created_event.rsenginelib/src/events/server_task_block_published_event.rsenginelib/src/events/server_task_created_event.rsenginelib/src/prelude.rsenginelib/src/task.rsenginelib/tests/event_tests.rs
💤 Files with no reviewable changes (1)
- enginelib/src/events/server_task_created_event.rs
| event::info, | ||
| plugin::LibraryInstance, | ||
| prelude::debug, | ||
| Registry, api::EngineAPI, event::info, events::Events, plugin::LibraryInstance, prelude::debug, |
There was a problem hiding this comment.
Update the client to the current API type.
This binary still imports and threads EngineAPI, but that symbol no longer exists in enginelib::api, so the client target does not compile. The client needs to be migrated to the refactored API surface before this can merge.
Also applies to: 18-22, 26-28, 48-53, 173-173
🧰 Tools
🪛 GitHub Actions: Rust / 0_Rust project - latest (stable).txt
[error] 2-2: E0432: unresolved import enginelib::api::EngineAPI — no EngineAPI in api
🪛 GitHub Actions: Rust / 1_Rust project - latest (nightly).txt
[error] 2-2: rustc --crate-name client failed: unresolved import enginelib::api::EngineAPI (no EngineAPI in api). Error[E0432].
🪛 GitHub Actions: Rust / 2_Rust project - latest (beta).txt
[error] 2-2: Rustc E0432: unresolved import enginelib::api::EngineAPI (no EngineAPI in api).
🪛 GitHub Actions: Rust / Rust project - latest (beta)
[error] 2-2: E0432 unresolved import: enginelib::api::EngineAPI (no EngineAPI in api)
🪛 GitHub Actions: Rust / Rust project - latest (nightly)
[error] 2-2: rustc error[E0432]: unresolved import enginelib::api::EngineAPI (no EngineAPI in api). Command: /home/runner/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/bin/rustc --crate-name client ... --crate-type bin ...
🪛 GitHub Actions: Rust / Rust project - latest (stable)
[error] 2-2: error[E0432]: unresolved import enginelib::api::EngineAPI (no EngineAPI in api).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@engine/src/bin/client.rs` at line 2, The client imports and references the
removed symbol `api::EngineAPI` (and threads it through code using `Registry`,
`events::Events`, `plugin::LibraryInstance`, etc.), so update all imports and
signatures to the refactored API surface: remove `api::EngineAPI` from the use
list, import the new API type(s) exported from `enginelib::api` (replace
`EngineAPI` usages with the current type name(s)), update function signatures
and variables that accept or return `EngineAPI` to the new type, and adapt any
method calls to the new API methods; ensure `Registry`, `Events`, and
`LibraryInstance` usages call the new API surface where they previously relied
on `EngineAPI`.
| let mut leased = api.leased_tasks.tasks.entry(key.clone()).or_default(); | ||
| let Some(idx) = leased | ||
| .iter() | ||
| .position(|f| f.id == instance_id && f.user_id == uid) | ||
| .ok_or_else(|| tonic::Status::not_found("Invalid taskid or userid"))?; | ||
|
|
||
| exec_tasks.remove(idx).id | ||
| }; | ||
|
|
||
| api.solved_tasks.tasks.entry(key.clone()).or_default().push( | ||
| enginelib::task::StoredTask { | ||
| bytes: publish_payload.clone(), | ||
| id: solved_id.clone(), | ||
| }, | ||
| ); | ||
|
|
||
| let exec_key_state = api | ||
| .executing_tasks | ||
| .tasks | ||
| .get(&key) | ||
| .cloned() | ||
| .unwrap_or_default(); | ||
| let solved_key_state = api | ||
| .solved_tasks | ||
| .tasks | ||
| .get(&key) | ||
| .cloned() | ||
| .unwrap_or_default(); | ||
| let db = api.db.clone(); | ||
|
|
||
| (solved_id, exec_key_state, solved_key_state, db) | ||
| }; | ||
|
|
||
| let exec_op = EngineAPI::state_op_executing(&key, &exec_key_state) | ||
| .map_err(|e| Status::internal(format!("Serialization error: {}", e)))?; | ||
| let solved_op = EngineAPI::state_op_solved(&key, &solved_key_state) | ||
| .map_err(|e| Status::internal(format!("Serialization error: {}", e)))?; | ||
|
|
||
| EngineAPI::apply_batch_ops(&db, vec![exec_op, solved_op]) | ||
| .map_err(|e| Status::internal(format!("DB insert error: {}", e)))?; | ||
| .position(|l| l.stored_task.id == t.id && l.user_id == uid) | ||
| else { | ||
| info!("publish: no lease for {} held by {}", t.id, uid); | ||
| continue; | ||
| }; | ||
| leased.remove(idx); | ||
| drop(leased); | ||
|
|
||
| let stored = StoredTask { | ||
| id: t.id.clone(), | ||
| bytes: payload, | ||
| }; | ||
| if let Err(e) = api.put_solved(&key, &stored) { | ||
| info!("publish: sled put_solved failed for {}: {}", t.id, e); | ||
| continue; | ||
| } |
There was a problem hiding this comment.
Lease removal and put_solved are not atomic — crash between them may cause data loss.
If the server crashes after leased.remove(idx) (line 492) but before put_solved succeeds (line 499), the task is lost: it's no longer leased and not written to solved storage. Consider writing to solved first, then removing the lease, so a crash leaves the task in both places (recoverable) rather than neither.
🐛 Proposed fix to reorder operations
- let mut leased = api.leased_tasks.tasks.entry(key.clone()).or_default();
- let Some(idx) = leased
- .iter()
- .position(|l| l.stored_task.id == t.id && l.user_id == uid)
- else {
- info!("publish: no lease for {} held by {}", t.id, uid);
- continue;
- };
- leased.remove(idx);
- drop(leased);
-
let stored = StoredTask {
id: t.id.clone(),
bytes: payload,
};
+ // Write to solved BEFORE removing lease — crash-safe ordering
if let Err(e) = api.put_solved(&key, &stored) {
info!("publish: sled put_solved failed for {}: {}", t.id, e);
continue;
}
+
+ let mut leased = api.leased_tasks.tasks.entry(key.clone()).or_default();
+ let Some(idx) = leased
+ .iter()
+ .position(|l| l.stored_task.id == t.id && l.user_id == uid)
+ else {
+ // Task written to solved but no lease found — log but continue
+ info!("publish: no lease for {} held by {} (already solved)", t.id, uid);
+ published_ids.push(t.id);
+ continue;
+ };
+ leased.remove(idx);
+ drop(leased);
+
published_ids.push(t.id);🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@engine/src/bin/server.rs` around lines 484 - 502, The current flow removes
the lease from api.leased_tasks (via leased.remove(idx)) before calling
api.put_solved, which can lose the task if the process crashes between those
calls; change the order so you create StoredTask and call api.put_solved(&key,
&stored) first and only on success remove the lease from the leased list (the
entry returned by api.leased_tasks.tasks.entry(...) / the vector referenced by
leased) and drop it afterwards; also handle the case where put_solved reports
the task already exists (tolerate duplicate/Idempotent write) and log errors
without removing the lease when put_solved fails so tasks remain recoverable.
- Serialize queue refills per identifier to avoid TOCTOU duplicates - Remove queued entries after publish and prune expired leases periodically
| let block = receiver | ||
| .recv() | ||
| .await | ||
| .map_err(|_| Status::unavailable("Task queue closed"))?; | ||
|
|
||
| api.executing_tasks | ||
| .tasks | ||
| .entry(key.clone()) | ||
| .or_default() | ||
| .push(enginelib::task::StoredExecutingTask { | ||
| bytes: task_payload, | ||
| // Lease the block and fire one per-block acquire event. | ||
| let instance_ids: Vec<String> = { | ||
| let api = self.EngineAPI.read().await; | ||
| let mut entry = api.leased_tasks.tasks.entry(key.clone()).or_default(); | ||
| let now = Utc::now(); | ||
| let mut ids = Vec::with_capacity(block.tasks.len()); | ||
| for task in &block.tasks { | ||
| ids.push(task.id.clone()); | ||
| entry.push(LeasedTask { | ||
| stored_task: Arc::new(task.clone()), | ||
| user_id: uid.clone(), | ||
| given_at: Utc::now(), | ||
| id: ttask.id.clone(), | ||
| given_at: now, | ||
| }); | ||
|
|
||
| let tasks_key_state = api.task_queue.tasks.get(&key).cloned().unwrap_or_default(); | ||
| let exec_key_state = api | ||
| .executing_tasks | ||
| .tasks | ||
| .get(&key) | ||
| .cloned() | ||
| .unwrap_or_default(); | ||
| let db = api.db.clone(); | ||
| (ttask, tasks_key_state, exec_key_state, db) | ||
| } |
There was a problem hiding this comment.
Lease registration window allows duplicate task delivery
After receiver.recv() returns a block (line 390–393), the tasks' leases are not recorded until the next self.EngineAPI.read().await completes (line 397). Between these two .await points the Tokio scheduler can run a second concurrent aquire_task_block call. That second caller sees the channel empty (the block was just drained), acquires the fill_lock, snapshots leased_tasks — which does not yet contain the unregistered tasks — finds those same task IDs in sled, and re-pushes them as a new block. Both workers end up holding references to the same task IDs, both add leases, and both attempt to publish. The fill_lock guards against two simultaneous fill_queue scans but does not cover this window between recv() and lease insertion.
Summary
enginelib.Testing
cargo test --workspacecargo build --workspaceSummary by CodeRabbit
New Features
Refactor