diff --git a/CHANGELOG.md b/CHANGELOG.md index 5236f11..89b48a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,25 @@ All notable changes to `@honcho-ai/openclaw-honcho` will be documented in this file. +## [1.4.0] - 2026-04-27 + +Per-sender participant peers when the channel emits `sender_id` (group chats +and many 1:1s — see README). No sender metadata → `owner` peer (operator/system +fallback: CLI runs, webchat-direct, cron, etc.). Routing controlled by +`defaultUnknownPolicy` in `~/.honcho/openclaw-peers.json`: existing installs +default to `"owner"` (pre-1.4.0 merge behavior); fresh installs default to +`"per-sender"`. + +### Added +- **Multi-peer**: `extractSenderId` parses `sender_id` from each inbound `Conversation info` block; both capture and `before_prompt_build` resolve a peer per sender from the current message, so the right participant is targeted on every turn — including when the speaker changes between turns. Peer IDs derive from the channel ID, sanitized to `[A-Za-z0-9_-]` and truncated to Honcho's 100-char limit. Switching `defaultUnknownPolicy` after the fact only affects new messages — already-captured messages stay where they originally landed. +- **`-p, --peer ` on `honcho ask` and `honcho search`**: query memory from a specific participant's perspective. +- **Peers file `~/.honcho/openclaw-peers.json`**: `sender_id` → peer ID map, auto-seeded by the plugin and hand-editable. Top-level `defaultUnknownPolicy` (`per-sender` for fresh installs, `owner` for legacy files without the field) controls auto-seeding; auto-seeded peers get `autoSeeded: true` metadata. Override path via `OPENCLAW_HONCHO_PEERS_FILE`. +- **Session metadata `participantSenderId`**: last active sender, used by tools to resolve the session's current participant peer. + +### Fixed +- **`ensureInitialized()` race could corrupt workspace metadata**: now guarded by a shared init promise. +- **`crossSessionSearch` rejected by OpenClaw config validation**: option existed in code since 1.3.0 but was missing from the plugin manifest's `configSchema`/`uiHints`. Now declared. + ## [1.3.3] - 2026-04-16 ### Fixed diff --git a/README.md b/README.md index 2d348fd..7770480 100644 --- a/README.md +++ b/README.md @@ -110,13 +110,47 @@ Honcho's `observeOthers` controls whether a peer forms representations of other Set `ownerObserveOthers: true` to let the owner peer also observe agent messages. This gives Honcho perspective-aware memory: the owner stores conclusions about the agent based only on what it witnessed, enabling the user's representation to reflect the full conversational context rather than just their own side of it. +### Peer Mappings + +Map `sender_id` → Honcho peer ID in `~/.honcho/openclaw-peers.json` (override with `OPENCLAW_HONCHO_PEERS_FILE`). New senders are added automatically; edit `peers` to alias or merge identities, then **`openclaw gateway restart`** so the gateway reloads the file. + +```json +{ + "version": 1, + "defaultUnknownPolicy": "per-sender", + "peers": { + "U0EXAMPLE01": "user", + "telegram-1234567890": "user" + } +} +``` + +- **`defaultUnknownPolicy`** controls how unknown `sender_id`s are seeded into `peers`: + - `per-sender` — default for fresh installs. Each new sender becomes its own peer; the seeded peer ID is the `sender_id` sanitized to `[A-Za-z0-9_-]` and truncated to Honcho's 100-char limit. + - `owner` — default for pre-existing files missing the field (preserves legacy behavior). All unknown senders merge into the owner peer. +- **Auto-seeded, manually overridable.** The plugin only adds entries for senders not already in the map. +- **Adding a mapping after messages exist splits history.** Messages already stored under the original peer stay there; new messages land under the new peer. Remap before the peer accumulates history. + +### Multi-Peer Participants + +In group chats (Discord, Slack, etc.), the plugin extracts the sender's platform ID from each inbound message and uses it directly as the Honcho peer ID. This gives every participant — humans and any other bots in the room — their own memory and representation in Honcho, rather than attributing all non-agent messages to a single generic peer. + +**How it works:** +- The plugin reads the `sender_id` field from OpenClaw's "Conversation info (untrusted metadata):" block, which OpenClaw injects on every inbound message that has a known sender — including 1-on-1 DMs on platforms like Telegram, not just group chats. +- Each distinct sender ID becomes its own Honcho peer (e.g., `U07KX7DG002` becomes the Honcho peer ID directly, sanitized to `[A-Za-z0-9_-]`). You can alias a sender to a friendlier peer ID by editing the [peers file](#peer-mappings). +- The default `owner` peer is used as a fallback when a message has no sender metadata at all (e.g., synthetic/system messages, or channel integrations that don't emit a `Conversation info` block), and — on legacy installs whose peers file uses `defaultUnknownPolicy: "owner"` — for any unknown sender. On fresh installs (`per-sender` policy) and platforms like Telegram, even DMs are attributed to the sender's own peer, not `owner`. +- Each OpenClaw agent gets its own Honcho peer (default `agent-{id}`, e.g., `agent-main`). +- All tools (`honcho_context`, `honcho_ask`, etc.) automatically resolve the correct peer for the current session. + +Both message *attribution* (capture) and *context injection* (`before_prompt_build`) read `sender_id` directly from the current inbound message's metadata block, so the right participant peer is used from the very first turn — and on every turn in group chats, even when the speaker changes between turns. Sessions whose channel never emits sender metadata (no `Conversation info` block) stay attributed to `owner`. + ## How it works Once installed, the plugin works automatically: - **Message Observation** — After every AI turn, the conversation is persisted to Honcho. Both user and agent messages are observed, allowing Honcho to build and refine its models. Message capture starts when the plugin is active for a session, and preserves original timestamps for captured messages. Messages are also flushed before session compaction and `/new`/`/reset`, so no conversation data is lost. - **Tool-Based Context Access** — The AI can query Honcho mid-conversation using tools like `honcho_context`, `honcho_search_conclusions`, and `honcho_ask` to retrieve relevant context about the user. Context is injected during OpenClaw's `before_prompt_build` phase, ensuring accurate turn boundaries. -- **Dual Peer Model** — Honcho maintains separate representations: one for the user (preferences, facts, communication style) and one for the agent (personality, learned behaviors). Each OpenClaw agent gets its own Honcho peer (`agent-{id}`), so multi-agent workspaces maintain isolated memory. +- **Multi-Peer Model** — Honcho maintains separate representations for each participant. Whenever an inbound message carries a `sender_id` (group chats, and DMs on platforms like Telegram), that sender gets their own peer, using their platform ID directly as the Honcho peer ID (or aliased via the [peers file](#peer-mappings) if configured). Each OpenClaw agent gets its own Honcho peer (default `agent-{id}`). The default `owner` peer is used as a fallback when a channel emits no sender metadata, and — on legacy installs whose peers file uses `defaultUnknownPolicy: "owner"` — for any unknown sender. **Migration boundary:** historical turns already attributed to `owner` (or to any prior peer ID) are not retroactively re-attributed when the plugin upgrades or when `peers` / `defaultUnknownPolicy` change. Only new inbound `sender_id`s create per-sender peers, so pre-existing sessions may show mixed attribution across the rollout. This gives every participant isolated, personalized memory going forward. - **Clean Persistence** — Platform metadata (conversation info, sender headers, thread context, forwarded messages) is stripped before saving to Honcho, ensuring only meaningful content is persisted. Noise messages (heartbeat acks, cron boilerplate, startup commands) are dropped entirely via configurable pattern filters. Honcho handles all reasoning and synthesis in the cloud. diff --git a/commands/cli.ts b/commands/cli.ts index cc62bd8..426f85b 100644 --- a/commands/cli.ts +++ b/commands/cli.ts @@ -451,11 +451,13 @@ export function registerCli(api: OpenClawPluginApi, state: PluginState): void { .command("ask ") .description("Ask Honcho about the user") .option("-a, --agent ", "Agent ID to query as (default: primary agent)") - .action(async (question: string, options: { agent?: string }) => { + .option("-p, --peer ", "Channel peer ID or Honcho peer ID to target (default: owner)") + .action(async (question: string, options: { agent?: string; peer?: string }) => { try { await state.ensureInitialized(); const agentPeer = await state.getAgentPeer(options.agent ?? state.resolveDefaultAgentId()); - const answer = await agentPeer.chat(question, { target: state.ownerPeer! }); + const participantPeer = await state.getParticipantPeer(options.peer); + const answer = await agentPeer.chat(question, { target: participantPeer }); console.log(answer ?? "No information available."); } catch (error) { console.error(`Failed to query: ${error}`); @@ -467,10 +469,12 @@ export function registerCli(api: OpenClawPluginApi, state: PluginState): void { .description("Semantic search over Honcho memory") .option("-k, --top-k ", "Number of results to return", "10") .option("-d, --max-distance ", "Maximum semantic distance (0-1)", "0.5") - .action(async (query: string, options: { topK: string; maxDistance: string }) => { + .option("-p, --peer ", "Channel peer ID or Honcho peer ID to target (default: owner)") + .action(async (query: string, options: { topK: string; maxDistance: string; peer?: string }) => { try { await state.ensureInitialized(); - const representation = await state.ownerPeer!.representation({ + const participantPeer = await state.getParticipantPeer(options.peer); + const representation = await participantPeer.representation({ searchQuery: query, searchTopK: parseInt(options.topK, 10), searchMaxDistance: parseFloat(options.maxDistance), diff --git a/helpers.ts b/helpers.ts index 0f4c200..2d030fb 100644 --- a/helpers.ts +++ b/helpers.ts @@ -4,6 +4,26 @@ import type { Peer, MessageInput } from "@honcho-ai/sdk"; +type ContentBlock = { type?: string; text?: unknown }; +type RawMessage = { role?: string; content?: string | ContentBlock[]; timestamp?: number }; + +/** + * Extract plain text from a message's `content` (string or array of content blocks). + * Returns "" for non-message inputs or messages with no text blocks. + */ +export function getRawContent(msg: unknown): string { + if (!msg || typeof msg !== "object") return ""; + const { content } = msg as RawMessage; + if (typeof content === "string") return content; + if (!Array.isArray(content)) return ""; + return content + .filter((b): b is ContentBlock & { text: string } => + !!b && b.type === "text" && typeof b.text === "string", + ) + .map((b) => b.text) + .join("\n"); +} + /** * Build a Honcho session key from OpenClaw context. * Combines sessionKey + messageProvider to create unique sessions per platform. @@ -142,6 +162,49 @@ export function cleanMessageContent(content: string): string { return cleaned.trim(); } +const CONVERSATION_INFO_SENTINEL = "Conversation info (untrusted metadata):"; + +/** + * Extract the sender_id from a raw message's "Conversation info (untrusted metadata):" + * metadata block. Must be called BEFORE cleanMessageContent() which strips these blocks. + * Returns undefined for DMs (no metadata block) or on parse failure. + * + * Only considers the FIRST occurrence of the sentinel to prevent user-pasted or quoted + * metadata blocks from poisoning sender attribution. + */ +export function extractSenderId(content: string): string | undefined { + if (!content || !content.includes(CONVERSATION_INFO_SENTINEL)) return undefined; + + const lines = content.split("\n"); + let found = false; + for (let i = 0; i < lines.length; i++) { + if (lines[i].trim() !== CONVERSATION_INFO_SENTINEL) continue; + if (found) return undefined; // Ignore duplicate sentinels (likely user-pasted content) + found = true; + if (lines[i + 1]?.trim() !== "```json") continue; + + // Collect JSON lines between ```json and ``` + const jsonLines: string[] = []; + for (let j = i + 2; j < lines.length; j++) { + if (lines[j].trim() === "```") break; + jsonLines.push(lines[j]); + } + + try { + const parsed = JSON.parse(jsonLines.join("\n")); + // Try sender_id first, fall back to sender + const id = parsed.sender_id ?? parsed.sender; + if (typeof id === "string" && id.length > 0) { + return id; + } + } catch { + // Malformed JSON — return undefined + } + return undefined; + } + return undefined; +} + /** * Returns true if the message should be dropped entirely. * Patterns starting with "/" are treated as anchored regexes (e.g. "/^HEARTBEAT/i"). @@ -167,9 +230,10 @@ export function shouldSkipMessage(content: string, noisePatterns: string[]): boo export function extractMessages( rawMessages: unknown[], - ownerPeer: Peer, + defaultParticipantPeer: Peer, agentPeer: Peer, - noisePatterns: string[] = [] + noisePatterns: string[] = [], + resolvePeer?: (senderId: string) => Peer | undefined, ): MessageInput[] { const result: MessageInput[] = []; @@ -180,33 +244,25 @@ export function extractMessages( if (role !== "user" && role !== "assistant") continue; - let content = ""; - if (typeof m.content === "string") { - content = m.content; - } else if (Array.isArray(m.content)) { - content = m.content - .filter( - (block: unknown) => - typeof block === "object" && - block !== null && - (block as Record).type === "text" - ) - .map((block: unknown) => (block as Record).text) - .filter((t): t is string => typeof t === "string") - .join("\n"); + const rawContent = getRawContent(msg); + + // For user messages, extract sender ID before cleaning strips metadata + let peer: Peer; + if (role === "user") { + const senderId = extractSenderId(rawContent); + peer = (senderId && resolvePeer?.(senderId)) || defaultParticipantPeer; + } else { + peer = agentPeer; } - content = cleanMessageContent(content); + let content = cleanMessageContent(rawContent); content = content.trim(); if (!content) continue; if (shouldSkipMessage(content, noisePatterns)) continue; - if (content) { - const peer = role === "user" ? ownerPeer : agentPeer; - const ts = typeof m.timestamp === "number" ? new Date(m.timestamp) : undefined; - result.push(peer.message(content, ts ? { createdAt: ts } : undefined)); - } + const ts = typeof m.timestamp === "number" ? new Date(m.timestamp) : undefined; + result.push(peer.message(content, ts ? { createdAt: ts } : undefined)); } return result; diff --git a/hooks/capture.ts b/hooks/capture.ts index f080d7c..4453eb0 100644 --- a/hooks/capture.ts +++ b/hooks/capture.ts @@ -6,6 +6,8 @@ import { buildSessionKey, isSubagentSession, extractMessages, + extractSenderId, + getRawContent, } from "../helpers.js"; import { subagentParentMap } from "./subagent.js"; @@ -55,30 +57,90 @@ async function flushMessages( const lastSavedIndex = Math.min(Math.max(rawLastSavedIndex, 0), messages.length); const startIndex = Math.max(turnStartIndex, lastSavedIndex); - const peerConfigs: Array<[string, { observeMe: boolean; observeOthers: boolean }]> = [ - [OWNER_ID, { observeMe: true, observeOthers: state.cfg.ownerObserveOthers }], - [agentPeer.id, { observeMe: true, observeOthers: true }], - ]; + if (messages.length <= startIndex) { + return 0; + } + + const newRawMessages = messages.slice(startIndex); + + // Pre-resolve participant peers for all unique sender IDs in this batch + const senderIds = new Set(); + let lastSenderId: string | undefined; + let userMsgCount = 0; + for (const msg of newRawMessages) { + if (!msg || typeof msg !== "object") continue; + const m = msg as Record; + if (m.role !== "user") continue; + userMsgCount++; + const rawContent = getRawContent(msg); + const senderId = extractSenderId(rawContent); + if (senderId) { + senderIds.add(senderId); + lastSenderId = senderId; + } else { + const hasConvInfo = rawContent.includes("Conversation info (untrusted metadata):"); + api.logger.debug?.(`[honcho] User message without sender_id (hasConvInfo=${hasConvInfo}, contentLen=${rawContent.length})`); + } + } + if (senderIds.size > 0) { + api.logger.debug?.(`[honcho] Resolved ${senderIds.size} unique sender(s) from ${userMsgCount} user message(s)`); + } + + // Parallel peer resolution — avoids sequential await bottleneck in group chats. + const resolvedPeers = new Map>>(); + const senderIdArray = [...senderIds]; + const peers = await Promise.all(senderIdArray.map((id) => state.getParticipantPeer(id))); + for (let i = 0; i < senderIdArray.length; i++) { + resolvedPeers.set(senderIdArray[i], peers[i]); + } + + const defaultParticipantPeer = await state.getParticipantPeer(); + + // Build peer configs: default owner + all resolved participant peers + agent + parent + const peerConfigMap = new Map(); + peerConfigMap.set(OWNER_ID, { observeMe: true, observeOthers: state.cfg.ownerObserveOthers }); + for (const [, peer] of resolvedPeers) { + if (peer.id !== OWNER_ID) { + peerConfigMap.set(peer.id, { observeMe: true, observeOthers: state.cfg.ownerObserveOthers }); + } + } + peerConfigMap.set(agentPeer.id, { observeMe: true, observeOthers: true }); if (parentPeer) { - peerConfigs.push([parentPeer.id, { observeMe: false, observeOthers: true }]); + peerConfigMap.set(parentPeer.id, { observeMe: false, observeOthers: true }); } + const peerConfigs = Array.from(peerConfigMap.entries()) as Array< + [string, { observeMe: boolean; observeOthers: boolean }] + >; await session.addPeers(peerConfigs); - if (messages.length <= startIndex) { - return 0; - } + const extracted = extractMessages( + newRawMessages, + defaultParticipantPeer, + agentPeer, + state.cfg.noisePatterns, + (senderId) => resolvedPeers.get(senderId), + ); - const newRawMessages = messages.slice(startIndex); - const extracted = extractMessages(newRawMessages, state.ownerPeer!, agentPeer, state.cfg.noisePatterns); + // participantSenderId = last active sender, used by tools to resolve the + // session's current participant peer. Named "sender" (not "peer") to + // distinguish raw channel IDs from resolved Honcho peer IDs. + const updatedMeta: Record = { + ...existingMeta, + ...sessionMeta, + lastSavedIndex: messages.length, + }; + if (lastSenderId) { + updatedMeta.participantSenderId = lastSenderId; + } if (extracted.length === 0) { - await session.setMetadata({ ...existingMeta, ...sessionMeta, lastSavedIndex: messages.length }); + await session.setMetadata(updatedMeta); return 0; } await session.addMessages(extracted); - await session.setMetadata({ ...existingMeta, ...sessionMeta, lastSavedIndex: messages.length }); + await session.setMetadata(updatedMeta); return extracted.length; } diff --git a/hooks/context.ts b/hooks/context.ts index c76dd50..7b762a8 100644 --- a/hooks/context.ts +++ b/hooks/context.ts @@ -1,7 +1,7 @@ // @ts-ignore - resolved by openclaw runtime import type { OpenClawPluginApi } from "openclaw/plugin-sdk"; import type { PluginState } from "../state.js"; -import { buildSessionKey, isSubagentSession } from "../helpers.js"; +import { buildSessionKey, extractSenderId, isSubagentSession } from "../helpers.js"; export function registerContextHook(api: OpenClawPluginApi, state: PluginState): void { api.on("before_prompt_build", async (event, ctx) => { @@ -16,12 +16,20 @@ export function registerContextHook(api: OpenClawPluginApi, state: PluginState): try { await state.ensureInitialized(); const agentPeer = await state.getAgentPeer(agentId); + // Prefer the sender of the current inbound message — capture has not + // run yet for this turn, so session metadata still reflects the previous + // speaker. In group chats this would otherwise build context against the + // prior participant's representation whenever the speaker changes. + const currentSenderId = extractSenderId(event.prompt); + const participantPeer = currentSenderId + ? await state.getParticipantPeer(currentSenderId) + : await state.resolveSessionParticipantPeer(sessionKey); const sections: string[] = []; if (isSubagent) { try { - const peerCtx = await agentPeer.context({ target: state.ownerPeer! }); + const peerCtx = await agentPeer.context({ target: participantPeer }); if (peerCtx.peerCard?.length) { sections.push(`Key facts:\n${peerCtx.peerCard.map((f: string) => `• ${f}`).join("\n")}`); } @@ -43,7 +51,7 @@ export function registerContextHook(api: OpenClawPluginApi, state: PluginState): context = await session.context({ summary: true, tokens: 2000, - peerTarget: state.ownerPeer!, + peerTarget: participantPeer, peerPerspective: agentPeer, }); } catch (e: unknown) { diff --git a/hooks/gateway.ts b/hooks/gateway.ts index f672a95..b9df356 100644 --- a/hooks/gateway.ts +++ b/hooks/gateway.ts @@ -7,7 +7,12 @@ export function registerGatewayHook(api: OpenClawPluginApi, state: PluginState): api.logger.info("Initializing Honcho memory..."); try { await state.ensureInitialized(); - api.logger.info("Honcho memory ready"); + const { filePath, peers } = state.peersPersister; + api.logger.info( + `Honcho memory ready — peer map: ${filePath} (${Object.keys(peers).length} known sender${ + Object.keys(peers).length === 1 ? "" : "s" + })`, + ); } catch (error) { api.logger.error(`Failed to initialize Honcho: ${error}`); } diff --git a/openclaw.plugin.json b/openclaw.plugin.json index 9f5a36c..5bb8d75 100644 --- a/openclaw.plugin.json +++ b/openclaw.plugin.json @@ -36,6 +36,11 @@ "type": "boolean", "default": false, "description": "When true, built-in noise patterns are not applied — only noisePatterns entries are used." + }, + "crossSessionSearch": { + "type": "boolean", + "default": true, + "description": "When true (default), memory_search/memory_get can return results from any session in the workspace. When false, results are scoped to the active session and its child sessions only." } } }, @@ -76,6 +81,11 @@ "label": "Disable Default Noise Patterns", "advanced": true, "help": "When enabled, only custom noisePatterns entries are used — built-in defaults are skipped." + }, + "crossSessionSearch": { + "label": "Cross-Session Search", + "advanced": true, + "help": "When enabled (default), memory tools can return results from any session in the workspace. Disable to scope results to the active session only." } } } diff --git a/package.json b/package.json index de38373..2af9c34 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@honcho-ai/openclaw-honcho", - "version": "1.3.3", + "version": "1.4.0", "type": "module", "description": "Honcho AI-native memory integration for OpenClaw", "main": "dist/index.js", diff --git a/peers.ts b/peers.ts new file mode 100644 index 0000000..f9ce2ba --- /dev/null +++ b/peers.ts @@ -0,0 +1,222 @@ +/** + * Sender_id → Honcho peer_id map, persisted at ~/.honcho/openclaw-peers.json. + * + * Plugin auto-seeds unknown senders per `defaultUnknownPolicy`: + * "owner" → strangers merge into the OWNER_ID peer (legacy contract). + * "per-sender" → each stranger gets a peer derived from its sender_id + * (sanitized + truncated to satisfy Honcho's RESOURCE_NAME_PATTERN). + * Fresh installs (file missing) default to "per-sender"; pre-existing files + * (no policy field) keep "owner" so legacy users see no behavior change. + * + * Restart reloads from disk. Flush merges disk before write (disk wins conflicts) + * so concurrent file edits are not overwritten. OPENCLAW_HONCHO_PEERS_FILE overrides path. + */ + +import { promises as fs, readFileSync } from "node:fs"; +import os from "node:os"; +import path from "node:path"; + +export const PEERS_FILE_VERSION = 1; +/** Honcho enforces RESOURCE_NAME_PATTERN = ^[a-zA-Z0-9_-]+$ with 1..100 length on peer IDs. */ +const HONCHO_PEER_ID_MAX_LEN = 100; + +export type DefaultUnknownPolicy = "owner" | "per-sender"; + +export type PeersFile = { + version: typeof PEERS_FILE_VERSION; + defaultUnknownPolicy: DefaultUnknownPolicy; + peers: Record; +}; + +export function resolvePeersFilePath(): string { + const envPath = process.env.OPENCLAW_HONCHO_PEERS_FILE; + if (envPath && envPath.trim().length > 0) return envPath.trim(); + return path.join(os.homedir(), ".honcho", "openclaw-peers.json"); +} + +function parsePeersJson(raw: string, filePath: string): PeersFile { + try { + const parsed = JSON.parse(raw); + if (parsed && typeof parsed === "object" && !Array.isArray(parsed)) { + const obj = parsed as Record; + const peers = + obj.peers && typeof obj.peers === "object" && !Array.isArray(obj.peers) + ? coerceStringMap(obj.peers as Record) + : {}; + return { + version: PEERS_FILE_VERSION, + defaultUnknownPolicy: obj.defaultUnknownPolicy === "per-sender" ? "per-sender" : "owner", + peers, + }; + } + console.warn( + `peers file ${filePath}: top-level value is not an object; falling back to legacy owner policy with empty map`, + ); + } catch (err) { + console.warn( + `peers file ${filePath}: ${(err as Error).message}; falling back to legacy owner policy with empty map`, + ); + } + // Existing-but-malformed file → preserve legacy contract. + return { version: PEERS_FILE_VERSION, defaultUnknownPolicy: "owner", peers: {} }; +} + +/** Read the peers file. Missing → per-sender (fresh install); anything else → owner (legacy). */ +export async function loadPeersFile(filePath: string): Promise { + try { + return parsePeersJson(await fs.readFile(filePath, "utf8"), filePath); + } catch (err) { + return (err as NodeJS.ErrnoException)?.code === "ENOENT" + ? { version: PEERS_FILE_VERSION, defaultUnknownPolicy: "per-sender", peers: {} } + : { version: PEERS_FILE_VERSION, defaultUnknownPolicy: "owner", peers: {} }; + } +} + +export function loadPeersFileSync(filePath: string): PeersFile { + try { + return parsePeersJson(readFileSync(filePath, "utf8"), filePath); + } catch (err) { + return (err as NodeJS.ErrnoException)?.code === "ENOENT" + ? { version: PEERS_FILE_VERSION, defaultUnknownPolicy: "per-sender", peers: {} } + : { version: PEERS_FILE_VERSION, defaultUnknownPolicy: "owner", peers: {} }; + } +} + +function coerceStringMap(value: Record): Record { + const out: Record = {}; + for (const [k, v] of Object.entries(value)) { + if (typeof v === "string" && v.length > 0) out[k] = v; + } + return out; +} + +/** + * Resolve an inbound sender_id to a Honcho peer ID. Known senders return their + * mapping; unknown senders auto-seed under the persister's policy and enqueue + * for persistence. Per-sender peer IDs are derived from the sender_id — + * sanitized to satisfy Honcho's RESOURCE_NAME_PATTERN and truncated to its + * 100-char limit. + */ +export function resolveParticipantPeerId( + senderId: string, + persister: PeersPersister, + ownerPeerId = "owner", +): string { + const mapped = persister.peers[senderId]; + if (mapped !== undefined) return mapped; + const seedPeerId = + persister.defaultUnknownPolicy === "per-sender" + ? senderId.replace(/[^A-Za-z0-9_-]/g, "_").slice(0, HONCHO_PEER_ID_MAX_LEN) + : ownerPeerId; + persister.enqueue(senderId, seedPeerId); + return seedPeerId; +} + +export type PeersPersisterOptions = { + /** Flush debounce window in milliseconds. Default 1000. */ + debounceMs?: number; +}; + +/** + * Debounced, serialized writer for the peers file. + * + * enqueue() is synchronous — it mutates the in-memory map and schedules a + * background flush. Multiple enqueues within the debounce window coalesce + * into a single write. Flushes are chained via a single promise so writes + * cannot interleave. + * + * Each flush() re-reads the file and merges `{ ...memory, ...disk }` so disk + * wins on conflicting sender_ids; keys only on disk or only in memory are + * kept. enqueue() never overwrites an existing mapping. + */ +export class PeersPersister { + public readonly peers: Record; + public readonly filePath: string; + public defaultUnknownPolicy: DefaultUnknownPolicy; + private readonly debounceMs: number; + private dirty = false; + private timer: ReturnType | null = null; + private chain: Promise = Promise.resolve(); + + constructor(filePath: string, initial: PeersFile, opts: PeersPersisterOptions = {}) { + this.filePath = filePath; + this.peers = { ...initial.peers }; + this.defaultUnknownPolicy = initial.defaultUnknownPolicy; + this.debounceMs = opts.debounceMs ?? 1000; + } + + /** Record a sender_id → peer_id mapping if absent. Schedules a debounced flush. */ + enqueue(senderId: string, peerId = "owner"): void { + if (!senderId) return; + if (this.peers[senderId] !== undefined) return; + this.peers[senderId] = peerId; + this.dirty = true; + if (this.timer) return; + this.timer = setTimeout(() => { + this.timer = null; + this.chain = this.chain.then(() => this.flush()).catch(() => undefined); + }, this.debounceMs); + // unref so the timer doesn't block process exit in short-lived runs. + this.timer.unref?.(); + } + + /** Flush any pending changes immediately. Safe to call concurrently with enqueue(). */ + async flushNow(): Promise { + if (this.timer) { + clearTimeout(this.timer); + this.timer = null; + } + const flushed = this.chain.then(() => this.flush()); + this.chain = flushed.catch(() => undefined); + await flushed; + } + + private async flush(): Promise { + if (!this.dirty) return; + while (this.dirty) { + this.dirty = false; + try { + await fs.mkdir(path.dirname(this.filePath), { recursive: true }); + const onDisk = await loadPeersFileForMerge(this.filePath, this.defaultUnknownPolicy); + const mergedPeers = { ...this.peers, ...onDisk.peers }; + this.defaultUnknownPolicy = onDisk.defaultUnknownPolicy; + replaceRecordInPlace(this.peers, mergedPeers); + + const body: PeersFile = { + version: PEERS_FILE_VERSION, + defaultUnknownPolicy: this.defaultUnknownPolicy, + peers: mergedPeers, + }; + await fs.writeFile(this.filePath, JSON.stringify(body, null, 2) + "\n"); + } catch (err) { + this.dirty = true; + throw err; + } + } + } +} + +/** Sync `target` to equal `source` (same keys and values). */ +function replaceRecordInPlace(target: Record, source: Record): void { + for (const k of Object.keys(target)) { + if (!(k in source)) delete target[k]; + } + Object.assign(target, source); +} + +/** + * Same as reading the peers file for parsing, except a missing file uses + * `memoryPolicy` instead of the fresh-install default (`per-sender`), so the + * first write does not clobber the policy loaded at boot from an in-memory-only state. + */ +async function loadPeersFileForMerge(filePath: string, memoryPolicy: DefaultUnknownPolicy): Promise { + try { + return parsePeersJson(await fs.readFile(filePath, "utf8"), filePath); + } catch (err) { + const code = (err as NodeJS.ErrnoException)?.code; + if (code === "ENOENT") { + return { version: PEERS_FILE_VERSION, defaultUnknownPolicy: memoryPolicy, peers: {} }; + } + return { version: PEERS_FILE_VERSION, defaultUnknownPolicy: "owner", peers: {} }; + } +} diff --git a/runtime.ts b/runtime.ts index 55c97e0..9dd35b4 100644 --- a/runtime.ts +++ b/runtime.ts @@ -35,17 +35,14 @@ async function buildSessionTranscript( sessionId: string ): Promise { await state.ensureInitialized(); - const ownerPeer = state.ownerPeer; - if (!ownerPeer) { - throw new Error("Honcho owner peer not initialized"); - } + const participantPeer = await state.resolveSessionParticipantPeer(sessionId); const agentPeer = await state.getAgentPeer(agentId); const session = await state.honcho.session(sessionId, { metadata: { agentId } }); const context = await session.context({ summary: true, tokens: 20000, - peerTarget: ownerPeer, + peerTarget: participantPeer, peerPerspective: agentPeer, }); @@ -57,11 +54,13 @@ async function buildSessionTranscript( for (const msg of context.messages ?? []) { const speaker = - msg.peerId === ownerPeer.id + msg.peerId === participantPeer.id ? "User" : msg.peerId === agentPeer.id ? `Agent(${agentId})` - : `Peer(${msg.peerId})`; + : state.isParticipantPeerId(msg.peerId) + ? `User(${msg.peerId})` + : `Peer(${msg.peerId})`; const ts = msg.createdAt ? ` ${msg.createdAt}` : ""; lines.push(`## ${speaker}${ts}`, msg.content ?? "", ""); } @@ -126,10 +125,9 @@ export async function getHonchoMemorySearchManager( manager: { async search(query: string, opts: { maxResults?: number; sessionKey?: string } = {}) { await state.ensureInitialized(); - const ownerPeer = state.ownerPeer; - if (!ownerPeer) { - throw new Error("Honcho owner peer not initialized"); - } + const participantPeer = activeSessionKey + ? await state.resolveSessionParticipantPeer(activeSessionKey) + : await state.getParticipantPeer(); const requested = Number.isFinite(opts.maxResults) ? Number(opts.maxResults) : DEFAULT_SEARCH_RESULTS; @@ -175,7 +173,7 @@ export async function getHonchoMemorySearchManager( collect(await exactSession.search(query, { limit })); if (filtered.length < limit) { - const sessions = await ownerPeer.sessions(); + const sessions = await participantPeer.sessions(); for await (const session of sessions) { if (filtered.length >= limit) break; if ( @@ -189,7 +187,7 @@ export async function getHonchoMemorySearchManager( } } } else { - collect(await ownerPeer.search(query, { limit })); + collect(await participantPeer.search(query, { limit })); } return Promise.all( diff --git a/state.ts b/state.ts index b54644b..615de64 100644 --- a/state.ts +++ b/state.ts @@ -8,6 +8,12 @@ import { Honcho, type Peer } from "@honcho-ai/sdk"; // @ts-ignore - resolved by openclaw runtime import type { OpenClawPluginApi } from "openclaw/plugin-sdk"; import { honchoConfigSchema, type HonchoConfig } from "./config.js"; +import { + PeersPersister, + loadPeersFileSync, + resolvePeersFilePath, + resolveParticipantPeerId, +} from "./peers.js"; export const OWNER_ID = "owner"; export const LEGACY_PEER_ID = "openclaw"; @@ -29,7 +35,10 @@ export function isLocalHonchoBaseUrl(baseUrl?: string): boolean { export type PluginState = { honcho: Honcho; cfg: HonchoConfig; - ownerPeer: Peer | null; + /** Cache of resolved participant peers, keyed by channel peer ID (or OWNER_ID for default). + * "Participant" intentionally generalizes over humans AND non-agent bots/agents in group + * chats — anyone in the conversation who isn't the local OpenClaw agent peer. */ + participantPeers: Map; agentPeers: Map; agentPeerMap: Record; /** Message count recorded at before_prompt_build time, keyed by Honcho session key. @@ -40,6 +49,17 @@ export type PluginState = { api: OpenClawPluginApi; ensureInitialized: () => Promise; getAgentPeer: (agentId?: string) => Promise; + /** Sender_id → Honcho peer_id map, backed by ~/.honcho/openclaw-peers.json. + * Unknown senders are auto-seeded to OWNER_ID; the user hand-edits the file + * to split specific senders off to their own peer IDs. */ + peersPersister: PeersPersister; + /** Resolve a participant peer by channel peer ID. Returns default "owner" peer if no ID given. */ + getParticipantPeer: (channelPeerId?: string) => Promise; + /** Resolve the participant peer for a session by reading participantSenderId from session metadata. + * Falls back to default "owner" peer if no metadata found. */ + resolveSessionParticipantPeer: (sessionKey: string) => Promise; + /** Returns true if the given honcho peer ID belongs to a known participant peer. */ + isParticipantPeerId: (peerId: string) => boolean; resolveDefaultAgentId: () => string; }; @@ -61,17 +81,32 @@ export function createPluginState(api: OpenClawPluginApi): PluginState { timeout: cfg.timeoutMs, }); + const peersFilePath = resolvePeersFilePath(); + const peersPersister = new PeersPersister( + peersFilePath, + loadPeersFileSync(peersFilePath), + ); + + // Promise-based init lock to prevent concurrent ensureInitialized() races. + // Without this, two concurrent hooks entering init simultaneously can corrupt + // workspace metadata. Errors propagate to all waiters. + let initPromise: Promise | null = null; + const state: PluginState = { honcho, cfg, - ownerPeer: null, + participantPeers: new Map(), agentPeers: new Map(), agentPeerMap: {}, turnStartIndex: new Map(), initialized: false, api, + peersPersister, ensureInitialized, getAgentPeer, + getParticipantPeer, + resolveSessionParticipantPeer, + isParticipantPeerId, resolveDefaultAgentId, }; @@ -84,6 +119,18 @@ export function createPluginState(api: OpenClawPluginApi): PluginState { async function ensureInitialized(): Promise { if (state.initialized) return; + if (initPromise) return initPromise; + initPromise = doInit(); + try { + await initPromise; + } catch (err) { + // Reset so next caller retries instead of getting a stale rejection. + initPromise = null; + throw err; + } + } + + async function doInit(): Promise { const wsMeta = await honcho.getMetadata(); state.agentPeerMap = (wsMeta.agentPeerMap as Record) ?? {}; @@ -97,10 +144,66 @@ export function createPluginState(api: OpenClawPluginApi): PluginState { await honcho.setMetadata({ ...wsMeta, agentPeerMap: state.agentPeerMap }); } - state.ownerPeer = await honcho.peer(OWNER_ID, { metadata: {} }); + // Create default "owner" peer + const defaultPeer = await honcho.peer(OWNER_ID, { metadata: {} }); + state.participantPeers.set(OWNER_ID, defaultPeer); + state.initialized = true; } + async function ensureOwnerPeer(): Promise { + let peer = state.participantPeers.get(OWNER_ID); + if (peer) return peer; + peer = await honcho.peer(OWNER_ID, { metadata: {} }); + state.participantPeers.set(OWNER_ID, peer); + return peer; + } + + async function getParticipantPeer(channelPeerId?: string): Promise { + if (!channelPeerId) return ensureOwnerPeer(); + + // Known senders resolve via the peers file. Unknown senders auto-seed + // per the persister's defaultUnknownPolicy: legacy installs merge into + // OWNER_ID; fresh installs mint a distinct participant- peer. + let peer = state.participantPeers.get(channelPeerId); + if (peer) return peer; + + const wasInFile = channelPeerId in peersPersister.peers; + const resolvedPeerId = resolveParticipantPeerId(channelPeerId, peersPersister, OWNER_ID); + const autoSeeded = !wasInFile && resolvedPeerId !== OWNER_ID; + + if (resolvedPeerId === OWNER_ID) { + peer = await ensureOwnerPeer(); + } else { + const metadata: Record = { channelPeerId }; + if (autoSeeded) metadata.autoSeeded = true; + peer = await honcho.peer(resolvedPeerId, { metadata }); + } + state.participantPeers.set(channelPeerId, peer); + return peer; + } + + async function resolveSessionParticipantPeer(sessionKey: string): Promise { + const session = await honcho.session(sessionKey); + const meta = await session.getMetadata(); + if (meta && typeof meta === "object") { + const senderId = (meta as Record).participantSenderId; + if (typeof senderId === "string" && senderId.length > 0) { + return await getParticipantPeer(senderId); + } + } + return await getParticipantPeer(); + } + + function isParticipantPeerId(peerId: string): boolean { + if (peerId === OWNER_ID) return true; + // Check if this peer ID is a known participant peer + for (const [, peer] of state.participantPeers) { + if (peer.id === peerId) return true; + } + return false; + } + async function getAgentPeer(agentId?: string): Promise { const id = (agentId || resolveDefaultAgentId()).toLowerCase().trim() || "main"; diff --git a/test/helpers.test.ts b/test/helpers.test.ts new file mode 100644 index 0000000..72e32a5 --- /dev/null +++ b/test/helpers.test.ts @@ -0,0 +1,82 @@ +import { describe, expect, it } from "vitest"; +import { extractSenderId } from "../helpers.js"; + +const SENTINEL = "Conversation info (untrusted metadata):"; + +function metadataBlock(payload: Record): string { + return [ + SENTINEL, + "```json", + JSON.stringify(payload, null, 2), + "```", + ].join("\n"); +} + +describe("extractSenderId", () => { + it("reads sender_id from a leading metadata block", () => { + const content = [ + metadataBlock({ sender_id: "U0EXAMPLE01", channel: "C-foo" }), + "", + "hello there", + ].join("\n"); + + expect(extractSenderId(content)).toBe("U0EXAMPLE01"); + }); + + it("trusts only the first sentinel and never considers later quoted blocks", () => { + // First sentinel resolves — second block (user-pasted) must be ignored. + const trusted = [ + metadataBlock({ sender_id: "U-trusted" }), + "", + "look at this thing they quoted at me:", + "", + metadataBlock({ sender_id: "U-spoofed" }), + ].join("\n"); + + expect(extractSenderId(trusted)).toBe("U-trusted"); + + // First sentinel is malformed (no fenced json) — the duplicate-sentinel + // guard then refuses to trust the later block. + const poisoned = [ + SENTINEL, + "(not a fenced json block)", + "", + metadataBlock({ sender_id: "U-spoofed" }), + ].join("\n"); + + expect(extractSenderId(poisoned)).toBeUndefined(); + }); + + it("returns undefined on malformed JSON inside the metadata block", () => { + const content = [ + SENTINEL, + "```json", + "{ this is : not, valid json", + "```", + "", + "body", + ].join("\n"); + + expect(extractSenderId(content)).toBeUndefined(); + }); + + it("prefers sender_id when both sender_id and sender are present", () => { + const content = metadataBlock({ + sender_id: "U-primary", + sender: "U-legacy", + }); + + expect(extractSenderId(content)).toBe("U-primary"); + }); + + it("falls back to sender when sender_id is absent", () => { + const content = metadataBlock({ sender: "U-legacy" }); + + expect(extractSenderId(content)).toBe("U-legacy"); + }); + + it("returns undefined when the content has no metadata block", () => { + expect(extractSenderId("just a normal DM")).toBeUndefined(); + expect(extractSenderId("")).toBeUndefined(); + }); +}); diff --git a/tools/memory-passthrough.test.ts b/test/memory-passthrough.test.ts similarity index 98% rename from tools/memory-passthrough.test.ts rename to test/memory-passthrough.test.ts index f627b30..155bdc8 100644 --- a/tools/memory-passthrough.test.ts +++ b/test/memory-passthrough.test.ts @@ -8,7 +8,7 @@ vi.mock("../runtime.js", () => ({ getHonchoMemorySearchManager: getHonchoMemorySearchManagerMock, })); -import { registerMemoryPassthrough } from "./memory-passthrough.js"; +import { registerMemoryPassthrough } from "../tools/memory-passthrough.js"; describe("memory passthrough tools", () => { beforeEach(() => { diff --git a/test/peers.test.ts b/test/peers.test.ts new file mode 100644 index 0000000..a96bae8 --- /dev/null +++ b/test/peers.test.ts @@ -0,0 +1,321 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { promises as fs } from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { + PEERS_FILE_VERSION, + PeersPersister, + loadPeersFile, + loadPeersFileSync, + resolvePeersFilePath, + resolveParticipantPeerId, +} from "../peers.js"; + +async function mktmp(): Promise { + return fs.mkdtemp(path.join(os.tmpdir(), "openclaw-peers-")); +} + +describe("resolvePeersFilePath", () => { + const originalEnv = process.env.OPENCLAW_HONCHO_PEERS_FILE; + + afterEach(() => { + if (originalEnv === undefined) delete process.env.OPENCLAW_HONCHO_PEERS_FILE; + else process.env.OPENCLAW_HONCHO_PEERS_FILE = originalEnv; + }); + + it("defaults to ~/.honcho/openclaw-peers.json", () => { + delete process.env.OPENCLAW_HONCHO_PEERS_FILE; + expect(resolvePeersFilePath()).toBe(path.join(os.homedir(), ".honcho", "openclaw-peers.json")); + }); + + it("respects OPENCLAW_HONCHO_PEERS_FILE override", () => { + process.env.OPENCLAW_HONCHO_PEERS_FILE = "/tmp/custom-peers.json"; + expect(resolvePeersFilePath()).toBe("/tmp/custom-peers.json"); + }); + + it("trims whitespace on the override", () => { + process.env.OPENCLAW_HONCHO_PEERS_FILE = " /tmp/ws.json "; + expect(resolvePeersFilePath()).toBe("/tmp/ws.json"); + }); + + it("ignores empty-string overrides", () => { + process.env.OPENCLAW_HONCHO_PEERS_FILE = ""; + expect(resolvePeersFilePath()).toBe(path.join(os.homedir(), ".honcho", "openclaw-peers.json")); + }); +}); + +describe("loadPeersFile", () => { + it("returns a per-sender seed when the file is missing (fresh install)", async () => { + const dir = await mktmp(); + const missing = path.join(dir, "does", "not", "exist.json"); + await expect(loadPeersFile(missing)).resolves.toEqual({ + version: PEERS_FILE_VERSION, + defaultUnknownPolicy: "per-sender", + peers: {}, + }); + expect(loadPeersFileSync(missing)).toEqual({ + version: PEERS_FILE_VERSION, + defaultUnknownPolicy: "per-sender", + peers: {}, + }); + }); + + it("reads a legacy file (no policy field) as owner-policy", async () => { + const dir = await mktmp(); + const file = path.join(dir, "peers.json"); + await fs.writeFile( + file, + JSON.stringify({ version: 1, peers: { "slack:U1": "owner", "slack:U2": "alice" } }), + ); + await expect(loadPeersFile(file)).resolves.toEqual({ + version: 1, + defaultUnknownPolicy: "owner", + peers: { "slack:U1": "owner", "slack:U2": "alice" }, + }); + }); + + it("respects an explicit defaultUnknownPolicy field", async () => { + const dir = await mktmp(); + const file = path.join(dir, "peers.json"); + await fs.writeFile( + file, + JSON.stringify({ version: 1, defaultUnknownPolicy: "per-sender", peers: {} }), + ); + await expect(loadPeersFile(file)).resolves.toEqual({ + version: 1, + defaultUnknownPolicy: "per-sender", + peers: {}, + }); + }); + + it("treats malformed JSON in an existing file as legacy (owner) — never silently upgrades", async () => { + const dir = await mktmp(); + const file = path.join(dir, "peers.json"); + await fs.writeFile(file, "{ not valid json"); + const warn = vi.spyOn(console, "warn").mockImplementation(() => {}); + await expect(loadPeersFile(file)).resolves.toEqual({ + version: PEERS_FILE_VERSION, + defaultUnknownPolicy: "owner", + peers: {}, + }); + expect(warn).toHaveBeenCalledWith(expect.stringContaining(file)); + warn.mockRestore(); + }); +}); + +describe("resolveParticipantPeerId", () => { + function persister( + initial: Record = {}, + defaultUnknownPolicy: "owner" | "per-sender" = "owner", + ) { + return new PeersPersister("/dev/null", { + version: PEERS_FILE_VERSION, + defaultUnknownPolicy, + peers: { ...initial }, + }); + } + + it("returns the mapped peer for a known sender", () => { + const p = persister({ "slack:U1": "alice" }); + expect(resolveParticipantPeerId("slack:U1", p)).toBe("alice"); + }); + + it("returns owner (no enqueue) for a sender already mapped to owner", () => { + const p = persister({ "slack:U2": "owner" }); + const enqueueSpy = vi.spyOn(p, "enqueue"); + expect(resolveParticipantPeerId("slack:U2", p)).toBe("owner"); + expect(enqueueSpy).not.toHaveBeenCalled(); + }); + + it("under owner policy: enqueues unknown senders as owner", () => { + const p = persister({}, "owner"); + expect(resolveParticipantPeerId("slack:U3", p)).toBe("owner"); + expect(p.peers["slack:U3"]).toBe("owner"); + }); + + it("under per-sender policy: derives a sanitized peer ID from the sender_id", () => { + const p = persister({}, "per-sender"); + expect(resolveParticipantPeerId("slack:U07A.bot@team", p)).toBe("slack_U07A_bot_team"); + expect(p.peers["slack:U07A.bot@team"]).toBe("slack_U07A_bot_team"); + }); + + it("under per-sender policy: hand-mapped owner mappings still resolve to owner", () => { + const p = persister({ "slack:U5": "owner" }, "per-sender"); + expect(resolveParticipantPeerId("slack:U5", p)).toBe("owner"); + }); + + it("under per-sender policy: truncates to fit Honcho's 100-char peer ID limit", () => { + const p = persister({}, "per-sender"); + const long = "x".repeat(200); + const id = resolveParticipantPeerId(long, p); + expect(id.length).toBeLessThanOrEqual(100); + expect(/^[a-zA-Z0-9_-]+$/.test(id)).toBe(true); + }); +}); + +describe("PeersPersister", () => { + function emptyFile(defaultUnknownPolicy: "owner" | "per-sender" = "owner") { + return { version: PEERS_FILE_VERSION, defaultUnknownPolicy, peers: {} } as const; + } + + it("enqueue is idempotent per sender", async () => { + const dir = await mktmp(); + const file = path.join(dir, "peers.json"); + const p = new PeersPersister(file, emptyFile()); + p.enqueue("slack:U1"); + p.enqueue("slack:U1"); + p.enqueue("slack:U1"); + expect(Object.keys(p.peers)).toEqual(["slack:U1"]); + }); + + it("does not overwrite an existing mapping on enqueue", () => { + const p = new PeersPersister("/dev/null", { + version: PEERS_FILE_VERSION, + defaultUnknownPolicy: "owner", + peers: { "slack:U1": "alice" }, + }); + p.enqueue("slack:U1"); + expect(p.peers["slack:U1"]).toBe("alice"); + }); + + it("coalesces 3 enqueues within the debounce window into one file write", async () => { + const dir = await mktmp(); + const file = path.join(dir, "peers.json"); + const writeSpy = vi.spyOn(fs, "writeFile"); + const p = new PeersPersister(file, emptyFile("owner"), { debounceMs: 50 }); + + p.enqueue("slack:U1"); + p.enqueue("slack:U2"); + p.enqueue("slack:U3"); + + expect(writeSpy).not.toHaveBeenCalled(); + + await p.flushNow(); + expect(writeSpy).toHaveBeenCalledTimes(1); + + await p.flushNow(); + expect(writeSpy).toHaveBeenCalledTimes(1); + writeSpy.mockRestore(); + + const body = JSON.parse(await fs.readFile(file, "utf8")); + expect(body).toEqual({ + version: 1, + defaultUnknownPolicy: "owner", + peers: { + "slack:U1": "owner", + "slack:U2": "owner", + "slack:U3": "owner", + }, + }); + }); + + it("creates the peers file on first flush when missing at boot (fresh install → per-sender)", async () => { + const dir = await mktmp(); + const file = path.join(dir, "nested", "peers.json"); + const loaded = loadPeersFileSync(file); + expect(loaded).toEqual({ + version: PEERS_FILE_VERSION, + defaultUnknownPolicy: "per-sender", + peers: {}, + }); + + const p = new PeersPersister(file, loaded, { debounceMs: 10 }); + p.enqueue("slack:Unew", "slack_Unew"); + await p.flushNow(); + + const body = JSON.parse(await fs.readFile(file, "utf8")); + expect(body).toEqual({ + version: 1, + defaultUnknownPolicy: "per-sender", + peers: { "slack:Unew": "slack_Unew" }, + }); + }); + + it("flushNow is a no-op when nothing is dirty", async () => { + const dir = await mktmp(); + const file = path.join(dir, "peers.json"); + const p = new PeersPersister(file, emptyFile()); + await p.flushNow(); + await expect(fs.access(file)).rejects.toBeTruthy(); + }); + + it("merge preserves hand-edited keys on disk not present in memory", async () => { + const dir = await mktmp(); + const file = path.join(dir, "peers.json"); + await fs.writeFile( + file, + JSON.stringify({ + version: 1, + defaultUnknownPolicy: "owner", + peers: { "slack:U99": "alice", "slack:U1": "owner" }, + }) + "\n", + ); + + const loaded = loadPeersFileSync(file); + const p = new PeersPersister(file, loaded, { debounceMs: 10 }); + p.enqueue("slack:Unew", "slack_Unew"); + await p.flushNow(); + + const body = JSON.parse(await fs.readFile(file, "utf8")); + expect(body.peers["slack:U99"]).toBe("alice"); + expect(body.peers["slack:Unew"]).toBe("slack_Unew"); + expect(body.peers["slack:U1"]).toBe("owner"); + }); + + it("merge prefers on-disk mapping when the same sender exists in memory", async () => { + const dir = await mktmp(); + const file = path.join(dir, "peers.json"); + await fs.writeFile( + file, + JSON.stringify({ + version: 1, + defaultUnknownPolicy: "owner", + peers: { "slack:U1": "from_disk" }, + }) + "\n", + ); + + const loaded = loadPeersFileSync(file); + const p = new PeersPersister(file, loaded, { debounceMs: 10 }); + expect(p.peers["slack:U1"]).toBe("from_disk"); + (p.peers as Record)["slack:U1"] = "stale_memory"; + p.enqueue("slack:U2", "owner"); + await p.flushNow(); + + const body = JSON.parse(await fs.readFile(file, "utf8")); + expect(body.peers["slack:U1"]).toBe("from_disk"); + expect(p.peers["slack:U1"]).toBe("from_disk"); + }); + + it("reloads defaultUnknownPolicy from disk on flush", async () => { + const dir = await mktmp(); + const file = path.join(dir, "peers.json"); + await fs.writeFile( + file, + JSON.stringify({ + version: 1, + defaultUnknownPolicy: "owner", + peers: {}, + }) + "\n", + ); + + const loaded = loadPeersFileSync(file); + const p = new PeersPersister(file, loaded, { debounceMs: 10 }); + expect(p.defaultUnknownPolicy).toBe("owner"); + + await fs.writeFile( + file, + JSON.stringify({ + version: 1, + defaultUnknownPolicy: "per-sender", + peers: {}, + }) + "\n", + ); + + p.enqueue("slack:Ux", "derived"); + await p.flushNow(); + + expect(p.defaultUnknownPolicy).toBe("per-sender"); + const body = JSON.parse(await fs.readFile(file, "utf8")); + expect(body.defaultUnknownPolicy).toBe("per-sender"); + }); +}); diff --git a/runtime.test.ts b/test/runtime.test.ts similarity index 82% rename from runtime.test.ts rename to test/runtime.test.ts index 7fd58c5..eb7c329 100644 --- a/runtime.test.ts +++ b/test/runtime.test.ts @@ -1,8 +1,16 @@ import { describe, expect, it, vi } from "vitest"; -import { getHonchoMemorySearchManager, resolveHonchoMemoryBackendConfig } from "./runtime.js"; -import type { PluginState } from "./state.js"; +import { getHonchoMemorySearchManager, resolveHonchoMemoryBackendConfig } from "../runtime.js"; +import type { PluginState } from "../state.js"; -function createState(baseUrl = "https://api.honcho.dev", { crossSessionSearch = true }: { crossSessionSearch?: boolean } = {}): PluginState { +type TestState = PluginState & { + participantPeer: { + id: string; + search: ReturnType; + sessions: ReturnType; + } | null; +}; + +function createState(baseUrl = "https://api.honcho.dev", { crossSessionSearch = true }: { crossSessionSearch?: boolean } = {}): TestState { const contexts = new Map> }>([ [ "session-1", @@ -94,7 +102,21 @@ function createState(baseUrl = "https://api.honcho.dev", { crossSessionSearch = const childSession = createSession("session-1-child"); - return { + const participantPeer = { + id: "owner", + search: vi.fn(async () => [ + { sessionId: "session-1", content: "Need to remember this" }, + { sessionId: "session-1-child", content: "Child transcript hit" }, + { sessionId: "other-session", content: "Other result" }, + ]), + sessions: vi.fn(async () => ({ + async *[Symbol.asyncIterator]() { + yield childSession; + }, + })), + }; + + const state = { cfg: { workspaceId: "openclaw", baseUrl, @@ -106,19 +128,8 @@ function createState(baseUrl = "https://api.honcho.dev", { crossSessionSearch = honcho: { session: vi.fn(async (sessionId: string) => createSession(sessionId)), } as never, - ownerPeer: { - id: "owner", - search: vi.fn(async () => [ - { sessionId: "session-1", content: "Need to remember this" }, - { sessionId: "session-1-child", content: "Child transcript hit" }, - { sessionId: "other-session", content: "Other result" }, - ]), - sessions: vi.fn(async () => ({ - async *[Symbol.asyncIterator]() { - yield childSession; - }, - })), - } as never, + participantPeer, + participantPeers: new Map(), agentPeers: new Map(), agentPeerMap: {}, turnStartIndex: new Map(), @@ -126,8 +137,18 @@ function createState(baseUrl = "https://api.honcho.dev", { crossSessionSearch = api: {} as never, ensureInitialized: vi.fn(async () => {}), getAgentPeer: vi.fn(async (agentId = "main") => ({ id: `agent-${agentId}` })), + getParticipantPeer: vi.fn(async () => { + if (!participantPeer) throw new Error("Honcho owner peer not initialized"); + return participantPeer; + }), + resolveSessionParticipantPeer: vi.fn(async () => { + if (!state.participantPeer) throw new Error("Honcho owner peer not initialized"); + return state.participantPeer; + }), + isParticipantPeerId: vi.fn((peerId: string) => peerId === "owner"), resolveDefaultAgentId: vi.fn(() => "main"), - } as unknown as PluginState; + } as unknown as TestState; + return state; } describe("Honcho memory runtime", () => { @@ -151,7 +172,7 @@ describe("Honcho memory runtime", () => { expect(results[0]?.snippet).toBe("Need to remember this"); expect(results[0]?.startLine).toBeGreaterThan(0); expect(results[0]?.endLine).toBeGreaterThanOrEqual(results[0]?.startLine ?? 0); - expect((state.ownerPeer.search as unknown as ReturnType)).not.toHaveBeenCalled(); + expect(state.participantPeer?.search as ReturnType).not.toHaveBeenCalled(); const implicitScopeResults = await manager.search("remember", { maxResults: 10, @@ -236,9 +257,9 @@ describe("Honcho memory runtime", () => { expect(result?.endLine).toBe(9); }); - it("fails cleanly when ownerPeer is unavailable after initialization", async () => { + it("fails cleanly when the participant peer is unavailable after initialization", async () => { const state = createState(); - state.ownerPeer = null; + state.participantPeer = null; const { manager } = await getHonchoMemorySearchManager(state, { agentId: "main", diff --git a/tools/ask.ts b/tools/ask.ts index e86de5c..ad975a5 100644 --- a/tools/ask.ts +++ b/tools/ask.ts @@ -2,6 +2,7 @@ import { Type } from "@sinclair/typebox"; // @ts-ignore - resolved by openclaw runtime import type { OpenClawPluginApi } from "openclaw/plugin-sdk"; import type { PluginState } from "../state.js"; +import { buildSessionKey } from "../helpers.js"; export function registerAskTool(api: OpenClawPluginApi, state: PluginState): void { api.registerTool( @@ -22,21 +23,31 @@ export function registerAskTool(api: OpenClawPluginApi, state: PluginState): voi description: "Reasoning depth: 'quick' for simple facts (default), 'thorough' for synthesis and analysis.", }) ), + about: Type.Optional( + Type.String({ + description: + "Sender ID of the user to ask about. Defaults to the last active sender. Pass a specific sender_id to ask about a different participant.", + }) + ), }, { additionalProperties: false } ), async execute(_toolCallId, params) { - const { query, depth = "quick" } = params as { + const { query, depth = "quick", about } = params as { query: string; depth?: "quick" | "thorough"; + about?: string; }; await state.ensureInitialized(); const agentPeer = await state.getAgentPeer(toolCtx.agentId); + const participantPeer = about + ? await state.getParticipantPeer(about) + : await state.resolveSessionParticipantPeer(buildSessionKey(toolCtx)); const reasoningLevel = depth === "thorough" ? "high" : "low"; const answer = await agentPeer.chat(query, { - target: state.ownerPeer!, + target: participantPeer, reasoningLevel, }); diff --git a/tools/context.ts b/tools/context.ts index 779380f..5d3c0d8 100644 --- a/tools/context.ts +++ b/tools/context.ts @@ -2,10 +2,11 @@ import { Type } from "@sinclair/typebox"; // @ts-ignore - resolved by openclaw runtime import type { OpenClawPluginApi } from "openclaw/plugin-sdk"; import type { PluginState } from "../state.js"; +import { buildSessionKey } from "../helpers.js"; export function registerContextTool(api: OpenClawPluginApi, state: PluginState): void { api.registerTool( - { + (toolCtx) => ({ name: "honcho_context", label: "Get User Context", description: @@ -19,16 +20,25 @@ export function registerContextTool(api: OpenClawPluginApi, state: PluginState): description: "Detail level: 'card' for key facts (default, fast), 'full' for broad representation.", }) ), + about: Type.Optional( + Type.String({ + description: + "Sender ID of the user to query about. Defaults to the last active sender. Pass a specific sender_id to get context about a different participant.", + }) + ), }, { additionalProperties: false } ), async execute(_toolCallId, params) { - const { detail = "card" } = params as { detail?: "card" | "full" }; + const { detail = "card", about } = params as { detail?: "card" | "full"; about?: string }; await state.ensureInitialized(); + const participantPeer = about + ? await state.getParticipantPeer(about) + : await state.resolveSessionParticipantPeer(buildSessionKey(toolCtx)); if (detail === "card") { - const card = await state.ownerPeer!.card().catch((err) => { + const card = await participantPeer.card().catch((err) => { // Only treat NotFoundError as empty; re-throw others or log if (err?.name === "NotFoundError") return null; // Optionally log unexpected errors for debugging @@ -60,7 +70,7 @@ export function registerContextTool(api: OpenClawPluginApi, state: PluginState): } // detail === "full" - const representation = await state.ownerPeer!.representation({ + const representation = await participantPeer.representation({ includeMostFrequent: true, }); @@ -81,7 +91,7 @@ export function registerContextTool(api: OpenClawPluginApi, state: PluginState): details: { detail, representationLength: representation.length }, }; }, - }, + }), { name: "honcho_context" } ); } diff --git a/tools/message-search.ts b/tools/message-search.ts index 1f84ccc..b3b71a1 100644 --- a/tools/message-search.ts +++ b/tools/message-search.ts @@ -3,6 +3,7 @@ import { Type } from "@sinclair/typebox"; import type { OpenClawPluginApi } from "openclaw/plugin-sdk"; import type { Message } from "@honcho-ai/sdk"; import type { PluginState } from "../state.js"; +import { buildSessionKey } from "../helpers.js"; export function registerMessageSearchTool(api: OpenClawPluginApi, state: PluginState): void { api.registerTool( @@ -24,6 +25,12 @@ export function registerMessageSearchTool(api: OpenClawPluginApi, state: PluginS "Filter by sender: 'user' for user messages, 'agent' for this agent's messages, 'all' for everything (default: 'all').", }) ), + about: Type.Optional( + Type.String({ + description: + "Sender ID of the participant whose messages to search. Only used when from='user'. Defaults to the last active sender.", + }) + ), metadata: Type.Optional( Type.Record(Type.String(), Type.Unknown(), { description: @@ -54,6 +61,7 @@ export function registerMessageSearchTool(api: OpenClawPluginApi, state: PluginS const { query, from = "all", + about, metadata, created_after, created_before, @@ -61,6 +69,7 @@ export function registerMessageSearchTool(api: OpenClawPluginApi, state: PluginS } = params as { query: string; from?: "user" | "agent" | "all"; + about?: string; metadata?: Record; created_after?: string; created_before?: string; @@ -90,7 +99,10 @@ export function registerMessageSearchTool(api: OpenClawPluginApi, state: PluginS // Route to the appropriate search method based on `from` let messages: Message[]; if (from === "user") { - messages = await state.ownerPeer!.search(query, searchOpts); + const participantPeer = about + ? await state.getParticipantPeer(about) + : await state.resolveSessionParticipantPeer(buildSessionKey(toolCtx)); + messages = await participantPeer.search(query, searchOpts); } else if (from === "agent") { const agentPeer = await state.getAgentPeer(toolCtx.agentId); messages = await agentPeer.search(query, searchOpts); @@ -111,7 +123,7 @@ export function registerMessageSearchTool(api: OpenClawPluginApi, state: PluginS } const results = messages.map((msg) => { - const speaker = msg.peerId === state.ownerPeer!.id ? "User" : "Agent"; + const speaker = state.isParticipantPeerId(msg.peerId) ? "User" : "Agent"; return { id: msg.id, content: msg.content, diff --git a/tools/search.ts b/tools/search.ts index 59292cf..3258cd5 100644 --- a/tools/search.ts +++ b/tools/search.ts @@ -2,10 +2,11 @@ import { Type } from "@sinclair/typebox"; // @ts-ignore - resolved by openclaw runtime import type { OpenClawPluginApi } from "openclaw/plugin-sdk"; import type { PluginState } from "../state.js"; +import { buildSessionKey } from "../helpers.js"; export function registerSearchTool(api: OpenClawPluginApi, state: PluginState): void { api.registerTool( - { + (toolCtx) => ({ name: "honcho_search_conclusions", label: "Search Honcho conclusions", description: @@ -29,19 +30,29 @@ export function registerSearchTool(api: OpenClawPluginApi, state: PluginState): maximum: 1, }) ), + about: Type.Optional( + Type.String({ + description: + "Sender ID of the user to query about. Defaults to the last active sender. Pass a specific sender_id to search conclusions about a different participant.", + }) + ), }, { additionalProperties: false } ), async execute(_toolCallId, params) { - const { query, topK, maxDistance } = params as { + const { query, topK, maxDistance, about } = params as { query: string; topK?: number; maxDistance?: number; + about?: string; }; await state.ensureInitialized(); + const participantPeer = about + ? await state.getParticipantPeer(about) + : await state.resolveSessionParticipantPeer(buildSessionKey(toolCtx)); - const representation = await state.ownerPeer!.representation({ + const representation = await participantPeer.representation({ searchQuery: query, searchTopK: topK ?? 10, searchMaxDistance: maxDistance ?? 0.5, @@ -64,7 +75,7 @@ export function registerSearchTool(api: OpenClawPluginApi, state: PluginState): details: { query, resultCount: representation.split("\n").filter(Boolean).length }, }; }, - }, + }), { name: "honcho_search_conclusions" } ); } diff --git a/tools/session.ts b/tools/session.ts index b6c1dc2..9fa6373 100644 --- a/tools/session.ts +++ b/tools/session.ts @@ -35,6 +35,12 @@ export function registerSessionTool(api: OpenClawPluginApi, state: PluginState): maximum: 32000, }) ), + about: Type.Optional( + Type.String({ + description: + "Sender ID of the user to get session context for. Defaults to the last active sender. Pass a specific sender_id to get session context about a different participant.", + }) + ), }, { additionalProperties: false } ), @@ -44,16 +50,21 @@ export function registerSessionTool(api: OpenClawPluginApi, state: PluginState): includeSummary = true, searchQuery, messageLimit = 4000, + about, } = params as { includeMessages?: boolean; includeSummary?: boolean; searchQuery?: string; messageLimit?: number; + about?: string; }; await state.ensureInitialized(); const agentPeer = await state.getAgentPeer(toolCtx.agentId); const sessionKey = buildSessionKey(toolCtx); + const participantPeer = about + ? await state.getParticipantPeer(about) + : await state.resolveSessionParticipantPeer(sessionKey); try { const session = await state.honcho.session(sessionKey); @@ -61,7 +72,7 @@ export function registerSessionTool(api: OpenClawPluginApi, state: PluginState): const context = await session.context({ summary: includeSummary, tokens: messageLimit, - peerTarget: state.ownerPeer!, + peerTarget: participantPeer, peerPerspective: agentPeer, searchQuery: searchQuery, }); @@ -88,7 +99,7 @@ export function registerSessionTool(api: OpenClawPluginApi, state: PluginState): if (includeMessages && context.messages.length > 0) { const messageLines = context.messages.map((msg) => { - const speaker = msg.peerId === state.ownerPeer!.id ? "User" : "OpenClaw"; + const speaker = state.isParticipantPeerId(msg.peerId) ? "User" : "OpenClaw"; const timestamp = msg.createdAt ? new Date(msg.createdAt).toLocaleString() : "";