diff --git a/Cargo.lock b/Cargo.lock index 287b6d6..1f3eb0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -91,6 +91,18 @@ dependencies = [ "rustversion", ] +[[package]] +name = "async-channel" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -378,6 +390,15 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -520,9 +541,9 @@ dependencies = [ [[package]] name = "dashmap" -version = "6.1.0" +version = "6.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +checksum = "e6361d5c062261c78a176addb82d4c821ae42bed6089de0e12603cd25de2059c" dependencies = [ "cfg-if", "crossbeam-utils", @@ -669,8 +690,10 @@ dependencies = [ name = "enginelib" version = "0.2.0" dependencies = [ + "async-channel", "chrono", "crossbeam", + "dashmap", "directories", "inventory", "libloading", @@ -703,6 +726,27 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "faster-hex" version = "0.10.0" @@ -2142,6 +2186,12 @@ dependencies = [ "tempfile", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.11.2" diff --git a/engine/proto/engine.proto b/engine/proto/engine.proto index 14a429d..6af5c55 100644 --- a/engine/proto/engine.proto +++ b/engine/proto/engine.proto @@ -1,12 +1,12 @@ syntax = "proto3"; package engine; service Engine { - rpc AquireTask(TaskRequest) returns (Task); + rpc AquireTaskBlock(TaskBlockRequest) returns (TaskBlock); rpc AquireTaskReg(empty) returns (TaskRegistry); - rpc PublishTask(Task) returns (empty); + rpc PublishTaskBlock(TaskBlock) returns (empty); rpc cgrpc(cgrpcmsg) returns (cgrpcmsg); - rpc CreateTask(Task) returns (Task); - rpc DeleteTask(TaskSelector) returns (empty); + rpc CreateTaskBlock(TaskBlock) returns (TaskBlock); + rpc DeleteTaskBlock(TaskSelector) returns (empty); rpc GetTasks(TaskPageRequest) returns (TaskPage); rpc CheckAuth(empty) returns (empty); rpc GetMetadata(empty) returns (ServerMetadata); @@ -30,8 +30,8 @@ message TaskSelector { message empty {} enum TaskState { QUEUED = 0; - PROCESSING = 1; - SOLVED = 2; + SOLVED = 1; + LEASED = 2; } message TaskPageRequest { string namespace = 1; @@ -59,9 +59,9 @@ message TaskRegistry { repeated string tasks = 1; // namespace:task } -message TaskRequest { +message TaskBlockRequest { string task_id = 1; // namespace:task - // bytes payload = 2; + uint32 block_size = 2; } message Task { string id = 4; // the task unique identifier @@ -69,3 +69,6 @@ message Task { string task_id = 2; // namespace:task bytes payload = 3; } +message TaskBlock { + repeated Task tasks = 1; +} diff --git a/engine/src/bin/client.rs b/engine/src/bin/client.rs index 546d42e..e8bc800 100644 --- a/engine/src/bin/client.rs +++ b/engine/src/bin/client.rs @@ -1,10 +1,5 @@ use enginelib::{ - Registry, - api::EngineAPI, - events::Events, - event::info, - plugin::LibraryInstance, - prelude::debug, + Registry, api::EngineAPI, event::info, events::Events, plugin::LibraryInstance, prelude::debug, }; use proto::engine_client; use std::{collections::HashMap, error::Error, sync::Arc}; @@ -88,14 +83,18 @@ async fn worker_loop( continue; } Err(status) => { - debug!("worker {}: acquire failed for {}: {:?}", worker_id, task_id, status); + debug!( + "worker {}: acquire failed for {}: {:?}", + worker_id, task_id, status + ); continue; } }; let task_payload = task_req.get_mut(); - let acquired_payload = Arc::new(std::sync::RwLock::new(task_payload.task_payload.clone())); + let acquired_payload = + Arc::new(std::sync::RwLock::new(task_payload.task_payload.clone())); Events::TaskAcquired( api.as_ref(), task_id.clone(), diff --git a/engine/src/bin/server.rs b/engine/src/bin/server.rs index 7b70ff8..18295cb 100644 --- a/engine/src/bin/server.rs +++ b/engine/src/bin/server.rs @@ -2,12 +2,12 @@ use engine::{get_auth, get_uid}; use enginelib::plugin::LibraryMetadata; use enginelib::{ Identifier, RawIdentifier, Registry, - api::EngineAPI, + api::ServerAPI, chrono::Utc, event::{debug, info, warn}, events::{self, Events, ID}, plugin::LibraryManager, - task::{SolvedTasks, StoredExecutingTask, StoredTask, Task, TaskQueue}, + task::{LeasedTask, StoredTask, StoredTaskBlock, Task, TaskQueue}, }; use proto::{ TaskState, @@ -32,7 +32,7 @@ mod proto { } #[allow(non_snake_case)] struct EngineService { - pub EngineAPI: Arc>, + pub EngineAPI: Arc>, } #[tonic::async_trait] impl Engine for EngineService { @@ -70,111 +70,57 @@ impl Engine for EngineService { request: tonic::Request, ) -> Result, Status> { let challenge = get_auth(&request); - let mut api = self.EngineAPI.write().await; + let api = self.EngineAPI.read().await; let db = api.db.clone(); - let output = Events::CheckAdminAuth(&mut api, challenge, ("".into(), "".into()), db); + let output = Events::CheckAdminAuth(&api, challenge, ("".into(), "".into()), db); if !output { warn!("Auth check failed - permission denied"); return Err(tonic::Status::permission_denied("Invalid Auth")); }; return Ok(tonic::Response::new(proto::Empty {})); } - async fn delete_task( + async fn delete_task_block( &self, request: tonic::Request, ) -> Result, Status> { - let mut api = self.EngineAPI.write().await; + let api = self.EngineAPI.read().await; let data = request.get_ref(); let challenge = get_auth(&request); let db = api.db.clone(); - let id = ID(&data.namespace, &data.task); + let key = ID(&data.namespace, &data.task); - let output = Events::CheckAdminAuth(&mut api, challenge, ("".into(), "".into()), db); - if !output { + if !Events::CheckAdminAuth(&api, challenge, ("".into(), "".into()), db) { warn!("Auth check failed - permission denied"); return Err(tonic::Status::permission_denied("Invalid Auth")); }; - // Generic helper for removing a task by id from a collection, using an id extractor closure - fn delete_task_from_collection( - collection: &mut HashMap<(String, String), Vec>, - id: &(String, String), - task_id: &str, - state_name: &str, - namespace: &str, - task: &str, - id_extractor: F, - ) -> Result<(), Status> - where - F: Fn(&T) -> &str, - { - match collection.get_mut(id) { - Some(query) => { - let orig_len = query.len(); - query.retain(|f| id_extractor(f) != task_id); - if query.len() == orig_len { - info!( - "DeleteTask: Task with id {} not found in {} state for namespace: {}, task: {}", - task_id, state_name, namespace, task - ); - return Err(Status::not_found(format!( - "Task with id {} not found in {} state", - task_id, state_name - ))); - } - Ok(()) - } - None => { - info!( - "DeleteTask: No tasks found in {} state for namespace: {}, task: {}", - state_name, namespace, task - ); - Err(Status::not_found(format!( - "No tasks found in {} state for given namespace and task", - state_name - ))) - } - } - } - // Use the helper for each state - let result = match data.state() { - TaskState::Processing => delete_task_from_collection( - &mut api.executing_tasks.tasks, - &id, - &data.id, - "Processing", - &data.namespace, - &data.task, - |f| &f.id, - ), - TaskState::Solved => delete_task_from_collection( - &mut api.solved_tasks.tasks, - &id, - &data.id, - "Solved", - &data.namespace, - &data.task, - |f| &f.id, - ), - TaskState::Queued => delete_task_from_collection( - &mut api.task_queue.tasks, - &id, - &data.id, - "Queued", - &data.namespace, - &data.task, - |f| &f.id, - ), + let removed = match data.state() { + TaskState::Queued => api + .delete_queued(&key, &data.id) + .map_err(|e| Status::internal(format!("sled: {e}")))?, + TaskState::Solved => api + .delete_solved(&key, &data.id) + .map_err(|e| Status::internal(format!("sled: {e}")))?, + TaskState::Leased => { + let mut leased = api.leased_tasks.tasks.entry(key.clone()).or_default(); + let orig = leased.len(); + leased.retain(|l| l.stored_task.id != data.id); + orig != leased.len() + } }; - if let Err(e) = result { - return Err(e); + if !removed { + return Err(Status::not_found(format!( + "Task {} not found in {:?} for {}:{}", + data.id, + data.state(), + data.namespace, + data.task, + ))); } - // Sync running memory into DB - EngineAPI::sync_db(&mut api); info!( - "DeleteTask: Successfully deleted task with id {} in state {:?} for namespace: {}, task: {}", + "DeleteTask: deleted {} in {:?} for {}:{}", data.id, data.state(), data.namespace, @@ -209,116 +155,76 @@ impl Engine for EngineService { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { - let mut api = self.EngineAPI.write().await; + let api = self.EngineAPI.read().await; let challenge = get_auth(&request); let db = api.db.clone(); - if !Events::CheckAdminAuth(&mut api, challenge, ("".into(), "".into()), db) { + if !Events::CheckAdminAuth(&api, challenge, ("".into(), "".into()), db) { info!("GetTask denied due to Invalid Auth"); return Err(Status::permission_denied("Invalid authentication")); }; let data = request.get_ref(); + let key = ID(&data.namespace, &data.task); + let task_id_str = format!("{}:{}", data.namespace, data.task); - let q: Vec = match data.clone().state() { - TaskState::Processing => { - match api - .executing_tasks - .tasks - .get(&(data.namespace.clone(), data.task.clone())) - { - Some(tasks) => { - let mut task_refs: Vec<_> = tasks.iter().collect(); - task_refs.sort_by_key(|f| &f.id); - task_refs - .iter() - .map(|f| proto::Task { - id: f.id.clone(), - task_id: format!("{}:{}", data.namespace, data.task), - task_payload: f.bytes.clone(), - payload: Vec::new(), - }) - .collect() - } - None => { - info!( - "Namespace {:?} and task {:?} not found in Processing state", - data.namespace, data.task - ); - Vec::new() - } - } - } + let to_proto = |id: String, bytes: Vec| proto::Task { + id, + task_id: task_id_str.clone(), + task_payload: bytes, + payload: Vec::new(), + }; + + let mut tasks: Vec = match data.state() { TaskState::Queued => { - match api - .task_queue - .tasks - .get(&(data.namespace.clone(), data.task.clone())) - { - Some(tasks) => { - let mut d = tasks.clone(); - d.sort_by_key(|f| f.id.clone()); - d.iter() - .map(|f| proto::Task { - id: f.id.clone(), - task_id: format!("{}:{}", data.namespace, data.task), - task_payload: f.bytes.clone(), - payload: Vec::new(), - }) - .collect() - } - None => { - info!( - "Namespace {:?} and task {:?} not found in Queued state", - data.namespace, data.task - ); - Vec::new() - } - } + let mut v: Vec<_> = api + .scan_queued(&key) + .map(|t| to_proto(t.id, t.bytes)) + .collect(); + v.sort_by(|a, b| a.id.cmp(&b.id)); + v } TaskState::Solved => { - match api - .solved_tasks + let mut v: Vec<_> = api + .scan_solved(&key) + .map(|t| to_proto(t.id, t.bytes)) + .collect(); + v.sort_by(|a, b| a.id.cmp(&b.id)); + v + } + TaskState::Leased => { + let mut v: Vec<_> = api + .leased_tasks .tasks - .get(&(data.namespace.clone(), data.task.clone())) - { - Some(tasks) => { - let mut d = tasks.clone(); - d.sort_by_key(|f| f.id.clone()); - d.iter() - .map(|f| proto::Task { - id: f.id.clone(), - task_id: format!("{}:{}", data.namespace, data.task), - task_payload: f.bytes.clone(), - payload: Vec::new(), - }) + .get(&key) + .map(|leased| { + leased + .iter() + .map(|l| to_proto(l.stored_task.id.clone(), l.stored_task.bytes.clone())) .collect() - } - None => { - info!( - "Namespace {:?} and task {:?} not found in Solved state", - data.namespace, data.task - ); - Vec::new() - } - } + }) + .unwrap_or_default(); + v.sort_by(|a, b| a.id.cmp(&b.id)); + v } }; - let index = data.page * data.page_size as u64; - let end = index + (api.cfg.config_toml.pagination_limit.min(data.page_size) as u64); - let final_vec: Vec<_> = q - .iter() - .skip(index as usize) - .take(data.page_size as usize) - .cloned() - .collect(); - return Ok(tonic::Response::new(proto::TaskPage { + + let page_size = api.cfg.config_toml.pagination_limit.min(data.page_size) as usize; + let start = (data.page as usize).saturating_mul(page_size); + let end = start.saturating_add(page_size).min(tasks.len()); + let final_vec = if start >= tasks.len() { + Vec::new() + } else { + tasks.drain(start..end).collect() + }; + + Ok(tonic::Response::new(proto::TaskPage { namespace: data.namespace.clone(), task: data.task.clone(), page: data.page, - page_size: data.page_size, + page_size: page_size as u32, state: data.state, tasks: final_vec, - })); + })) } /// Handles custom gRPC messages with admin-level authentication. /// @@ -392,11 +298,11 @@ impl Engine for EngineService { let uid = get_uid(&request); let challenge = get_auth(&request); info!("Task registry request received from user: {}", uid); - let mut api = self.EngineAPI.write().await; + let api = self.EngineAPI.read().await; let db = api.db.clone(); debug!("Validating authentication for task registry request"); - if !Events::CheckAuth(&mut api, uid.clone(), challenge, db) { + if !Events::CheckAuth(&api, uid.clone(), challenge, db) { info!( "Task registry request denied - invalid authentication for user: {}", uid @@ -404,20 +310,19 @@ impl Engine for EngineService { return Err(Status::permission_denied("Invalid authentication")); }; let mut tasks: Vec = Vec::new(); - for (k, v) in &api.task_registry.tasks { - let js: Vec = vec![k.0.clone(), k.1.clone()]; - let jstr = js.join(":"); - tasks.push(jstr); + for entry in api.task_registry.tasks.iter() { + let k = entry.key(); + tasks.push(format!("{}:{}", k.0, k.1)); } info!("Returning task registry with {} tasks", tasks.len()); let response = proto::TaskRegistry { tasks }; Ok(tonic::Response::new(response)) } - async fn aquire_task( + async fn aquire_task_block( &self, - request: tonic::Request, - ) -> Result, tonic::Status> { + request: tonic::Request, + ) -> Result, tonic::Status> { let challenge = get_auth(&request); let task_id = request.get_ref().task_id.clone(); let uid = get_uid(&request); @@ -427,10 +332,9 @@ impl Engine for EngineService { ); { - let mut api = self.EngineAPI.write().await; + let api = self.EngineAPI.read().await; let db = api.db.clone(); - debug!("Validating authentication for task acquisition"); - if !Events::CheckAuth(&mut api, uid.clone(), challenge, db) { + if !Events::CheckAuth(&api, uid.clone(), challenge, db) { info!( "Task acquisition denied - invalid authentication for user: {}", uid @@ -440,302 +344,297 @@ impl Engine for EngineService { } let (namespace, task_name) = task_id.split_once(':').ok_or_else(|| { - info!("Invalid task ID format: {}", task_id); Status::invalid_argument("Invalid task ID format, expected 'namespace:task'") })?; - - debug!("Looking up task definition for {}:{}", namespace, task_name); let key = ID(namespace, task_name); { let api = self.EngineAPI.read().await; if api.task_registry.get(&key).is_none() { - warn!( - "Task acquisition failed - task does not exist: {}:{}", - namespace, task_name - ); return Err(Status::invalid_argument("Task Does not Exist")); } if Events::ServerBeforeTaskAcquire(&api, uid.clone(), task_id.clone()) { - info!( - "ServerBeforeTaskAcquire cancelled for user: {} task: {}", - uid, task_id - ); return Err(Status::aborted( "Task acquire cancelled by server event handler", )); } } - let (ttask, tasks_key_state, exec_key_state, db) = { - let mut api = self.EngineAPI.write().await; - - let queue = api - .task_queue + // Resolve the receiver, refill from sled if drained, then await a block. + // recv blocks until a producer (fill_queue or create_task_block) pushes one, + // so callers should use a gRPC deadline if they need an upper bound. + let receiver = { + let api = self.EngineAPI.read().await; + api.task_queue .tasks - .get_mut(&key) - .ok_or_else(|| Status::not_found("No queued tasks available"))?; + .get(&key) + .map(|entry| entry.0.clone()) + .ok_or_else(|| Status::not_found("Unknown task type"))? + }; - if queue.is_empty() { - info!("No queued tasks for {}:{}", namespace, task_name); - return Err(Status::not_found("No queued tasks available")); + if receiver.is_empty() { + // Serialize concurrent refills for this Identifier — without this, + // two acquires that both see an empty channel will both scan sled + // and push duplicate StoredTaskBlocks (P1 TOCTOU). + let lock_arc = { + let api = self.EngineAPI.read().await; + api.fill_locks.entry(key.clone()).or_default().clone() + }; + let _g = lock_arc.lock().await; + if receiver.is_empty() { + let api = self.EngineAPI.read().await; + ServerAPI::fill_queue(&api, key.clone()); } + } - let ttask = queue.remove(0); - let task_payload = ttask.bytes.clone(); + let block = receiver + .recv() + .await + .map_err(|_| Status::unavailable("Task queue closed"))?; - api.executing_tasks - .tasks - .entry(key.clone()) - .or_default() - .push(enginelib::task::StoredExecutingTask { - bytes: task_payload, + // Lease the block and fire one per-block acquire event. + let instance_ids: Vec = { + let api = self.EngineAPI.read().await; + let mut entry = api.leased_tasks.tasks.entry(key.clone()).or_default(); + let now = Utc::now(); + let mut ids = Vec::with_capacity(block.tasks.len()); + for task in &block.tasks { + ids.push(task.id.clone()); + entry.push(LeasedTask { + stored_task: Arc::new(task.clone()), user_id: uid.clone(), - given_at: Utc::now(), - id: ttask.id.clone(), + given_at: now, }); - - let tasks_key_state = api.task_queue.tasks.get(&key).cloned().unwrap_or_default(); - let exec_key_state = api - .executing_tasks - .tasks - .get(&key) - .cloned() - .unwrap_or_default(); - let db = api.db.clone(); - (ttask, tasks_key_state, exec_key_state, db) + } + drop(entry); + Events::ServerTaskBlockAcquired(&api, uid.clone(), task_id.clone(), ids.clone()); + ids }; + let _ = instance_ids; - let tasks_op = EngineAPI::state_op_tasks(&key, &tasks_key_state) - .map_err(|e| Status::internal(format!("Serialization error: {}", e)))?; - let exec_op = EngineAPI::state_op_executing(&key, &exec_key_state) - .map_err(|e| Status::internal(format!("Serialization error: {}", e)))?; - - EngineAPI::apply_batch_ops(&db, vec![tasks_op, exec_op]) - .map_err(|e| Status::internal(format!("DB insert error: {}", e)))?; - - { - let api = self.EngineAPI.read().await; - Events::ServerTaskAcquired(&api, uid.clone(), task_id.clone(), ttask.id.clone()); - } + let tasks = block + .tasks + .into_iter() + .map(|t| proto::Task { + id: t.id, + task_id: task_id.clone(), + task_payload: t.bytes, + payload: Vec::new(), + }) + .collect(); - Ok(tonic::Response::new(proto::Task { - id: ttask.id, - task_id, - task_payload: ttask.bytes, - payload: Vec::new(), - })) + Ok(tonic::Response::new(proto::TaskBlock { tasks })) } - async fn publish_task( + async fn publish_task_block( &self, - request: tonic::Request, + request: tonic::Request, ) -> Result, tonic::Status> { let challenge = get_auth(&request); let uid = get_uid(&request); - - let task_id = request.get_ref().task_id.clone(); - let instance_id = request.get_ref().id.clone(); - let payload_for_event = Arc::new(std::sync::RwLock::new( - request.get_ref().task_payload.clone(), - )); + let api = self.EngineAPI.read().await; { - let mut api = self.EngineAPI.write().await; let db = api.db.clone(); - if !Events::CheckAuth(&mut api, uid.clone(), challenge, db) { - info!("Aquire Task denied due to Invalid Auth"); + if !Events::CheckAuth(&api, uid.clone(), challenge, db) { return Err(Status::permission_denied("Invalid authentication")); }; } - { - let api = self.EngineAPI.read().await; - if Events::ServerBeforeTaskPublish( - &api, - uid.clone(), - task_id.clone(), - instance_id.clone(), - payload_for_event.clone(), - ) { - info!( - "ServerBeforeTaskPublish cancelled for user: {} task: {}", - uid, task_id - ); - return Err(Status::aborted( - "Task publish cancelled by server event handler", - )); - } + // Group incoming tasks by their (namespace:task) — the proto allows mixing + // but in practice they'll be uniform per call. Grouping defensively also + // keeps registry lookups + event firing once-per-group. + let mut groups: HashMap> = HashMap::new(); + for t in request.into_inner().tasks { + let Some((ns, name)) = t.task_id.split_once(':') else { + info!("publish: skipping malformed task_id {}", t.task_id); + continue; + }; + groups + .entry((ns.to_string(), name.to_string())) + .or_default() + .push(t); } - let publish_payload = payload_for_event - .read() - .map(|p| p.clone()) - .map_err(|_| Status::internal("Task publish payload lock poisoned"))?; - - let (namespace, task_name) = task_id - .split_once(':') - .ok_or_else(|| Status::invalid_argument("Invalid Params"))?; - let key = ID(namespace, task_name); - - let reg_tsk = { - let api = self.EngineAPI.read().await; - if !api.task_registry.tasks.contains_key(&key) { - warn!( - "Task acquisition failed - task does not exist: {}:{}", - namespace, task_name - ); - return Err(Status::invalid_argument("Task Does not Exist")); - } + for (key, tasks) in groups { + let task_id_str = format!("{}:{}", key.0, key.1); + let Some(reg_tsk) = api.task_registry.get(&key) else { + info!("publish: unknown task {}:{}, skipping group", key.0, key.1); + continue; + }; - match api.task_registry.get(&key) { - Some(r) => r, - None => { - warn!("Task registry missing for {}:{}", namespace, task_name); - return Err(Status::invalid_argument("Task Does not Exist")); + let mut published_ids: Vec = Vec::with_capacity(tasks.len()); + + for t in tasks { + let payload_for_event = + Arc::new(std::sync::RwLock::new(t.task_payload.clone())); + if Events::ServerBeforeTaskPublish( + &api, + uid.clone(), + task_id_str.clone(), + t.id.clone(), + payload_for_event.clone(), + ) { + info!("publish: handler cancelled {}:{}", task_id_str, t.id); + continue; } - } - }; - - if !reg_tsk.verify(publish_payload.clone()) { - info!("Failed to parse task"); - return Err(Status::invalid_argument("Failed to parse given task bytes")); - } - let (solved_id, exec_key_state, solved_key_state, db) = { - let mut api = self.EngineAPI.write().await; + let payload = match payload_for_event.read() { + Ok(p) => p.clone(), + Err(_) => { + info!("publish: payload lock poisoned for {}", t.id); + continue; + } + }; - let solved_id = { - let exec_tasks = api - .executing_tasks - .tasks - .get_mut(&key) - .ok_or_else(|| tonic::Status::not_found("Invalid taskid or userid"))?; + if !reg_tsk.clone().verify(payload.clone()) { + info!("publish: verify failed for {}", t.id); + continue; + } - let idx = exec_tasks + let mut leased = api.leased_tasks.tasks.entry(key.clone()).or_default(); + let Some(idx) = leased .iter() - .position(|f| f.id == instance_id && f.user_id == uid) - .ok_or_else(|| tonic::Status::not_found("Invalid taskid or userid"))?; - - exec_tasks.remove(idx).id - }; - - api.solved_tasks.tasks.entry(key.clone()).or_default().push( - enginelib::task::StoredTask { - bytes: publish_payload.clone(), - id: solved_id.clone(), - }, - ); - - let exec_key_state = api - .executing_tasks - .tasks - .get(&key) - .cloned() - .unwrap_or_default(); - let solved_key_state = api - .solved_tasks - .tasks - .get(&key) - .cloned() - .unwrap_or_default(); - let db = api.db.clone(); - - (solved_id, exec_key_state, solved_key_state, db) - }; - - let exec_op = EngineAPI::state_op_executing(&key, &exec_key_state) - .map_err(|e| Status::internal(format!("Serialization error: {}", e)))?; - let solved_op = EngineAPI::state_op_solved(&key, &solved_key_state) - .map_err(|e| Status::internal(format!("Serialization error: {}", e)))?; - - EngineAPI::apply_batch_ops(&db, vec![exec_op, solved_op]) - .map_err(|e| Status::internal(format!("DB insert error: {}", e)))?; + .position(|l| l.stored_task.id == t.id && l.user_id == uid) + else { + info!("publish: no lease for {} held by {}", t.id, uid); + continue; + }; + leased.remove(idx); + drop(leased); + + let stored = StoredTask { + id: t.id.clone(), + bytes: payload, + }; + if let Err(e) = api.put_solved(&key, &stored) { + info!("publish: sled put_solved failed for {}: {}", t.id, e); + continue; + } + if let Err(e) = api.delete_queued(&key, &t.id) { + info!("publish: sled delete_queued failed for {}: {}", t.id, e); + } + published_ids.push(t.id); + } - { - let api = self.EngineAPI.read().await; - Events::ServerTaskPublished(&api, uid.clone(), task_id.clone(), solved_id); + if !published_ids.is_empty() { + Events::ServerTaskBlockPublished( + &api, + uid.clone(), + task_id_str, + published_ids, + ); + } } - info!("Task published successfully: {} by user: {}", task_id, uid); Ok(tonic::Response::new(proto::Empty {})) } - async fn create_task( + async fn create_task_block( &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - let mut api = self.EngineAPI.write().await; + request: tonic::Request, + ) -> Result, tonic::Status> { let challenge = get_auth(&request); let uid = get_uid(&request); + let api = self.EngineAPI.read().await; let db = api.db.clone(); - if !Events::CheckAuth(&mut api, uid, challenge, db) { + if !Events::CheckAuth(&api, uid, challenge, db) { //TODO: change to AdminSpecific Auth info!("Create Task denied due to Invalid Auth"); return Err(Status::permission_denied("Invalid authentication")); }; - let task = request.get_ref(); - let task_id = task.task_id.clone(); - let payload_for_event = Arc::new(std::sync::RwLock::new(task.task_payload.clone())); - if Events::ServerBeforeTaskCreate(&api, task_id.clone(), payload_for_event.clone()) { - info!("ServerBeforeTaskCreate cancelled for task: {}", task_id); - return Err(Status::aborted( - "Task create cancelled by server event handler", - )); - } - let task_payload = payload_for_event - .read() - .map(|p| p.clone()) - .map_err(|_| Status::internal("Task create payload lock poisoned"))?; - - let parts: Vec<&str> = task_id.splitn(2, ':').collect(); - if parts.len() != 2 || parts[0].is_empty() || parts[1].is_empty() { - return Err(Status::invalid_argument( - "Invalid task ID format, expected 'namespace:task'", - )); - } - let id: Identifier = (parts[0].to_string(), parts[1].to_string()); - let tsk_reg = api.task_registry.get(&id); - if let Some(tsk_reg) = tsk_reg { - if !tsk_reg.clone().verify(task_payload.clone()) { - warn!("Failed to parse given task bytes"); - return Err(Status::invalid_argument("Failed to parse given task bytes")); - } - let tbp_tsk = StoredTask { - bytes: task_payload.clone(), - id: druid::Druid::default().to_hex(), + + // Group incoming tasks by (namespace:task). + let mut groups: HashMap> = HashMap::new(); + for t in request.into_inner().tasks { + let Some((ns, name)) = t.task_id.split_once(':') else { + return Err(Status::invalid_argument(format!( + "Invalid task ID format: {}", + t.task_id + ))); }; - api.task_queue - .tasks - .entry(id.clone()) + if ns.is_empty() || name.is_empty() { + return Err(Status::invalid_argument( + "Invalid task ID format, expected 'namespace:task'", + )); + } + groups + .entry((ns.to_string(), name.to_string())) .or_default() - .push(tbp_tsk.clone()); + .push(t); + } + + let mut created: Vec = Vec::new(); + + for (key, tasks) in groups { + let task_id_str = format!("{}:{}", key.0, key.1); + let Some(reg_tsk) = api.task_registry.get(&key) else { + return Err(Status::invalid_argument(format!( + "Task does not exist: {}", + task_id_str + ))); + }; + + let mut block_tasks: Vec = Vec::with_capacity(tasks.len()); + let mut instance_ids: Vec = Vec::with_capacity(tasks.len()); + let mut payloads: Vec>>> = + Vec::with_capacity(tasks.len()); + + for t in tasks { + let payload_for_event = + Arc::new(std::sync::RwLock::new(t.task_payload.clone())); + if Events::ServerBeforeTaskCreate( + &api, + task_id_str.clone(), + payload_for_event.clone(), + ) { + info!("create: handler cancelled task in {}", task_id_str); + continue; + } + let payload = match payload_for_event.read() { + Ok(p) => p.clone(), + Err(_) => { + info!("create: payload lock poisoned in {}", task_id_str); + continue; + } + }; + if !reg_tsk.clone().verify(payload.clone()) { + info!("create: verify failed in {}", task_id_str); + continue; + } + let stored = StoredTask { + id: druid::Druid::default().to_hex(), + bytes: payload, + }; + if let Err(e) = api.put_queued(&key, &stored) { + info!("create: sled put_queued failed for {}: {}", stored.id, e); + continue; + } + instance_ids.push(stored.id.clone()); + payloads.push(payload_for_event); + created.push(proto::Task { + id: stored.id.clone(), + task_id: task_id_str.clone(), + task_payload: stored.bytes.clone(), + payload: Vec::new(), + }); + block_tasks.push(stored); + } - let tasks_key_state = api.task_queue.tasks.get(&id).cloned().unwrap_or_default(); - let task_op = EngineAPI::state_op_tasks(&id, &tasks_key_state) - .map_err(|e| Status::internal(format!("Serialization error: {}", e)))?; - if let Err(e) = EngineAPI::apply_batch_ops(&api.db, vec![task_op]) { - return Err(Status::internal(format!("DB insert error: {}", e))); + if !block_tasks.is_empty() { + if let Some(channel) = api.task_queue.tasks.get(&key) { + let _ = channel.1.try_send(StoredTaskBlock { tasks: block_tasks }); + } + Events::ServerTaskBlockCreated(&api, task_id_str, instance_ids, payloads); } - Events::ServerTaskCreated( - &api, - task_id.clone(), - tbp_tsk.id.clone(), - Arc::new(std::sync::RwLock::new(tbp_tsk.bytes.clone())), - ); - return Ok(tonic::Response::new(proto::Task { - id: tbp_tsk.id.clone(), - task_id: task_id.clone(), - payload: Vec::new(), - task_payload: tbp_tsk.bytes.clone(), - })); } - Err(tonic::Status::aborted("Error")) + + Ok(tonic::Response::new(proto::TaskBlock { tasks: created })) } } #[tokio::main(flavor = "multi_thread", worker_threads = 8)] async fn main() -> Result<(), Box> { - let mut api = EngineAPI::default(); - EngineAPI::init(&mut api); + let mut api = ServerAPI::default(); + ServerAPI::init(&mut api); Events::init_auth(&mut api); Events::StartEvent(&mut api); Events::ServerStart(&api); @@ -749,7 +648,7 @@ async fn main() -> Result<(), Box> { 50051, ))); let apii = Arc::new(RwLock::new(api)); - EngineAPI::init_chron(apii.clone()); + ServerAPI::init_chron(apii.clone()); let engine = EngineService { EngineAPI: apii }; // Build reflection service, mapping its concrete error into Box diff --git a/enginelib/Cargo.toml b/enginelib/Cargo.toml index 0481fa3..241f162 100644 --- a/enginelib/Cargo.toml +++ b/enginelib/Cargo.toml @@ -21,6 +21,8 @@ tokio = { version = "1.50.0", features = ["full"] } postcard = { version = "1.1.3", features = ["use-std"] } inventory = "0.3.22" crossbeam = "0.8.4" +dashmap = "6.2.1" +async-channel = "2.5.0" [build-dependencies] vergen-gix = { version = "9.1.0", features = ["build", "cargo", "rustc"] } [profile.release] diff --git a/enginelib/src/api.rs b/enginelib/src/api.rs index aab43f8..b556c59 100644 --- a/enginelib/src/api.rs +++ b/enginelib/src/api.rs @@ -1,5 +1,6 @@ use chrono::Utc; use crossbeam::queue::ArrayQueue; +use dashmap::DashMap; use tokio::{spawn, sync::RwLock, time::interval}; use tracing::{Level, debug, error, info, instrument}; @@ -8,7 +9,7 @@ use crate::{ config::Config, event::{EngineEventHandlerRegistry, EventBus}, plugin::LibraryManager, - task::{LeasedTaskQueue, StoredTask, Task, TaskQueue}, + task::{LeasedTaskQueue, StoredTask, StoredTaskBlock, Task, TaskQueue}, }; pub use postcard; pub use postcard::from_bytes; @@ -27,7 +28,9 @@ pub struct ServerAPI { pub event_bus: EventBus, // RW pub db: sled::Db, // R pub lib_manager: LibraryManager, // RW - pub client: bool, // RW + // Serializes fill_queue() calls per Identifier so concurrent acquires can't + // both refill an empty channel and double-enqueue the same StoredTaskBlock. + pub fill_locks: DashMap>>, } impl Default for ServerAPI { @@ -44,28 +47,11 @@ impl Default for ServerAPI { }, }, leased_tasks: LeasedTaskQueue::default(), - client: false, + fill_locks: DashMap::new(), } } } impl ServerAPI { - // pub fn default_client() -> Self { - // Self { - // cfg: Config::default(), - // task_queue: TaskQueue::default(), - // db: sled::open("engine_client_db").unwrap(), - // lib_manager: LibraryManager::default(), - // task_registry: EngineTaskRegistry::default(), - // event_bus: EventBus { - // event_handler_registry: EngineEventHandlerRegistry { - // event_handlers: HashMap::new(), - // }, - // }, - // solved_tasks: SolvedTasks::default(), - // executing_tasks: ExecutingTaskQueue::default(), - // client: true, - // } - //} pub fn test_default() -> Self { // `sled::Config::temporary(true)` defaults to `/dev/shm` on Linux when no path is set. // Some environments deny writes there, so force a unique temp path. @@ -80,7 +66,6 @@ impl ServerAPI { )); Self { - client: false, cfg: Config::new(), leased_tasks: LeasedTaskQueue::default(), task_queue: TaskQueue::default(), @@ -97,6 +82,7 @@ impl ServerAPI { event_handlers: HashMap::new(), }, }, + fill_locks: DashMap::new(), } } pub fn init(api: &mut Self) { @@ -106,13 +92,9 @@ impl ServerAPI { let mut new_lib_manager = LibraryManager::default(); new_lib_manager.load_modules(api); api.lib_manager = new_lib_manager; - for (id, _tsk) in api.task_registry.tasks.iter() { - api.task_queue - .tasks - .entry(id.clone()) - .insert_entry(ArrayQueue::new( - api.cfg.config_toml.task_block_size as usize, - )); + for (id, _tsk) in api.task_registry.tasks.clone() { + let (s, r) = async_channel::unbounded(); + api.task_queue.tasks.entry(id.clone()).insert((r, s)); api.leased_tasks.tasks.entry(id.clone()).or_default(); } @@ -126,152 +108,125 @@ impl ServerAPI { let t = api.try_read().unwrap().cfg.config_toml.clean_tasks; spawn(clear_sled_periodically(api, t)); } - const TASKS_PREFIX: &'static str = "tasks:"; - const LEASING_PREFIX: &'static str = "leasing:"; - const SOLVED_PREFIX: &'static str = "solved:"; + // type:namespace:task:id + pub const TASKS_PREFIX: &'static str = "tasks:"; + pub const SOLVED_PREFIX: &'static str = "solved:"; - fn state_key(prefix: &str, task_id: &Identifier, id: String) -> Vec { + fn state_key(prefix: &str, task_id: &Identifier, id: &str) -> Vec { format!("{}{}\u{1f}{}:{}", prefix, task_id.0, task_id.1, id).into_bytes() } - fn parse_state_key(prefix: &str, key: &[u8]) -> Option { - let key = std::str::from_utf8(key).ok()?; - let rest = key.strip_prefix(prefix)?; - let (task_id, id) = rest.split_once(":")?; - let (namespace, task) = task_id.split_once('\u{1f}')?; - Some((namespace.to_string(), task.to_string())) + fn state_prefix(prefix: &str, task_id: &Identifier) -> Vec { + format!("{}{}\u{1f}{}:", prefix, task_id.0, task_id.1).into_bytes() } - // pub fn apply_batch_entries( - // db: &sled::Db, - // entries: Vec<(&'static str, Vec)>, - // ) -> sled::Result<()> { - // let mut batch = sled::Batch::default(); - // for (key, value) in entries { - // batch.insert(key, value); - // } - // db.apply_batch(batch) - // } - - // pub fn apply_batch_ops( - // db: &sled::Db, - // ops: Vec<(Vec, Option>)>, - // ) -> sled::Result<()> { - // let mut batch = sled::Batch::default(); - // for (key, value) in ops { - // match value { - // Some(v) => batch.insert(key, v), - // None => batch.remove(key), - // } - // } - // db.apply_batch(batch) - // } - - // pub fn state_op_tasks( - // id: &Identifier, - // value: &Vec, - // ) -> Result<(Vec, Option>), postcard::Error> { - // if value.is_empty() { - // Ok((Self::state_key(Self::TASKS_PREFIX, id), None)) - // } else { - // Ok(( - // Self::state_key(Self::TASKS_PREFIX, id), - // Some(postcard::to_allocvec(value)?), - // )) - // } - // } - - // pub fn state_op_executing( - // id: &Identifier, - // value: &Vec, - // ) -> Result<(Vec, Option>), postcard::Error> { - // if value.is_empty() { - // Ok((Self::state_key(Self::LEASING_PREFIX, id), None)) - // } else { - // Ok(( - // Self::state_key(Self::LEASING_PREFIX, id), - // Some(postcard::to_allocvec(value)?), - // )) - // } - // } - - // pub fn state_op_solved( - // id: &Identifier, - // value: &Vec, - // ) -> Result<(Vec, Option>), postcard::Error> { - // if value.is_empty() { - // Ok((Self::state_key(Self::SOLVED_PREFIX, id), None)) - // } else { - // Ok(( - // Self::state_key(Self::SOLVED_PREFIX, id), - // Some(postcard::to_allocvec(value)?), - // )) - // } - // } - - // pub fn sync_db(api: &mut ServerAPI) { - // // IF THIS FN CAUSES PANIC SOMETHING IS VERY BROKEN - // let mut ops: Vec<(Vec, Option>)> = Vec::new(); - - // for prefix in [ - // Self::TASKS_PREFIX, - // Self::LEASING_PREFIX, - // Self::SOLVED_PREFIX, - // ] { - // for item in api.db.scan_prefix(prefix.as_bytes()) { - // if let Ok((key, _)) = item { - // ops.push((key.to_vec(), None)); - // } - // } - // } - - // for (id, tasks) in &api.task_queue.tasks { - // ops.push(Self::state_op_tasks(id, tasks).unwrap()); - // } - // for (id, tasks) in &api.executing_tasks.tasks { - // ops.push(Self::state_op_executing(id, tasks).unwrap()); - // } - // for (id, tasks) in &api.solved_tasks.tasks { - // ops.push(Self::state_op_solved(id, tasks).unwrap()); - // } - - // Self::apply_batch_ops(&api.db, ops).unwrap(); - // debug!("Synced in-memory state to keyed sled storage"); - // } + pub fn task_key(task_id: &Identifier, id: &str) -> Vec { + Self::state_key(Self::TASKS_PREFIX, task_id, id) + } - fn init_db(api: &mut ServerAPI) { - api.task_queue = TaskQueue::default(); - api.leased_tasks = LeasedTaskQueue::default(); + pub fn solved_key(task_id: &Identifier, id: &str) -> Vec { + Self::state_key(Self::SOLVED_PREFIX, task_id, id) + } - for item in api.db.scan_prefix(Self::TASKS_PREFIX.as_bytes()) { - if let Ok((key, value)) = item { - if let Some(id) = Self::parse_state_key(Self::TASKS_PREFIX, &key) { - if let Ok(tasks) = postcard::from_bytes::(&value) { - api.task_queue.tasks.get(&id).unwrap().push(Arc::new(tasks)); - } - } - } + pub fn task_prefix(task_id: &Identifier) -> Vec { + Self::state_prefix(Self::TASKS_PREFIX, task_id) + } + + pub fn solved_prefix(task_id: &Identifier) -> Vec { + Self::state_prefix(Self::SOLVED_PREFIX, task_id) + } + + pub fn put_queued(&self, task_id: &Identifier, task: &StoredTask) -> sled::Result<()> { + let bytes = postcard::to_allocvec(task) + .map_err(|e| sled::Error::Unsupported(format!("postcard: {e}")))?; + self.db.insert(Self::task_key(task_id, &task.id), bytes)?; + Ok(()) + } + + pub fn put_solved(&self, task_id: &Identifier, task: &StoredTask) -> sled::Result<()> { + let bytes = postcard::to_allocvec(task) + .map_err(|e| sled::Error::Unsupported(format!("postcard: {e}")))?; + self.db.insert(Self::solved_key(task_id, &task.id), bytes)?; + Ok(()) + } + + pub fn delete_queued(&self, task_id: &Identifier, id: &str) -> sled::Result { + Ok(self.db.remove(Self::task_key(task_id, id))?.is_some()) + } + + pub fn delete_solved(&self, task_id: &Identifier, id: &str) -> sled::Result { + Ok(self.db.remove(Self::solved_key(task_id, id))?.is_some()) + } + + pub fn scan_queued(&self, task_id: &Identifier) -> impl Iterator + '_ { + self.db + .scan_prefix(Self::task_prefix(task_id)) + .filter_map(|item| item.ok()) + .filter_map(|(_, value)| postcard::from_bytes::(&value).ok()) + } + + pub fn scan_solved(&self, task_id: &Identifier) -> impl Iterator + '_ { + self.db + .scan_prefix(Self::solved_prefix(task_id)) + .filter_map(|item| item.ok()) + .filter_map(|(_, value)| postcard::from_bytes::(&value).ok()) + } + + pub fn fill_queue(api: &ServerAPI, task_id: Identifier) { + let max_block = api.cfg.config_toml.task_block_size.max(1) as usize; + let max_queue = api.cfg.config_toml.task_queue_size as usize; + + let Some(channel) = api.task_queue.tasks.get(&task_id) else { + return; + }; + let sender = &channel.1; + + // Soft size lock: bail out if the channel is already at its configured cap. + // The cap is advisory — a partially-built trailing block may still push us + // one block past the limit, but no full block is enqueued once we hit it. + if sender.len() >= max_queue { + return; } - for item in api.db.scan_prefix(Self::LEASING_PREFIX.as_bytes()) { - if let Ok((key, value)) = item { - if let Some(id) = Self::parse_state_key(Self::LEASING_PREFIX, &key) { - if let Ok(tasks) = postcard::from_bytes::>(&value) { - api.executing_tasks.tasks.insert(id, tasks); - } - } + // Build a leased-id set once per call — O(L) instead of O(N·L) per scan item. + let leased: HashSet = api + .leased_tasks + .tasks + .get(&task_id) + .map(|v| v.iter().map(|l| l.stored_task.id.clone()).collect()) + .unwrap_or_default(); + + let mut block: Vec = Vec::with_capacity(max_block); + + for item in api.db.scan_prefix(Self::task_prefix(&task_id)) { + let Ok((_, value)) = item else { continue }; + let Ok(task) = postcard::from_bytes::(&value) else { + continue; + }; + if leased.contains(&task.id) { + continue; } - } - for item in api.db.scan_prefix(Self::SOLVED_PREFIX.as_bytes()) { - if let Ok((key, value)) = item { - if let Some(id) = Self::parse_state_key(Self::SOLVED_PREFIX, &key) { - if let Ok(tasks) = postcard::from_bytes::>(&value) { - api.solved_tasks.tasks.insert(id, tasks); - } + block.push(task); + if block.len() == max_block { + let full = std::mem::replace(&mut block, Vec::with_capacity(max_block)); + if sender.try_send(StoredTaskBlock { tasks: full }).is_err() { + return; // receiver dropped + } + if sender.len() >= max_queue { + return; } } } + + if !block.is_empty() { + let _ = sender.try_send(StoredTaskBlock { tasks: block }); + } + } + + fn init_db(api: &mut ServerAPI) { + api.task_queue = TaskQueue::default(); + api.leased_tasks = LeasedTaskQueue::default(); } pub fn setup_logger() { @@ -290,7 +245,7 @@ impl ServerAPI { let _ = tracing_subscriber::FmtSubscriber::builder() // all spans/events with a level higher than TRACE (e.g, debug, info, warn, etc.) // will be written to stdout. - .with_max_level(Level::INFO) + .with_max_level(Level::ERROR) // builds the subscriber. .try_init(); }); @@ -298,7 +253,7 @@ impl ServerAPI { } #[derive(Default, Clone, Debug)] pub struct EngineTaskRegistry { - pub tasks: HashMap>, + pub tasks: DashMap>, } impl Registry for EngineTaskRegistry { #[instrument] @@ -317,87 +272,21 @@ impl Registry for EngineTaskRegistry { } pub async fn clear_sled_periodically(api: Arc>, n_minutes: u64) { - info!("Sled Cron Job Started"); + info!("Lease GC started ({}m interval)", n_minutes); let mut interval = interval(Duration::from_secs(n_minutes * 60)); + let ttl = chrono::Duration::seconds(3600); loop { interval.tick().await; - info!("Purging Unsolved Tasks"); - - let now = Utc::now().timestamp(); - let mut rw_api = api.write().await; - - let mut moved_tasks: Vec<(Identifier, StoredTask)> = Vec::new(); - let mut touched_exec: HashSet = HashSet::new(); - - for (id, task_list) in rw_api.executing_tasks.tasks.iter_mut() { - let before_len = task_list.len(); - task_list.retain(|info| { - let age = now - info.given_at.timestamp(); - if age > 3600 { - info!("Task {:?} is older than an hour! Moving...", info); - moved_tasks.push(( - id.clone(), - StoredTask { - id: info.id.clone(), - bytes: info.bytes.clone(), - }, - )); - false - } else { - true - } - }); - - if task_list.len() != before_len { - touched_exec.insert(id.clone()); - } - } - - let mut touched_tasks: HashSet = HashSet::new(); - for (id, task) in moved_tasks { - rw_api - .task_queue - .tasks - .entry(id.clone()) - .or_default() - .push(task); - touched_tasks.insert(id); - } - - if touched_exec.is_empty() && touched_tasks.is_empty() { - continue; - } - - let mut ops: Vec<(Vec, Option>)> = Vec::new(); - - for id in touched_exec { - let value = rw_api - .executing_tasks - .tasks - .get(&id) - .cloned() - .unwrap_or_default(); - match ServerAPI::state_op_executing(&id, &value) { - Ok(op) => ops.push(op), - Err(e) => error!("Failed to serialize executing_tasks entry: {:?}", e), - } - } - - for id in touched_tasks { - let value = rw_api - .task_queue - .tasks - .get(&id) - .cloned() - .unwrap_or_default(); - match ServerAPI::state_op_tasks(&id, &value) { - Ok(op) => ops.push(op), - Err(e) => error!("Failed to serialize tasks entry: {:?}", e), - } + let api = api.read().await; + let now = Utc::now(); + let mut expired: u64 = 0; + for mut entry in api.leased_tasks.tasks.iter_mut() { + let before = entry.len(); + entry.retain(|l| now.signed_duration_since(l.given_at) < ttl); + expired += (before - entry.len()) as u64; } - - if let Err(e) = ServerAPI::apply_batch_ops(&rw_api.db, ops) { - error!("Failed to update task state in Sled batch: {:?}", e); + if expired > 0 { + info!("Lease GC: expired {} stale lease(s)", expired); } } } diff --git a/enginelib/src/config.rs b/enginelib/src/config.rs index 2d46cb5..5f09076 100644 --- a/enginelib/src/config.rs +++ b/enginelib/src/config.rs @@ -11,12 +11,14 @@ fn default_clean_tasks() -> u64 { 60 } fn default_task_block_size() -> u32 { - 256 + 0x8000 } fn default_pagination_limit() -> u32 { u32::MAX } - +fn default_task_queue_size() -> u32 { + 2048 +} #[derive(Debug, Serialize, Deserialize, Clone)] pub struct ConfigTomlServer { #[serde(default)] @@ -29,6 +31,8 @@ pub struct ConfigTomlServer { pub pagination_limit: u32, #[serde(default = "default_task_block_size")] pub task_block_size: u32, + #[serde(default = "default_task_queue_size")] + pub task_queue_size: u32, } impl Default for ConfigTomlServer { fn default() -> Self { @@ -38,6 +42,7 @@ impl Default for ConfigTomlServer { clean_tasks: default_clean_tasks(), pagination_limit: default_pagination_limit(), task_block_size: default_task_block_size(), + task_queue_size: default_task_queue_size(), } } } diff --git a/enginelib/src/events/admin_auth_event.rs b/enginelib/src/events/admin_auth_event.rs index c13533d..a1ce7d2 100644 --- a/enginelib/src/events/admin_auth_event.rs +++ b/enginelib/src/events/admin_auth_event.rs @@ -19,7 +19,7 @@ pub struct AdminAuthEvent { impl AdminAuthEvent { pub fn fire( - api: &mut ServerAPI, + api: &ServerAPI, payload: String, target: Identifier, db: Db, @@ -35,7 +35,7 @@ impl AdminAuthEvent { }); } - pub fn check(api: &mut ServerAPI, payload: String, target: Identifier, db: Db) -> bool { + pub fn check(api: &ServerAPI, payload: String, target: Identifier, db: Db) -> bool { let output = Arc::new(RwLock::new(false)); Self::fire(api, payload, target, db, output.clone()); *output.read().unwrap() diff --git a/enginelib/src/events/auth_event.rs b/enginelib/src/events/auth_event.rs index 1056a99..cd5fa0e 100644 --- a/enginelib/src/events/auth_event.rs +++ b/enginelib/src/events/auth_event.rs @@ -16,7 +16,7 @@ pub struct AuthEvent { } impl AuthEvent { pub fn fire( - api: &mut ServerAPI, + api: &ServerAPI, uid: String, challenge: String, db: Db, @@ -32,7 +32,7 @@ impl AuthEvent { }); } - pub fn check(api: &mut ServerAPI, uid: String, challenge: String, db: Db) -> bool { + pub fn check(api: &ServerAPI, uid: String, challenge: String, db: Db) -> bool { let output = Arc::new(RwLock::new(false)); Self::fire(api, uid, challenge, db, output.clone()); *output.read().unwrap() diff --git a/enginelib/src/events/mod.rs b/enginelib/src/events/mod.rs index 48ee7a5..7c687d6 100644 --- a/enginelib/src/events/mod.rs +++ b/enginelib/src/events/mod.rs @@ -10,9 +10,9 @@ pub mod server_before_task_acquire_event; pub mod server_before_task_create_event; pub mod server_before_task_publish_event; pub mod server_start_event; -pub mod server_task_acquired_event; -pub mod server_task_created_event; -pub mod server_task_published_event; +pub mod server_task_block_acquired_event; +pub mod server_task_block_created_event; +pub mod server_task_block_published_event; pub mod start_event; pub mod task_acquired_event; use std::collections::HashMap; @@ -32,12 +32,12 @@ pub struct Events; impl Events { pub fn init_auth(_api: &mut ServerAPI) {} - pub fn CheckAuth(api: &mut ServerAPI, uid: String, challenge: String, db: Db) -> bool { + pub fn CheckAuth(api: &ServerAPI, uid: String, challenge: String, db: Db) -> bool { auth_event::AuthEvent::check(api, uid, challenge, db) } pub fn CheckAdminAuth( - api: &mut ServerAPI, + api: &ServerAPI, payload: String, target: Identifier, db: Db, @@ -109,21 +109,36 @@ impl Events { server_before_task_create_event::ServerBeforeTaskCreateEvent::check(api, task_id, payload) } - pub fn ServerTaskCreated( + pub fn ServerTaskBlockCreated( api: &ServerAPI, task_id: String, - instance_id: String, - payload: Arc>>, + instance_ids: Vec, + payloads: Vec>>>, ) { - server_task_created_event::ServerTaskCreatedEvent::fire(api, task_id, instance_id, payload) + server_task_block_created_event::ServerTaskBlockCreatedEvent::fire( + api, + task_id, + instance_ids, + payloads, + ) } pub fn ServerBeforeTaskAcquire(api: &ServerAPI, uid: String, task_id: String) -> bool { server_before_task_acquire_event::ServerBeforeTaskAcquireEvent::check(api, uid, task_id) } - pub fn ServerTaskAcquired(api: &ServerAPI, uid: String, task_id: String, instance_id: String) { - server_task_acquired_event::ServerTaskAcquiredEvent::fire(api, uid, task_id, instance_id) + pub fn ServerTaskBlockAcquired( + api: &ServerAPI, + uid: String, + task_id: String, + instance_ids: Vec, + ) { + server_task_block_acquired_event::ServerTaskBlockAcquiredEvent::fire( + api, + uid, + task_id, + instance_ids, + ) } pub fn ServerBeforeTaskPublish( @@ -142,7 +157,17 @@ impl Events { ) } - pub fn ServerTaskPublished(api: &ServerAPI, uid: String, task_id: String, instance_id: String) { - server_task_published_event::ServerTaskPublishedEvent::fire(api, uid, task_id, instance_id) + pub fn ServerTaskBlockPublished( + api: &ServerAPI, + uid: String, + task_id: String, + instance_ids: Vec, + ) { + server_task_block_published_event::ServerTaskBlockPublishedEvent::fire( + api, + uid, + task_id, + instance_ids, + ) } } diff --git a/enginelib/src/events/server_task_acquired_event.rs b/enginelib/src/events/server_task_block_acquired_event.rs similarity index 51% rename from enginelib/src/events/server_task_acquired_event.rs rename to enginelib/src/events/server_task_block_acquired_event.rs index 116ecf1..4d3f7ce 100644 --- a/enginelib/src/events/server_task_acquired_event.rs +++ b/enginelib/src/events/server_task_block_acquired_event.rs @@ -3,23 +3,23 @@ use macros::Event; use crate::{Identifier, api::ServerAPI}; #[derive(Clone, Debug, Event)] -#[event(namespace = "server", name = "task_acquired")] -pub struct ServerTaskAcquiredEvent { +#[event(namespace = "server", name = "task_block_acquired")] +pub struct ServerTaskBlockAcquiredEvent { pub cancelled: bool, pub id: Identifier, pub uid: String, pub task_id: String, - pub instance_id: String, + pub instance_ids: Vec, } -impl ServerTaskAcquiredEvent { - pub fn fire(api: &ServerAPI, uid: String, task_id: String, instance_id: String) { - let mut event = ServerTaskAcquiredEvent { +impl ServerTaskBlockAcquiredEvent { + pub fn fire(api: &ServerAPI, uid: String, task_id: String, instance_ids: Vec) { + let mut event = ServerTaskBlockAcquiredEvent { cancelled: false, - id: ("server".to_string(), "task_acquired".to_string()), + id: ("server".to_string(), "task_block_acquired".to_string()), uid, task_id, - instance_id, + instance_ids, }; api.event_bus.fire(&mut event); } diff --git a/enginelib/src/events/server_task_block_created_event.rs b/enginelib/src/events/server_task_block_created_event.rs new file mode 100644 index 0000000..9f5a331 --- /dev/null +++ b/enginelib/src/events/server_task_block_created_event.rs @@ -0,0 +1,33 @@ +use std::sync::{Arc, RwLock}; + +use macros::Event; + +use crate::{Identifier, api::ServerAPI}; + +#[derive(Clone, Debug, Event)] +#[event(namespace = "server", name = "task_block_created")] +pub struct ServerTaskBlockCreatedEvent { + pub cancelled: bool, + pub id: Identifier, + pub task_id: String, + pub instance_ids: Vec, + pub payloads: Vec>>>, +} + +impl ServerTaskBlockCreatedEvent { + pub fn fire( + api: &ServerAPI, + task_id: String, + instance_ids: Vec, + payloads: Vec>>>, + ) { + let mut event = ServerTaskBlockCreatedEvent { + cancelled: false, + id: ("server".to_string(), "task_block_created".to_string()), + task_id, + instance_ids, + payloads, + }; + api.event_bus.fire(&mut event); + } +} diff --git a/enginelib/src/events/server_task_published_event.rs b/enginelib/src/events/server_task_block_published_event.rs similarity index 51% rename from enginelib/src/events/server_task_published_event.rs rename to enginelib/src/events/server_task_block_published_event.rs index 5e6ee8a..43b997e 100644 --- a/enginelib/src/events/server_task_published_event.rs +++ b/enginelib/src/events/server_task_block_published_event.rs @@ -3,23 +3,23 @@ use macros::Event; use crate::{Identifier, api::ServerAPI}; #[derive(Clone, Debug, Event)] -#[event(namespace = "server", name = "task_published")] -pub struct ServerTaskPublishedEvent { +#[event(namespace = "server", name = "task_block_published")] +pub struct ServerTaskBlockPublishedEvent { pub cancelled: bool, pub id: Identifier, pub uid: String, pub task_id: String, - pub instance_id: String, + pub instance_ids: Vec, } -impl ServerTaskPublishedEvent { - pub fn fire(api: &ServerAPI, uid: String, task_id: String, instance_id: String) { - let mut event = ServerTaskPublishedEvent { +impl ServerTaskBlockPublishedEvent { + pub fn fire(api: &ServerAPI, uid: String, task_id: String, instance_ids: Vec) { + let mut event = ServerTaskBlockPublishedEvent { cancelled: false, - id: ("server".to_string(), "task_published".to_string()), + id: ("server".to_string(), "task_block_published".to_string()), uid, task_id, - instance_id, + instance_ids, }; api.event_bus.fire(&mut event); } diff --git a/enginelib/src/events/server_task_created_event.rs b/enginelib/src/events/server_task_created_event.rs deleted file mode 100644 index 1326818..0000000 --- a/enginelib/src/events/server_task_created_event.rs +++ /dev/null @@ -1,33 +0,0 @@ -use std::sync::{Arc, RwLock}; - -use macros::Event; - -use crate::{Identifier, api::ServerAPI}; - -#[derive(Clone, Debug, Event)] -#[event(namespace = "server", name = "task_created")] -pub struct ServerTaskCreatedEvent { - pub cancelled: bool, - pub id: Identifier, - pub task_id: String, - pub instance_id: String, - pub payload: Arc>>, -} - -impl ServerTaskCreatedEvent { - pub fn fire( - api: &ServerAPI, - task_id: String, - instance_id: String, - payload: Arc>>, - ) { - let mut event = ServerTaskCreatedEvent { - cancelled: false, - id: ("server".to_string(), "task_created".to_string()), - task_id, - instance_id, - payload, - }; - api.event_bus.fire(&mut event); - } -} diff --git a/enginelib/src/prelude.rs b/enginelib/src/prelude.rs index bee62a5..c1af52d 100644 --- a/enginelib/src/prelude.rs +++ b/enginelib/src/prelude.rs @@ -22,9 +22,9 @@ pub use crate::events::server_before_task_acquire_event::ServerBeforeTaskAcquire pub use crate::events::server_before_task_create_event::ServerBeforeTaskCreateEvent; pub use crate::events::server_before_task_publish_event::ServerBeforeTaskPublishEvent; pub use crate::events::server_start_event::ServerStartEvent; -pub use crate::events::server_task_acquired_event::ServerTaskAcquiredEvent; -pub use crate::events::server_task_created_event::ServerTaskCreatedEvent; -pub use crate::events::server_task_published_event::ServerTaskPublishedEvent; +pub use crate::events::server_task_block_acquired_event::ServerTaskBlockAcquiredEvent; +pub use crate::events::server_task_block_created_event::ServerTaskBlockCreatedEvent; +pub use crate::events::server_task_block_published_event::ServerTaskBlockPublishedEvent; pub use crate::events::start_event::StartEvent; pub use crate::events::task_acquired_event::TaskAcquiredEvent; pub use crate::events::{Events, ID}; diff --git a/enginelib/src/task.rs b/enginelib/src/task.rs index 410a90f..09347f1 100644 --- a/enginelib/src/task.rs +++ b/enginelib/src/task.rs @@ -4,28 +4,40 @@ use std::{collections::HashMap, sync::Arc}; use crate::Identifier; use chrono::{DateTime, Utc}; use crossbeam::queue::ArrayQueue; +use dashmap::{DashMap, DashSet}; use serde::{Deserialize, Serialize}; use tracing::{error, instrument, warn}; -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct StoredTask { pub bytes: Vec, pub id: String, } -#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct LeasedTask { pub stored_task: Arc, pub user_id: String, pub given_at: DateTime, } + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct StoredTaskBlock { + pub tasks: Vec, +} #[derive(Debug, Default)] pub struct TaskQueue { - pub tasks: HashMap>>, + pub tasks: DashMap< + Identifier, + ( + async_channel::Receiver, + async_channel::Sender, + ), + >, } #[derive(Debug, Default, Clone)] pub struct LeasedTaskQueue { - pub tasks: HashMap>, + pub tasks: DashMap>, } pub trait Verifiable { diff --git a/enginelib/tests/event_tests.rs b/enginelib/tests/event_tests.rs index ed5ec9a..9288a3d 100644 --- a/enginelib/tests/event_tests.rs +++ b/enginelib/tests/event_tests.rs @@ -1,4 +1,4 @@ -use enginelib::{Registry, api::EngineAPI, events::ID, task::Verifiable}; +use enginelib::{Registry, api::ServerAPI, events::ID, task::Verifiable}; use macros::Verifiable; use std::sync::Arc; use tracing_test::traced_test; @@ -35,7 +35,7 @@ fn id() { #[traced_test] #[test] fn test_event_registration_and_handling() { - let mut api = EngineAPI::test_default(); + let mut api = ServerAPI::test_default(); let mut test_event = TestEvent { value: 0, @@ -51,7 +51,7 @@ fn test_event_registration_and_handling() { #[traced_test] #[test] fn test_stateful_event_auto_registration() { - let mut api = EngineAPI::test_default(); + let mut api = ServerAPI::test_default(); enginelib::event::register_inventory_handlers(&mut api); @@ -96,7 +96,7 @@ fn test_task_registration() { return Box::new(self.clone()); } } - let mut api = EngineAPI::test_default(); + let mut api = ServerAPI::test_default(); let task_id = ID("test", "test_task"); // Register the task type