From f4130b438c7a6fc9ef8f9a034d207bf9de988a56 Mon Sep 17 00:00:00 2001 From: "Raju Roopani (RAJU)" Date: Mon, 18 May 2026 09:36:01 -0700 Subject: [PATCH] Adding socket mode support in Teams SDK --- docs/proposals/socket-mode.md | 463 +++++++++++++++++ package-lock.json | 133 ++++- packages/apps/package.json | 1 + packages/apps/src/index.ts | 3 + packages/apps/src/socket-mode/backoff.ts | 45 ++ packages/apps/src/socket-mode/envelope.ts | 24 + packages/apps/src/socket-mode/index.ts | 11 + packages/apps/src/socket-mode/negotiate.ts | 83 +++ .../src/socket-mode/socket-mode-app.spec.ts | 472 ++++++++++++++++++ .../apps/src/socket-mode/socket-mode-app.ts | 466 +++++++++++++++++ .../src/socket-mode/socket-mode-client.ts | 126 +++++ .../apps/src/socket-mode/synthesize-token.ts | 32 ++ 12 files changed, 1858 insertions(+), 1 deletion(-) create mode 100644 docs/proposals/socket-mode.md create mode 100644 packages/apps/src/socket-mode/backoff.ts create mode 100644 packages/apps/src/socket-mode/envelope.ts create mode 100644 packages/apps/src/socket-mode/index.ts create mode 100644 packages/apps/src/socket-mode/negotiate.ts create mode 100644 packages/apps/src/socket-mode/socket-mode-app.spec.ts create mode 100644 packages/apps/src/socket-mode/socket-mode-app.ts create mode 100644 packages/apps/src/socket-mode/socket-mode-client.ts create mode 100644 packages/apps/src/socket-mode/synthesize-token.ts diff --git a/docs/proposals/socket-mode.md b/docs/proposals/socket-mode.md new file mode 100644 index 000000000..1337f0b0e --- /dev/null +++ b/docs/proposals/socket-mode.md @@ -0,0 +1,463 @@ +# Teams SDK (teams.ts) — Socket Mode Support + +> **Status:** Draft proposal — for review before implementation. +> **Author:** rroopani +> **Last updated:** 2026-05-17 + +--- + +## 1. Goal + +Let a bot developer opt into **Socket Mode** (APX → bot delivery over Azure SignalR WSS) with a single entry point. The developer picks **one** of two start calls; never both: + +```ts +// HTTP delivery (today's behavior, unchanged) +const app = new App(); +await app.start(); + +// Socket Mode delivery (new) +const app = new App(); +const socketModeApp = new SocketModeApp(app); +await socketModeApp.start(); // internally calls app.start() + opens the WSS +``` + +`SocketModeApp.start()` is a **drop-in replacement** for `app.start()`. It owns the full lifecycle: + +1. `app.initialize()` — plugins init, server route registration. +2. Plugin `onStart` callbacks fire (same as `app.start()`). +3. `app.server.start(port)` binds the HTTP listener for **invokes, OAuth callbacks, tabs, and remote functions** (this part is unchanged from the HTTP path). +4. `POST /v3/websockets/connect` negotiates an Azure SignalR session. +5. Opens the WSS and registers the `"activity"` handler. + +Result: event-style activities arrive over the socket; invokes / OAuth / tabs continue to use HTTP. Replies always go over the existing `/v3/conversations/...` HTTPS APIs (a platform-side v1 limitation, not an SDK choice). The developer makes **one** `start()` call regardless of which transport they pick. + +## 2. Platform contract recap (what the SDK has to speak) + +| Concern | Value | Source | +|---|---|---| +| Negotiate endpoint | `POST /v3/websockets/connect` (also `{cloud}/...`, `{cloud}/{tenantId}/...`) | [`WebSocketConnectController.cs`](file:///c:/Work6/Git/async_messaging_botapiservice/BotFrontEnd.Library/Controllers/WebSocketConnectController.cs) | +| Auth | `Authorization: Bearer {BF JWT}` — same MSA token used for other `/v3/...` calls | [`WebSocketConnectService.cs`](file:///c:/Work6/Git/async_messaging_botapiservice/Library/Services/WebSocketConnectService.cs) | +| Response | `{ url, accessToken, sessionId, expiresIn }` (`expiresIn` in seconds) | `WebSocketConnectController.cs` | +| Transport | Azure SignalR Service Default protocol. Reference JS client: `@microsoft/signalr` | +| Hub method | `"activity"` (server → client) | [`SocketModeDispatcher.cs:40`](file:///c:/Work6/Git/async_messaging_botapiservice/Library/Services/SocketModeDispatcher.cs#L40) | +| Envelope | `{ type: "activity", envelopeId, cv, payload: }` | `SocketModeDispatcher.cs:18-31` | +| Direction | One-way, APX → bot. No bot→APX SignalR frames in v1. | `apx.dev.md` §D5 | +| Invokes | Always HTTPS — never on socket. SDK must keep HTTP path live. | `apx.dev.md` §D5, dev guide §Limitations | +| Token rotation | Bot's responsibility. Re-negotiate before `expiresIn`. Recommended at `0.8 × expiresIn`. | Dev guide §Operational | +| Failure mode | `503` from negotiate ⇒ socket mode unavailable, bot must continue on HTTP. APX dispatcher auto-falls-back to HTTPS POST per request when no socket session exists. | + +> ⚠️ The .NET `SocketModeTestClient/Program.cs` registers `connection.On("ReceiveActivity", …)`. That string is **stale in the test client**. The authoritative method name from `SocketModeDispatcher.cs` is `"activity"`. The SDK MUST handler-bind to `"activity"`. We should also file a fix on the test client. + +## 3. Surface design + +### 3.1 Public API (TypeScript) + +```ts +// Top-level export from @microsoft/teams.apps +export { SocketModeApp, SocketModeOptions, SocketModeEvents, ISocketModeClient } from '@microsoft/teams.apps'; +``` + +```ts +export interface SocketModeOptions { + /** Cloud route variant. Default { kind: 'global' } ⇒ POST /v3/websockets/connect. */ + readonly route?: + | { kind: 'global' } + | { kind: 'regional'; cloud: 'amer' | 'apac' | 'emea' | 'eudb' } + | { kind: 'regional-tenant'; cloud: string; tenantId: string }; + /** Fraction of expiresIn at which to re-negotiate. Default 0.8. */ + readonly renegotiateAt?: number; + /** Backoff config for reconnect/negotiate retries. */ + readonly backoff?: { minMs?: number; maxMs?: number; factor?: number; jitter?: boolean }; + /** Deduplicate envelopes by envelopeId (helpful in blue/green). Default false. */ + readonly dedupe?: boolean; + /** + * Continue running if /v3/websockets/connect returns 503. + * When true, SocketModeApp.start() resolves without an active socket and the + * bot relies on HTTPS callback delivery. Default true. + */ + readonly fallbackOn503?: boolean; + /** Test seam: override the SignalR client. */ + readonly client?: ISocketModeClient; +} + +export class SocketModeApp { + constructor(app: App, options?: SocketModeOptions); + + /** + * Start the App (HTTP server, plugins) AND open the Socket Mode WebSocket. + * Drop-in replacement for app.start() — do NOT also call app.start(). + */ + start(port?: number | string): Promise; + /** Close socket AND stop the App. */ + stop(): Promise; + + /** The wrapped App. Convenience accessor: socketModeApp.app.on(...) === app.on(...). */ + readonly app: App; + /** Current session id (negotiate result). Useful for logging. */ + readonly sessionId?: string; + /** Last cv observed on an inbound envelope. */ + readonly lastCv?: string; + + /** Lifecycle events for diagnostics. */ + on(name: K, cb: SocketModeEvents[K]): this; +} + +export interface SocketModeEvents { + connected: (info: { sessionId: string; connectionId?: string }) => void; + reconnecting: (err?: Error) => void; + reconnected: (info: { connectionId?: string }) => void; + closed: (err?: Error) => void; + renegotiated: (info: { sessionId: string; expiresIn: number }) => void; + envelope: (env: { envelopeId: string; cv?: string; type: string }) => void; + unavailable: (info: { status: number; message: string }) => void; +} +``` + +### 3.2 Internal contract with `App` + +`SocketModeApp` reuses what `App` already exposes — no new public method is required on the App class: + +| Needs | Already on `App`? | +|---|---| +| Inject an inbound activity through the same routing pipeline | ✅ `app.onActivity({ body, token })` (already public, returns `InvokeResponse`) | +| Get the bot's MSA bearer token for negotiate | ⚠️ `app.getBotToken()` is currently `protected`. Promote to `public` (or add a thin public `getBotFrameworkToken()` wrapper). | +| Resolve `serviceUrl` for synthesized `IToken` | ✅ `app.api.serviceUrl` | +| Discover `clientId` for synthesized `IToken.appId` | ✅ `app.credentials?.clientId` (via getter) | +| Logger | ✅ `app.log` | +| Ensure init/start runs once | ✅ `app.initialize()` | + +Required change: **make `App.getBotToken()` public** (currently `app.ts:704`). It is the one minimal new surface on `App` itself. Alternative: expose `app.tokenManager.getBotToken()` directly (`tokenManager` is already public on `App`), which avoids changing `App` at all. **Preferred: use `app.tokenManager.getBotToken()` and leave `App` untouched.** + +## 4. File-by-file change list + +### 4.1 New files (under `packages/apps/src/socket-mode/`) + +| File | Purpose | +|---|---| +| `socket-mode-app.ts` | The `SocketModeApp` class. Owns lifecycle, re-negotiate timer, event emitter, envelope routing into `app.onActivity`. | +| `socket-mode-client.ts` | Thin wrapper around `@microsoft/signalr.HubConnection`. Handles `WithUrl(...)`, `accessTokenFactory`, `WithAutomaticReconnect`, `.on('activity', ...)`, `.onclose(...)`. Mockable via `ISocketModeClient` for unit tests. | +| `negotiate.ts` | `negotiate(app, options): Promise`. Calls `POST /v3/websockets/connect[/{cloud}[/{tenantId}]]` with the BF JWT. Surfaces `503` distinctly so the caller can decide fallback. | +| `envelope.ts` | `SocketActivityEnvelope` type + `isActivityEnvelope` guard. Matches the platform shape exactly: `{ type, envelopeId, cv, payload }`. | +| `synthesize-token.ts` | Builds an `IToken` from an inbound socket payload so it can flow through `App.onActivity({ body, token })` unchanged. Marks `from: 'azure'`, `isExpired: () => false`, fills `appId`/`serviceUrl` from `app.credentials` and `activity.serviceUrl`. | +| `backoff.ts` | Tiny jittered exponential backoff (min 2 s, max 30 s, factor 2) used for negotiate and reconnect retries. | +| `index.ts` | Re-exports the public surface above. | +| `socket-mode-app.spec.ts` | Unit tests with a mock `ISocketModeClient` and `nock`/`msw` for the negotiate HTTP call. Cases below. | + +### 4.2 Modified files + +| File | Change | +|---|---| +| `packages/apps/package.json` | Add `"@microsoft/signalr": "^8.0.7"` to `dependencies` (it's the official JS SignalR client and works against Azure SignalR Default protocol per the dev guide). | +| `packages/apps/src/index.ts` | Re-export `./socket-mode` so consumers can `import { SocketModeApp } from '@microsoft/teams.apps'` (or via the deeper specifier shown in §3.1). | +| `packages/apps/src/app.ts` | **No source changes required** if we route via `app.tokenManager.getBotToken()`. (If we instead choose to promote `getBotToken` to public, the single edit is removing `protected` on line 704.) | +| `packages/apps/README.md` | Add a "Socket Mode" section pointing at this proposal + a minimal example. | +| `examples/` | New `examples/socket-mode/` directory mirroring `examples/echo` but starting `SocketModeApp` alongside `app.start()`. | + +### 4.3 No-change components (for the reviewer's sanity check) + +- `packages/api` — the negotiate endpoint is a one-off and lives inside socket-mode rather than as a generic `client.websockets.connect()`. Adding it as a first-class `Client` route is possible but not required for v1, and would bloat the public API surface for one endpoint with a niche use case. +- `packages/apps/src/http/*` — HTTP server, adapter, middleware, JWT validator unchanged. Socket mode bypasses them entirely on the inbound path. +- `packages/apps/src/router/*`, `routes/*`, `app.process.ts` — unchanged. Socket-mode activities re-enter the same `App.onActivity → App.process(event)` pipeline, so all existing handlers (`app.on('message', …)`, message routing, OAuth verify-state, etc.) work without modification. + +## 5. Behavior in detail + +### 5.1 Connect sequence + +```mermaid +sequenceDiagram + autonumber + participant Dev as Bot Process + participant App as App + participant SMA as SocketModeApp + participant TM as TokenManager + participant APX as APX /v3/websockets/connect + participant SR as Azure SignalR + + Dev->>App: new App() + Dev->>SMA: new SocketModeApp(app) + Dev->>SMA: start() + SMA->>App: app.initialize() + SMA->>TM: getBotToken() + TM-->>SMA: BF JWT + SMA->>APX: POST /v3/websockets/connect (Bearer) + APX-->>SMA: 200 { url, accessToken, sessionId, expiresIn } + SMA->>SR: HubConnection.start() (WSS, accessToken) + SR-->>SMA: connected + SMA-->>Dev: start() resolves + SMA->>SMA: schedule re-negotiate at 0.8 × expiresIn +``` + +### 5.2 Activity delivery (event-style) + +```mermaid +sequenceDiagram + participant SR as Azure SignalR + participant SMA as SocketModeApp + participant App as App + participant Router as App.process + + SR-->>SMA: "activity"({ type:"activity", envelopeId, cv, payload: }) + SMA->>SMA: (optional) dedupe by envelopeId + SMA->>SMA: synthesize IToken from payload + app.credentials + SMA->>App: app.onActivity({ body: payload, token }) + App->>Router: process(event) + Router-->>App: InvokeResponse (status 200 for non-invoke) + App-->>SMA: InvokeResponse + Note over SMA: status is ignored (no ack frame in v1) +``` + +The bot's `app.on('message', …)`, etc., never knows the activity came from a socket. Any reply goes through `app.send(...)` → `ActivitySender` → HTTPS POST against `/v3/conversations/...`, unchanged. + +### 5.3 Token rotation + +```mermaid +sequenceDiagram + participant SMA as SocketModeApp + participant APX as /v3/websockets/connect + participant SR as Azure SignalR + + Note over SMA: at 0.8 × expiresIn + SMA->>APX: POST /v3/websockets/connect (fresh BF JWT) + APX-->>SMA: 200 { url, accessToken, sessionId, expiresIn } + SMA->>SR: build NEW HubConnection + SMA->>SR: new connection start() + SR-->>SMA: connected + SMA->>SR: old connection stop() + SMA->>SMA: emit('renegotiated', …) +``` + +Make-before-break to keep zero-gap delivery during the swap. If start of the new connection fails, keep the old one and back off — never tear down what works. + +### 5.4 Reconnect / failure handling + +| Trigger | Action | +|---|---| +| `HubConnection.onclose(err)` with no transport error | Treat as normal reconnect path: `WithAutomaticReconnect` handles it. Emit `reconnecting`/`reconnected`. | +| `HubConnection` permanently closes | Re-negotiate from scratch with jittered backoff (2 s → 30 s). | +| `1008` close from SignalR | Token expired earlier than estimated. Force immediate re-negotiate. | +| Negotiate `401` | Refresh BF JWT once via `tokenManager.getBotToken({ forceRefresh: true })` (extend TokenManager if not already supported), then retry once. If still 401, surface to caller and back off. | +| Negotiate `503 "Socket mode is not available."` | Emit `unavailable`. If `fallbackOn503` (default true): resolve `start()` without an active socket and stay quiet — APX will deliver to the HTTPS endpoint, which is still running. If `fallbackOn503: false`: reject `start()`. | +| Negotiate `503 "Unable to allocate WebSocket session."` | Retry with jittered backoff (transient). | +| Stopped via `stop()` | Cancel timers, close hub connection. `start()` is callable again. | + +### 5.5 Synthesized inbound `IToken` + +The platform validates the bearer token once at `/v3/websockets/connect`. Subsequent activity frames carry no per-activity JWT. The SDK synthesizes one so the activity can re-enter `App.onActivity` without changing `HttpServer`/router types. Every field on `IToken` (see [packages/api/src/auth/token.ts](packages/api/src/auth/token.ts)) is populated so any downstream consumer behaves identically to the HTTP path: + +```ts +const token: IToken = { + appId: app.credentials?.clientId ?? '', + appDisplayName: app.name, // best-effort, matches HTTP path + tenantId: payload.conversation?.tenantId + ?? app.credentials?.tenantId, + serviceUrl: payload.serviceUrl ?? app.api.serviceUrl, + from: 'azure', // service-to-service caller type + fromId: app.credentials?.clientId ?? '', + expiration: Date.now() + sessionExpiresIn * 1000, // tracks the SignalR access token + isExpired: (bufferMs = 5 * 60_000) => + Date.now() + bufferMs >= this.tokenExpiry, + toString: () => '', // never used by app.process +}; +``` + +This is safe because: +- The HTTP `JwtValidator` / `ServiceTokenValidator` middleware is **not** in the socket path — we never feed the synthesized token to JWT validation. +- [`app.process`](packages/apps/src/app.process.ts) reads `activity.serviceUrl || token.serviceUrl` and otherwise builds the `ConversationReference` from the **activity body**, not the token. Both fields are present on socket-delivered envelopes exactly as on HTTP. + +## 5.6 Activity-type parity — what flows through, what doesn't + +**Guarantee:** every activity type that today reaches a handler via the HTTP `/api/messages` endpoint will reach the same handler unchanged when SocketModeApp is active. SocketModeApp does not filter, transform, or special-case activities by type — it hands the envelope's `payload` to `app.onActivity({ body, token })` and the existing router does the rest. + +Concretely, this means each of the following keeps working with zero new code: + +| Activity type / `name` | Existing entry point (handler API) | Socket-mode behavior | +| --- | --- | --- | +| `message` | `app.on('message', …)`, `app.message(/regex/, …)` | ✅ Delivered | +| `messageReaction` | `app.on('messageReaction', …)` | ✅ Delivered | +| `messageUpdate` (edit) | `app.on('messageUpdate', …)` ([routes/message-update.ts](packages/apps/src/routes/message-update.ts)) | ✅ Delivered | +| `messageDelete` (soft delete) | `app.on('messageDelete', …)` ([routes/message-delete.ts](packages/apps/src/routes/message-delete.ts)) | ✅ Delivered | +| `conversationUpdate` (members added/removed, channel created/renamed/deleted, team renamed, etc.) | `app.on('conversationUpdate', …)` and the sub-routes in [routes/conversation-update.ts](packages/apps/src/routes/conversation-update.ts) | ✅ Delivered | +| `installationUpdate` (`add`, `remove`) | `app.on('install.*', …)` ([routes/install.ts](packages/apps/src/routes/install.ts)) | ✅ Delivered | +| `event` (system events e.g. `application/vnd.microsoft.meetingStart`, `meetingEnd`, `readReceipt`) | `app.on('event', …)` ([routes/event.ts](packages/apps/src/routes/event.ts)) | ✅ Delivered | +| `typing` | `app.on('typing', …)` | ✅ Delivered | +| `endOfConversation` | `app.on('endOfConversation', …)` | ✅ Delivered | +| Mention dispatch | `app.on('mention', …)` (post-routing, derived from activity entities) | ✅ Delivered — the dispatch is driven off the body, not the transport | +| Catch-all | `app.on('activity', …)` | ✅ Delivered | +| **`invoke` (all `name`s — `signin/tokenExchange`, `signin/verifyState`, `composeExtension/*`, `task/fetch`, `task/submit`, `adaptiveCard/action`, `fileConsent/invoke`, …)** | [routes/invoke/*](packages/apps/src/routes/invoke/) | ⛔ **Stays on HTTPS by platform design.** The bot's `/api/messages` endpoint continues to receive invokes regardless of socket mode. This is enforced **on the APX side**, not in the SDK — APX's `SocketModeDispatcher` guards `!(botActivity is BotInvokeActivity)`. See [`apx.dev.md`](file:///c:/Work6/Git/teams-conv-platform-specs/features/socket-mode/apx.dev.md) §D5. | + +**Defensive behavior in the SDK**: if for any reason an `invoke` frame *does* arrive on the socket (e.g. future platform change, mis-flighted dispatcher), the SDK still routes it through `app.onActivity` so existing invoke handlers fire. The resulting `InvokeResponse` is discarded — there is no v1 wire frame to send it back over the socket. This is a `log.warn` event but not an error. **The bot developer should always keep the HTTP server running for invokes.** + +**Why this works automatically (no per-type code in SocketModeApp):** + +- The HTTP path goes `HttpServer.handleRequest → onRequest(event) → App.onActivity → App.process → router.select(activity)`. +- The socket path goes `HubConnection.on('activity') → SocketModeApp.dispatch → App.onActivity → App.process → router.select(activity)`. +- The two paths merge at the same `App.onActivity({ body, token })` call. From that line onward — including all of [app.process.ts](packages/apps/src/app.process.ts) — the code does not see, query, or branch on which transport delivered the activity. Adding a new activity type or invoke `name` in the future automatically works for both paths. + +### 5.7 Concurrent connections — why and how many + +#### TL;DR — why the Teams SDK does NOT need 10 connections + +The Slack ecosystem expects bots to open up to 10 simultaneous WSS connections. **For Teams bots on APX, the right default is 1.** The Slack rationale doesn't transfer because the two platforms have opposite dispatch models: + +| Slack rationale for many connections | Why it doesn't move the needle on APX | +| --- | --- | +| **Load distribution** — Slack shards events across the open sockets, so 10 sockets ≈ 10× throughput. | APX dispatches via Azure SignalR group fan-out — **every session for the same `botKey` receives every event** ([`apx.dev.md`](file:///c:/Work6/Git/teams-conv-platform-specs/features/socket-mode/apx.dev.md) §D5). Opening more sockets does NOT add throughput; it just multiplies the bandwidth/CPU per event by `N`. | +| **Graceful restart** — overlap a fresh connection with the draining old one to avoid event loss during a scheduled disconnect. | The SDK already does **make-before-break** on every re-negotiate (§5.3). A single connection survives every scheduled token rotation without a delivery gap. | +| **Active-active redundancy** — keep multiple parallel connections so a single socket drop doesn't pause the app. | The supported APX answer is **horizontal pods**, not intra-process sockets — the platform dev guide's blue/green section explicitly says "Run one session per process." Two pods × one socket each gives the same fault tolerance with `1×` the per-bot bandwidth, not `2×`. | +| **High event volume from a single workspace** — Slack apps can saturate a single socket. | Not a concern for Teams in v1: APX's per-bot event rates are nowhere near a single-socket bottleneck (Azure SignalR Standard tier alone bursts at ~30k msg/s per unit, far above any single bot's event rate). | + +In short: **APX's fan-out semantics make extra sockets a cost without a benefit for any of the reasons Slack uses them.** The one APX-specific use case where `connections > 1` *does* help — surviving a single-process socket drop without waiting for the reconnect backoff — is niche enough to be an opt-in knob rather than the default. + +**Concrete recommendation:** + +| Deployment | Recommended `connections` | +| --- | --- | +| Single pod, accept brief delivery pauses during reconnect | `1` (default) | +| Single pod, want belt-and-suspenders against a single socket drop | `2` | +| Two or more pods | `1` per pod — pod count already gives active-active redundancy | +| Anything chasing throughput | `1` per pod, scale by **adding pods**, not sockets | + +The SDK still allows up to `10` to match Slack ecosystem muscle memory, but nothing in the APX model rewards going above `2` from a single process. + +#### Slack's model (for context) + +Slack's Socket Mode lets an app open up to 10 simultaneous WSS connections to its gateway ([Slack docs: Connections](https://docs.slack.dev/apis/events-api/using-socket-mode/#connections)). Slack's documented reasons: + +1. **Graceful restarts** — pre-warm a new connection before a scheduled disconnect. +2. **Load distribution** — Slack distributes events *across* the open connections so a single slow consumer doesn't bottleneck event delivery. +3. **Active-active redundancy** — zero-downtime app restarts by overlapping a new fleet of connections with the draining old one. + +#### APX's model is different + +APX dispatches via Azure SignalR group `bot_{botKey}` and the platform spec ([`apx.dev.md`](file:///c:/Work6/Git/teams-conv-platform-specs/features/socket-mode/apx.dev.md) §D5) explicitly states: + +> Per-bot fanout. **All sessions for the same `botKey` receive every event for that bot.** Run one session per process; do not shard events by session. + +That means: + +| Reason for multi-connection | Slack | APX | Verdict for Teams SDK | +| --- | --- | --- | --- | +| Graceful restart (pre-warm next connection before disconnect) | ✅ Applies | ✅ Applies — but the SDK already does **make-before-break** on every re-negotiate (§5.3), so a single connection already gets this benefit. | Low marginal value; useful as belt-and-suspenders for hostile networks. | +| Load distribution across connections | ✅ Applies — Slack shards events | ⛔ **Does not apply** — APX fans out; every connection gets every event. Adding connections does NOT raise per-bot throughput. | Inapplicable. | +| Active-active redundancy across processes/pods | ✅ Applies | ✅ Applies — multiple pods opening sockets is the documented blue/green story (§Operational guidance in the dev guide). | Already supported by deploying multiple pods. | +| Active-active redundancy **within a single process** | ✅ Applies | ✅ Applies — survives a transient drop of one socket without a delivery gap. | Net new value for single-process bots. | + +#### SDK decision + +Support `connections: N` with `N ∈ [1, 10]`, default `1`. + +- The cap of 10 matches the Slack ecosystem expectation; for APX, 10 is a soft upper bound because each session gets the full event stream (so `N=10` is ~10× the bandwidth/CPU per bot vs `N=1`). +- The default of `1` matches the platform spec's "one session per process" recommendation. +- When `connections > 1`, the SDK **auto-enables `dedupe`** — APX fan-out guarantees that every envelope arrives `N` times, and the bot must see each one exactly once. Auto-enable removes the foot-gun of forgetting to set this. +- Each connection has independent: negotiate, access token, re-negotiate timer, reconnect backoff. They share the dedup set so `app.onActivity` is invoked exactly once per `envelopeId` across the fleet of connections. +- Activity routing is unchanged — `app.onActivity({ body, token })` runs once per unique envelope; the slot that delivered it is internal. + +**When should a Teams bot developer set `connections > 1`?** + +- They need belt-and-suspenders redundancy inside a single process (e.g., a single-pod hosted bot where a momentary socket drop would otherwise pause event delivery for the duration of the reconnect backoff). +- They are coming from the Slack ecosystem and expect the same knob. +- For higher event throughput per-bot: **no — increase pod count instead.** Multiple APX sessions for the same bot don't add throughput because every session gets every event. + +**When should they NOT?** + +- They're already running ≥ 2 pods. The pod count already gives them active-active redundancy; adding intra-process connections just multiplies the dedup work without adding resilience. +- They are bandwidth-constrained or running on a small instance. `N` sockets = `N×` the inbound bandwidth per event. + +### 5.8 Plugin parity + +Plugins registered on the App expose `onActivity`, `onActivityResponse`, `onActivitySent`, and `onError`. Each of these is invoked from inside `App.process` — i.e. *downstream* of where the socket and HTTP paths merge. Concretely: + +- `onActivity` is invoked for every socket-delivered activity exactly as for HTTP-delivered activities ([app.process.ts:73-91](packages/apps/src/app.process.ts#L73-L91)). +- `onActivitySent` fires when the bot replies. Replies still go over HTTPS via `app.send` / `ActivitySender` — no change. +- `onActivityResponse` fires after routing completes. For socket-delivered activities the response is built locally and not sent over the wire (v1 has no ack frame), but the plugin hook still fires. + +Plugins that subclass `IPlugin` and rely on these hooks need no awareness of socket mode. + +## 6. Sample bot (TypeScript) + +```ts +import { App, SocketModeApp } from '@microsoft/teams.apps'; + +const app = new App({ + // clientId / clientSecret picked up from env, same as today +}); + +// Regular handlers — no change. +app.on('message', async ({ activity, send }) => { + await send({ type: 'message', text: `echo: ${activity.text}` }); +}); + +const sm = new SocketModeApp(app, { + renegotiateAt: 0.8, + dedupe: true, // safe with blue/green deploys + fallbackOn503: true, // happy to fall back to HTTPS +}); + +sm.on('connected', ({ sessionId }) => console.log('socket up', sessionId)); +sm.on('renegotiated', ({ sessionId }) => console.log('rotated', sessionId)); +sm.on('unavailable', ({ message }) => console.warn('socket unavailable:', message)); + +await sm.start(); // <-- single entry point. Starts the App AND opens the WSS. +``` + +For HTTP-only delivery (the existing model), the developer just uses `app.start()` and doesn't construct a `SocketModeApp` at all. + +## 7. Test plan + +Unit tests in `socket-mode-app.spec.ts` against a mock `ISocketModeClient`: + +1. `start()` calls negotiate with the BF JWT, then `HubConnection.start()`. (`nock` the POST.) +2. Inbound `"activity"` frame → `app.onActivity` called once with `body=payload` and a synthesized token. Handlers fire. +3. Re-negotiate fires at `0.8 × expiresIn`, swaps connection make-before-break. +4. `stop()` cancels the re-negotiate timer and closes the hub. +5. `503 "Socket mode is not available"` with `fallbackOn503: true` resolves `start()`, emits `unavailable`, does NOT throw. +6. Dedupe: two frames with the same `envelopeId` route the activity exactly once. +7. Token expired mid-stream (`1008` close) → immediate re-negotiate, not waited-out backoff. +8. Activity with `type: 'invoke'` never appears on the socket — but if it does (test only), it still routes through `app.onActivity` and produces an `InvokeResponse` that we discard (no ack frame in v1). + +Integration smoke test (manual against APX dev tenant): +- Echo bot + `SocketModeApp` connected to `https://smba.trafficmanager.net`. +- Confirm `cv` and `sessionId` appear in logs alongside each replied message. +- Kill the socket; confirm APX falls back to HTTPS POST automatically. + +## 8. Open questions for review + +1. **`getBotToken({ forceRefresh })`** — does `TokenManager` already support forced refresh? If not, we'll need to extend it (small, but worth flagging now since the negotiate-401 retry path depends on it). +2. **Cloud routing** — should `SocketModeApp` derive the regional route automatically from `app.cloud`, or always default to global and require explicit opt-in? I'm leaning "always derive from `app.cloud`" for ergonomic parity with the existing HTTP path; not a blocker either way. +3. **CV propagation** — the platform spec asks bots to log the envelope `cv` on every frame and on every outbound reply. Should `SocketModeApp` push the `cv` into `app.log` context (via a child logger) so all downstream logs in that activity carry it automatically? I think yes, but it's the kind of choice that warrants an explicit decision. +4. **Multi-region** — the dev guide notes one socket per bot process. If a customer runs N pods, every pod opens one socket and APX fans out the event to all of them. Should the SDK ship a simple "join-only-N-pods" gate, or leave that to the deployer? I'd leave it to the deployer — out of scope for v1. +5. **Package import path** — `import { SocketModeApp } from '@microsoft/teams.apps/socket-mode'` requires a deep export entry in `packages/apps/package.json`. Easier alternative: top-level export from `@microsoft/teams.apps`. Both work; flagging the package.json change either way. + +## 9. Out of scope (v1) + +- Bot→APX SignalR frames (acks, invoke responses). Awaits APX v2. +- `ack` / `ping` / `disconnect` frame handling beyond logging. +- Standalone npm package — v1 ships inside `@microsoft/teams.apps`. We can extract later if `@microsoft/signalr` size becomes a concern. +- Replacement of the HTTP path for OAuth/tabs/invokes/remote functions. +- Mooncake / Airgap clouds — APX itself isn't there yet. + +--- + +## Appendix A — Quick reference: the wire envelope + +```jsonc +{ + "type": "activity", + "envelopeId": "32-hex-chars", // unique per frame + "cv": "base.extension", // APX correlation vector + "payload": { /* Bot Framework Activity JSON */ } +} +``` + +SignalR hub method name: `"activity"` (server → client). + +## Appendix B — Mapping from .NET reference to TypeScript + +| .NET (`SocketModeTestClient/Program.cs`) | teams.ts equivalent | +|---|---| +| `HubConnectionBuilder().WithUrl(url, o => o.AccessTokenProvider = …)` | `new HubConnectionBuilder().withUrl(url, { accessTokenFactory: () => accessToken })` (`@microsoft/signalr`) | +| `.WithAutomaticReconnect()` | `.withAutomaticReconnect()` | +| `connection.On("activity", OnActivityAsync)` | `connection.on('activity', envelope => …)` | +| `connection.Closed += …` | `connection.onclose(err => …)` | +| `Task.Delay(0.8 × expiresIn)` + dispose + restart | `setTimeout(renegotiate, 0.8 × expiresIn × 1000)` with make-before-break | +| `JObject.Value("payload")` | `(envelope as SocketActivityEnvelope).payload` | diff --git a/package-lock.json b/package-lock.json index 231f7b4c9..4ee47db0c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5011,6 +5011,69 @@ "react-dom": "^18 || ^19" } }, + "node_modules/@microsoft/signalr": { + "version": "8.0.17", + "resolved": "https://registry.npmjs.org/@microsoft/signalr/-/signalr-8.0.17.tgz", + "integrity": "sha512-5pM6xPtKZNJLO0Tq5nQasVyPFwi/WBY3QB5uc/v3dIPTpS1JXQbaXAQAPxFoQ5rTBFE094w8bbqkp17F9ReQvA==", + "license": "MIT", + "dependencies": { + "abort-controller": "^3.0.0", + "eventsource": "^2.0.2", + "fetch-cookie": "^2.0.3", + "node-fetch": "^2.6.7", + "ws": "^7.5.10" + } + }, + "node_modules/@microsoft/signalr/node_modules/eventsource": { + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/eventsource/-/eventsource-2.0.2.tgz", + "integrity": "sha512-IzUmBGPR3+oUG9dUeXynyNmf91/3zUSJg1lCktzKw47OXuhco54U3r9B7O4XX+Rb1Itm9OZ2b0RkTs10bICOxA==", + "license": "MIT", + "engines": { + "node": ">=12.0.0" + } + }, + "node_modules/@microsoft/signalr/node_modules/node-fetch": { + "version": "2.7.0", + "resolved": "https://registry.npmjs.org/node-fetch/-/node-fetch-2.7.0.tgz", + "integrity": "sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A==", + "license": "MIT", + "dependencies": { + "whatwg-url": "^5.0.0" + }, + "engines": { + "node": "4.x || >=6.0.0" + }, + "peerDependencies": { + "encoding": "^0.1.0" + }, + "peerDependenciesMeta": { + "encoding": { + "optional": true + } + } + }, + "node_modules/@microsoft/signalr/node_modules/ws": { + "version": "7.5.10", + "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.10.tgz", + "integrity": "sha512-+dbF1tHwZpXcbOJdVOkzLDxZP1ailvSxM6ZweXTegylPny803bFhA+vqBYw4s31NSAk4S2Qz+AKXK9a4wkdjcQ==", + "license": "MIT", + "engines": { + "node": ">=8.3.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": "^5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/@microsoft/teams-js": { "version": "2.49.0", "license": "MIT", @@ -11620,6 +11683,16 @@ "node": "^12.20 || >= 14.13" } }, + "node_modules/fetch-cookie": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/fetch-cookie/-/fetch-cookie-2.2.0.tgz", + "integrity": "sha512-h9AgfjURuCgA2+2ISl8GbavpUdR+WGAM2McW/ovn4tVccegp8ZqCKWSBR8uRdM8dDNlx5WdKRWxBYUwteLDCNQ==", + "license": "Unlicense", + "dependencies": { + "set-cookie-parser": "^2.4.8", + "tough-cookie": "^4.0.0" + } + }, "node_modules/file-entry-cache": { "version": "8.0.0", "dev": true, @@ -16239,6 +16312,18 @@ "version": "1.1.0", "license": "MIT" }, + "node_modules/psl": { + "version": "1.15.0", + "resolved": "https://registry.npmjs.org/psl/-/psl-1.15.0.tgz", + "integrity": "sha512-JZd3gMVBAVQkSs6HdNZo9Sdo0LNcQeMNP3CozBJb3JYC/QUYZTnKxP+f8oWRX4rHP5EurWxqAHTSwUCjlNKa1w==", + "license": "MIT", + "dependencies": { + "punycode": "^2.3.1" + }, + "funding": { + "url": "https://github.com/sponsors/lupomontero" + } + }, "node_modules/pstree.remy": { "version": "1.1.8", "dev": true, @@ -16262,7 +16347,6 @@ }, "node_modules/punycode": { "version": "2.3.1", - "dev": true, "license": "MIT", "engines": { "node": ">=6" @@ -16296,6 +16380,12 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/querystringify": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/querystringify/-/querystringify-2.2.0.tgz", + "integrity": "sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==", + "license": "MIT" + }, "node_modules/quick-format-unescaped": { "version": "4.0.4", "license": "MIT" @@ -16813,6 +16903,12 @@ "node": ">=0.10.0" } }, + "node_modules/requires-port": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/requires-port/-/requires-port-1.0.0.tgz", + "integrity": "sha512-KigOCHcocU3XODJxsu8i/j8T9tzT4adHiecwORRQ0ZZFcp7ahwXuRU1m+yuO90C5ZUyGeGfocHDI14M3L3yDAQ==", + "license": "MIT" + }, "node_modules/resolve": { "version": "1.22.11", "dev": true, @@ -18401,6 +18497,30 @@ "nodetouch": "bin/nodetouch.js" } }, + "node_modules/tough-cookie": { + "version": "4.1.4", + "resolved": "https://registry.npmjs.org/tough-cookie/-/tough-cookie-4.1.4.tgz", + "integrity": "sha512-Loo5UUvLD9ScZ6jh8beX1T6sO1w2/MpCRpEP7V280GKMVUQ0Jzar2U3UJPsrdbziLEMMhu3Ujnq//rhiFuIeag==", + "license": "BSD-3-Clause", + "dependencies": { + "psl": "^1.1.33", + "punycode": "^2.1.1", + "universalify": "^0.2.0", + "url-parse": "^1.5.3" + }, + "engines": { + "node": ">=6" + } + }, + "node_modules/tough-cookie/node_modules/universalify": { + "version": "0.2.0", + "resolved": "https://registry.npmjs.org/universalify/-/universalify-0.2.0.tgz", + "integrity": "sha512-CJ1QgKmNg3CwvAv/kOFmtnEN05f0D/cn9QntgNOQlQF9dgvVTHj3t+8JPdjqawCHk7V/KA+fbUqzZ9XWhcqPUg==", + "license": "MIT", + "engines": { + "node": ">= 4.0.0" + } + }, "node_modules/tr46": { "version": "0.0.3", "license": "MIT" @@ -19157,6 +19277,16 @@ "dev": true, "license": "MIT" }, + "node_modules/url-parse": { + "version": "1.5.10", + "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.10.tgz", + "integrity": "sha512-WypcfiRhfeUP9vvF0j6rw0J3hrWrw6iZv3+22h6iRMJ/8z1Tj6XfLP4DsUix5MhMPnXpiHDoKyoZ/bdCkwBCiQ==", + "license": "MIT", + "dependencies": { + "querystringify": "^2.1.1", + "requires-port": "^1.0.0" + } + }, "node_modules/use-callback-ref": { "version": "1.3.3", "dev": true, @@ -19876,6 +20006,7 @@ "license": "MIT", "dependencies": { "@azure/msal-node": "^3.8.1", + "@microsoft/signalr": "^8.0.7", "@microsoft/teams.api": "*", "@microsoft/teams.common": "*", "@microsoft/teams.graph": "*", diff --git a/packages/apps/package.json b/packages/apps/package.json index 42563f6ab..468ba12a1 100644 --- a/packages/apps/package.json +++ b/packages/apps/package.json @@ -39,6 +39,7 @@ }, "dependencies": { "@azure/msal-node": "^3.8.1", + "@microsoft/signalr": "^8.0.7", "@microsoft/teams.api": "*", "@microsoft/teams.common": "*", "@microsoft/teams.graph": "*", diff --git a/packages/apps/src/index.ts b/packages/apps/src/index.ts index db94068e7..db01b40c9 100644 --- a/packages/apps/src/index.ts +++ b/packages/apps/src/index.ts @@ -9,5 +9,8 @@ export * as manifest from './manifest'; // HTTP infrastructure - public API export * from './http'; +// Socket Mode (APX → bot delivery over Azure SignalR) +export * from './socket-mode'; + // Threading utilities export { toThreadedConversationId } from './utils/thread'; diff --git a/packages/apps/src/socket-mode/backoff.ts b/packages/apps/src/socket-mode/backoff.ts new file mode 100644 index 000000000..b225c5a2d --- /dev/null +++ b/packages/apps/src/socket-mode/backoff.ts @@ -0,0 +1,45 @@ +export type BackoffOptions = { + readonly minMs?: number; + readonly maxMs?: number; + readonly factor?: number; + readonly jitter?: boolean; +}; + +const DEFAULT_MIN_MS = 2000; +const DEFAULT_MAX_MS = 30000; +const DEFAULT_FACTOR = 2; + +/** + * Jittered exponential backoff. Defaults match the dev-guide recommendation + * (≥ 2s, max 30s, factor 2, with jitter to avoid thundering herd on a fleet of pods). + */ +export class Backoff { + private attempt = 0; + private readonly minMs: number; + private readonly maxMs: number; + private readonly factor: number; + private readonly jitter: boolean; + + constructor(opts: BackoffOptions = {}) { + this.minMs = opts.minMs ?? DEFAULT_MIN_MS; + this.maxMs = opts.maxMs ?? DEFAULT_MAX_MS; + this.factor = opts.factor ?? DEFAULT_FACTOR; + this.jitter = opts.jitter ?? true; + } + + reset(): void { + this.attempt = 0; + } + + /** Returns the next delay in ms and advances the attempt counter. */ + next(): number { + const base = Math.min(this.maxMs, this.minMs * Math.pow(this.factor, this.attempt)); + this.attempt++; + if (!this.jitter) return base; + return Math.floor(base * (0.5 + Math.random() * 0.5)); + } +} + +export function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/packages/apps/src/socket-mode/envelope.ts b/packages/apps/src/socket-mode/envelope.ts new file mode 100644 index 000000000..e20f8916f --- /dev/null +++ b/packages/apps/src/socket-mode/envelope.ts @@ -0,0 +1,24 @@ +/** + * Wire envelope sent from APX over the Azure SignalR socket. + * Method name on the hub is `"activity"` — see SocketModeDispatcher.cs on the platform side. + */ +export interface ISocketActivityEnvelope { + /** Frame discriminator. v1 only emits `"activity"`. Future: `ping`, `ack`, `disconnect`. */ + readonly type: string; + /** Unique per frame; the bot should log this and use it for dedup. */ + readonly envelopeId: string; + /** APX correlation vector — log on every frame for cross-service traceability. */ + readonly cv?: string; + /** The Bot Framework Activity JSON. */ + readonly payload: unknown; +} + +export function isActivityEnvelope(v: unknown): v is ISocketActivityEnvelope { + if (v == null || typeof v !== 'object') return false; + const e = v as Record; + return ( + typeof e.type === 'string' && + typeof e.envelopeId === 'string' && + e.payload != null + ); +} diff --git a/packages/apps/src/socket-mode/index.ts b/packages/apps/src/socket-mode/index.ts new file mode 100644 index 000000000..29d37a201 --- /dev/null +++ b/packages/apps/src/socket-mode/index.ts @@ -0,0 +1,11 @@ +export { SocketModeApp, SocketModeOptions, SocketModeEvents } from './socket-mode-app'; +export { ISocketModeClient, SocketModeClient, ConnectionState } from './socket-mode-client'; +export { ISocketActivityEnvelope, isActivityEnvelope } from './envelope'; +export { + NegotiateRoute, + NegotiateResult, + NegotiateUnavailableError, + negotiate, +} from './negotiate'; +export { synthesizeToken } from './synthesize-token'; +export { Backoff, BackoffOptions } from './backoff'; diff --git a/packages/apps/src/socket-mode/negotiate.ts b/packages/apps/src/socket-mode/negotiate.ts new file mode 100644 index 000000000..3a3abc569 --- /dev/null +++ b/packages/apps/src/socket-mode/negotiate.ts @@ -0,0 +1,83 @@ +import { Client as HttpClient } from '@microsoft/teams.common'; + +/** + * Route variant for POST /v3/websockets/connect. + * + * - `global`: `POST /v3/websockets/connect` + * - `regional`: `POST /{cloud}/v3/websockets/connect` + * - `regional-tenant`: `POST /{cloud}/{tenantId}/v3/websockets/connect` + */ +export type NegotiateRoute = + | { kind: 'global' } + | { kind: 'regional'; cloud: string } + | { kind: 'regional-tenant'; cloud: string; tenantId: string }; + +export type NegotiateResult = { + /** Opaque WSS URL returned by Azure SignalR via APX. */ + readonly url: string; + /** Client access token for the WSS connect. */ + readonly accessToken: string; + /** APX-generated session id — include in logs for support correlation. */ + readonly sessionId: string; + /** Token lifetime in seconds — re-negotiate before this expires. */ + readonly expiresIn: number; +}; + +/** Returned when APX explicitly signals socket mode is unavailable (503). */ +export class NegotiateUnavailableError extends Error { + readonly status: number; + constructor(status: number, message: string) { + super(message); + this.name = 'NegotiateUnavailableError'; + this.status = status; + } +} + +export type NegotiateOptions = { + readonly client: HttpClient; + readonly serviceUrl: string; + readonly route: NegotiateRoute; + /** Bot Framework JWT as a raw string. */ + readonly bearerToken: string; +}; + +function routePath(route: NegotiateRoute): string { + switch (route.kind) { + case 'global': + return '/v3/websockets/connect'; + case 'regional': + return `/${encodeURIComponent(route.cloud)}/v3/websockets/connect`; + case 'regional-tenant': + return `/${encodeURIComponent(route.cloud)}/${encodeURIComponent(route.tenantId)}/v3/websockets/connect`; + } +} + +function trimTrailingSlash(s: string): string { + return s.replace(/\/+$/, ''); +} + +/** + * Calls APX's negotiate endpoint and returns the SignalR connect coordinates. + * + * Throws `NegotiateUnavailableError` when APX returns 503 — the caller can decide + * whether to fall back to HTTP delivery or rethrow. + */ +export async function negotiate(opts: NegotiateOptions): Promise { + const { client, serviceUrl, route, bearerToken } = opts; + const url = `${trimTrailingSlash(serviceUrl)}${routePath(route)}`; + + try { + const res = await client.post(url, undefined, { + token: bearerToken, + }); + return res.data; + } catch (err: unknown) { + const e = err as { response?: { status?: number; data?: { error?: string } }; message?: string }; + const status = e.response?.status; + const message = e.response?.data?.error ?? e.message ?? 'Negotiate failed'; + if (status === 503) { + throw new NegotiateUnavailableError(503, message); + } + throw err; + } +} diff --git a/packages/apps/src/socket-mode/socket-mode-app.spec.ts b/packages/apps/src/socket-mode/socket-mode-app.spec.ts new file mode 100644 index 000000000..874595569 --- /dev/null +++ b/packages/apps/src/socket-mode/socket-mode-app.spec.ts @@ -0,0 +1,472 @@ +import jwt from 'jsonwebtoken'; + +import { App } from '../app'; +import { createTestApp } from '../test-utils'; + +import { ISocketActivityEnvelope } from './envelope'; +import { NegotiateUnavailableError } from './negotiate'; +import { SocketModeApp } from './socket-mode-app'; +import { ConnectionState, ISocketModeClient } from './socket-mode-client'; + +class FakeSocketClient implements ISocketModeClient { + readonly id: string; + connectionId: string; + state: ConnectionState = 'disconnected'; + connectCalls: Array<{ url: string; accessToken: string }> = []; + disconnectCalls = 0; + activityHandler?: (env: unknown) => void | Promise; + closeHandler?: (err?: Error) => void; + reconnectingHandler?: (err?: Error) => void; + reconnectedHandler?: (id?: string) => void; + shouldFailConnect = false; + + constructor(id = 'test-conn-1') { + this.id = id; + this.connectionId = id; + } + + async connect(url: string, accessToken: string): Promise { + this.connectCalls.push({ url, accessToken }); + if (this.shouldFailConnect) { + this.state = 'disconnected'; + throw new Error('fake connect failed'); + } + this.state = 'connected'; + } + async disconnect(): Promise { + this.disconnectCalls++; + this.state = 'disconnected'; + } + onActivity(h: (e: unknown) => void | Promise) { this.activityHandler = h; } + onClose(h: (err?: Error) => void) { this.closeHandler = h; } + onReconnecting(h: (err?: Error) => void) { this.reconnectingHandler = h; } + onReconnected(h: (id?: string) => void) { this.reconnectedHandler = h; } + + async deliver(env: ISocketActivityEnvelope) { + if (this.activityHandler) await this.activityHandler(env); + } + fireClose(err?: Error) { + if (this.closeHandler) this.closeHandler(err); + } +} + +function mockNegotiateOnApp( + app: App, + responses: Array<{ status?: number; body?: unknown; error?: Error }>, +) { + const post = jest.spyOn(app.client, 'post'); + for (const r of responses) { + if (r.error) { + post.mockRejectedValueOnce(r.error); + } else { + post.mockResolvedValueOnce({ + data: r.body, + status: r.status ?? 200, + statusText: 'OK', + headers: {}, + config: {} as never, + }); + } + } + return post; +} + +function mockBotToken(app: App, value: string | null) { + jest.spyOn(app.tokenManager, 'getBotToken').mockResolvedValue( + value == null + ? null + : { toString: () => value, isExpired: () => false } as never, + ); +} + +const sampleMessageEnvelope: ISocketActivityEnvelope = { + type: 'activity', + envelopeId: 'env-1', + cv: 'cv.1', + payload: { + type: 'message', + id: 'act-1', + text: 'hello', + serviceUrl: 'https://smba.trafficmanager.net/teams', + channelId: 'msteams', + from: { id: 'user-1', name: 'Alice' }, + recipient: { id: 'bot-1', name: 'Bot' }, + conversation: { id: 'conv-1', tenantId: 't1' }, + }, +}; + +function envelopeOfType(type: string, name?: string, id = `env-${Math.random()}`): ISocketActivityEnvelope { + return { + type: 'activity', + envelopeId: id, + cv: 'cv.x', + payload: { + type, + ...(name ? { name } : {}), + id: `act-${id}`, + serviceUrl: 'https://smba.trafficmanager.net/teams', + channelId: 'msteams', + from: { id: 'user-1' }, + recipient: { id: 'bot-1' }, + conversation: { id: 'conv-1', tenantId: 't1' }, + }, + }; +} + +describe('SocketModeApp', () => { + let app: App; + let fake: FakeSocketClient; + const fakeJwt = jwt.sign( + { exp: Math.floor((Date.now() + 3600000) / 1000) }, + 'test-secret', + ); + + beforeEach(() => { + app = createTestApp({ + clientId: 'test-client-id', + clientSecret: 'test-client-secret', + tenantId: 'test-tenant-id', + }); + fake = new FakeSocketClient(); + mockBotToken(app, fakeJwt); + }); + + afterEach(async () => { + jest.restoreAllMocks(); + // Best-effort: app.stop is idempotent + try { await app.stop(); } catch { /* */ } + }); + + it('start() boots the App and opens the socket', async () => { + const appStart = jest.spyOn(app, 'start'); + mockNegotiateOnApp(app, [{ + body: { url: 'wss://sr/?h=hub', accessToken: 'tok', sessionId: 'sess-1', expiresIn: 60 }, + }]); + + const sm = new SocketModeApp(app, { client: fake }); + await sm.start(); + + expect(appStart).toHaveBeenCalledTimes(1); + expect(fake.connectCalls).toHaveLength(1); + expect(fake.connectCalls[0]).toEqual({ url: 'wss://sr/?h=hub', accessToken: 'tok' }); + expect(sm.sessionId).toBe('sess-1'); + expect(sm.sessionIds).toEqual(['sess-1']); + expect(fake.state).toBe('connected'); + }); + + it('emits "connected" event with slot=0', async () => { + mockNegotiateOnApp(app, [{ + body: { url: 'wss://x', accessToken: 't', sessionId: 'sess-2', expiresIn: 60 }, + }]); + const sm = new SocketModeApp(app, { client: fake }); + const cb = jest.fn(); + sm.on('connected', cb); + + await sm.start(); + + expect(cb).toHaveBeenCalledWith({ sessionId: 'sess-2', connectionId: 'test-conn-1', slot: 0 }); + }); + + it('routes inbound activities through app.onActivity for all activity types', async () => { + mockNegotiateOnApp(app, [{ + body: { url: 'wss://x', accessToken: 't', sessionId: 'sess-3', expiresIn: 3600 }, + }]); + const onActivity = jest.spyOn(app, 'onActivity') + .mockResolvedValue({ status: 200 }); + + const sm = new SocketModeApp(app, { client: fake }); + await sm.start(); + + // Parity check: every activity type that flows on HTTP should flow on socket. + const types = [ + 'message', + 'messageReaction', + 'messageUpdate', + 'messageDelete', + 'conversationUpdate', + 'installationUpdate', + 'event', + 'typing', + 'endOfConversation', + ]; + for (const t of types) { + await fake.deliver(envelopeOfType(t)); + } + + expect(onActivity).toHaveBeenCalledTimes(types.length); + for (let i = 0; i < types.length; i++) { + const call = onActivity.mock.calls[i][0]; + expect((call.body as { type?: string }).type).toBe(types[i]); + expect(call.token.appId).toBe('test-client-id'); + expect(call.token.from).toBe('azure'); + expect(call.token.serviceUrl).toBe('https://smba.trafficmanager.net/teams'); + expect(call.token.tenantId).toBe('t1'); + expect(call.token.isExpired()).toBe(false); + } + }); + + it('routes inbound invoke activities through app.onActivity (defensive fallback)', async () => { + mockNegotiateOnApp(app, [{ + body: { url: 'wss://x', accessToken: 't', sessionId: 'sess-4', expiresIn: 3600 }, + }]); + const onActivity = jest.spyOn(app, 'onActivity').mockResolvedValue({ status: 200 }); + + const sm = new SocketModeApp(app, { client: fake }); + await sm.start(); + await fake.deliver(envelopeOfType('invoke', 'composeExtension/query')); + + // SocketModeApp does not filter by activity.type — even invokes (which platform v1 says + // never come on the socket) are routed if they arrive. Response is discarded. + expect(onActivity).toHaveBeenCalledTimes(1); + expect((onActivity.mock.calls[0][0].body as { type?: string }).type).toBe('invoke'); + }); + + it('emits an envelope event carrying envelopeId + cv for every frame', async () => { + mockNegotiateOnApp(app, [{ + body: { url: 'wss://x', accessToken: 't', sessionId: 's', expiresIn: 60 }, + }]); + jest.spyOn(app, 'onActivity').mockResolvedValue({ status: 200 }); + + const sm = new SocketModeApp(app, { client: fake }); + const env = jest.fn(); + sm.on('envelope', env); + await sm.start(); + + await fake.deliver(sampleMessageEnvelope); + expect(env).toHaveBeenCalledWith({ + envelopeId: 'env-1', + cv: 'cv.1', + type: 'activity', + slot: 0, + duplicate: false, + }); + expect(sm.lastCv).toBe('cv.1'); + }); + + it('dedupe drops a repeated envelopeId', async () => { + mockNegotiateOnApp(app, [{ + body: { url: 'wss://x', accessToken: 't', sessionId: 's', expiresIn: 60 }, + }]); + const onActivity = jest.spyOn(app, 'onActivity').mockResolvedValue({ status: 200 }); + + const sm = new SocketModeApp(app, { client: fake, dedupe: true }); + await sm.start(); + + await fake.deliver(sampleMessageEnvelope); + await fake.deliver(sampleMessageEnvelope); + await fake.deliver(sampleMessageEnvelope); + + expect(onActivity).toHaveBeenCalledTimes(1); + }); + + it('non-envelope frames are discarded without throwing', async () => { + mockNegotiateOnApp(app, [{ + body: { url: 'wss://x', accessToken: 't', sessionId: 's', expiresIn: 60 }, + }]); + const onActivity = jest.spyOn(app, 'onActivity').mockResolvedValue({ status: 200 }); + + const sm = new SocketModeApp(app, { client: fake }); + await sm.start(); + + await fake.deliver(null as never); + await fake.deliver({ type: 'activity', envelopeId: 'x' } as never); // payload missing + expect(onActivity).not.toHaveBeenCalled(); + }); + + it('503 from negotiate with fallbackOn503=true resolves start() and emits "unavailable"', async () => { + const err = new Error('boom') as Error & { response?: { status?: number; data?: { error?: string } } }; + err.response = { status: 503, data: { error: 'Socket mode is not available.' } }; + mockNegotiateOnApp(app, [{ error: err }]); + + const sm = new SocketModeApp(app, { client: fake, fallbackOn503: true }); + const unavailable = jest.fn(); + sm.on('unavailable', unavailable); + + await expect(sm.start()).resolves.toBeUndefined(); + expect(unavailable).toHaveBeenCalledWith({ status: 503, message: 'Socket mode is not available.', slot: 0 }); + expect(fake.connectCalls).toHaveLength(0); + }); + + it('503 from negotiate with fallbackOn503=false rejects start()', async () => { + const err = new Error('boom') as Error & { response?: { status?: number; data?: { error?: string } } }; + err.response = { status: 503, data: { error: 'down' } }; + mockNegotiateOnApp(app, [{ error: err }]); + + const sm = new SocketModeApp(app, { client: fake, fallbackOn503: false }); + await expect(sm.start()).rejects.toBeInstanceOf(NegotiateUnavailableError); + }); + + it('stop() cancels reconnect, disconnects socket, and stops the App', async () => { + mockNegotiateOnApp(app, [{ + body: { url: 'wss://x', accessToken: 't', sessionId: 's', expiresIn: 60 }, + }]); + const appStop = jest.spyOn(app, 'stop'); + + const sm = new SocketModeApp(app, { client: fake }); + await sm.start(); + await sm.stop(); + + expect(fake.disconnectCalls).toBe(1); + expect(appStop).toHaveBeenCalledTimes(1); + }); + + it('handleClose triggers a fresh negotiate+connect cycle', async () => { + mockNegotiateOnApp(app, [ + { body: { url: 'wss://x', accessToken: 'a', sessionId: 's1', expiresIn: 60 } }, + { body: { url: 'wss://y', accessToken: 'b', sessionId: 's2', expiresIn: 60 } }, + ]); + + const sm = new SocketModeApp(app, { client: fake }); + await sm.start(); + expect(fake.connectCalls).toHaveLength(1); + + fake.fireClose(new Error('transport closed')); + + // Wait a tick for the async reconnect path + await new Promise((r) => setImmediate(r)); + + expect(fake.connectCalls).toHaveLength(2); + expect(sm.sessionId).toBe('s2'); + }); + + // --------------------------------------------------------------------------- + // Multi-connection (the Slack-style "up to 10 connections" surface) + // --------------------------------------------------------------------------- + + describe('multi-connection', () => { + it('rejects connections outside [1, 10]', () => { + expect(() => new SocketModeApp(app, { connections: 0 })).toThrow(/\[1, 10\]/); + expect(() => new SocketModeApp(app, { connections: 11 })).toThrow(/\[1, 10\]/); + expect(() => new SocketModeApp(app, { connections: 1.5 })).toThrow(/\[1, 10\]/); + }); + + it('opens N concurrent sockets when connections > 1', async () => { + const fakes = [ + new FakeSocketClient('c-0'), + new FakeSocketClient('c-1'), + new FakeSocketClient('c-2'), + ]; + mockNegotiateOnApp(app, [ + { body: { url: 'wss://a', accessToken: 'a', sessionId: 'sess-a', expiresIn: 3600 } }, + { body: { url: 'wss://b', accessToken: 'b', sessionId: 'sess-b', expiresIn: 3600 } }, + { body: { url: 'wss://c', accessToken: 'c', sessionId: 'sess-c', expiresIn: 3600 } }, + ]); + + const sm = new SocketModeApp(app, { + connections: 3, + clientFactory: (i) => fakes[i], + }); + const connected = jest.fn(); + sm.on('connected', connected); + + await sm.start(); + + expect(fakes[0].connectCalls).toHaveLength(1); + expect(fakes[1].connectCalls).toHaveLength(1); + expect(fakes[2].connectCalls).toHaveLength(1); + expect(connected).toHaveBeenCalledTimes(3); + expect(sm.sessionIds.sort()).toEqual(['sess-a', 'sess-b', 'sess-c']); + // sessionId mirrors slot 0 + expect(sm.sessionId).toBe('sess-a'); + }); + + it('routes a fan-out envelope exactly once across all slots (dedupe auto-enabled)', async () => { + const fakes = [new FakeSocketClient('c-0'), new FakeSocketClient('c-1')]; + mockNegotiateOnApp(app, [ + { body: { url: 'wss://a', accessToken: 'a', sessionId: 's-a', expiresIn: 3600 } }, + { body: { url: 'wss://b', accessToken: 'b', sessionId: 's-b', expiresIn: 3600 } }, + ]); + const onActivity = jest.spyOn(app, 'onActivity').mockResolvedValue({ status: 200 }); + + const sm = new SocketModeApp(app, { + connections: 2, + clientFactory: (i) => fakes[i], + // dedupe omitted intentionally — must auto-enable when connections > 1 + }); + const envelopeEvt = jest.fn(); + sm.on('envelope', envelopeEvt); + + await sm.start(); + + // APX dispatches via SignalR group fan-out: every session receives every event. + // The SDK must call app.onActivity exactly once per envelopeId. + await fakes[0].deliver(sampleMessageEnvelope); + await fakes[1].deliver(sampleMessageEnvelope); + + expect(onActivity).toHaveBeenCalledTimes(1); + // The envelope event still fires for BOTH frames so observability stays complete, + // but the second is flagged duplicate=true. + expect(envelopeEvt).toHaveBeenCalledTimes(2); + expect(envelopeEvt.mock.calls[0][0].duplicate).toBe(false); + expect(envelopeEvt.mock.calls[1][0].duplicate).toBe(true); + expect(envelopeEvt.mock.calls[0][0].slot).toBe(0); + expect(envelopeEvt.mock.calls[1][0].slot).toBe(1); + }); + + it('stop() drains all slots', async () => { + const fakes = [new FakeSocketClient('c-0'), new FakeSocketClient('c-1')]; + mockNegotiateOnApp(app, [ + { body: { url: 'wss://a', accessToken: 'a', sessionId: 's-a', expiresIn: 3600 } }, + { body: { url: 'wss://b', accessToken: 'b', sessionId: 's-b', expiresIn: 3600 } }, + ]); + + const sm = new SocketModeApp(app, { + connections: 2, + clientFactory: (i) => fakes[i], + }); + await sm.start(); + await sm.stop(); + + expect(fakes[0].disconnectCalls).toBe(1); + expect(fakes[1].disconnectCalls).toBe(1); + }); + + it('close on one slot reconnects only that slot, others stay live', async () => { + const fakes = [new FakeSocketClient('c-0'), new FakeSocketClient('c-1')]; + mockNegotiateOnApp(app, [ + { body: { url: 'wss://a', accessToken: 'a', sessionId: 's-a', expiresIn: 3600 } }, + { body: { url: 'wss://b', accessToken: 'b', sessionId: 's-b', expiresIn: 3600 } }, + { body: { url: 'wss://b2', accessToken: 'b2', sessionId: 's-b2', expiresIn: 3600 } }, + ]); + + const sm = new SocketModeApp(app, { + connections: 2, + clientFactory: (i) => fakes[i], + }); + await sm.start(); + expect(fakes[0].connectCalls).toHaveLength(1); + expect(fakes[1].connectCalls).toHaveLength(1); + + // Slot 1 drops. Slot 0 stays connected. Slot 1's client reconnects in place. + fakes[1].fireClose(new Error('peer reset')); + await new Promise((r) => setImmediate(r)); + + expect(fakes[0].connectCalls).toHaveLength(1); // slot 0 untouched + expect(fakes[0].disconnectCalls).toBe(0); // slot 0 untouched + expect(fakes[1].connectCalls).toHaveLength(2); // slot 1 reconnected + expect(sm.sessionIds[0]).toBe('s-a'); + expect(sm.sessionIds[1]).toBe('s-b2'); + }); + + it('user-set dedupe=false is honored even when connections > 1', async () => { + const fakes = [new FakeSocketClient('c-0'), new FakeSocketClient('c-1')]; + mockNegotiateOnApp(app, [ + { body: { url: 'wss://a', accessToken: 'a', sessionId: 's-a', expiresIn: 3600 } }, + { body: { url: 'wss://b', accessToken: 'b', sessionId: 's-b', expiresIn: 3600 } }, + ]); + const onActivity = jest.spyOn(app, 'onActivity').mockResolvedValue({ status: 200 }); + + const sm = new SocketModeApp(app, { + connections: 2, + dedupe: false, // explicit override + clientFactory: (idx) => fakes[idx], + }); + await sm.start(); + await fakes[0].deliver(sampleMessageEnvelope); + await fakes[1].deliver(sampleMessageEnvelope); + + // With dedupe explicitly off, both deliveries route through. + expect(onActivity).toHaveBeenCalledTimes(2); + }); + }); +}); diff --git a/packages/apps/src/socket-mode/socket-mode-app.ts b/packages/apps/src/socket-mode/socket-mode-app.ts new file mode 100644 index 000000000..e663b55c0 --- /dev/null +++ b/packages/apps/src/socket-mode/socket-mode-app.ts @@ -0,0 +1,466 @@ +import { ILogger } from '@microsoft/teams.common'; + +import { App } from '../app'; +import { IPlugin } from '../types'; + +import { Backoff, BackoffOptions, sleep } from './backoff'; +import { ISocketActivityEnvelope, isActivityEnvelope } from './envelope'; +import { + NegotiateResult, + NegotiateRoute, + NegotiateUnavailableError, + negotiate, +} from './negotiate'; +import { ISocketModeClient, SocketModeClient } from './socket-mode-client'; +import { synthesizeToken } from './synthesize-token'; + +export type SocketModeOptions = { + /** Cloud route variant. Default `{ kind: 'global' }`. */ + readonly route?: NegotiateRoute; + /** + * Number of concurrent WSS connections per process. + * Default `1`. Range `[1, 10]`. + * + * APX dispatches via Azure SignalR group fan-out — every session for the same + * `botKey` receives every event. So extra connections give active-active redundancy + * inside a single process, NOT load distribution (see §5.7 of the proposal doc). + * + * When > 1, the SDK automatically enables `dedupe` because duplicates are guaranteed. + */ + readonly connections?: number; + /** Fraction of `expiresIn` at which to re-negotiate the SignalR token. Default 0.8. */ + readonly renegotiateAt?: number; + /** Backoff config for negotiate/reconnect retries. */ + readonly backoff?: BackoffOptions; + /** + * Deduplicate envelopes by envelopeId across all connections. + * Auto-enabled when `connections > 1`. Default `false` for a single connection. + */ + readonly dedupe?: boolean; + /** + * Continue running if `/v3/websockets/connect` returns 503 (socket mode unavailable). + * When true, `start()` resolves without an active socket; the App's HTTP path still works. + * When false, `start()` rejects. Default true. + */ + readonly fallbackOn503?: boolean; + /** Test seam: inject a fake `ISocketModeClient` as slot 0's client (only honored when `connections === 1`). */ + readonly client?: ISocketModeClient; + /** Test seam: build the `ISocketModeClient` for each slot. Takes precedence over `client`. */ + readonly clientFactory?: (slotIndex: number) => ISocketModeClient; +}; + +export type SocketModeEvents = { + connected: (info: { sessionId: string; connectionId?: string; slot: number }) => void; + reconnecting: (info: { slot: number; err?: Error }) => void; + reconnected: (info: { slot: number; connectionId?: string }) => void; + closed: (info: { slot: number; err?: Error }) => void; + renegotiated: (info: { sessionId: string; expiresIn: number; slot: number }) => void; + envelope: (env: { envelopeId: string; cv?: string; type: string; slot: number; duplicate: boolean }) => void; + unavailable: (info: { status: number; message: string; slot: number }) => void; +}; + +const DEFAULT_RENEGOTIATE_AT = 0.8; +const DEDUPE_MAX = 1024; +const MIN_CONNECTIONS = 1; +const MAX_CONNECTIONS = 10; + +type Listener = SocketModeEvents[K]; + +/** + * One concurrent WSS connection. Each slot has its own client, session id, expiry, + * re-negotiate timer, and reconnect backoff. The parent SocketModeApp coordinates + * the slots and shares dedup state across them. + */ +type Slot = { + readonly index: number; + client: ISocketModeClient; + sessionId?: string; + expiry: number; + renegotiateTimer?: ReturnType; + backoff: Backoff; +}; + +/** + * Drop-in replacement for `app.start()` that adds Azure SignalR Socket Mode delivery + * for event-style activities. The App's HTTP server still runs (for invokes, OAuth + * callbacks, tabs, and remote functions). + * + * @example + * const app = new App(); + * app.on('message', async ({ activity, send }) => { + * await send({ type: 'message', text: `echo: ${activity.text}` }); + * }); + * const sm = new SocketModeApp(app); + * await sm.start(); + */ +export class SocketModeApp { + readonly app: App; + + /** + * The most recently negotiated session id (slot 0 in single-connection mode). + * For multi-connection mode, see `sessionIds`. + */ + sessionId?: string; + /** Session ids indexed by slot. Mirrors `sessionId` for slot 0. */ + sessionIds: Array = []; + /** Last `cv` (correlation vector) observed on any inbound envelope. */ + lastCv?: string; + + private readonly opts: { + readonly route: NegotiateRoute; + readonly connections: number; + readonly renegotiateAt: number; + readonly backoff?: BackoffOptions; + readonly dedupe: boolean; + readonly fallbackOn503: boolean; + }; + private readonly log: ILogger; + private readonly clientFactory: (slotIndex: number) => ISocketModeClient; + private slots: Slot[] = []; + private started = false; + private stopping = false; + private seenEnvelopeIds = new Set(); + private envelopeOrder: string[] = []; + private listeners: { [K in keyof SocketModeEvents]?: Array> } = {}; + + constructor(app: App, options: SocketModeOptions = {}) { + this.app = app; + this.log = app.log.child('SocketMode'); + + const connections = options.connections ?? 1; + if ( + !Number.isInteger(connections) || + connections < MIN_CONNECTIONS || + connections > MAX_CONNECTIONS + ) { + throw new Error( + `SocketModeApp: connections must be an integer in [${MIN_CONNECTIONS}, ${MAX_CONNECTIONS}], got ${String(connections)}`, + ); + } + + this.opts = { + route: options.route ?? { kind: 'global' }, + connections, + renegotiateAt: options.renegotiateAt ?? DEFAULT_RENEGOTIATE_AT, + backoff: options.backoff, + // Auto-enable dedup when running multiple sockets — APX fan-out delivers each + // envelope to every session, so duplicates are guaranteed. + dedupe: options.dedupe ?? (connections > 1), + fallbackOn503: options.fallbackOn503 ?? true, + }; + + // Resolve client factory. `client` is honored only for the single-connection case + // to preserve back-compat with existing tests; `clientFactory` is the multi-conn path. + if (options.clientFactory) { + this.clientFactory = options.clientFactory; + } else if (options.client && connections === 1) { + const injected = options.client; + let used = false; + this.clientFactory = () => { + if (!used) { + used = true; + return injected; + } + return new SocketModeClient(); + }; + } else { + this.clientFactory = () => new SocketModeClient(); + } + } + + on(name: K, cb: Listener): this { + const list = (this.listeners[name] ?? []) as Array>; + list.push(cb); + (this.listeners as Record)[name as string] = list; + return this; + } + + /** + * Boot the App (initialize plugins, start HTTP server) AND open the Socket Mode WebSocket(s). + * Drop-in replacement for `app.start()` — do NOT also call `app.start()`. + */ + async start(port?: number | string): Promise { + if (this.started) return; + this.started = true; + this.stopping = false; + + // Boot the App: plugin onInit, plugin onStart, HTTP server bind. + // The HTTP server keeps handling invokes / OAuth callbacks / tabs / remote functions. + await this.app.start(port); + + // Create slots up front so handlers can reference them. + this.slots = []; + this.sessionIds = new Array(this.opts.connections); + for (let i = 0; i < this.opts.connections; i++) { + const slot: Slot = { + index: i, + client: this.clientFactory(i), + expiry: 0, + backoff: new Backoff(this.opts.backoff), + }; + this.wireClient(slot); + this.slots.push(slot); + } + + // Open all slots in parallel. We don't rethrow per-slot failures here — each slot's + // own reconnect/backoff loop handles transient errors. If `fallbackOn503` is false + // and ALL slots get a 503, surface that to the caller. + const results = await Promise.allSettled( + this.slots.map((s) => this.negotiateAndConnect(s)), + ); + + if (!this.opts.fallbackOn503) { + const allUnavailable = + results.length > 0 && + results.every( + (r) => + r.status === 'rejected' && r.reason instanceof NegotiateUnavailableError, + ); + if (allUnavailable) { + const firstReason = (results[0] as PromiseRejectedResult).reason as NegotiateUnavailableError; + throw firstReason; + } + } + } + + /** Close all sockets and stop the underlying App. */ + async stop(): Promise { + if (!this.started) return; + this.stopping = true; + this.started = false; + + for (const slot of this.slots) { + if (slot.renegotiateTimer) { + clearTimeout(slot.renegotiateTimer); + slot.renegotiateTimer = undefined; + } + } + + await Promise.allSettled( + this.slots.map(async (s) => { + try { + await s.client.disconnect(); + } catch (err) { + this.log.warn(`socket disconnect failed (slot ${s.index})`, err); + } + }), + ); + + this.slots = []; + await this.app.stop(); + } + + // ----------------------------------------------------------------------------------------- + // Internals + // ----------------------------------------------------------------------------------------- + + private wireClient(slot: Slot): void { + slot.client.onActivity((env) => this.handleEnvelope(slot, env)); + slot.client.onClose((err) => this.handleClose(slot, err)); + slot.client.onReconnecting((err) => this.emit('reconnecting', { slot: slot.index, err })); + slot.client.onReconnected((id) => this.emit('reconnected', { slot: slot.index, connectionId: id })); + } + + private async negotiateAndConnect(slot: Slot): Promise { + while (!this.stopping) { + try { + const result = await this.negotiateOnce(); + await slot.client.connect(result.url, result.accessToken); + slot.sessionId = result.sessionId; + slot.expiry = Date.now() + result.expiresIn * 1000; + slot.backoff.reset(); + this.sessionIds[slot.index] = result.sessionId; + if (slot.index === 0) this.sessionId = result.sessionId; + this.emit('connected', { + sessionId: result.sessionId, + connectionId: slot.client.connectionId, + slot: slot.index, + }); + this.scheduleRenegotiate(slot, result.expiresIn); + return; + } catch (err: unknown) { + if (err instanceof NegotiateUnavailableError) { + this.log.warn(`socket mode unavailable (slot ${slot.index}): ${err.message}`); + this.emit('unavailable', { status: err.status, message: err.message, slot: slot.index }); + if (this.opts.fallbackOn503) return; + throw err; + } + const delayMs = slot.backoff.next(); + this.log.warn( + `negotiate failed (slot ${slot.index}), retrying in ${delayMs}ms`, + (err as Error)?.message ?? err, + ); + await sleep(delayMs); + } + } + } + + private async negotiateOnce(): Promise { + const tokenObj = await this.app.tokenManager.getBotToken(); + if (!tokenObj) { + throw new Error('SocketModeApp: App has no credentials configured'); + } + return negotiate({ + client: this.app.client, + serviceUrl: this.app.api.serviceUrl, + route: this.opts.route, + bearerToken: tokenObj.toString(), + }); + } + + private scheduleRenegotiate(slot: Slot, expiresInSec: number): void { + if (slot.renegotiateTimer) clearTimeout(slot.renegotiateTimer); + const delayMs = Math.max( + 1000, + Math.floor(expiresInSec * 1000 * this.opts.renegotiateAt), + ); + slot.renegotiateTimer = setTimeout(() => { + void this.renegotiate(slot); + }, delayMs); + // Don't hold the event loop open just for the timer. + (slot.renegotiateTimer as unknown as { unref?: () => void }).unref?.(); + } + + private async renegotiate(slot: Slot): Promise { + if (this.stopping) return; + try { + const result = await this.negotiateOnce(); + // Make-before-break: bring up the new connection, then tear down the old one. + const oldClient = slot.client; + const newClient = this.clientFactory(slot.index); + slot.client = newClient; + this.wireClient(slot); + await newClient.connect(result.url, result.accessToken); + slot.sessionId = result.sessionId; + slot.expiry = Date.now() + result.expiresIn * 1000; + this.sessionIds[slot.index] = result.sessionId; + if (slot.index === 0) this.sessionId = result.sessionId; + this.emit('renegotiated', { + sessionId: result.sessionId, + expiresIn: result.expiresIn, + slot: slot.index, + }); + this.scheduleRenegotiate(slot, result.expiresIn); + try { + await oldClient.disconnect(); + } catch (err) { + this.log.debug(`old socket disconnect failed (slot ${slot.index})`, err); + } + } catch (err: unknown) { + this.log.warn( + `renegotiate failed (slot ${slot.index}); will retry via close handler`, + (err as Error)?.message ?? err, + ); + // The existing connection will drop on token expiry; handleClose triggers full reconnect. + } + } + + private async handleEnvelope(slot: Slot, env: unknown): Promise { + if (!isActivityEnvelope(env)) { + this.log.warn(`discarding non-activity envelope (slot ${slot.index})`, env); + return; + } + const envelope: ISocketActivityEnvelope = env; + this.lastCv = envelope.cv; + + const duplicate = this.opts.dedupe && this.envelopeAlreadySeen(envelope.envelopeId); + this.emit('envelope', { + envelopeId: envelope.envelopeId, + cv: envelope.cv, + type: envelope.type, + slot: slot.index, + duplicate, + }); + + if (duplicate) { + this.log.debug(`dedup: dropping duplicate envelope ${envelope.envelopeId} (slot ${slot.index})`); + return; + } + + const activity = envelope.payload as Record; + if (!activity || typeof activity !== 'object') { + this.log.warn( + `envelope payload missing or non-object; discarding ${envelope.envelopeId} (slot ${slot.index})`, + ); + return; + } + + if (activity.type === 'invoke') { + // Per platform v1, invokes always arrive on HTTPS — this branch shouldn't fire. + // If it does (future platform change, mis-flighted dispatcher), route it anyway; + // the response can't be sent back over the socket in v1, so it's just discarded. + this.log.warn( + `invoke received over socket mode (envelopeId=${envelope.envelopeId}). ` + + 'Invokes should arrive on HTTP. Routing anyway; response will be discarded.', + ); + } + + const conv = activity.conversation as { tenantId?: string } | undefined; + const serviceUrl = + typeof activity.serviceUrl === 'string' + ? activity.serviceUrl + : this.app.api.serviceUrl; + + const token = synthesizeToken({ + clientId: this.app.credentials?.clientId, + tenantId: conv?.tenantId ?? this.app.credentials?.tenantId, + appDisplayName: this.app.name, + serviceUrl, + expirationMs: slot.expiry || Date.now() + 60 * 60 * 1000, + }); + + try { + await this.app.onActivity({ + body: activity as { type?: string; id?: string; serviceUrl?: string }, + token, + }); + } catch (err) { + this.log.error( + `error routing socket-delivered activity (envelopeId=${envelope.envelopeId})`, + err, + ); + } + } + + private envelopeAlreadySeen(id: string): boolean { + if (this.seenEnvelopeIds.has(id)) return true; + this.seenEnvelopeIds.add(id); + this.envelopeOrder.push(id); + if (this.envelopeOrder.length > DEDUPE_MAX) { + const evict = this.envelopeOrder.shift(); + if (evict) this.seenEnvelopeIds.delete(evict); + } + return false; + } + + private handleClose(slot: Slot, err?: Error): void { + this.emit('closed', { slot: slot.index, err }); + if (this.stopping) return; + // Close after SignalR's automatic-reconnect attempts have exhausted, or + // immediately on a 1008 (token expired). Negotiate fresh and reconnect just this slot. + void this.fullReconnect(slot); + } + + private async fullReconnect(slot: Slot): Promise { + if (slot.renegotiateTimer) { + clearTimeout(slot.renegotiateTimer); + slot.renegotiateTimer = undefined; + } + await this.negotiateAndConnect(slot); + } + + private emit( + name: K, + ...args: Parameters> + ): void { + const list = this.listeners[name]; + if (!list) return; + for (const cb of list) { + try { + (cb as (...a: unknown[]) => void)(...(args as unknown[])); + } catch (err) { + this.log.error(`event listener for "${String(name)}" threw`, err); + } + } + } +} diff --git a/packages/apps/src/socket-mode/socket-mode-client.ts b/packages/apps/src/socket-mode/socket-mode-client.ts new file mode 100644 index 000000000..06aff4890 --- /dev/null +++ b/packages/apps/src/socket-mode/socket-mode-client.ts @@ -0,0 +1,126 @@ +import { + HubConnection, + HubConnectionBuilder, + HubConnectionState, + IRetryPolicy, +} from '@microsoft/signalr'; + +export type ConnectionState = 'disconnected' | 'connecting' | 'connected' | 'reconnecting'; + +/** + * Thin abstraction over `@microsoft/signalr.HubConnection` so SocketModeApp can be unit-tested + * with a fake client. + */ +export interface ISocketModeClient { + /** Connect to the given Azure SignalR URL using the provided client access token. */ + connect(url: string, accessToken: string): Promise; + /** Stop the connection. Safe to call multiple times. */ + disconnect(): Promise; + + onActivity(handler: (envelope: unknown) => void | Promise): void; + onClose(handler: (err?: Error) => void): void; + onReconnecting(handler: (err?: Error) => void): void; + onReconnected(handler: (connectionId?: string) => void): void; + + readonly connectionId?: string; + readonly state: ConnectionState; +} + +const DEFAULT_RETRY_POLICY: IRetryPolicy = { + // Reconnect attempts at 0s, 2s, 10s, 30s. Matches @microsoft/signalr defaults + // but exposed here so we can tune later without changing call sites. + nextRetryDelayInMilliseconds: (ctx) => { + if (ctx.previousRetryCount === 0) return 0; + if (ctx.previousRetryCount === 1) return 2_000; + if (ctx.previousRetryCount === 2) return 10_000; + return 30_000; + }, +}; + +/** + * Production implementation backed by `@microsoft/signalr`. + */ +export class SocketModeClient implements ISocketModeClient { + private connection?: HubConnection; + private activityHandler?: (envelope: unknown) => void | Promise; + private closeHandler?: (err?: Error) => void; + private reconnectingHandler?: (err?: Error) => void; + private reconnectedHandler?: (connectionId?: string) => void; + + get connectionId(): string | undefined { + return this.connection?.connectionId ?? undefined; + } + + get state(): ConnectionState { + switch (this.connection?.state) { + case HubConnectionState.Connected: return 'connected'; + case HubConnectionState.Connecting: return 'connecting'; + case HubConnectionState.Reconnecting: return 'reconnecting'; + default: return 'disconnected'; + } + } + + async connect(url: string, accessToken: string): Promise { + const conn = new HubConnectionBuilder() + .withUrl(url, { accessTokenFactory: () => accessToken }) + .withAutomaticReconnect(DEFAULT_RETRY_POLICY) + .build(); + + if (this.activityHandler) { + conn.on('activity', this.activityHandler); + } + if (this.closeHandler) { + conn.onclose(this.closeHandler); + } + if (this.reconnectingHandler) { + conn.onreconnecting(this.reconnectingHandler); + } + if (this.reconnectedHandler) { + conn.onreconnected(this.reconnectedHandler); + } + + await conn.start(); + this.connection = conn; + } + + async disconnect(): Promise { + const conn = this.connection; + this.connection = undefined; + if (conn) { + try { + await conn.stop(); + } catch { + // signalr.stop() can throw if the transport is already torn down — that's fine. + } + } + } + + onActivity(handler: (envelope: unknown) => void | Promise): void { + this.activityHandler = handler; + if (this.connection) { + this.connection.off('activity'); + this.connection.on('activity', handler); + } + } + + onClose(handler: (err?: Error) => void): void { + this.closeHandler = handler; + if (this.connection) { + this.connection.onclose(handler); + } + } + + onReconnecting(handler: (err?: Error) => void): void { + this.reconnectingHandler = handler; + if (this.connection) { + this.connection.onreconnecting(handler); + } + } + + onReconnected(handler: (connectionId?: string) => void): void { + this.reconnectedHandler = handler; + if (this.connection) { + this.connection.onreconnected(handler); + } + } +} diff --git a/packages/apps/src/socket-mode/synthesize-token.ts b/packages/apps/src/socket-mode/synthesize-token.ts new file mode 100644 index 000000000..5dec6a2d2 --- /dev/null +++ b/packages/apps/src/socket-mode/synthesize-token.ts @@ -0,0 +1,32 @@ +import { IToken } from '@microsoft/teams.api'; + +export type SynthesizeTokenOptions = { + readonly clientId?: string; + readonly tenantId?: string; + readonly appDisplayName?: string; + readonly serviceUrl: string; + /** Epoch-ms expiry derived from the SignalR access token's expiresIn. */ + readonly expirationMs: number; +}; + +/** + * Builds an IToken for activities arriving over Socket Mode. + * + * The platform validates the bot's bearer token once at POST /v3/websockets/connect. + * Subsequent activity frames carry no per-activity JWT — we synthesize one so the + * activity can re-enter App.onActivity unchanged. + */ +export function synthesizeToken(opts: SynthesizeTokenOptions): IToken { + const { clientId, tenantId, appDisplayName, serviceUrl, expirationMs } = opts; + return { + appId: clientId ?? '', + appDisplayName, + tenantId, + serviceUrl, + from: 'azure', + fromId: clientId ?? '', + expiration: expirationMs, + isExpired: (bufferMs = 5 * 60_000) => Date.now() + bufferMs >= expirationMs, + toString: () => '', + }; +}