Skip to content

Commit 13bc99a

Browse files
boennemannclaude
andcommitted
feat: bulk feedback collection and thread locking (#111)
listen now returns ALL pending events at once (batch mode) and auto-claims their threads so multiple agents can work in parallel without conflicts. - Add claimed_by/claimed_at fields to Thread struct - Add ThreadClaimed/ThreadReleased event types - Add claim_thread/release_thread/is_claimed methods to FeedbackStore - Change listen to batch mode by default (--no-batch for legacy) - Auto-claim threads on listen, skip already-claimed threads - Add --agent flag (default: agent-<pid>) for agent identity - Add release subcommand with atomic comment + release - Add POST /feedback/api/threads/{id}/release HTTP endpoint - Add claim badges and release button to browser overlay - Handle thread_claimed/thread_released events in frontend polling - Extract process_batch() for testable batch orchestration logic - 18 new tests (7 store-level, 11 batch-logic) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 89b66f4 commit 13bc99a

File tree

9 files changed

+849
-36
lines changed

9 files changed

+849
-36
lines changed

Cargo.lock

Lines changed: 5 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/veld-core/src/feedback.rs

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,12 @@ pub struct Thread {
9090
/// The seq of the last message the human has viewed in this thread.
9191
#[serde(default, skip_serializing_if = "Option::is_none")]
9292
pub last_human_seen_seq: Option<u64>,
93+
/// Which agent has claimed this thread (agent name/id), if any.
94+
#[serde(default, skip_serializing_if = "Option::is_none")]
95+
pub claimed_by: Option<String>,
96+
/// When the claim was created.
97+
#[serde(default, skip_serializing_if = "Option::is_none")]
98+
pub claimed_at: Option<DateTime<Utc>>,
9399
#[serde(default, skip_serializing_if = "Option::is_none")]
94100
pub viewport_width: Option<u32>,
95101
#[serde(default, skip_serializing_if = "Option::is_none")]
@@ -127,6 +133,8 @@ pub enum EventType {
127133
AgentThreadCreated { thread: Thread },
128134
AgentListening,
129135
AgentStopped,
136+
ThreadClaimed { thread_id: String, agent_id: String },
137+
ThreadReleased { thread_id: String, agent_id: String },
130138
}
131139

132140
// ---------------------------------------------------------------------------
@@ -304,6 +312,72 @@ impl FeedbackStore {
304312
Ok(())
305313
}
306314

315+
/// Claim a thread atomically. Succeeds if unclaimed or already claimed by the same agent.
316+
/// Returns Err if already claimed by a different agent.
317+
pub fn claim_thread(&self, thread_id: &str, agent_id: &str) -> anyhow::Result<Thread> {
318+
let aid = agent_id.to_owned();
319+
self.modify_thread(thread_id, move |thread| {
320+
if let Some(ref existing) = thread.claimed_by {
321+
if existing != &aid {
322+
// Will be caught after modify_thread returns — see below.
323+
return;
324+
}
325+
}
326+
thread.claimed_by = Some(aid);
327+
thread.claimed_at = Some(Utc::now());
328+
})
329+
.and_then(|thread| {
330+
// Check if the claim actually took effect.
331+
if thread.claimed_by.as_deref() == Some(agent_id) {
332+
Ok(thread)
333+
} else {
334+
anyhow::bail!(
335+
"thread {} already claimed by {}",
336+
thread_id,
337+
thread.claimed_by.as_deref().unwrap_or("unknown")
338+
)
339+
}
340+
})
341+
}
342+
343+
/// Release a claim. If `agent_id` is Some, only releases if it matches the claimer.
344+
/// If `agent_id` is None, force-releases (for UI button use).
345+
pub fn release_thread(
346+
&self,
347+
thread_id: &str,
348+
agent_id: Option<&str>,
349+
) -> anyhow::Result<Thread> {
350+
let aid = agent_id.map(|s| s.to_owned());
351+
self.modify_thread(thread_id, move |thread| {
352+
if let Some(ref required) = aid {
353+
if thread.claimed_by.as_deref() != Some(required.as_str()) {
354+
return; // Don't release — wrong agent.
355+
}
356+
}
357+
thread.claimed_by = None;
358+
thread.claimed_at = None;
359+
})
360+
.and_then(|thread| {
361+
if let Some(required) = agent_id {
362+
// If the caller specified an agent_id, verify the release happened.
363+
if thread.claimed_by.is_some() {
364+
anyhow::bail!(
365+
"thread {} is claimed by {}, not {}",
366+
thread_id,
367+
thread.claimed_by.as_deref().unwrap_or("unknown"),
368+
required
369+
);
370+
}
371+
}
372+
Ok(thread)
373+
})
374+
}
375+
376+
/// Check if a thread is currently claimed.
377+
pub fn is_claimed(thread: &Thread) -> bool {
378+
thread.claimed_by.is_some()
379+
}
380+
307381
// -- Event log ------------------------------------------------------------
308382

309383
/// Atomically increment the sequence counter and return the new value.
@@ -509,6 +583,8 @@ pub fn new_thread(
509583
status: ThreadStatus::Open,
510584
messages: vec![initial_message],
511585
last_human_seen_seq: None,
586+
claimed_by: None,
587+
claimed_at: None,
512588
viewport_width,
513589
viewport_height,
514590
created_at: now,
@@ -889,4 +965,138 @@ mod tests {
889965
assert_eq!(all_seqs[0], 1);
890966
assert_eq!(*all_seqs.last().unwrap(), 100);
891967
}
968+
969+
#[test]
970+
fn test_claim_thread() {
971+
let tmp = TempDir::new().unwrap();
972+
let store = make_store(&tmp);
973+
974+
let t = make_thread("Needs fixing");
975+
store.save_thread(&t).unwrap();
976+
977+
let claimed = store.claim_thread(&t.id, "agent-1").unwrap();
978+
assert_eq!(claimed.claimed_by.as_deref(), Some("agent-1"));
979+
assert!(claimed.claimed_at.is_some());
980+
981+
// Same agent can re-claim (idempotent).
982+
let reclaimed = store.claim_thread(&t.id, "agent-1").unwrap();
983+
assert_eq!(reclaimed.claimed_by.as_deref(), Some("agent-1"));
984+
}
985+
986+
#[test]
987+
fn test_claim_already_claimed() {
988+
let tmp = TempDir::new().unwrap();
989+
let store = make_store(&tmp);
990+
991+
let t = make_thread("Contested");
992+
store.save_thread(&t).unwrap();
993+
994+
store.claim_thread(&t.id, "agent-1").unwrap();
995+
996+
// Different agent cannot claim.
997+
let err = store.claim_thread(&t.id, "agent-2").unwrap_err();
998+
assert!(err.to_string().contains("already claimed by agent-1"));
999+
}
1000+
1001+
#[test]
1002+
fn test_claim_no_expiry() {
1003+
let tmp = TempDir::new().unwrap();
1004+
let store = make_store(&tmp);
1005+
1006+
let t = make_thread("Long task");
1007+
store.save_thread(&t).unwrap();
1008+
1009+
store.claim_thread(&t.id, "agent-1").unwrap();
1010+
1011+
// Claim persists — another agent still cannot claim.
1012+
let err = store.claim_thread(&t.id, "agent-2").unwrap_err();
1013+
assert!(err.to_string().contains("already claimed"));
1014+
}
1015+
1016+
#[test]
1017+
fn test_release_thread() {
1018+
let tmp = TempDir::new().unwrap();
1019+
let store = make_store(&tmp);
1020+
1021+
let t = make_thread("Will release");
1022+
store.save_thread(&t).unwrap();
1023+
1024+
store.claim_thread(&t.id, "agent-1").unwrap();
1025+
1026+
let released = store.release_thread(&t.id, Some("agent-1")).unwrap();
1027+
assert!(released.claimed_by.is_none());
1028+
assert!(released.claimed_at.is_none());
1029+
1030+
// Now another agent can claim.
1031+
let claimed = store.claim_thread(&t.id, "agent-2").unwrap();
1032+
assert_eq!(claimed.claimed_by.as_deref(), Some("agent-2"));
1033+
}
1034+
1035+
#[test]
1036+
fn test_release_wrong_agent() {
1037+
let tmp = TempDir::new().unwrap();
1038+
let store = make_store(&tmp);
1039+
1040+
let t = make_thread("Guarded");
1041+
store.save_thread(&t).unwrap();
1042+
1043+
store.claim_thread(&t.id, "agent-1").unwrap();
1044+
1045+
let err = store.release_thread(&t.id, Some("agent-2")).unwrap_err();
1046+
assert!(err.to_string().contains("claimed by agent-1"));
1047+
}
1048+
1049+
#[test]
1050+
fn test_force_release() {
1051+
let tmp = TempDir::new().unwrap();
1052+
let store = make_store(&tmp);
1053+
1054+
let t = make_thread("Force release");
1055+
store.save_thread(&t).unwrap();
1056+
1057+
store.claim_thread(&t.id, "agent-1").unwrap();
1058+
1059+
// Force release (no agent_id check) — for UI button.
1060+
let released = store.release_thread(&t.id, None).unwrap();
1061+
assert!(released.claimed_by.is_none());
1062+
}
1063+
1064+
#[test]
1065+
fn test_is_claimed() {
1066+
let t = make_thread("Check claimed");
1067+
assert!(!FeedbackStore::is_claimed(&t));
1068+
1069+
let mut t2 = t;
1070+
t2.claimed_by = Some("agent-1".into());
1071+
t2.claimed_at = Some(Utc::now());
1072+
assert!(FeedbackStore::is_claimed(&t2));
1073+
}
1074+
1075+
#[test]
1076+
fn test_claim_event_types_serde() {
1077+
let event = Event {
1078+
seq: 1,
1079+
event_type: EventType::ThreadClaimed {
1080+
thread_id: "t_abc".into(),
1081+
agent_id: "agent-1".into(),
1082+
},
1083+
timestamp: Utc::now(),
1084+
};
1085+
let json = serde_json::to_string(&event).unwrap();
1086+
assert!(json.contains(r#""event":"thread_claimed"#));
1087+
assert!(json.contains(r#""agent_id":"agent-1"#));
1088+
let _: Event = serde_json::from_str(&json).unwrap();
1089+
1090+
let event = Event {
1091+
seq: 2,
1092+
event_type: EventType::ThreadReleased {
1093+
thread_id: "t_abc".into(),
1094+
agent_id: "agent-1".into(),
1095+
},
1096+
timestamp: Utc::now(),
1097+
};
1098+
let json = serde_json::to_string(&event).unwrap();
1099+
assert!(json.contains(r#""event":"thread_released"#));
1100+
let _: Event = serde_json::from_str(&json).unwrap();
1101+
}
8921102
}

crates/veld-daemon/frontend/src/feedback-overlay.css

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,22 @@
525525
font-size: 10px; color: var(--vf-text-muted); margin-top: 3px;
526526
white-space: nowrap; overflow: hidden; text-overflow: ellipsis;
527527
}
528+
.veld-feedback-thread-card-claim-badge {
529+
font-size: 10px; color: var(--vf-primary); margin-top: 4px;
530+
display: flex; align-items: center; gap: 4px;
531+
}
532+
.veld-feedback-thread-card-claimed {
533+
border-left: 3px solid var(--vf-primary);
534+
}
535+
.veld-feedback-thread-detail-claim {
536+
display: flex; align-items: center; justify-content: space-between;
537+
padding: 6px 10px; margin-bottom: 8px;
538+
background: color-mix(in srgb, var(--vf-primary) 10%, transparent);
539+
border-radius: 6px; font-size: 11px;
540+
}
541+
.veld-feedback-thread-detail-claim-text {
542+
color: var(--vf-primary); font-weight: 500;
543+
}
528544

529545
/* --- Thread messages --- */
530546
.veld-feedback-thread-messages {
@@ -578,6 +594,7 @@
578594
align-items: center; gap: 8px;
579595
}
580596
.veld-feedback-listening-dot {
597+
display: block;
581598
width: 10px; height: 10px; border-radius: 50%;
582599
background: var(--vf-accent); flex-shrink: 0;
583600
animation: veld-feedback-pulse-dot 2s ease-in-out infinite;

crates/veld-daemon/frontend/src/feedback-overlay/panel.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,23 @@ function renderThreadDetail(thread: Thread): void {
160160

161161
refs.panelBody.appendChild(header);
162162

163+
if (thread.claimed_by) {
164+
const claimRow = mkEl("div", "thread-detail-claim");
165+
claimRow.appendChild(mkEl("span", "thread-detail-claim-text", "\u2699 Being worked on by " + thread.claimed_by));
166+
const releaseBtn = mkEl("button", "btn btn-secondary btn-sm", "Release");
167+
releaseBtn.addEventListener("click", function () {
168+
api("POST", "/threads/" + thread.id + "/release").then(function () {
169+
thread.claimed_by = null;
170+
thread.claimed_at = null;
171+
dispatch({ type: "SET_THREADS", threads: [...getState().threads] });
172+
renderPanel();
173+
toast("Thread released");
174+
});
175+
});
176+
claimRow.appendChild(releaseBtn);
177+
refs.panelBody.appendChild(claimRow);
178+
}
179+
163180
if (thread.status === "resolved") {
164181
const msgList = mkEl("div", "thread-messages-list");
165182
thread.messages.forEach(function (msg: Message) {
@@ -262,6 +279,12 @@ function makeThreadCard(thread: Thread, isResolved: boolean): HTMLElement {
262279
card.appendChild(mkEl("div", "thread-card-selector", thread.scope.selector));
263280
}
264281

282+
if (thread.claimed_by) {
283+
const claimBadge = mkEl("div", "thread-card-claim-badge", "\u2699 " + thread.claimed_by);
284+
card.appendChild(claimBadge);
285+
card.classList.add(PREFIX + "thread-card-claimed");
286+
}
287+
265288
card.addEventListener("click", function () { showThreadDetail(thread.id); });
266289
return card;
267290
}

crates/veld-daemon/frontend/src/feedback-overlay/polling.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,28 @@ function handleEvent(event: FeedbackEvent): void {
5757
dispatch({ type: "SET_LISTENING", listening: false });
5858
updateListeningModule();
5959
break;
60+
case "thread_claimed":
61+
if (event.thread_id) {
62+
const clThread = findThread(getState().threads, event.thread_id);
63+
if (clThread) {
64+
clThread.claimed_by = event.agent_id || null;
65+
clThread.claimed_at = new Date().toISOString();
66+
dispatch({ type: "SET_THREADS", threads: [...getState().threads] });
67+
if (getState().panelOpen) deps().renderPanel();
68+
}
69+
}
70+
break;
71+
case "thread_released":
72+
if (event.thread_id) {
73+
const rlThread = findThread(getState().threads, event.thread_id);
74+
if (rlThread) {
75+
rlThread.claimed_by = null;
76+
rlThread.claimed_at = null;
77+
dispatch({ type: "SET_THREADS", threads: [...getState().threads] });
78+
if (getState().panelOpen) deps().renderPanel();
79+
}
80+
}
81+
break;
6082
case "thread_created":
6183
if (event.thread && !findThread(getState().threads, event.thread.id)) {
6284
dispatch({ type: "ADD_THREAD", thread: event.thread });

0 commit comments

Comments
 (0)