diff --git a/bun.lock b/bun.lock index d5e66080d..534d72afe 100644 --- a/bun.lock +++ b/bun.lock @@ -36,6 +36,21 @@ "typescript": "~5.9.3", }, }, + "packages/appservice": { + "name": "@rocket.chat/appservice", + "version": "0.1.0", + "dependencies": { + "@rocket.chat/federation-core": "workspace:*", + "@rocket.chat/federation-room": "workspace:*", + "mongodb": "^6.16.0", + "reflect-metadata": "^0.2.2", + "tsyringe": "^4.10.0", + "yaml": "^2.7.1", + }, + "peerDependencies": { + "typescript": "~5.9.2", + }, + }, "packages/core": { "name": "@rocket.chat/federation-core", "version": "1.0.50", @@ -64,8 +79,9 @@ }, "packages/federation-sdk": { "name": "@rocket.chat/federation-sdk", - "version": "0.4.2", + "version": "0.6.3", "dependencies": { + "@rocket.chat/appservice": "workspace:*", "@rocket.chat/emitter": "^0.32.0", "@rocket.chat/federation-core": "workspace:*", "@rocket.chat/federation-crypto": "workspace:*", @@ -205,6 +221,8 @@ "@pinojs/redact": ["@pinojs/redact@0.4.0", "", {}, "sha512-k2ENnmBugE/rzQfEcdWHcCY+/FM3VLzH9cYEsbdsoqrvzAKRhUZeRNhAZvB8OitQJ1TBed3yqWtdjzS6wJKBwg=="], + "@rocket.chat/appservice": ["@rocket.chat/appservice@workspace:packages/appservice"], + "@rocket.chat/emitter": ["@rocket.chat/emitter@0.32.0", "", {}, "sha512-QCXNGDm5xjJPePMeP9GknMAZne5xhNmLLD/ZVVqNgOyeWdfQKddwBUqtMiWDaFCRq1bC22kNf3WQLqt89CO6fg=="], "@rocket.chat/eslint-config": ["@rocket.chat/eslint-config@0.7.0", "", { "dependencies": { "@babel/core": "^7.20.7", "@babel/eslint-parser": "~7.23.3", "@types/eslint": "~8.44.6", "@types/prettier": "^2.6.3", "@typescript-eslint/eslint-plugin": "~5.60.1", "@typescript-eslint/parser": "~5.60.1", "eslint": "~8.45.0", "eslint-config-prettier": "~8.8.0", "eslint-plugin-anti-trojan-source": "~1.1.1", "eslint-plugin-import": "~2.26.0", "eslint-plugin-jest": "~27.2.3", "eslint-plugin-jsx-a11y": "^6.8.0", "eslint-plugin-prettier": "~4.2.1", "prettier": "~2.8.8" } }, "sha512-6AlE/MpJfITicLVNmgToK8hjp7doB4UtJdkDXm58JQZcDuU4Vt7co5/xUpg8tUhT7OouzlqdPr9keNuGm6Wi7A=="], @@ -457,7 +475,7 @@ "bson": ["bson@6.10.4", "", {}, "sha512-WIsKqkSC0ABoBJuT1LEX+2HEvNmNKKgnTAyd0fL8qzK4SH2i9NXg+t08YtdZp/V9IZ33cxe3iV4yM0qg8lMQng=="], - "bun-types": ["bun-types@1.3.10", "", { "dependencies": { "@types/node": "*" } }, "sha512-tcpfCCl6XWo6nCVnpcVrxQ+9AYN1iqMIzgrSKYMB/fjLtV2eyAVEg7AxQJuCq/26R6HpKWykQXuSOq/21RYcbg=="], + "bun-types": ["bun-types@1.3.13", "", { "dependencies": { "@types/node": "*" } }, "sha512-QXKeHLlOLqQX9LgYaHJfzdBaV21T63HhFJnvuRCcjZiaUDpbs5ED1MgxbMra71CsryN/1dAoXuJJJwIv/2drVA=="], "call-bind": ["call-bind@1.0.8", "", { "dependencies": { "call-bind-apply-helpers": "^1.0.0", "es-define-property": "^1.0.0", "get-intrinsic": "^1.2.4", "set-function-length": "^1.2.2" } }, "sha512-oKlSFMcMwpUg2ednkhQ454wfWiU/ul3CkJe/PEHcTKuiX6RpbehUiFMXu13HalGZxfUwCQzZG747YXBn1im9ww=="], @@ -605,8 +623,6 @@ "fast-levenshtein": ["fast-levenshtein@2.0.6", "", {}, "sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw=="], - "fast-redact": ["fast-redact@3.5.0", "", {}, "sha512-dwsoQlS7h9hMeYUq1W++23NDcBLV4KqONnITDV9DjfS3q1SgDGVrBdvvTLUotWtPSD7asWDV9/CmsZPy8Hf70A=="], - "fast-safe-stringify": ["fast-safe-stringify@2.1.1", "", {}, "sha512-W+KJc2dmILlPplD/H4K9l9LcAHAfPtP6BY84uVLXQ6Evcz9Lcg33Y2z1IVblT6xdY54PXYVHEv+0Wpq8Io6zkA=="], "fastq": ["fastq@1.20.1", "", { "dependencies": { "reusify": "^1.0.4" } }, "sha512-GGToxJ/w1x32s/D2EKND7kTil4n8OVk/9mycTc4VDza13lOvpUZTGX3mFSCtV9ksdGBVzvsyAVLM6mHFThxXxw=="], @@ -1133,6 +1149,8 @@ "yallist": ["yallist@3.1.1", "", {}, "sha512-a4UGQaWPH59mOXUYnAG2ewncQS4i4F43Tv3JoAM+s2VDAmS9NsK8GpDMLrCHPksFT7h3K6TOoUNn2pb7RoXx4g=="], + "yaml": ["yaml@2.8.3", "", { "bin": { "yaml": "bin.mjs" } }, "sha512-AvbaCLOO2Otw/lW5bmh9d/WEdcDFdQp2Z2ZUH3pX9U2ihyUY0nvLv7J6TrWowklRGPYbB/IuIMfYgxaCPg5Bpg=="], + "yargs-parser": ["yargs-parser@20.2.9", "", {}, "sha512-y11nGElTIV+CT3Zv9t7VKl+Q3hTQoT9a1Qzezhhl6Rp21gJ/IVTW7Z3y9EWXhuUBC2Shnf+DX0antecpAwSP8w=="], "yn": ["yn@3.1.1", "", {}, "sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q=="], @@ -1167,8 +1185,6 @@ "@babel/types/@babel/helper-validator-identifier": ["@babel/helper-validator-identifier@7.28.5", "", {}, "sha512-qSs4ifwzKJSV39ucNjsvc6WVHs6b7S03sOh2OcHF9UHfVPqWWALUsNUVzhSBiItjRZoLHx7nIarVjqKVusUZ1Q=="], - "@bogeychan/elysia-logger/pino": ["pino@9.7.0", "", { "dependencies": { "atomic-sleep": "^1.0.0", "fast-redact": "^3.1.1", "on-exit-leak-free": "^2.1.0", "pino-abstract-transport": "^2.0.0", "pino-std-serializers": "^7.0.0", "process-warning": "^5.0.0", "quick-format-unescaped": "^4.0.3", "real-require": "^0.2.0", "safe-stable-stringify": "^2.3.1", "sonic-boom": "^4.0.1", "thread-stream": "^3.0.0" }, "bin": { "pino": "bin.js" } }, "sha512-vnMCM6xZTb1WDmLvtG2lE/2p+t9hDEIvTWJsu6FejkE62vB7gDhvzrpFR4Cw2to+9JNQxVnkAKVPA1KPB98vWg=="], - "@emnapi/core/tslib": ["tslib@2.8.1", "", {}, "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w=="], "@emnapi/runtime/tslib": ["tslib@2.8.1", "", {}, "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w=="], @@ -1207,6 +1223,8 @@ "@tybys/wasm-util/tslib": ["tslib@2.8.1", "", {}, "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w=="], + "@types/bun/bun-types": ["bun-types@1.3.10", "", { "dependencies": { "@types/node": "*" } }, "sha512-tcpfCCl6XWo6nCVnpcVrxQ+9AYN1iqMIzgrSKYMB/fjLtV2eyAVEg7AxQJuCq/26R6HpKWykQXuSOq/21RYcbg=="], + "@typescript-eslint/typescript-estree/debug": ["debug@4.4.1", "", { "dependencies": { "ms": "^2.1.3" } }, "sha512-KcKCqiftBJcZr++7ykoDIEwSa3XWowTfNPo92BYxjXiyYEVrUQh2aLyhxBCwww+heortUFxEJYcRzosstTEBYQ=="], "@typescript-eslint/typescript-estree/semver": ["semver@7.7.2", "", { "bin": { "semver": "bin/semver.js" } }, "sha512-RF0Fw+rO5AMf9MAyaRXI4AV0Ulj5lMHqVxxdSgiVbixSCXoEmmX/jk0CuJw4+3SqroYO9VoUh+HcuJivvtJemA=="], @@ -1265,10 +1283,6 @@ "@babel/traverse/@babel/code-frame/@babel/helper-validator-identifier": ["@babel/helper-validator-identifier@7.28.5", "", {}, "sha512-qSs4ifwzKJSV39ucNjsvc6WVHs6b7S03sOh2OcHF9UHfVPqWWALUsNUVzhSBiItjRZoLHx7nIarVjqKVusUZ1Q=="], - "@bogeychan/elysia-logger/pino/pino-abstract-transport": ["pino-abstract-transport@2.0.0", "", { "dependencies": { "split2": "^4.0.0" } }, "sha512-F63x5tizV6WCh4R6RHyi2Ml+M70DNRXt/+HANowMflpgGFMAym/VKm6G7ZOQRjqN7XbGxK1Lg9t6ZrtzOaivMw=="], - - "@bogeychan/elysia-logger/pino/thread-stream": ["thread-stream@3.1.0", "", { "dependencies": { "real-require": "^0.2.0" } }, "sha512-OqyPZ9u96VohAyMfJykzmivOrY2wfMSf3C5TtFJVgN+Hm6aj+voFhlK+kZEIv2FBh1X6Xp3DlnCOfEQ3B2J86A=="], - "@nicolo-ribaudo/eslint-scope-5-internals/eslint-scope/estraverse": ["estraverse@4.3.0", "", {}, "sha512-39nnKffWz8xN1BU/2c79n9nB9HDzo0niYUqx6xyqUnyoAnQyyWpOTdZEeiCch8BBu515t4wp9ZmgVfVhn9EBpw=="], "@rocket.chat/eslint-config/@typescript-eslint/eslint-plugin/@typescript-eslint/scope-manager": ["@typescript-eslint/scope-manager@5.60.1", "", { "dependencies": { "@typescript-eslint/types": "5.60.1", "@typescript-eslint/visitor-keys": "5.60.1" } }, "sha512-Dn/LnN7fEoRD+KspEOV0xDMynEmR3iSHdgNsarlXNLGGtcUok8L4N71dxUgt3YvlO8si7E+BJ5Fe3wb5yUw7DQ=="], diff --git a/bundle.ts b/bundle.ts index 5871963ca..ae968cb22 100644 --- a/bundle.ts +++ b/bundle.ts @@ -30,7 +30,7 @@ const filterWorkspace = (deps: Record) => Object.fromEntries(Object.entries(deps || {}).filter(([, value]) => typeof value === 'string' && !value.startsWith('workspace:'))); // TODO get list of packages programmatically -const packages = ['core', 'crypto', 'federation-sdk', 'room']; +const packages = ['core', 'crypto', 'federation-sdk', 'room', 'appservice']; const localPackagesNames = getLocalPackages(packages); diff --git a/docs/bridge-architecture-guide.md b/docs/bridge-architecture-guide.md new file mode 100644 index 000000000..40e332db6 --- /dev/null +++ b/docs/bridge-architecture-guide.md @@ -0,0 +1,587 @@ +# Matrix Bridge Architecture Guide + +A comprehensive guide to how bridges work with Matrix homeservers via the Application Service (AS) API. Written as a reference for implementing bridge support in any homeserver. + +## Overview + +Bridges are external services that connect other messaging protocols (IRC, Slack, Telegram, etc.) to Matrix. They use the **Application Service API** — a standardized protocol for bidirectional event flow between a homeserver and an external service. + +--- + +## 1. Registration + +Each bridge registers with the homeserver via a YAML registration file. The homeserver admin configures which registration files to load. + +### Registration File Structure + +```yaml +id: my_irc_bridge # Unique identifier for the bridge +url: http://localhost:9999 # URL where bridge receives events from homeserver +as_token: as_token_secret_123 # Token the bridge sends to homeserver (bridge → HS auth) +hs_token: hs_token_secret_456 # Token homeserver sends to bridge (HS → bridge auth) +sender_localpart: bridge_bot # Localpart for the bridge bot user (e.g., @bridge_bot:example.com) + +# Namespaces define what users/aliases/rooms the bridge manages +namespaces: + users: + - exclusive: true # Only this bridge can create these users + regex: "@irc_.*" # Users matching this pattern belong to this bridge + aliases: + - exclusive: true + regex: "#irc_.*" # Room aliases this bridge manages + rooms: [] + +# Optional: 3rd-party protocol support for /thirdparty/* endpoints +protocols: + - irc + +# Optional feature flags +de.sorunome.msc2409.push_ephemeral: true # Receive typing, read receipts, etc. +org.matrix.msc3202: true # Receive device list changes +``` + +### Key Concepts + +- **`as_token`**: Bridge → Homeserver authentication. The bridge includes this in requests to prove its identity. +- **`hs_token`**: Homeserver → Bridge authentication. The homeserver includes this when pushing events to the bridge. +- **`sender_localpart`**: The "bridge bot" user. This is the default user the bridge acts as when not impersonating a ghost user. +- **Exclusive namespaces**: Only the owning bridge can create/manage resources matching the regex. The homeserver must reject attempts by other clients to register users or create aliases in exclusive namespaces. +- **Non-exclusive namespaces**: The bridge receives events about matching resources, but other clients can also create them. + +### Homeserver Responsibilities at Registration Load Time + +1. Parse and validate the registration YAML. +2. Store the registration in memory/database. +3. Create the bridge bot user (`sender_localpart`) if it doesn't exist. +4. Index namespace regexes for fast lookup during event routing. + +--- + +## 2. Communication: Homeserver → Bridge + +The homeserver pushes events to the bridge via HTTP. + +### Transaction Endpoint + +``` +PUT {bridge_url}/_matrix/app/v1/transactions/{txnId} +Authorization: Bearer +``` + +**Request body:** + +```json +{ + "events": [ + { + "type": "m.room.message", + "room_id": "!room123:example.com", + "sender": "@user:example.com", + "content": {"body": "Hello", "msgtype": "m.text"}, + "event_id": "$event123", + "origin_server_ts": 1234567890 + } + ], + "de.sorunome.msc2409.ephemeral": [ + { + "type": "m.typing", + "room_id": "!room123:example.com", + "content": {"user_ids": ["@user:example.com"]} + } + ], + "ephemeral": [ + { + "type": "m.typing", + "room_id": "!room123:example.com", + "content": {"user_ids": ["@user:example.com"]} + } + ], + "de.sorunome.msc2409.to_device": [], + "org.matrix.msc3202.device_one_time_key_counts": {}, + "org.matrix.msc3202.device_lists": {"changed": [], "left": []} +} +``` + +> **Ephemeral events under two keys.** Ephemeral events (typing, receipts, presence) are sent under *both* the unstable `de.sorunome.msc2409.ephemeral` key (what Synapse emits and most bridges read) and the stable `ephemeral` key (Matrix v1.13+). Sending both maximizes bridge compatibility. See `transaction-sender.service.ts`. + +**Bridge must respond:** HTTP 200 with `{}` on success. + +**Transaction ID (`txnId`)**: Monotonically increasing. Bridges should deduplicate by `txnId` in case the homeserver retries. + +### Batching + +Events are not pushed one-at-a-time. The router accumulates events and EDUs per appservice into a batch that flushes when **either** limit is hit: + +- **Batch window**: 100 ms since the first event in the batch (`BATCH_WINDOW_MS`). +- **Max batch size**: 50 events/EDUs combined (`MAX_BATCH_SIZE`). + +This keeps transaction volume low while bounding delivery latency. See `event-router.service.ts`. + +### What Events to Push + +The homeserver must determine which bridges are "interested" in each event. A bridge is interested if any of the following match: + +1. **Room ID** matches the bridge's room namespace regex. +2. **Room alias** (any alias for that room) matches the bridge's alias namespace regex. +3. **Any member in the room** matches the bridge's user namespace regex (i.e., a ghost user is in the room). +4. **The event sender** matches the bridge's user namespace regex. + +### Retry and Recovery + +If a bridge is unreachable: + +1. Persist the transaction (status `pending`) before attempting delivery, so it survives a restart. +2. On a non-2xx response or network error, mark the transaction `failed` and the bridge `down` (recording the error). +3. On a 2xx response, mark the transaction `sent` and the bridge `up`. +4. Retry eligible pending/failed transactions with **exponential backoff**: `min(1000 * 2^attempts, 60000)` ms (initial 1 s, capped at 60 s). +5. Track per-bridge state so it can resume from where it left off. + +> **Implementation caveat (retries).** This homeserver stores only the event *IDs* in the transaction record, not the full event bodies. Retries therefore send an **empty** transaction body purely to test connectivity and reset the bridge to `up`; the missed events are not re-delivered. A more complete implementation would persist enough to replay the original payload. See `transaction-sender.service.ts` (`retryPending`). + +### State Tracking (per bridge) + +The homeserver tracks per-bridge delivery state (`AppServiceState`): + +| Field | Purpose | +|-------|---------| +| `state` | `up` or `down` | +| `lastTxnId` | Last transaction ID allocated (monotonic counter) | +| `streamOrdering` | Last event stream position delivered | +| `readReceiptStreamId` | Last read receipt delivered | +| `presenceStreamId` | Last presence update delivered | +| `toDeviceStreamId` | Last to-device message delivered | +| `lastError` / `lastErrorAt` | Last delivery error and when it occurred | + +--- + +## 3. Communication: Bridge → Homeserver + +The bridge uses the standard **Matrix Client-Server API** to interact with the homeserver, with special AS authentication. + +### Authentication + +The bridge authenticates using the `as_token`: + +``` +Authorization: Bearer +``` + +Or legacy (deprecated): + +``` +?access_token= +``` + +### Acting as Another User (Impersonation) + +The bridge can act as any user within its namespace by appending: + +``` +?user_id=@irc_alice:example.com +``` + +The homeserver must: + +1. Verify the `as_token` is valid. +2. Verify the target `user_id` is within the bridge's user namespace. +3. Execute the request as if that user made it. + +### Key Operations a Bridge Performs + +#### a) Register Ghost/Virtual Users + +```http +POST /_matrix/client/v3/register +Authorization: Bearer + +{ + "auth": {"type": "m.login.application_service"}, + "username": "irc_alice" +} +``` + +**Homeserver must:** +- Validate the `as_token`. +- Check that the requested username falls within the bridge's user namespace. +- Create the user with an association to the bridge (`appservice_id` in the DB). +- Skip CAPTCHA, email verification, and other interactive auth steps. +- Ghost users have no password (empty password hash). + +#### b) Send Messages as Ghost Users + +```http +PUT /_matrix/client/v3/rooms/{roomId}/send/m.room.message/{txnId}?user_id=@irc_alice:example.com +Authorization: Bearer + +{"body": "Hello from IRC!", "msgtype": "m.text"} +``` + +#### c) Join/Leave Rooms as Ghost Users + +```http +POST /_matrix/client/v3/join/{roomIdOrAlias}?user_id=@irc_alice:example.com +Authorization: Bearer +``` + +#### d) Set Display Names and Avatars for Ghost Users + +```http +PUT /_matrix/client/v3/profile/@irc_alice:example.com/displayname?user_id=@irc_alice:example.com +Authorization: Bearer + +{"displayname": "alice (IRC)"} +``` + +#### e) Create Rooms + +```http +POST /_matrix/client/v3/createRoom +Authorization: Bearer + +{ + "room_alias_name": "irc_general", + "name": "#general (IRC)", + "visibility": "public" +} +``` + +--- + +## 4. Query Endpoints (Bridge Must Implement) + +The homeserver calls these when it encounters an unknown user or room alias that matches a bridge's namespace. + +### User Query + +``` +GET {bridge_url}/_matrix/app/v1/users/{userId} +Authorization: Bearer +``` + +**When called:** A client queries a user ID that matches the bridge's namespace but doesn't exist yet. + +**Bridge response:** +- `200 {}` — "Yes, I know this user." The homeserver should then allow the bridge to lazily create it. +- `404` — "I don't know this user." + +### Room Alias Query + +``` +GET {bridge_url}/_matrix/app/v1/rooms/{roomAlias} +Authorization: Bearer +``` + +**When called:** A client tries to join/resolve a room alias matching the bridge's namespace that doesn't exist yet. + +**Bridge response:** +- `200 {}` — "Yes, this room exists." The bridge is expected to create the room (via Client-Server API) before or shortly after responding. +- `404` — "I don't know this room." + +### Third-Party Protocol Lookup (Optional) + +``` +GET {bridge_url}/_matrix/app/v1/thirdparty/protocol/{protocol} +GET {bridge_url}/_matrix/app/v1/thirdparty/user/{protocol}?fields... +GET {bridge_url}/_matrix/app/v1/thirdparty/location/{protocol}?fields... +``` + +These enable discovery — mapping between Matrix and third-party identifiers. + +--- + +## 5. Ghost/Virtual User Management + +### User Lifecycle + +1. **Creation**: Bridge registers user via `/register` with `m.login.application_service` auth. +2. **Profile setup**: Bridge sets display name and avatar. +3. **Room participation**: Bridge joins the user to rooms, sends messages on their behalf. +4. **Cleanup** (optional): Bridge can deactivate users that are no longer needed. + +### Database Requirements + +The homeserver needs to track which users belong to which bridge: + +``` +users table: + - user_id: TEXT (e.g., @irc_alice:example.com) + - password_hash: TEXT (empty for ghost users) + - appservice_id: TEXT NULLABLE (bridge ID, NULL for normal users) +``` + +### Lookup Operations Needed + +- `get_app_service_by_user_id(user_id)` — Which bridge owns this user? +- `is_user_in_appservice_namespace(user_id)` — Does this user match any bridge's namespace? +- `get_appservices_interested_in_user(user_id)` — Which bridges care about events from this user? + +--- + +## 6. Room Bridging and Interest Detection + +### How Rooms Get Bridged + +**Method 1: Bridge creates the room proactively** +- Bridge calls `POST /createRoom` with a room alias in its namespace. +- Any user joining that alias lands in the bridged room. + +**Method 2: Lazy creation via alias query** +- User tries to join `#irc_general:example.com`. +- Homeserver doesn't find the alias, queries the bridge via room alias query endpoint. +- Bridge creates the room and responds 200. +- User joins the newly created room. + +**Method 3: Invite-based bridging** +- A real user invites a ghost user (or bridge bot) to a room. +- Bridge accepts the invite and starts bridging that room. + +### Interest Detection Algorithm + +For each event, determine which bridges should receive it: + +```python +def get_interested_bridges(event, all_bridges): + interested = [] + for bridge in all_bridges: + if bridge.matches_room_id(event.room_id): + interested.append(bridge) + elif any(bridge.matches_alias(a) for a in get_aliases(event.room_id)): + interested.append(bridge) + elif any(bridge.matches_user(m) for m in get_members(event.room_id)): + interested.append(bridge) + elif bridge.matches_user(event.sender): + interested.append(bridge) + return interested +``` + +To resolve a room's aliases and members for steps 2 and 3, the router uses an injected **room-state resolver** callback that reads the latest resolved room state (canonical aliases + members). See `event-router.service.ts` and the resolver wired up in `federation-sdk/src/index.ts`. + +> **Implementation note (member matching).** In this codebase the member-based check (step 3) is narrower than the generic algorithm: a member only triggers interest when it equals the bridge's *own* bot user ID (`@{senderLocalpart}:{serverName}`) **and** matches the user namespace regex (commit "match user id with server name"). In practice, interest is driven mainly by room ID, alias, and sender matches; the bridge bot's presence in a room counts, but other ghost members do not by themselves. See `namespace-matcher.service.ts` (`getInterestedAppServices`). + +## 7. Complete Message Flow Examples + +### External Protocol → Matrix + +``` +1. IRC user "alice" sends "Hello" in #general +2. IRC bridge receives the message +3. Bridge checks if @irc_alice:hs.example.com exists + - If not: POST /register to create ghost user + - If not in room: POST /join to join the bridged room +4. Bridge sends message: + PUT /rooms/!bridged:hs.example.com/send/m.room.message/txn1?user_id=@irc_alice:hs.example.com + Body: {"body": "Hello", "msgtype": "m.text"} +5. Homeserver stores event, delivers to room members +6. Other bridges interested in this room also receive the event via transaction push +``` + +### Matrix → External Protocol + +``` +1. Matrix user @bob:hs.example.com sends "Hi" in !bridged:hs.example.com +2. Homeserver determines IRC bridge is interested (ghost users in room) +3. Homeserver pushes transaction to bridge: + PUT http://bridge:9999/_matrix/app/v1/transactions/42 + Body: {"events": [{sender: "@bob:hs.example.com", body: "Hi", ...}]} +4. Bridge receives event, translates to IRC format +5. Bridge sends "Hi" as bob to #general on IRC +6. Bridge responds HTTP 200 to homeserver +``` + +--- + +## 8. Implementation Checklist for a Homeserver + +### Registration Management +- [ ] Parse AS registration YAML files +- [ ] Store registrations (id, url, as_token, hs_token, sender_localpart, namespaces) +- [ ] Create bridge bot user on registration load +- [ ] Index namespace regexes for fast matching + +### Authentication +- [ ] Recognize `as_token` in `Authorization: Bearer` header and `access_token` query param +- [ ] Support `?user_id=` impersonation parameter +- [ ] Validate impersonated user is within bridge's namespace +- [ ] Support `m.login.application_service` auth type in `/register` + +### Event Routing +- [ ] Detect interested bridges per event (room ID, aliases, members, sender matching) +- [ ] Queue events per bridge +- [ ] Push transactions to bridge URL with `hs_token` auth +- [ ] Handle transaction acknowledgment (HTTP 200) +- [ ] Implement retry with exponential backoff on failure +- [ ] Track per-bridge stream positions for resumption +- [ ] Deduplicate transactions by `txnId` + +### Query Protocol +- [ ] Query bridge for unknown users in its namespace (`GET /users/{userId}`) +- [ ] Query bridge for unknown room aliases in its namespace (`GET /rooms/{roomAlias}`) +- [ ] Optional: Support third-party protocol lookup endpoints + +### Ghost User Management +- [ ] Allow bridges to register users in their namespace without password/captcha +- [ ] Store `appservice_id` association on ghost users +- [ ] Allow bridges to set profiles (displayname, avatar) for ghost users +- [ ] Enforce exclusive namespace restrictions (reject non-bridge registration in exclusive namespaces) + +### Ephemeral Events (Optional but Recommended) +- [ ] Push typing notifications to interested bridges +- [ ] Push read receipts to interested bridges +- [ ] Push presence updates to interested bridges +- [ ] Push to-device messages to interested bridges +- [ ] Respect `de.sorunome.msc2409.push_ephemeral` flag +- [ ] Send ephemeral events under both the unstable (`de.sorunome.msc2409.ephemeral`) and stable (`ephemeral`) keys for compatibility +- [ ] Convert federation EDUs to client-server ephemeral shapes (coalesce typing, re-key receipts, fan out presence) + +### Database Schema + +```sql +-- Bridge registrations (or load from config into memory) +CREATE TABLE application_services ( + id TEXT PRIMARY KEY, + url TEXT, + as_token TEXT UNIQUE, + hs_token TEXT, + sender_localpart TEXT +); + +-- Namespace patterns +CREATE TABLE application_service_namespaces ( + as_id TEXT REFERENCES application_services(id), + type TEXT, -- 'users', 'aliases', 'rooms' + regex TEXT, + exclusive BOOLEAN +); + +-- Per-bridge delivery state +CREATE TABLE application_services_state ( + as_id TEXT PRIMARY KEY REFERENCES application_services(id), + state TEXT, -- 'up' or 'down' + stream_ordering BIGINT, + read_receipt_stream_id BIGINT, + presence_stream_id BIGINT, + to_device_stream_id BIGINT, + device_list_stream_id BIGINT +); + +-- Transaction queue +CREATE TABLE application_services_txns ( + as_id TEXT REFERENCES application_services(id), + txn_id BIGINT, + event_ids TEXT, -- JSON array of event IDs + PRIMARY KEY (as_id, txn_id) +); + +-- Users table needs appservice_id column +ALTER TABLE users ADD COLUMN appservice_id TEXT REFERENCES application_services(id); +``` + +--- + +## 9. Synapse-Specific Source Reference + +These files in the Synapse codebase serve as a reference implementation: + +| File | Purpose | +|------|---------| +| `synapse/appservice/__init__.py` | `ApplicationService` class, namespace/interest matching | +| `synapse/appservice/api.py` | HTTP client for calling bridges | +| `synapse/appservice/scheduler.py` | Transaction queuing, retry, backoff | +| `synapse/handlers/appservice.py` | Event routing to interested bridges | +| `synapse/config/appservice.py` | Registration file parsing | +| `synapse/handlers/register.py` | Ghost user registration (`appservice_register`) | +| `synapse/handlers/directory.py` | Room alias resolution with bridge queries | +| `synapse/api/auth/base.py` | AS token authentication and impersonation | +| `synapse/storage/databases/main/appservice.py` | Database operations for AS state | +| `synapse/rest/client/thirdparty.py` | Third-party protocol lookup endpoints | +| `docs/application_services.md` | Official Synapse AS documentation | + +--- + +## 10. Existing Bridge SDKs + +These SDKs implement the bridge-side of the AS API: + +| Language | SDK | Notes | +|----------|-----|-------| +| Python | [mautrix-python](https://github.com/mautrix/python) | Most popular, powers mautrix-telegram, mautrix-signal, etc. | +| Node.js | [matrix-appservice-bridge](https://github.com/matrix-org/matrix-appservice-bridge) | Official Matrix.org SDK | +| Go | [mautrix-go](https://github.com/mautrix/go) | Powers mautrix-whatsapp, mautrix-discord | +| Rust | [matrix-rust-sdk](https://github.com/matrix-org/matrix-rust-sdk) | Has appservice module | + +Studying how these SDKs interact with the homeserver is useful for understanding the protocol from the bridge's perspective. + +--- + +## 11. Relevant Matrix Spec Sections + +- [Application Service API](https://spec.matrix.org/latest/application-service-api/) +- [Client-Server API — Registration](https://spec.matrix.org/latest/client-server-api/#registration) +- [MSC2409 — Ephemeral events for appservices](https://github.com/matrix-org/matrix-spec-proposals/pull/2409) +- [MSC3202 — Device list changes for appservices](https://github.com/matrix-org/matrix-spec-proposals/pull/3202) + +--- + +## 12. This Homeserver's Implementation + +The Application Service support in this repository lives in the `@rocket.chat/federation-appservice` package, with supporting changes in `federation-sdk`, `federation-core`, and `federation-room`. It currently powers an **XMPP bridge**. This section maps the concepts above onto the actual code. + +### Source File Reference + +| File | Purpose | +|------|---------| +| `packages/appservice/src/models/appservice.model.ts` | `AppServiceRegistration`, `AppServiceNamespaces`, `AppServiceState`, `AppServiceTransaction`, `AppServiceEphemeralEvent`, and the in-memory `CachedAppService` (with compiled regexes) | +| `packages/appservice/src/services/registration.service.ts` | Loads registrations from config, caches them with compiled namespaces, indexes by `asToken`, tracks state | +| `packages/appservice/src/services/namespace-matcher.service.ts` | Namespace/exclusivity matching and `getInterestedAppServices()` interest detection | +| `packages/appservice/src/services/event-router.service.ts` | Routes persistent + ephemeral events to interested bridges; batching (100 ms / 50 events) | +| `packages/appservice/src/services/transaction-sender.service.ts` | `PUT /_matrix/app/v1/transactions/{txnId}` with `hs_token` auth; retry/backoff; up/down state | +| `packages/appservice/src/services/bridge-query.service.ts` | Bridge query endpoints: user, room alias, third-party protocol/user/location | +| `packages/appservice/src/utils/edu-to-appservice.ts` (+ `.spec.ts`) | Converts federation EDUs (typing/receipt/presence) into client-server ephemeral events | +| `packages/appservice/src/config-provider.ts` | `AppServiceConfigProvider` DI token — supplies `serverName` and XMPP config | +| `packages/federation-sdk/src/sdk.ts` | Public SDK surface for appservice operations (see below) | +| `packages/federation-sdk/src/services/directory.service.ts` + `repositories/room-alias.repository.ts` | Canonical room-alias storage and resolution | +| `packages/room/src/manager/room-state.ts` | `setRoomStateResolver` — exposes canonical aliases + members used for interest matching | +| `packages/core/src/utils/fetch.ts` | HTTP/HTTPS fetch used to reach bridges (see below) | + +### Config-Driven Registration (no YAML/DB) + +Unlike Synapse's YAML registration files, this homeserver builds the bridge registration **entirely from app config**. `RegistrationService.initialize()` reads `AppConfig.xmpp`: + +```ts +xmpp?: { + bridgeURL: string; // url + hsToken: string; // HS → bridge auth + asToken: string; // bridge → HS auth +} +``` + +When present, it synthesizes a single registration with fixed values: + +- `_id`: `"xmpp"`, `senderLocalpart`: `"_xmpp_bot"`, `protocols`: `["xmpp"]` +- Namespaces: users `@_xmpp_.*` (exclusive), aliases `#_xmpp_.*` (exclusive), rooms empty + +Re-running `setConfig()` re-initializes the cache and ensures the bridge bot user exists (`ensureSenderUsersForAllRegistrations()`). There is no `application_services` config-file parser yet — adding more bridges means extending the config schema and `RegistrationService`. + +### EDU → Ephemeral Event Conversion + +`eduBatchToAppServiceEphemeral()` translates federation EDUs into the client-server shapes bridges expect: + +- **`m.typing`** — coalesced per room; emits one event per room whose `content.user_ids` is the set of currently-typing users (last value per user in the batch wins). +- **`m.receipt`** — re-keyed from the federation shape (`room → user → {data, event_ids}`) to the client-server shape (`event_id → "m.read" → user → {ts, thread_id?}`); receipts for the same room merge into one event. +- **`m.presence`** — each entry in the EDU's `push` array fans out into its own `m.presence` event, hoisting `user_id` to a top-level `sender`. + +### SDK Surface (`federation-sdk`) + +The SDK exposes the appservice machinery to the rest of the homeserver and to the bridge-facing controllers: + +- Registration lookup: `getAllRegistrations()`, `getRegistrationById()`, `getRegistrationByAsToken()` +- Startup: `ensureSenderUsersForAllRegistrations()` (creates/ensures the bridge bot user) +- Queries: `getAllProtocols()`, `queryThirdPartyProtocol/User/Location()`, `pingAppService()`, `getAppServiceState()` +- Namespace checks: `isExclusiveNamespace()`, `isUserInAppServiceNamespace()` +- Room participation: `joinUser()` and `joinXMPPChatRoom()` — the latter builds an `#_xmpp_…` alias, queries the bridge, resolves the alias via the directory, and joins the user. + +The bridge bot user is created as a regular Rocket.Chat bot user on startup (commit "create appservice user as a rocketchat bot"). + +### HTTP Support for Bridges + +`packages/core/src/utils/fetch.ts` selects the transport by URL scheme (`http` vs `https`), so bridges can be reached over plain **HTTP** (e.g. a local XMPP bridge at `http://localhost:…`). For HTTPS it sets the TLS `servername` (SNI) from the `Host` header so certificate verification works on multihomed servers. diff --git a/packages/appservice/TODO.md b/packages/appservice/TODO.md new file mode 100644 index 000000000..68e273833 --- /dev/null +++ b/packages/appservice/TODO.md @@ -0,0 +1,49 @@ +# Application Service - Missing Features + +Gaps between the current implementation and a spec-compliant Matrix Application Service API homeserver. + +## 1. Retry Scheduler + +`TransactionSenderService.retryPending()` implements exponential backoff logic but has no caller. A periodic job (e.g. every 30-60s) must invoke it so that failed transactions are retried automatically. + +**Files:** `transaction-sender.service.ts` + +## 2. State-Aware Event Routing + +`EventRouterService` routes events to all namespace-matching ASes without checking their state. When an AS is marked "down", the homeserver should queue events instead of attempting delivery, and resume when the AS recovers. + +**Files:** `event-router.service.ts`, `appservice-state.repository.ts` + +## 3. Transaction Event Body Persistence + +Transactions only store `eventIds`, not the full event JSON. When `retryPending()` retries a failed transaction it sends `{ events: [] }`. Full event bodies must be persisted so retries deliver the actual events. + +**Files:** `transaction-sender.service.ts`, `appservice-txn.repository.ts`, `appservice.model.ts` + +## 4. Event Stream Position Tracking + +`AppServiceState.streamOrdering` is initialized to 0 but never updated after successful delivery. This position must be incremented so the homeserver can resume from where it left off after a crash or AS downtime. + +**Files:** `appservice-state.repository.ts`, `event-router.service.ts` + +## 5. Ephemeral Event Stream Tracking + +`readReceiptStreamId`, `presenceStreamId`, and `toDeviceStreamId` in `AppServiceState` are initialized to 0 but never read or updated. Ephemeral events (typing, presence, read receipts) are fire-and-forget. Per-AS stream cursors are needed so missed ephemeral events can be replayed. + +**Files:** `appservice-state.repository.ts`, `event-router.service.ts` + +## 6. Catch-Up / Replay on Recovery + +When an AS transitions from "down" to "up", there is no mechanism to replay events missed during downtime. This requires working stream position tracking (#4, #5) and persisted event bodies (#3). + +## 7. Rate Limiting Enforcement + +`AppServiceRegistration.rateLimited` is stored but never checked. Middleware or routing logic should enforce rate limits on requests from ASes that have this flag set to `true`. + +**Files:** `appservice.model.ts`, homeserver middleware + +## 8. Lazy Loading (Namespace Guard Integration) + +`NamespaceGuardService.lazyCreateUser()` and `lazyCreateRoomAlias()` are implemented but not wired into the homeserver controllers. When a request targets an unknown user or alias that falls in an AS exclusive namespace, the homeserver should query the bridge to lazily create the resource. + +**Files:** `namespace-guard.service.ts`, homeserver `register.controller.ts`, `directory.controller.ts` diff --git a/packages/appservice/package.json b/packages/appservice/package.json new file mode 100644 index 000000000..b2628ddc2 --- /dev/null +++ b/packages/appservice/package.json @@ -0,0 +1,32 @@ +{ + "name": "@rocket.chat/appservice", + "version": "0.1.0", + "description": "Matrix Application Service API for bridge support", + "main": "./dist/index.js", + "types": "./dist/index.d.ts", + "exports": { + ".": { + "types": "./dist/index.d.ts", + "import": "./dist/index.js", + "require": "./dist/index.js" + } + }, + "scripts": { + "build": "tsc --build --force", + "test": "bun test" + }, + "dependencies": { + "@rocket.chat/federation-core": "workspace:*", + "@rocket.chat/federation-room": "workspace:*", + "mongodb": "^6.16.0", + "reflect-metadata": "^0.2.2", + "tsyringe": "^4.10.0", + "yaml": "^2.7.1" + }, + "license": "AGPL-3.0", + "author": "Rocket.Chat Technologies Corp. ", + "files": ["dist"], + "peerDependencies": { + "typescript": "~5.9.2" + } +} diff --git a/packages/appservice/src/config-provider.ts b/packages/appservice/src/config-provider.ts new file mode 100644 index 000000000..b63dccb69 --- /dev/null +++ b/packages/appservice/src/config-provider.ts @@ -0,0 +1,6 @@ +export interface AppServiceConfigProvider { + readonly serverName: string; + readonly xmpp?: { bridgeURL: string; hsToken: string; asToken: string }; +} + +export const APPSERVICE_CONFIG_PROVIDER = Symbol('AppServiceConfigProvider'); diff --git a/packages/appservice/src/index.ts b/packages/appservice/src/index.ts new file mode 100644 index 000000000..78f9afb0c --- /dev/null +++ b/packages/appservice/src/index.ts @@ -0,0 +1,24 @@ +import 'reflect-metadata'; + +export type { + AppServiceRegistration, + AppServiceNamespaces, + AppServiceState, + AppServiceTransaction, + CachedAppService, + CompiledNamespace, + Namespace, +} from './models/appservice.model'; + +export { AppServiceStateRepository } from './repositories/appservice-state.repository'; +export { AppServiceTransactionRepository } from './repositories/appservice-txn.repository'; + +export { RegistrationService } from './services/registration.service'; +export { NamespaceMatcherService } from './services/namespace-matcher.service'; +export { TransactionSenderService } from './services/transaction-sender.service'; +export { EventRouterService } from './services/event-router.service'; +export { BridgeQueryService } from './services/bridge-query.service'; +export { PingService, type PingResult, type PingError } from './services/ping.service'; +export { NamespaceGuardService } from './services/namespace-guard.service'; + +export { APPSERVICE_CONFIG_PROVIDER, type AppServiceConfigProvider } from './config-provider'; diff --git a/packages/appservice/src/models/appservice.model.ts b/packages/appservice/src/models/appservice.model.ts new file mode 100644 index 000000000..e0818989b --- /dev/null +++ b/packages/appservice/src/models/appservice.model.ts @@ -0,0 +1,73 @@ +export interface AppServiceRegistration { + _id: string; + url: string | null; + asToken: string; + hsToken: string; + senderLocalpart: string; + namespaces: AppServiceNamespaces; + protocols: string[]; + rateLimited: boolean; + receiveEphemeral: boolean; + createdAt: Date; + updatedAt: Date; +} + +export interface AppServiceNamespaces { + users: Namespace[]; + aliases: Namespace[]; + rooms: Namespace[]; +} + +export interface Namespace { + regex: string; + exclusive: boolean; +} + +export interface AppServiceState { + _id: string; + state: 'up' | 'down'; + lastTxnId: number; + streamOrdering: number; + readReceiptStreamId: number; + presenceStreamId: number; + toDeviceStreamId: number; + lastError?: string; + lastErrorAt?: Date; + updatedAt: Date; +} + +export interface AppServiceEphemeralEvent { + type: string; + room_id?: string; + sender?: string; + content: Record; +} + +export interface AppServiceTransaction { + _id: string; + asId: string; + txnId: number; + eventIds: string[]; + ephemeralEvents?: AppServiceEphemeralEvent[]; + status: 'pending' | 'sent' | 'failed'; + attempts: number; + createdAt: Date; + sentAt?: Date; +} + +/** + * In-memory cached version of a registration with compiled regexes. + */ +export interface CachedAppService { + registration: AppServiceRegistration; + compiledNamespaces: { + users: CompiledNamespace[]; + aliases: CompiledNamespace[]; + rooms: CompiledNamespace[]; + }; +} + +export interface CompiledNamespace { + regex: RegExp; + exclusive: boolean; +} diff --git a/packages/appservice/src/repositories/appservice-state.repository.ts b/packages/appservice/src/repositories/appservice-state.repository.ts new file mode 100644 index 000000000..f97b2848d --- /dev/null +++ b/packages/appservice/src/repositories/appservice-state.repository.ts @@ -0,0 +1,67 @@ +import type { Collection } from 'mongodb'; +import { inject, singleton } from 'tsyringe'; + +import type { AppServiceState } from '../models/appservice.model'; + +@singleton() +export class AppServiceStateRepository { + constructor( + @inject('AppServiceStateCollection') + private readonly collection: Collection, + ) {} + + async getState(asId: string): Promise { + return this.collection.findOne({ _id: asId }); + } + + async upsertState(asId: string, updates: Partial>): Promise { + await this.collection.updateOne( + { _id: asId }, + { + $set: { ...updates, updatedAt: new Date() }, + $setOnInsert: { _id: asId, lastTxnId: 0, streamOrdering: 0, readReceiptStreamId: 0, presenceStreamId: 0, toDeviceStreamId: 0 }, + }, + { upsert: true }, + ); + } + + async markUp(asId: string): Promise { + await this.upsertState(asId, { + state: 'up', + lastError: undefined, + lastErrorAt: undefined, + }); + } + + async markDown(asId: string, error: string): Promise { + await this.upsertState(asId, { + state: 'down', + lastError: error, + lastErrorAt: new Date(), + }); + } + + async incrementTxnId(asId: string): Promise { + const result = await this.collection.findOneAndUpdate( + { _id: asId }, + { + $inc: { lastTxnId: 1 }, + $set: { updatedAt: new Date() }, + $setOnInsert: { + _id: asId, + state: 'up' as const, + streamOrdering: 0, + readReceiptStreamId: 0, + presenceStreamId: 0, + toDeviceStreamId: 0, + }, + }, + { upsert: true, returnDocument: 'after' }, + ); + return result?.lastTxnId ?? 1; + } + + async remove(asId: string): Promise { + await this.collection.deleteOne({ _id: asId }); + } +} diff --git a/packages/appservice/src/repositories/appservice-txn.repository.ts b/packages/appservice/src/repositories/appservice-txn.repository.ts new file mode 100644 index 000000000..948d6f070 --- /dev/null +++ b/packages/appservice/src/repositories/appservice-txn.repository.ts @@ -0,0 +1,34 @@ +import type { Collection } from 'mongodb'; +import { inject, singleton } from 'tsyringe'; + +import type { AppServiceTransaction } from '../models/appservice.model'; + +@singleton() +export class AppServiceTransactionRepository { + constructor( + @inject('AppServiceTxnCollection') + private readonly collection: Collection, + ) { + this.collection.createIndex({ asId: 1, txnId: 1 }, { unique: true }); + this.collection.createIndex({ asId: 1, status: 1 }); + } + + async create(txn: AppServiceTransaction): Promise { + await this.collection.insertOne(txn); + } + + async markSent(asId: string, txnId: number): Promise { + await this.collection.updateOne({ asId, txnId }, { $set: { status: 'sent', sentAt: new Date() } }); + } + + async markFailed(asId: string, txnId: number): Promise { + await this.collection.updateOne({ asId, txnId }, { $set: { status: 'failed' }, $inc: { attempts: 1 } }); + } + + async getPending(asId: string): Promise { + return this.collection + .find({ asId, status: { $in: ['pending', 'failed'] } }) + .sort({ txnId: 1 }) + .toArray(); + } +} diff --git a/packages/appservice/src/services/bridge-query.service.ts b/packages/appservice/src/services/bridge-query.service.ts new file mode 100644 index 000000000..d42f0fec9 --- /dev/null +++ b/packages/appservice/src/services/bridge-query.service.ts @@ -0,0 +1,152 @@ +import { createLogger, fetch } from '@rocket.chat/federation-core'; +import { singleton } from 'tsyringe'; + +import { RegistrationService } from './registration.service'; +import type { CachedAppService } from '../models/appservice.model'; + +@singleton() +export class BridgeQueryService { + private readonly logger = createLogger('BridgeQueryService'); + + constructor(private readonly registrationService: RegistrationService) {} + + /** + * Query a bridge about an unknown user in its namespace. + * Returns true if the bridge claims the user (200), false otherwise. + */ + async queryUser(asId: string, userId: string): Promise { + const as = this.registrationService.getById(asId); + if (!as?.registration.url) { + return false; + } + + return this.queryBridge(as, `/_matrix/app/v1/users/${encodeURIComponent(userId)}`); + } + + /** + * Query a bridge about an unknown room alias in its namespace. + * Returns true if the bridge claims the alias (200), false otherwise. + */ + async queryRoomAlias(asId: string, roomAlias: string): Promise { + const as = this.registrationService.getById(asId); + if (!as?.registration.url) { + return false; + } + + return this.queryBridge(as, `/_matrix/app/v1/rooms/${encodeURIComponent(roomAlias)}`); + } + + /** + * Get third-party protocol metadata from a bridge. + */ + async queryThirdPartyProtocol(asId: string, protocol: string): Promise | null> { + const as = this.registrationService.getById(asId); + if (!as?.registration.url) { + return null; + } + + return this.queryBridgeJson(as, `/_matrix/app/v1/thirdparty/protocol/${encodeURIComponent(protocol)}`); + } + + /** + * Query third-party users from a bridge. + */ + async queryThirdPartyUser(asId: string, protocol: string, fields: Record): Promise[] | null> { + const as = this.registrationService.getById(asId); + if (!as?.registration.url) { + return null; + } + + const params = new URLSearchParams(fields).toString(); + const path = protocol + ? `/_matrix/app/v1/thirdparty/user/${encodeURIComponent(protocol)}?${params}` + : `/_matrix/app/v1/thirdparty/user?${params}`; + + return this.queryBridgeJson(as, path) as Promise[] | null>; + } + + /** + * Query third-party locations from a bridge. + */ + async queryThirdPartyLocation(asId: string, protocol: string, fields: Record): Promise[] | null> { + const as = this.registrationService.getById(asId); + if (!as?.registration.url) { + return null; + } + + const params = new URLSearchParams(fields).toString(); + const path = protocol + ? `/_matrix/app/v1/thirdparty/location/${encodeURIComponent(protocol)}?${params}` + : `/_matrix/app/v1/thirdparty/location?${params}`; + + return this.queryBridgeJson(as, path) as Promise[] | null>; + } + + /** + * Aggregate all third-party protocols from all registered bridges. + */ + async getAllProtocols(): Promise> { + const result: Record = {}; + + const queries = this.registrationService.getAll().flatMap((as) => + as.registration.protocols.map(async (protocol) => { + const data = await this.queryThirdPartyProtocol(as.registration._id, protocol); + if (data) { + result[protocol] = data; + } + }), + ); + await Promise.all(queries); + + return result; + } + + private async queryBridge(as: CachedAppService, path: string): Promise { + try { + const url = new URL(`${as.registration.url}${path}`); + + const response = await fetch(url, { + method: 'GET', + headers: { + Authorization: `Bearer ${as.registration.hsToken}`, + Host: url.host, + }, + }); + + return response.ok; + } catch (err) { + this.logger.error({ + msg: 'Bridge query failed', + asId: as.registration._id, + path, + err, + }); + return false; + } + } + + private async queryBridgeJson(as: CachedAppService, path: string): Promise | null> { + try { + const response = await fetch(new URL(`${as.registration.url}${path}`), { + method: 'GET', + headers: { + Authorization: `Bearer ${as.registration.hsToken}`, + }, + }); + + if (!response.ok) { + return null; + } + + return response.json() as Promise>; + } catch (err) { + this.logger.error({ + msg: 'Bridge query failed', + asId: as.registration._id, + path, + err, + }); + return null; + } + } +} diff --git a/packages/appservice/src/services/event-router.service.spec.ts b/packages/appservice/src/services/event-router.service.spec.ts new file mode 100644 index 000000000..173fb3587 --- /dev/null +++ b/packages/appservice/src/services/event-router.service.spec.ts @@ -0,0 +1,141 @@ +import 'reflect-metadata'; + +import { beforeEach, describe, expect, mock, test } from 'bun:test'; + +import type { PersistentEventBase } from '@rocket.chat/federation-room'; + +import { EventRouterService, MAX_BATCH_SIZE } from './event-router.service'; +import type { NamespaceMatcherService } from './namespace-matcher.service'; +import type { TransactionSenderService } from './transaction-sender.service'; +import type { CachedAppService } from '../models/appservice.model'; + +// Flushes a batch synchronously: afterAppend() flushes once events + ephemeral +// reach MAX_BATCH_SIZE, so no fake timers are needed. + +type Deferred = { promise: Promise; resolve: (value: T) => void; reject: (err: unknown) => void }; + +function deferred(): Deferred { + let resolve!: (value: T) => void; + let reject!: (err: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + +const tick = () => new Promise((r) => setTimeout(r, 0)); + +const fakeAppService = { registration: { _id: 'xmpp', receiveEphemeral: true } } as unknown as CachedAppService; + +function makeEvent(tag: string): PersistentEventBase { + return { roomId: '!r:s', sender: '@u:s', eventId: tag, event: {} } as unknown as PersistentEventBase; +} + +// Routes a full batch tagged so the first event's id identifies which batch a +// sendTransaction call corresponds to. +async function routeBatch(router: EventRouterService, tag: string): Promise { + for (let i = 0; i < MAX_BATCH_SIZE; i++) { + // eslint-disable-next-line no-await-in-loop + await router.routeEvent(makeEvent(`${tag}:${i}`)); + } +} + +describe('EventRouterService send serialization', () => { + let router: EventRouterService; + let sends: Array<{ asId: string; tag: string; deferred: Deferred }>; + let sendTransaction: ReturnType; + + beforeEach(() => { + sends = []; + // Each call records the target appservice and batch tag (first event's id + // prefix) and returns a promise we resolve/reject manually, so we control + // ordering precisely. + sendTransaction = mock((as: CachedAppService, events: PersistentEventBase[]) => { + const tag = events[0]?.eventId.split(':')[0] ?? ''; + const d = deferred(); + sends.push({ asId: as.registration._id, tag, deferred: d }); + return d.promise; + }); + + const namespaceMatcher = { + getInterestedAppServices: () => [fakeAppService], + } as unknown as NamespaceMatcherService; + + const transactionSender = { sendTransaction } as unknown as TransactionSenderService; + + router = new EventRouterService(namespaceMatcher, transactionSender); + }); + + test('does not start the next batch until the previous send resolves', async () => { + await routeBatch(router, 'a'); + await tick(); + + // First batch sent; second not yet attempted. + expect(sendTransaction).toHaveBeenCalledTimes(1); + expect(sends[0].tag).toBe('a'); + + await routeBatch(router, 'b'); + await tick(); + + // Still only one send in flight — the chain is blocked on batch 'a'. + expect(sendTransaction).toHaveBeenCalledTimes(1); + + // Resolve 'a' → 'b' is now free to go. + sends[0].deferred.resolve(); + await tick(); + + expect(sendTransaction).toHaveBeenCalledTimes(2); + expect(sends.map((s) => s.tag)).toEqual(['a', 'b']); + }); + + test('a failed send does not break the chain for subsequent batches', async () => { + await routeBatch(router, 'a'); + await tick(); + await routeBatch(router, 'b'); + await tick(); + + expect(sendTransaction).toHaveBeenCalledTimes(1); + + // First send rejects — the chain must swallow it and continue. + sends[0].deferred.reject(new Error('bridge down')); + await tick(); + + expect(sendTransaction).toHaveBeenCalledTimes(2); + expect(sends[1].tag).toBe('b'); + + sends[1].deferred.resolve(); + await tick(); + }); + + test('a slow bridge does not block another (no cross-bridge head-of-line blocking)', async () => { + const asA = { registration: { _id: 'aaa', receiveEphemeral: true } } as unknown as CachedAppService; + const asB = { registration: { _id: 'bbb', receiveEphemeral: true } } as unknown as CachedAppService; + + const matcher = { getInterestedAppServices: () => [asA, asB] } as unknown as NamespaceMatcherService; + router = new EventRouterService(matcher, { sendTransaction } as unknown as TransactionSenderService); + + // First batch goes to both bridges; neither send is resolved yet. + await routeBatch(router, 'x'); + await tick(); + expect(sendTransaction).toHaveBeenCalledTimes(2); + + // Second batch also goes to both. Each bridge's second send is queued + // behind its own (still-pending) first send. + await routeBatch(router, 'y'); + await tick(); + expect(sendTransaction).toHaveBeenCalledTimes(2); + + // Resolve ONLY bridge A's first send. A's chain advances to its second + // batch; B stays blocked on its own still-pending first send. + const aFirst = sends.find((s) => s.asId === 'aaa' && s.tag === 'x'); + aFirst?.deferred.resolve(); + await tick(); + + expect(sendTransaction).toHaveBeenCalledTimes(3); + const aTags = sends.filter((s) => s.asId === 'aaa').map((s) => s.tag); + const bTags = sends.filter((s) => s.asId === 'bbb').map((s) => s.tag); + expect(aTags).toEqual(['x', 'y']); // A serialized its own batches, in order + expect(bTags).toEqual(['x']); // B not dragged along by A + }); +}); diff --git a/packages/appservice/src/services/event-router.service.ts b/packages/appservice/src/services/event-router.service.ts new file mode 100644 index 000000000..a4f170bc9 --- /dev/null +++ b/packages/appservice/src/services/event-router.service.ts @@ -0,0 +1,189 @@ +import { createLogger, PresenceEDU, ReceiptEDU, TypingEDU } from '@rocket.chat/federation-core'; +import type { PersistentEventBase } from '@rocket.chat/federation-room'; +import { singleton } from 'tsyringe'; + +import { NamespaceMatcherService } from './namespace-matcher.service'; +import { TransactionSenderService } from './transaction-sender.service'; +import type { CachedAppService } from '../models/appservice.model'; + +interface EventBatch { + events: PersistentEventBase[]; + ephemeral: (ReceiptEDU | TypingEDU | PresenceEDU)[]; + timer: ReturnType | null; +} + +const BATCH_WINDOW_MS = 100; +export const MAX_BATCH_SIZE = 50; + +@singleton() +export class EventRouterService { + private readonly logger = createLogger('EventRouterService'); + + private batches: Map = new Map(); + + // Tail of the in-flight send chain per appservice. Flushed batches are + // appended to this chain so transactions for a given bridge are delivered + // strictly in order (txnId allocation happens inside sendTransaction, so + // serializing the calls also keeps txnIds monotonic with delivery order). + private sendChains: Map> = new Map(); + + // Resolves the aliases and joined members of a room — needed for namespace + // interest detection. Injected from federation-sdk since the appservice + // package doesn't own room state. + private roomStateResolver?: (roomId: string) => Promise<{ aliases: string[]; members: string[] }>; + + constructor(private readonly namespaceMatcher: NamespaceMatcherService, private readonly transactionSender: TransactionSenderService) {} + + setRoomStateResolver(resolver: (roomId: string) => Promise<{ aliases: string[]; members: string[] }>): void { + this.roomStateResolver = resolver; + } + + async routeEvent(event: PersistentEventBase): Promise { + // check if event is persistent or ephemeral based on event type and route accordingly + await this.routePersistent(event); + } + + private async routePersistent(event: PersistentEventBase): Promise { + const { roomId, sender } = event; + if (!roomId || !sender) { + return; + } + + const { aliases, members } = (await this.roomStateResolver?.(roomId)) ?? { aliases: [], members: [] }; + + const interested = this.namespaceMatcher.getInterestedAppServices(roomId, sender, aliases, members); + + for (const as of interested) { + const batch = this.getOrCreateBatch(as); + batch.events.push(event); + this.afterAppend(as, batch); + } + } + + async routeEphemeral(payload: ReceiptEDU | TypingEDU | PresenceEDU): Promise { + const targets = this.extractEphemeralTargets(payload); + const interested = await this.findInterestedForTargets(targets); + + for (const as of interested) { + if (!as.registration.receiveEphemeral) { + continue; + } + const batch = this.getOrCreateBatch(as); + batch.ephemeral.push(payload); + this.afterAppend(as, batch); + } + } + + // Returns the (roomId, userId) pairs that should be checked against + // appservice namespaces for a given EDU. Each EDU shape exposes its + // room/user references differently — presence has no rooms, receipts can + // span many rooms and many users. + private extractEphemeralTargets(payload: ReceiptEDU | TypingEDU | PresenceEDU): Array<{ roomId: string; userId: string }> { + if (payload.edu_type === 'm.typing') { + return [{ roomId: payload.content.room_id, userId: payload.content.user_id }]; + } + + if (payload.edu_type === 'm.presence') { + return payload.content.push.map((update) => ({ roomId: '', userId: update.user_id })); + } + + const targets: Array<{ roomId: string; userId: string }> = []; + for (const [roomId, readByUser] of Object.entries(payload.content)) { + const userIds = Object.keys(readByUser?.['m.read'] ?? {}); + if (userIds.length === 0) { + targets.push({ roomId, userId: '' }); + continue; + } + for (const userId of userIds) { + targets.push({ roomId, userId }); + } + } + return targets; + } + + private async findInterestedForTargets(targets: Array<{ roomId: string; userId: string }>): Promise { + const emptyState = { aliases: [] as string[], members: [] as string[] }; + const uniqueRoomIds = Array.from(new Set(targets.map((t) => t.roomId).filter(Boolean))); + + const resolved = await Promise.all( + uniqueRoomIds.map(async (roomId) => [roomId, (await this.roomStateResolver?.(roomId)) ?? emptyState] as const), + ); + const stateByRoom = new Map(resolved); + + const interested = new Map(); + for (const { roomId, userId } of targets) { + const state = roomId ? stateByRoom.get(roomId) ?? emptyState : emptyState; + for (const as of this.namespaceMatcher.getInterestedAppServices(roomId, userId, state.aliases, state.members)) { + interested.set(as.registration._id, as); + } + } + + return Array.from(interested.values()); + } + + private getOrCreateBatch(appservice: CachedAppService): EventBatch { + const asId = appservice.registration._id; + let batch = this.batches.get(asId); + if (!batch) { + batch = { events: [], ephemeral: [], timer: null }; + this.batches.set(asId, batch); + } + return batch; + } + + private afterAppend(appservice: CachedAppService, batch: EventBatch): void { + if (batch.events.length + batch.ephemeral.length >= MAX_BATCH_SIZE) { + this.flushBatch(appservice); + return; + } + if (!batch.timer) { + batch.timer = setTimeout(() => { + this.flushBatch(appservice); + }, BATCH_WINDOW_MS); + } + } + + private flushBatch(appservice: CachedAppService): void { + const asId = appservice.registration._id; + const batch = this.batches.get(asId); + if (!batch) return; + + if (batch.timer) { + clearTimeout(batch.timer); + } + this.batches.delete(asId); + + if (batch.events.length === 0 && batch.ephemeral.length === 0) return; + + this.enqueueSend(appservice, batch.events, batch.ephemeral.length > 0 ? batch.ephemeral : undefined); + } + + // Appends a transaction send to the per-appservice chain so sends run one + // at a time, in flush order. A failed send is logged but does not break the + // chain — the next batch still goes out (and its delivery resets up/down + // state via the sender). + private enqueueSend( + appservice: CachedAppService, + events: PersistentEventBase[], + ephemeral: (ReceiptEDU | TypingEDU | PresenceEDU)[] | undefined, + ): void { + const asId = appservice.registration._id; + const prev = this.sendChains.get(asId) ?? Promise.resolve(); + + const next = prev + .then(() => this.transactionSender.sendTransaction(appservice, events, ephemeral)) + .catch((err) => { + this.logger.error({ msg: 'Failed to send transaction batch', asId, err }); + }); + + this.sendChains.set(asId, next); + + // Drop the chain entry once it settles and nothing newer was queued, + // so the map doesn't retain resolved promises for idle bridges. + void next.finally(() => { + if (this.sendChains.get(asId) === next) { + this.sendChains.delete(asId); + } + }); + } +} diff --git a/packages/appservice/src/services/namespace-guard.service.ts b/packages/appservice/src/services/namespace-guard.service.ts new file mode 100644 index 000000000..74ec4f82e --- /dev/null +++ b/packages/appservice/src/services/namespace-guard.service.ts @@ -0,0 +1,117 @@ +import { createLogger } from '@rocket.chat/federation-core'; +import { singleton } from 'tsyringe'; + +import { BridgeQueryService } from './bridge-query.service'; +import { NamespaceMatcherService } from './namespace-matcher.service'; +import { RegistrationService } from './registration.service'; + +/** + * Handles namespace enforcement and lazy-loading of bridged resources. + * + * Namespace enforcement: prevents non-owning clients from registering users + * or creating aliases in exclusive namespaces. + * + * Lazy loading: when an unknown user or alias is queried that matches a + * bridge namespace, queries the bridge to create the resource on-demand. + */ +@singleton() +export class NamespaceGuardService { + private readonly logger = createLogger('NamespaceGuardService'); + + constructor( + private readonly namespaceMatcher: NamespaceMatcherService, + private readonly bridgeQuery: BridgeQueryService, + private readonly registrationService: RegistrationService, + ) {} + + /** + * Check if a username can be registered by a non-appservice client. + * Returns an error object if blocked, undefined if allowed. + */ + checkUserRegistration(userId: string, requestingAsId?: string): { errcode: string; error: string } | undefined { + const exclusiveOwner = this.namespaceMatcher.isExclusive('users', userId); + if (!exclusiveOwner) return undefined; + + if (requestingAsId && exclusiveOwner.registration._id === requestingAsId) { + return undefined; // Owning appservice is allowed + } + + return { + errcode: 'M_EXCLUSIVE', + error: `User ID ${userId} is within the exclusive namespace of appservice ${exclusiveOwner.registration._id}`, + }; + } + + /** + * Check if a room alias can be created by a non-appservice client. + */ + checkAliasCreation(alias: string, requestingAsId?: string): { errcode: string; error: string } | undefined { + const exclusiveOwner = this.namespaceMatcher.isExclusive('aliases', alias); + if (!exclusiveOwner) return undefined; + + if (requestingAsId && exclusiveOwner.registration._id === requestingAsId) { + return undefined; + } + + return { + errcode: 'M_EXCLUSIVE', + error: `Alias ${alias} is within the exclusive namespace of appservice ${exclusiveOwner.registration._id}`, + }; + } + + /** + * Attempt to lazy-create a user via bridge query. + * Called when a user ID matches a bridge namespace but doesn't exist locally. + * Returns true if a bridge claimed the user (bridge will register it). + */ + async lazyCreateUser(userId: string): Promise { + const owningAs = this.namespaceMatcher.getAppServiceForUser(userId); + if (!owningAs) return false; + + this.logger.info({ + msg: 'Querying bridge for unknown user', + userId, + asId: owningAs.registration._id, + }); + + const claimed = await this.bridgeQuery.queryUser(owningAs.registration._id, userId); + + if (claimed) { + this.logger.info({ + msg: 'Bridge claimed user, waiting for registration', + userId, + asId: owningAs.registration._id, + }); + } + + return claimed; + } + + /** + * Attempt to lazy-create a room alias via bridge query. + * Called when an alias matches a bridge namespace but doesn't exist locally. + * Returns true if a bridge claimed the alias (bridge will create the room). + */ + async lazyCreateRoomAlias(roomAlias: string): Promise { + const owningAs = this.namespaceMatcher.matches('aliases', roomAlias); + if (!owningAs) return false; + + this.logger.info({ + msg: 'Querying bridge for unknown room alias', + roomAlias, + asId: owningAs.registration._id, + }); + + const claimed = await this.bridgeQuery.queryRoomAlias(owningAs.registration._id, roomAlias); + + if (claimed) { + this.logger.info({ + msg: 'Bridge claimed room alias, waiting for creation', + roomAlias, + asId: owningAs.registration._id, + }); + } + + return claimed; + } +} diff --git a/packages/appservice/src/services/namespace-matcher.service.ts b/packages/appservice/src/services/namespace-matcher.service.ts new file mode 100644 index 000000000..9305e61f1 --- /dev/null +++ b/packages/appservice/src/services/namespace-matcher.service.ts @@ -0,0 +1,134 @@ +import { inject, singleton } from 'tsyringe'; + +import { RegistrationService } from './registration.service'; +import { APPSERVICE_CONFIG_PROVIDER, type AppServiceConfigProvider } from '../config-provider'; +import type { CachedAppService } from '../models/appservice.model'; + +type NamespaceType = 'users' | 'aliases' | 'rooms'; + +@singleton() +export class NamespaceMatcherService { + constructor( + private readonly registrationService: RegistrationService, + @inject(APPSERVICE_CONFIG_PROVIDER) private readonly config: AppServiceConfigProvider, + ) {} + + /** + * Check if a value matches any appservice's namespace of the given type. + * Optionally restrict to a specific appservice. + */ + matches(type: NamespaceType, value: string, asId?: string): CachedAppService | undefined { + const appservices = asId + ? ([this.registrationService.getById(asId)].filter(Boolean) as CachedAppService[]) + : this.registrationService.getAll(); + + for (const as of appservices) { + for (const ns of as.compiledNamespaces[type]) { + if (ns.regex.test(value)) { + return as; + } + } + } + return undefined; + } + + /** + * Check if a value falls within an exclusive namespace. + * Returns the owning appservice if exclusive, undefined otherwise. + */ + isExclusive(type: NamespaceType, value: string): CachedAppService | undefined { + for (const as of this.registrationService.getAll()) { + for (const ns of as.compiledNamespaces[type]) { + if (ns.exclusive && ns.regex.test(value)) { + return as; + } + } + } + return undefined; + } + + isUserInNamespace(userId: string, asId?: string): boolean { + return this.matches('users', userId, asId) !== undefined; + } + + isAliasInNamespace(alias: string, asId?: string): boolean { + return this.matches('aliases', alias, asId) !== undefined; + } + + isRoomInNamespace(roomId: string, asId?: string): boolean { + return this.matches('rooms', roomId, asId) !== undefined; + } + + /** + * Get which appservice owns a user (if any). + */ + getAppServiceForUser(userId: string): CachedAppService | undefined { + return this.matches('users', userId); + } + + /** + * Determine which appservices are interested in an event. + * + * A bridge is interested if any of the following match: + * 1. Room ID matches the bridge's room namespace + * 2. Any room alias matches the bridge's alias namespace + * 3. Any room member matches the bridge's user namespace + * 4. The event sender matches the bridge's user namespace + */ + getInterestedAppServices(roomId: string, sender: string, roomAliases: string[], roomMembers: string[]): CachedAppService[] { + const interested = new Map(); + + for (const as of this.registrationService.getAll()) { + const asUserId = `@${as.registration.senderLocalpart}:${this.config.serverName}`; + + if (interested.has(as.registration._id)) continue; + + // 1. Room ID matches room namespace + for (const ns of as.compiledNamespaces.rooms) { + if (ns.regex.test(roomId)) { + interested.set(as.registration._id, as); + break; + } + } + if (interested.has(as.registration._id)) continue; + + // 2. Any room alias matches alias namespace + for (const alias of roomAliases) { + let found = false; + for (const ns of as.compiledNamespaces.aliases) { + if (ns.regex.test(alias)) { + interested.set(as.registration._id, as); + found = true; + break; + } + } + if (found) break; + } + if (interested.has(as.registration._id)) continue; + + // 3. Any room member matches user namespace + for (const member of roomMembers) { + let found = false; + for (const ns of as.compiledNamespaces.users) { + if (member === asUserId && ns.regex.test(member)) { + interested.set(as.registration._id, as); + found = true; + break; + } + } + if (found) break; + } + if (interested.has(as.registration._id)) continue; + + // 4. Event sender matches user namespace + for (const ns of as.compiledNamespaces.users) { + if (ns.regex.test(sender)) { + interested.set(as.registration._id, as); + break; + } + } + } + + return Array.from(interested.values()); + } +} diff --git a/packages/appservice/src/services/ping.service.ts b/packages/appservice/src/services/ping.service.ts new file mode 100644 index 000000000..548401785 --- /dev/null +++ b/packages/appservice/src/services/ping.service.ts @@ -0,0 +1,76 @@ +import { createLogger, fetch } from '@rocket.chat/federation-core'; +import { singleton } from 'tsyringe'; + +import { RegistrationService } from './registration.service'; + +export interface PingResult { + duration_ms: number; +} + +export interface PingError { + errcode: string; + error: string; +} + +@singleton() +export class PingService { + private readonly logger = createLogger('PingService'); + + constructor(private readonly registrationService: RegistrationService) {} + + /** + * Ping an appservice to check connectivity. + * The homeserver sends POST /_matrix/app/v1/ping to the bridge. + */ + async ping(asId: string, transactionId?: string): Promise { + const as = this.registrationService.getById(asId); + if (!as) { + return { errcode: 'M_NOT_FOUND', error: `Appservice ${asId} not found` }; + } + + if (!as.registration.url) { + return { errcode: 'M_URL_NOT_SET', error: 'Appservice URL is not set' }; + } + + const url = new URL(`${as.registration.url}/_matrix/app/v1/ping`); + const startTime = Date.now(); + + try { + const response = await fetch(url, { + method: 'POST', + headers: { + 'Authorization': `Bearer ${as.registration.hsToken}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + transaction_id: transactionId ?? `ping-${Date.now()}`, + }), + }); + + const durationMs = Date.now() - startTime; + + if (!response.ok) { + return { + errcode: 'M_BAD_STATUS', + error: `Appservice returned HTTP ${response.status}`, + }; + } + + return { duration_ms: durationMs }; + } catch (err) { + const durationMs = Date.now() - startTime; + + if (durationMs > 30_000) { + return { + errcode: 'M_CONNECTION_TIMEOUT', + error: 'Connection to appservice timed out', + }; + } + + return { + errcode: 'M_CONNECTION_FAILED', + error: err instanceof Error ? err.message : 'Connection failed', + }; + } + } +} diff --git a/packages/appservice/src/services/registration.service.ts b/packages/appservice/src/services/registration.service.ts new file mode 100644 index 000000000..e12147ecb --- /dev/null +++ b/packages/appservice/src/services/registration.service.ts @@ -0,0 +1,109 @@ +import { createLogger } from '@rocket.chat/federation-core'; +import { delay, inject, singleton } from 'tsyringe'; + +import { APPSERVICE_CONFIG_PROVIDER, type AppServiceConfigProvider } from '../config-provider'; +import type { AppServiceRegistration, CachedAppService, CompiledNamespace } from '../models/appservice.model'; +import { AppServiceStateRepository } from '../repositories/appservice-state.repository'; + +/** + * XMPP is the only supported bridge. Its registration is built entirely from + * `ConfigService` (URL + tokens); the remaining fields are fixed constants + * derived from the `_xmpp_` prefix used throughout the codebase. + */ +const XMPP_APPSERVICE_ID = 'xmpp'; +const XMPP_SENDER_LOCALPART = '_xmpp_bot'; + +@singleton() +export class RegistrationService { + private readonly logger = createLogger('RegistrationService'); + + private cache: Map = new Map(); + + private tokenIndex: Map = new Map(); // asToken -> asId + + constructor( + @inject(delay(() => AppServiceStateRepository)) + private readonly stateRepo: AppServiceStateRepository, + @inject(APPSERVICE_CONFIG_PROVIDER) + private readonly config: AppServiceConfigProvider, + ) {} + + /** + * (Re)build the in-memory registration from the current config. Safe to + * call repeatedly — clears prior cache so a config change (e.g. a later + * `setConfig`) is reflected. + */ + async initialize(): Promise { + this.cache.clear(); + this.tokenIndex.clear(); + + const { xmpp } = this.config; + if (!xmpp) { + this.logger.info({ msg: 'No bridge configured; skipping appservice registration' }); + return; + } + + const now = new Date(); + const registration: AppServiceRegistration = { + _id: XMPP_APPSERVICE_ID, + url: xmpp.bridgeURL, + asToken: xmpp.asToken, + hsToken: xmpp.hsToken, + senderLocalpart: XMPP_SENDER_LOCALPART, + namespaces: { + users: [{ regex: '@_xmpp_.*', exclusive: true }], + aliases: [{ regex: '#_xmpp_.*', exclusive: true }], + rooms: [], + }, + protocols: ['xmpp'], + rateLimited: false, + receiveEphemeral: true, + createdAt: now, + updatedAt: now, + }; + + this.cacheRegistration(registration); + await this.stateRepo.upsertState(registration._id, { state: 'up' }); + this.logger.info({ msg: `Loaded appservice registration: ${registration._id}` }); + } + + getAll(): CachedAppService[] { + return Array.from(this.cache.values()); + } + + getById(id: string): CachedAppService | undefined { + return this.cache.get(id); + } + + getByAsToken(asToken: string): CachedAppService | undefined { + const asId = this.tokenIndex.get(asToken); + if (!asId) { + return undefined; + } + return this.cache.get(asId); + } + + async getState(asId: string) { + return this.stateRepo.getState(asId); + } + + private cacheRegistration(reg: AppServiceRegistration): void { + const cached: CachedAppService = { + registration: reg, + compiledNamespaces: { + users: reg.namespaces.users.map((ns) => this.compileNamespace(ns)), + aliases: reg.namespaces.aliases.map((ns) => this.compileNamespace(ns)), + rooms: reg.namespaces.rooms.map((ns) => this.compileNamespace(ns)), + }, + }; + this.cache.set(reg._id, cached); + this.tokenIndex.set(reg.asToken, reg._id); + } + + private compileNamespace(ns: { regex: string; exclusive: boolean }): CompiledNamespace { + return { + regex: new RegExp(`^(?:${ns.regex})$`), + exclusive: ns.exclusive, + }; + } +} diff --git a/packages/appservice/src/services/transaction-sender.service.ts b/packages/appservice/src/services/transaction-sender.service.ts new file mode 100644 index 000000000..fd75f4c5e --- /dev/null +++ b/packages/appservice/src/services/transaction-sender.service.ts @@ -0,0 +1,141 @@ +import { createLogger, fetch, PresenceEDU, ReceiptEDU, TypingEDU } from '@rocket.chat/federation-core'; +import { Pdu, PersistentEventBase } from '@rocket.chat/federation-room'; +import { delay, inject, singleton } from 'tsyringe'; + +import type { AppServiceEphemeralEvent, CachedAppService } from '../models/appservice.model'; +import { AppServiceStateRepository } from '../repositories/appservice-state.repository'; +import { AppServiceTransactionRepository } from '../repositories/appservice-txn.repository'; +import { eduBatchToAppServiceEphemeral } from '../utils/edu-to-appservice'; + +const MAX_BACKOFF_MS = 60_000; +const INITIAL_BACKOFF_MS = 1_000; + +@singleton() +export class TransactionSenderService { + private readonly logger = createLogger('TransactionSenderService'); + + constructor( + @inject(delay(() => AppServiceStateRepository)) + private readonly stateRepo: AppServiceStateRepository, + @inject(delay(() => AppServiceTransactionRepository)) + private readonly txnRepo: AppServiceTransactionRepository, + ) {} + + /** + * Send a transaction to an appservice. + * Queues it first, then attempts delivery. On failure, marks the bridge as DOWN + * and schedules retries with exponential backoff. + */ + async sendTransaction( + appservice: CachedAppService, + events: PersistentEventBase[], + ephemeral?: (ReceiptEDU | TypingEDU | PresenceEDU)[], + ): Promise { + const { registration } = appservice; + + if (!registration.url) { + return; // No URL configured, skip + } + + const txnId = await this.stateRepo.incrementTxnId(registration._id); + + const eventIds = events.map((e) => e.eventId).filter(Boolean); + + const ephemeralEvents = ephemeral && ephemeral.length > 0 ? eduBatchToAppServiceEphemeral(ephemeral) : undefined; + + await this.txnRepo.create({ + _id: `${registration._id}:${txnId}`, + asId: registration._id, + txnId, + eventIds, + ...(ephemeralEvents && { ephemeralEvents }), + status: 'pending', + attempts: 0, + createdAt: new Date(), + }); + + await this.attemptDelivery(appservice, txnId, events, ephemeralEvents); + } + + /** + * Retry all pending/failed transactions for an appservice. + */ + async retryPending(appservice: CachedAppService): Promise { + const pending = await this.txnRepo.getPending(appservice.registration._id); + + const now = new Date(); + const eligible = pending.filter((txn) => { + const backoffMs = Math.min(INITIAL_BACKOFF_MS * 2 ** txn.attempts, MAX_BACKOFF_MS); + const nextAttemptAt = new Date((txn.sentAt ?? txn.createdAt).getTime() + backoffMs); + return now >= nextAttemptAt; + }); + + // We don't have the full events stored in the txn (only IDs), + // so for retries we send an empty transaction to test connectivity. + await Promise.all(eligible.map((txn) => this.attemptDeliveryRaw(appservice, txn.txnId, { events: [] }))); + } + + private async attemptDelivery( + appservice: CachedAppService, + txnId: number, + events: PersistentEventBase[], + ephemeral?: AppServiceEphemeralEvent[], + ): Promise { + const body: Record = { events: events.map((e) => ({ event_id: e.eventId, ...e.event })) }; + if (ephemeral?.length) { + // Send under both the unstable MSC2409 key (what Synapse emits and most bridges read) + // and the stable spec key (Matrix v1.13+). + body['de.sorunome.msc2409.ephemeral'] = ephemeral; + body.ephemeral = ephemeral; + } + + return this.attemptDeliveryRaw(appservice, txnId, body); + } + + private async attemptDeliveryRaw(appservice: CachedAppService, txnId: number, body: Record): Promise { + const { registration } = appservice; + + if (!registration.url) return false; + + const url = new URL(`${registration.url}/_matrix/app/v1/transactions/${txnId}`); + + try { + const response = await fetch(url, { + method: 'PUT', + headers: { + 'Authorization': `Bearer ${registration.hsToken}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify(body), + }); + + if (response.ok) { + await this.txnRepo.markSent(registration._id, txnId); + await this.stateRepo.markUp(registration._id); + return true; + } + + this.logger.warn({ + msg: `Transaction delivery failed`, + asId: registration._id, + txnId, + status: response.status, + }); + + await this.txnRepo.markFailed(registration._id, txnId); + await this.stateRepo.markDown(registration._id, `HTTP ${response.status}`); + return false; + } catch (err) { + this.logger.error({ + msg: `Transaction delivery error`, + asId: registration._id, + txnId, + err, + }); + + await this.txnRepo.markFailed(registration._id, txnId); + await this.stateRepo.markDown(registration._id, err instanceof Error ? err.message : 'Unknown error'); + return false; + } + } +} diff --git a/packages/appservice/src/utils/edu-to-appservice.spec.ts b/packages/appservice/src/utils/edu-to-appservice.spec.ts new file mode 100644 index 000000000..028a85f4f --- /dev/null +++ b/packages/appservice/src/utils/edu-to-appservice.spec.ts @@ -0,0 +1,173 @@ +import { describe, expect, test } from 'bun:test'; + +import type { PresenceEDU, ReceiptEDU, TypingEDU } from '@rocket.chat/federation-core'; + +import { eduBatchToAppServiceEphemeral } from './edu-to-appservice'; + +const typingEDU = (room_id: string, user_id: string, typing: boolean): TypingEDU => ({ + edu_type: 'm.typing', + content: { room_id, user_id, typing }, +}); + +const presenceEDU = (push: PresenceEDU['content']['push']): PresenceEDU => ({ + edu_type: 'm.presence', + content: { push }, +}); + +describe('eduBatchToAppServiceEphemeral', () => { + describe('typing', () => { + test('single typing=true emits one event with one user', () => { + const out = eduBatchToAppServiceEphemeral([typingEDU('!r:s', '@u:s', true)]); + expect(out).toEqual([{ type: 'm.typing', room_id: '!r:s', content: { user_ids: ['@u:s'] } }]); + }); + + test('two users typing in same room coalesce into one event', () => { + const out = eduBatchToAppServiceEphemeral([typingEDU('!r:s', '@u1:s', true), typingEDU('!r:s', '@u2:s', true)]); + expect(out).toHaveLength(1); + expect(out[0].type).toBe('m.typing'); + expect(out[0].room_id).toBe('!r:s'); + expect((out[0].content as { user_ids: string[] }).user_ids.sort()).toEqual(['@u1:s', '@u2:s']); + }); + + test('typing=false only emits one event with empty user_ids', () => { + const out = eduBatchToAppServiceEphemeral([typingEDU('!r:s', '@u:s', false)]); + expect(out).toEqual([{ type: 'm.typing', room_id: '!r:s', content: { user_ids: [] } }]); + }); + + test('start-then-stop for same user in one batch removes them from user_ids', () => { + const out = eduBatchToAppServiceEphemeral([typingEDU('!r:s', '@u:s', true), typingEDU('!r:s', '@u:s', false)]); + expect(out).toEqual([{ type: 'm.typing', room_id: '!r:s', content: { user_ids: [] } }]); + }); + + test('typing across two rooms emits two events', () => { + const out = eduBatchToAppServiceEphemeral([typingEDU('!a:s', '@u:s', true), typingEDU('!b:s', '@u:s', true)]); + expect(out).toHaveLength(2); + const byRoom = Object.fromEntries(out.map((e) => [e.room_id, e])); + expect(byRoom['!a:s'].content).toEqual({ user_ids: ['@u:s'] }); + expect(byRoom['!b:s'].content).toEqual({ user_ids: ['@u:s'] }); + }); + }); + + describe('receipts', () => { + test('single room/user/event receipt is correctly transformed', () => { + const edu: ReceiptEDU = { + edu_type: 'm.receipt', + content: { + '!r:s': { + 'm.read': { + '@u:s': { data: { ts: 1700000000000 }, event_ids: ['$evt:s'] }, + }, + }, + }, + }; + expect(eduBatchToAppServiceEphemeral([edu])).toEqual([ + { + type: 'm.receipt', + room_id: '!r:s', + content: { '$evt:s': { 'm.read': { '@u:s': { ts: 1700000000000 } } } }, + }, + ]); + }); + + test('thread_id is preserved on the receipt user entry', () => { + const edu: ReceiptEDU = { + edu_type: 'm.receipt', + content: { + '!r:s': { + 'm.read': { + '@u:s': { data: { ts: 1700000000000, thread_id: 'main' }, event_ids: ['$evt:s'] }, + }, + }, + }, + }; + const out = eduBatchToAppServiceEphemeral([edu]); + expect(out[0].content).toEqual({ + '$evt:s': { 'm.read': { '@u:s': { ts: 1700000000000, thread_id: 'main' } } }, + }); + }); + + test('multiple event_ids for one user fan out into separate event_id keys', () => { + const edu: ReceiptEDU = { + edu_type: 'm.receipt', + content: { + '!r:s': { + 'm.read': { + '@u:s': { data: { ts: 1700000000000 }, event_ids: ['$a:s', '$b:s'] }, + }, + }, + }, + }; + const out = eduBatchToAppServiceEphemeral([edu]); + expect(out).toHaveLength(1); + expect(out[0].content).toEqual({ + '$a:s': { 'm.read': { '@u:s': { ts: 1700000000000 } } }, + '$b:s': { 'm.read': { '@u:s': { ts: 1700000000000 } } }, + }); + }); + + test('two receipts for the same room within a batch merge into one event', () => { + const eduA: ReceiptEDU = { + edu_type: 'm.receipt', + content: { + '!r:s': { 'm.read': { '@u1:s': { data: { ts: 1 }, event_ids: ['$a:s'] } } }, + }, + }; + const eduB: ReceiptEDU = { + edu_type: 'm.receipt', + content: { + '!r:s': { 'm.read': { '@u2:s': { data: { ts: 2 }, event_ids: ['$b:s'] } } }, + }, + }; + const out = eduBatchToAppServiceEphemeral([eduA, eduB]); + expect(out).toHaveLength(1); + expect(out[0].room_id).toBe('!r:s'); + expect(out[0].content).toEqual({ + '$a:s': { 'm.read': { '@u1:s': { ts: 1 } } }, + '$b:s': { 'm.read': { '@u2:s': { ts: 2 } } }, + }); + }); + }); + + describe('presence', () => { + test('two pushes fan out into two events with sender hoisted', () => { + const edu = presenceEDU([ + { user_id: '@u1:s', presence: 'online', last_active_ago: 5000 }, + { user_id: '@u2:s', presence: 'offline' }, + ]); + const out = eduBatchToAppServiceEphemeral([edu]); + expect(out).toEqual([ + { type: 'm.presence', sender: '@u1:s', content: { presence: 'online', last_active_ago: 5000 } }, + { type: 'm.presence', sender: '@u2:s', content: { presence: 'offline' } }, + ]); + }); + + test('user_id is stripped from content', () => { + const out = eduBatchToAppServiceEphemeral([ + presenceEDU([{ user_id: '@u:s', presence: 'online', last_active_ago: 100, status_msg: 'hi' }]), + ]); + expect(out[0].content).not.toHaveProperty('user_id'); + expect(out[0].content).toEqual({ presence: 'online', last_active_ago: 100, status_msg: 'hi' }); + }); + }); + + describe('mixed batches', () => { + test('returns typing + receipt + presence events from one mixed batch', () => { + const out = eduBatchToAppServiceEphemeral([ + typingEDU('!r:s', '@u:s', true), + { + edu_type: 'm.receipt', + content: { + '!r:s': { 'm.read': { '@u:s': { data: { ts: 1 }, event_ids: ['$e:s'] } } }, + }, + } satisfies ReceiptEDU, + presenceEDU([{ user_id: '@u:s', presence: 'online', last_active_ago: 0 }]), + ]); + const types = out.map((e) => e.type).sort(); + expect(types).toEqual(['m.presence', 'm.receipt', 'm.typing']); + }); + + test('empty input returns empty array', () => { + expect(eduBatchToAppServiceEphemeral([])).toEqual([]); + }); + }); +}); diff --git a/packages/appservice/src/utils/edu-to-appservice.ts b/packages/appservice/src/utils/edu-to-appservice.ts new file mode 100644 index 000000000..3199199b8 --- /dev/null +++ b/packages/appservice/src/utils/edu-to-appservice.ts @@ -0,0 +1,99 @@ +import type { PresenceEDU, ReceiptEDU, TypingEDU } from '@rocket.chat/federation-core'; + +import type { AppServiceEphemeralEvent } from '../models/appservice.model'; + +type ReceiptUser = { ts: number; thread_id?: string }; +type ReceiptRoomContent = Record }>; + +const isTyping = (e: TypingEDU | ReceiptEDU | PresenceEDU): e is TypingEDU => e.edu_type === 'm.typing'; +const isReceipt = (e: TypingEDU | ReceiptEDU | PresenceEDU): e is ReceiptEDU => e.edu_type === 'm.receipt'; +const isPresence = (e: TypingEDU | ReceiptEDU | PresenceEDU): e is PresenceEDU => e.edu_type === 'm.presence'; + +export function eduBatchToAppServiceEphemeral(edus: (TypingEDU | ReceiptEDU | PresenceEDU)[]): AppServiceEphemeralEvent[] { + const out: AppServiceEphemeralEvent[] = []; + + out.push(...transformTyping(edus.filter(isTyping))); + out.push(...transformReceipts(edus.filter(isReceipt))); + out.push(...transformPresence(edus.filter(isPresence))); + + return out; +} + +// Coalesces typing EDUs per room: within a batch, the final user_ids list for +// a room is the set of users whose last seen state in the batch was typing=true. +// A user who appears only as typing=false is removed from user_ids. +function transformTyping(edus: TypingEDU[]): AppServiceEphemeralEvent[] { + const perRoom = new Map>(); + + for (const edu of edus) { + const { room_id, user_id, typing } = edu.content; + let users = perRoom.get(room_id); + if (!users) { + users = new Map(); + perRoom.set(room_id, users); + } + users.set(user_id, typing); + } + + const events: AppServiceEphemeralEvent[] = []; + for (const [room_id, users] of perRoom) { + const user_ids = Array.from(users.entries()) + .filter(([, typing]) => typing) + .map(([user_id]) => user_id); + events.push({ type: 'm.typing', room_id, content: { user_ids } }); + } + return events; +} + +// Re-keys federation receipts (room -> user -> {data, event_ids}) into the +// client-server shape (event_id -> "m.read" -> user -> {ts, thread_id?}), +// emitting one m.receipt event per room. Multiple receipt EDUs for the same +// room within a batch are merged. +function transformReceipts(edus: ReceiptEDU[]): AppServiceEphemeralEvent[] { + const perRoom = new Map(); + + for (const edu of edus) { + for (const [room_id, roomContent] of Object.entries(edu.content)) { + const readMap = roomContent['m.read'] ?? {}; + let acc = perRoom.get(room_id); + if (!acc) { + acc = {}; + perRoom.set(room_id, acc); + } + for (const [user_id, userReceipt] of Object.entries(readMap)) { + for (const event_id of userReceipt.event_ids) { + if (!acc[event_id]) { + acc[event_id] = { 'm.read': {} }; + } + + const userEntry: ReceiptUser = { ts: userReceipt.data.ts }; + + if (userReceipt.data.thread_id) { + userEntry.thread_id = userReceipt.data.thread_id; + } + + acc[event_id]['m.read'][user_id] = userEntry; + } + } + } + } + + const events: AppServiceEphemeralEvent[] = []; + for (const [room_id, content] of perRoom) { + events.push({ type: 'm.receipt', room_id, content }); + } + return events; +} + +// Fans out federation presence (content.push[]) into one m.presence event per +// update, hoisting user_id to top-level sender and stripping it from content. +function transformPresence(edus: PresenceEDU[]): AppServiceEphemeralEvent[] { + const events: AppServiceEphemeralEvent[] = []; + for (const edu of edus) { + for (const update of edu.content.push) { + const { user_id, ...rest } = update; + events.push({ type: 'm.presence', sender: user_id, content: rest }); + } + } + return events; +} diff --git a/packages/appservice/tsconfig.json b/packages/appservice/tsconfig.json new file mode 100644 index 000000000..75c06d1c6 --- /dev/null +++ b/packages/appservice/tsconfig.json @@ -0,0 +1,17 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "./dist", + "rootDir": "./src", + "composite": true, + "noEmit": false, + "verbatimModuleSyntax": false, + "tsBuildInfoFile": "./.tsbuildinfo" + }, + "references": [ + { "path": "../core" }, + { "path": "../room" } + ], + "include": ["src/**/*"], + "exclude": ["node_modules", "dist", "**/*.spec.ts", "**/*.test.ts"] +} diff --git a/packages/core/src/events/homeserver-event-signatures.ts b/packages/core/src/events/homeserver-event-signatures.ts new file mode 100644 index 000000000..e9b71fad2 --- /dev/null +++ b/packages/core/src/events/homeserver-event-signatures.ts @@ -0,0 +1,80 @@ +import type { EventID, PduForType } from '@rocket.chat/federation-room'; + +export type HomeserverEventSignatures = { + 'homeserver.ping': { + message: string; + }; + 'homeserver.matrix.typing': { + room_id: string; + user_id: string; + typing: boolean; + origin?: string; + }; + 'homeserver.matrix.presence': { + user_id: string; + presence: 'online' | 'offline' | 'unavailable'; + last_active_ago?: number; + origin?: string; + }; + 'homeserver.matrix.receipt': { + room_id: string; + user_id: string; + event_ids: string[]; + ts: number; + thread_id?: string; + }; + 'homeserver.matrix.encryption': { + event_id: EventID; + event: PduForType<'m.room.encryption'>; + }; + 'homeserver.matrix.encrypted': { + event_id: EventID; + event: PduForType<'m.room.encrypted'>; + }; + 'homeserver.matrix.room.create': { + event: PduForType<'m.room.create'>; + event_id: EventID; + }; + 'homeserver.matrix.message': { + event_id: EventID; + event: PduForType<'m.room.message'>; + }; + 'homeserver.matrix.reaction': { + event_id: EventID; + event: PduForType<'m.reaction'>; + }; + 'homeserver.matrix.redaction': { + event_id: EventID; + event: PduForType<'m.room.redaction'>; + }; + 'homeserver.matrix.membership': { + event_id: EventID; + event: PduForType<'m.room.member'>; + }; + 'homeserver.matrix.room.name': { + event_id: EventID; + event: PduForType<'m.room.name'>; + }; + 'homeserver.matrix.room.topic': { + event_id: EventID; + event: PduForType<'m.room.topic'>; + }; + 'homeserver.matrix.room.server_acl': { + event_id: EventID; + event: PduForType<'m.room.server_acl'>; + }; + 'homeserver.matrix.room.power_levels': { + event_id: EventID; + event: PduForType<'m.room.power_levels'>; + }; + 'homeserver.matrix.room.role': { + sender_id: string; + user_id: string; + room_id: string; + role: 'moderator' | 'owner' | 'user'; + }; + 'homeserver.matrix.membership.rejected': { + event: PduForType<'m.room.member'>; + reason: string; + }; +}; diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 03f7328ac..13aa7c151 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -82,3 +82,5 @@ export type { FetchResponse, MultipartResult } from './utils/fetch'; export { fetch } from './utils/fetch'; export * from './AsyncDispatcher'; + +export type { HomeserverEventSignatures } from './events/homeserver-event-signatures'; diff --git a/packages/core/src/utils/fetch.ts b/packages/core/src/utils/fetch.ts index f5477b38b..24b1288be 100644 --- a/packages/core/src/utils/fetch.ts +++ b/packages/core/src/utils/fetch.ts @@ -1,4 +1,4 @@ -import { type IncomingHttpHeaders } from 'node:http'; +import http, { type IncomingHttpHeaders, type IncomingMessage } from 'node:http'; import https from 'node:https'; type RequestOptions = Parameters[1]; @@ -116,9 +116,77 @@ export type FetchResponse = { body: () => Promise; }; -// this fetch is used when connecting to a multihome server, same server hosting multiple homeservers, and we need to verify the cert with the right SNI (hostname), or else, cert check will fail due to connecting through ip and not hostname (due to matrix spec). +// lazily reads the full response body once, enforcing a size limit and cleaning up listeners +function readBody(res: IncomingMessage): () => Promise { + let body: Promise; + + return () => { + if (!body) { + body = new Promise((resolve, reject) => { + const chunks: Buffer[] = []; + + // TODO: Make @hs/core fetch size limit configurable + let total = 0; + const MAX_RESPONSE_BYTES = 50 * 1024 * 1024; // 50 MB + + const onData = (chunk: Buffer) => { + total += chunk.length; + if (total > MAX_RESPONSE_BYTES) { + const err = new Error('Response exceeds size limit'); + res.destroy(err); + cleanup(); + reject(err); + return; + } + chunks.push(chunk); + }; + const onEnd = () => { + cleanup(); + resolve(Buffer.concat(chunks)); + }; + const onErr = (err: Error) => { + cleanup(); + reject(err); + }; + const onAborted = () => onErr(new Error('Response aborted')); + const cleanup = () => { + res.off('data', onData); + res.off('end', onEnd); + res.off('error', onErr); + res.off('aborted', onAborted); + }; + res.on('data', onData); + res.once('end', onEnd); + res.once('error', onErr); + res.once('aborted', onAborted); + res.resume(); + }); + } + + return body; + }; +} + +// fallback response returned when the request never produced a usable response +function errorResponse(reason: string): FetchResponse { + return { + ok: false, + status: undefined, + headers: {}, + buffer: () => Promise.reject(reason), + json: () => Promise.reject(reason), + text: () => Promise.reject(reason), + multipart: () => Promise.reject(reason), + body: () => Promise.reject(reason), + }; +} + +// works for both http and https. for https on a multihomed server (same server hosting +// multiple homeservers) we must verify the cert with the right SNI (hostname), or else the +// cert check fails because we connect through the ip and not the hostname (due to matrix spec). export async function fetch(url: URL, options: RequestInit): Promise> { - const serverName = new URL(`http://${(options.headers as IncomingHttpHeaders).Host}` as string).hostname; + const isHttps = url.protocol === 'https:'; + const transport = isHttps ? https : http; const requestParams: RequestOptions = { // for ipv6 remove square brackets as they come due to url standard @@ -127,70 +195,29 @@ export async function fetch(url: URL, options: RequestInit): Promise Promise; headers: IncomingHttpHeaders; } = await new Promise((resolve, reject) => { - const request = https.request(requestParams, (res) => { - const chunks: Buffer[] = []; - + const request = transport.request(requestParams, (res) => { res.once('error', reject); - res.pause(); - let body: Promise; - resolve({ statusCode: res.statusCode, headers: res.headers, - body() { - if (!body) { - body = new Promise((resBody, rejBody) => { - // TODO: Make @hs/core fetch size limit configurable - let total = 0; - const MAX_RESPONSE_BYTES = 50 * 1024 * 1024; // 50 MB - - const onData = (chunk: Buffer) => { - total += chunk.length; - if (total > MAX_RESPONSE_BYTES) { - const err = new Error('Response exceeds size limit'); - res.destroy(err); - cleanup(); - rejBody(err); - return; - } - chunks.push(chunk); - }; - const onEnd = () => { - cleanup(); - resBody(Buffer.concat(chunks)); - }; - const onErr = (err: Error) => { - cleanup(); - rejBody(err); - }; - const onAborted = () => onErr(new Error('Response aborted')); - const cleanup = () => { - res.off('data', onData); - res.off('end', onEnd); - res.off('error', onErr); - res.off('aborted', onAborted); - }; - res.on('data', onData); - res.once('end', onEnd); - res.once('error', onErr); - res.once('aborted', onAborted); - res.resume(); - }); - } - - return body; - }, + body: readBody(res), }); }); @@ -228,15 +255,6 @@ export async function fetch(url: URL, options: RequestInit): Promise Promise.reject(reason), - json: () => Promise.reject(reason), - text: () => Promise.reject(reason), - multipart: () => Promise.reject(reason), - body: () => Promise.reject(reason), - }; + return errorResponse(reason); } } diff --git a/packages/federation-sdk/package.json b/packages/federation-sdk/package.json index 790b6c02a..291b5248e 100644 --- a/packages/federation-sdk/package.json +++ b/packages/federation-sdk/package.json @@ -1,6 +1,6 @@ { "name": "@rocket.chat/federation-sdk", - "version": "0.6.3", + "version": "0.7.0-beta.1", "description": "Matrix Federation SDK for server-to-server communication", "main": "./dist/index.js", "types": "./dist/index.d.ts", @@ -16,6 +16,7 @@ "test": "bun test" }, "dependencies": { + "@rocket.chat/appservice": "workspace:*", "@rocket.chat/emitter": "^0.32.0", "@rocket.chat/federation-core": "workspace:*", "@rocket.chat/federation-crypto": "workspace:*", @@ -28,6 +29,10 @@ }, "license": "AGPL-3.0", "author": "Rocket.Chat Technologies Corp. ", + "repository": { + "type": "git", + "url": "https://github.com/RocketChat/homeserver" + }, "files": ["dist"], "peerDependencies": { "typescript": "~5.9.2" diff --git a/packages/federation-sdk/src/index.ts b/packages/federation-sdk/src/index.ts index e0120229a..7db9b38c8 100644 --- a/packages/federation-sdk/src/index.ts +++ b/packages/federation-sdk/src/index.ts @@ -1,24 +1,49 @@ import 'reflect-metadata'; +import { + APPSERVICE_CONFIG_PROVIDER, + type AppServiceConfigProvider, + type AppServiceState, + type AppServiceTransaction, + EventRouterService, +} from '@rocket.chat/appservice'; import type { EventStagingStore } from '@rocket.chat/federation-core'; -import type { EventID, EventStore, PduForType } from '@rocket.chat/federation-room'; +import type { EventStore, RoomID } from '@rocket.chat/federation-room'; import { Collection } from 'mongodb'; import { container } from 'tsyringe'; import { StagingAreaListener } from './listeners/staging-area.listener'; import { Key } from './repositories/key.repository'; import { Lock } from './repositories/lock.repository'; +import { RoomAlias } from './repositories/room-alias.repository'; import { Room } from './repositories/room.repository'; import { Server } from './repositories/server.repository'; import { StateGraphStore } from './repositories/state-graph.repository'; import { Upload } from './repositories/upload.repository'; import { User } from './repositories/user.repository'; import { FederationSDK } from './sdk'; +import { ConfigService } from './services/config.service'; import { DatabaseConnectionService } from './services/database-connection.service'; import { EventEmitterService } from './services/event-emitter.service'; import { EventService } from './services/event.service'; +import { StateService } from './services/state.service'; + +container.register(APPSERVICE_CONFIG_PROVIDER, { + useValue: { + get serverName() { + return container.resolve(ConfigService).serverName; + }, + get xmpp() { + return container.resolve(ConfigService).getConfig('xmpp'); + }, + }, +}); export { FederationRequestError } from './services/federation-request.service'; +export { EventEmitterService } from './services/event-emitter.service'; + +export type { CachedAppService, AppServiceRegistration, AppServiceState } from '@rocket.chat/appservice'; +export type { PingResult, PingError } from '@rocket.chat/appservice'; export type { Pdu, @@ -52,84 +77,7 @@ export { errCodes } from './utils/response-codes'; export { NotAllowedError } from './services/invite.service'; export { FederationValidationService, FederationValidationError } from './services/federation-validation.service'; -export type HomeserverEventSignatures = { - 'homeserver.ping': { - message: string; - }; - 'homeserver.matrix.typing': { - room_id: string; - user_id: string; - typing: boolean; - origin?: string; - }; - 'homeserver.matrix.presence': { - user_id: string; - presence: 'online' | 'offline' | 'unavailable'; - last_active_ago?: number; - origin?: string; - }; - 'homeserver.matrix.receipt': { - room_id: string; - user_id: string; - event_ids: string[]; - ts: number; - thread_id?: string; - }; - 'homeserver.matrix.encryption': { - event_id: EventID; - event: PduForType<'m.room.encryption'>; - }; - 'homeserver.matrix.encrypted': { - event_id: EventID; - event: PduForType<'m.room.encrypted'>; - }; - 'homeserver.matrix.room.create': { - event: PduForType<'m.room.create'>; - event_id: EventID; - }; - 'homeserver.matrix.message': { - event_id: EventID; - event: PduForType<'m.room.message'>; - }; - 'homeserver.matrix.reaction': { - event_id: EventID; - event: PduForType<'m.reaction'>; - }; - 'homeserver.matrix.redaction': { - event_id: EventID; - event: PduForType<'m.room.redaction'>; - }; - 'homeserver.matrix.membership': { - event_id: EventID; - event: PduForType<'m.room.member'>; - }; - 'homeserver.matrix.room.name': { - event_id: EventID; - event: PduForType<'m.room.name'>; - }; - 'homeserver.matrix.room.topic': { - event_id: EventID; - event: PduForType<'m.room.topic'>; - }; - 'homeserver.matrix.room.server_acl': { - event_id: EventID; - event: PduForType<'m.room.server_acl'>; - }; - 'homeserver.matrix.room.power_levels': { - event_id: EventID; - event: PduForType<'m.room.power_levels'>; - }; - 'homeserver.matrix.room.role': { - sender_id: string; // who changed - user_id: string; // whose changed - room_id: string; // room where the change happened - role: 'moderator' | 'owner' | 'user'; // 50, 100, 0 - }; - 'homeserver.matrix.membership.rejected': { - event: PduForType<'m.room.member'>; - reason: string; - }; -}; +export type { HomeserverEventSignatures } from '@rocket.chat/federation-core'; export { roomIdSchema, userIdSchema, eventIdSchema, extractDomainFromId } from '@rocket.chat/federation-room'; @@ -164,6 +112,10 @@ export async function init({ useValue: db.collection('rocketchat_federation_rooms'), }); + container.register>('RoomAliasCollection', { + useValue: db.collection('rocketchat_federation_room_aliases'), + }); + container.register>('ServerCollection', { useValue: db.collection('rocketchat_federation_servers'), }); @@ -184,9 +136,30 @@ export async function init({ useValue: db.collection('users'), }); + container.register>('AppServiceStateCollection', { + useValue: db.collection('rocketchat_federation_appservices_state'), + }); + + container.register>('AppServiceTxnCollection', { + useValue: db.collection('rocketchat_federation_appservices_txns'), + }); + // this is required to initialize the listener and register the queue handler container.resolve(StagingAreaListener); + // Wire the event router into the homeserver event emitter so appservices + // receive transactions for events in their namespaces. + const eventRouter = container.resolve(EventRouterService); + const stateService = container.resolve(StateService); + eventRouter.setRoomStateResolver(async (roomId) => { + try { + const state = await stateService.getLatestRoomState2(roomId as RoomID); + return { aliases: state.getCanonicalAliases(), members: state.members }; + } catch { + return { aliases: [], members: [] }; + } + }); + // once the db is initialized we look for old staged events and try to process them setTimeout(async () => { const eventService = container.resolve(EventService); diff --git a/packages/federation-sdk/src/repositories/room-alias.repository.ts b/packages/federation-sdk/src/repositories/room-alias.repository.ts new file mode 100644 index 000000000..256ce5ac2 --- /dev/null +++ b/packages/federation-sdk/src/repositories/room-alias.repository.ts @@ -0,0 +1,29 @@ +import type { Collection } from 'mongodb'; +import { inject, singleton } from 'tsyringe'; + +export type RoomAlias = { + _id: string; + roomId: string; +}; + +@singleton() +export class RoomAliasRepository { + constructor(@inject('RoomAliasCollection') private readonly collection: Collection) {} + + async findByAlias(alias: string): Promise { + return this.collection.findOne({ _id: alias }); + } + + async findByRoomId(roomId: string): Promise { + return this.collection.find({ roomId }).toArray(); + } + + async upsert(alias: string, roomId: string): Promise { + await this.collection.updateOne({ _id: alias }, { $set: { _id: alias, roomId } }, { upsert: true }); + } + + async delete(alias: string): Promise { + const result = await this.collection.deleteOne({ _id: alias }); + return result.deletedCount > 0; + } +} diff --git a/packages/federation-sdk/src/repositories/user.repository.ts b/packages/federation-sdk/src/repositories/user.repository.ts index 744d89f0d..f51d5c143 100644 --- a/packages/federation-sdk/src/repositories/user.repository.ts +++ b/packages/federation-sdk/src/repositories/user.repository.ts @@ -1,3 +1,5 @@ +import crypto from 'node:crypto'; + import type { Collection } from 'mongodb'; import { inject, singleton } from 'tsyringe'; @@ -13,6 +15,7 @@ export type User = { mui?: string; origin?: string; avatarUrl?: string; + asId?: string; }; createdAt: Date; _updatedAt: Date; @@ -26,7 +29,7 @@ export class UserRepository { return this.collection.findOne( { username, - $or: [{ federated: { $exists: false } }, { federated: false }], + $or: [{ federated: { $exists: false } }, { federated: false }, { 'federation.asId': { $exists: true } }], }, { projection: { @@ -43,4 +46,42 @@ export class UserRepository { }, ); } + + /** + * Idempotently create the bot user that represents an appservice + * (its `sender_localpart`). Safe to call on every load / boot — it + * preserves `createdAt` and only refreshes `_updatedAt` and the + * `appserviceId` linkage on subsequent calls. + */ + async ensureSenderUser(localpart: string, serverName: string, appserviceId: string): Promise { + const now = new Date(); + const username = `@${localpart}:${serverName}`; + await this.collection.updateOne( + { 'federation.asId': appserviceId }, + { + $set: { + username, + name: username, + type: 'bot' as const, + status: 'offline' as const, + active: true, + roles: ['federated-external'], + requirePasswordChange: false, + federated: true, + federation: { + version: 1, + mui: username, + origin: serverName, + asId: appserviceId, + }, + _updatedAt: new Date(), + }, + $setOnInsert: { + _id: crypto.randomUUID(), + createdAt: now, + }, + }, + { upsert: true }, + ); + } } diff --git a/packages/federation-sdk/src/sdk.ts b/packages/federation-sdk/src/sdk.ts index 8af34ae04..b81dfd07d 100644 --- a/packages/federation-sdk/src/sdk.ts +++ b/packages/federation-sdk/src/sdk.ts @@ -1,8 +1,17 @@ +import { + type AppServiceRegistration, + BridgeQueryService, + NamespaceMatcherService, + PingService, + RegistrationService, +} from '@rocket.chat/appservice'; import type { EventStore } from '@rocket.chat/federation-core'; -import type { PduForType, PduType, UserID } from '@rocket.chat/federation-room'; -import { singleton } from 'tsyringe'; +import type { PduForType, PduType, RoomID, UserID } from '@rocket.chat/federation-room'; +import { delay, inject, singleton } from 'tsyringe'; +import { UserRepository } from './repositories/user.repository'; import { AppConfig, ConfigService } from './services/config.service'; +import { DirectoryService } from './services/directory.service'; import { EduService } from './services/edu.service'; import { EventAuthorizationService } from './services/event-authorization.service'; import { EventEmitterService } from './services/event-emitter.service'; @@ -41,8 +50,25 @@ export class FederationSDK { private readonly federationService: FederationService, public readonly eventEmitterService: EventEmitterService, private readonly federationValidationService: FederationValidationService, + private readonly registrationService: RegistrationService, + private readonly bridgeQueryService: BridgeQueryService, + private readonly namespaceMatcherService: NamespaceMatcherService, + private readonly pingService: PingService, + public readonly directoryService: DirectoryService, + @inject(delay(() => UserRepository)) + private readonly userRepository: UserRepository, ) {} + /** + * Ensure the bot user (`sender_localpart`) backing an appservice + * exists. Called from every path that brings a registration into the + * cache so the bot user is always materialised — load from YAML, + * admin-API register, and boot-time rehydrate from the DB. + */ + private async ensureSenderUser(registration: AppServiceRegistration): Promise { + await this.userRepository.ensureSenderUser(registration.senderLocalpart, this.configService.serverName, registration._id); + } + /** * @deprecated use createDirectMessage instead */ @@ -61,6 +87,10 @@ export class FederationSDK { return this.roomService.createRoom(...args); } + createRoomV2(...args: Parameters) { + return this.roomService.createRoomV2(...args); + } + inviteUserToRoom(...args: Parameters) { return this.inviteService.inviteUserToRoom(...args); } @@ -259,8 +289,13 @@ export class FederationSDK { return this.profilesService.eventAuth(...args); } - setConfig(...args: Parameters) { - return this.configService.setConfig(...args); + async setConfig(...args: Parameters) { + this.configService.setConfig(...args); + // Config is the sole source of bridge configuration, so rebuild the + // appservice registration whenever it changes — this also covers the + // boot path, where `init()` runs before the first `setConfig`. + await this.registrationService.initialize(); + await this.ensureSenderUsersForAllRegistrations(); } queryKeys(...args: Parameters) { @@ -282,4 +317,82 @@ export class FederationSDK { updateRoomMembership(...args: Parameters) { return this.roomService.updateRoomMembership(...args); } + + // --- Application Service --- + + getAllRegistrations(...args: Parameters) { + return this.registrationService.getAll(...args); + } + + getRegistrationById(...args: Parameters) { + return this.registrationService.getById(...args); + } + + getRegistrationByAsToken(...args: Parameters) { + return this.registrationService.getByAsToken(...args); + } + + /** + * Walk every cached registration and ensure its sender user exists. + * Called at boot once the registration is built from config; idempotent. + */ + async ensureSenderUsersForAllRegistrations(): Promise { + const registrations = this.registrationService.getAll(); + await Promise.all(registrations.map((cached) => this.ensureSenderUser(cached.registration))); + } + + getAllProtocols(...args: Parameters) { + return this.bridgeQueryService.getAllProtocols(...args); + } + + queryThirdPartyProtocol(...args: Parameters) { + return this.bridgeQueryService.queryThirdPartyProtocol(...args); + } + + queryThirdPartyUser(...args: Parameters) { + return this.bridgeQueryService.queryThirdPartyUser(...args); + } + + queryThirdPartyLocation(...args: Parameters) { + return this.bridgeQueryService.queryThirdPartyLocation(...args); + } + + isExclusiveNamespace(...args: Parameters) { + return this.namespaceMatcherService.isExclusive(...args); + } + + isUserInAppServiceNamespace(...args: Parameters) { + return this.namespaceMatcherService.isUserInNamespace(...args); + } + + pingAppService(...args: Parameters) { + return this.pingService.ping(...args); + } + + getAppServiceState(...args: Parameters) { + return this.registrationService.getState(...args); + } + + joinUser(...args: Parameters) { + return this.roomService.joinUser(...args); + } + + async joinXMPPChatRoom(roomAlias: string, sender: UserID) { + const localAlias = `_xmpp_${roomAlias}`; + + const fullRoomAlias = `#${localAlias}:${this.configService.serverName}`; + + const interested = this.namespaceMatcherService.getInterestedAppServices('', sender, [fullRoomAlias], []); + + for await (const as of interested) { + await this.bridgeQueryService.queryRoomAlias(as.registration._id, fullRoomAlias); + + const resolved = await this.directoryService.resolveAlias(localAlias); + if (!resolved) { + throw new Error(`Failed to resolve room alias ${roomAlias} after bridge query response`); + } + + await this.roomService.joinUser(resolved.roomId as RoomID, sender); + } + } } diff --git a/packages/federation-sdk/src/services/config.service.ts b/packages/federation-sdk/src/services/config.service.ts index 83e660fcf..2eaddb6a7 100644 --- a/packages/federation-sdk/src/services/config.service.ts +++ b/packages/federation-sdk/src/services/config.service.ts @@ -32,6 +32,11 @@ export interface AppConfig { }; userCheckTimeoutMs?: number; networkCheckTimeoutMs?: number; + xmpp?: { + bridgeURL: string; + hsToken: string; + asToken: string; + }; } export const AppConfigSchema = z.object({ @@ -62,6 +67,13 @@ export const AppConfigSchema = z.object({ processPresence: z.boolean(), processReceipt: z.boolean().optional(), }), + xmpp: z + .object({ + bridgeURL: z.string().min(1, 'Bridge URL is required'), + hsToken: z.string().min(1, 'hs_token is required'), + asToken: z.string().min(1, 'as_token is required'), + }) + .optional(), networkCheckTimeoutMs: z.number().int().min(1000, 'Network check timeout must be at least 1000ms').default(5000).optional(), userCheckTimeoutMs: z.number().int().min(1000, 'User check timeout must be at least 1000ms').default(10000).optional(), }); diff --git a/packages/federation-sdk/src/services/directory.service.ts b/packages/federation-sdk/src/services/directory.service.ts new file mode 100644 index 000000000..b5b21330b --- /dev/null +++ b/packages/federation-sdk/src/services/directory.service.ts @@ -0,0 +1,27 @@ +import { delay, inject, singleton } from 'tsyringe'; + +import { RoomAliasRepository } from '../repositories/room-alias.repository'; + +@singleton() +export class DirectoryService { + constructor( + @inject(delay(() => RoomAliasRepository)) + private readonly roomAliasRepository: RoomAliasRepository, + ) {} + + async resolveAlias(alias: string) { + return this.roomAliasRepository.findByAlias(alias); + } + + async setAlias(alias: string, roomId: string) { + return this.roomAliasRepository.upsert(alias, roomId); + } + + async deleteAlias(alias: string) { + return this.roomAliasRepository.delete(alias); + } + + async getAliasesForRoom(roomId: string) { + return this.roomAliasRepository.findByRoomId(roomId); + } +} diff --git a/packages/federation-sdk/src/services/edu.service.ts b/packages/federation-sdk/src/services/edu.service.ts index 3fe6bf5af..6e9bd7f5e 100644 --- a/packages/federation-sdk/src/services/edu.service.ts +++ b/packages/federation-sdk/src/services/edu.service.ts @@ -1,3 +1,4 @@ +import { EventRouterService } from '@rocket.chat/appservice'; import type { PresenceUpdate, ReceiptEDU } from '@rocket.chat/federation-core'; import { createPresenceEDU, createTypingEDU, createLogger } from '@rocket.chat/federation-core'; import { RoomID } from '@rocket.chat/federation-room'; @@ -15,6 +16,7 @@ export class EduService { private readonly configService: ConfigService, private readonly federationService: FederationService, private readonly stateService: StateService, + private readonly eventRouterService: EventRouterService, ) {} async sendTypingNotification(roomId: RoomID, userId: string, typing: boolean): Promise { @@ -29,6 +31,7 @@ export class EduService { const uniqueServers = Array.from(servers).filter((server) => server !== origin); await this.federationService.sendEDUToServers([typingEDU], uniqueServers); + void this.eventRouterService.routeEphemeral(typingEDU); this.logger.debug(`Sent typing notification to ${uniqueServers.length} unique servers for room ${roomId}`); } catch (error) { @@ -61,6 +64,7 @@ export class EduService { ); await this.federationService.sendEDUToServers([presenceEDU], Array.from(uniqueServers)); + void this.eventRouterService.routeEphemeral(presenceEDU); this.logger.debug(`Sent presence updates to ${uniqueServers.size} unique servers for ${roomIds.length} rooms`); } catch (error) { @@ -111,6 +115,7 @@ export class EduService { const uniqueServers = Array.from(servers).filter((server) => server !== origin); await this.federationService.sendEDUToServers([receiptEDU], uniqueServers); + void this.eventRouterService.routeEphemeral(receiptEDU); this.logger.debug(`Sent read receipt to ${uniqueServers.length} unique servers for room ${roomId}`); } catch (error) { diff --git a/packages/federation-sdk/src/services/event-emitter.service.ts b/packages/federation-sdk/src/services/event-emitter.service.ts index 3077ec803..ca37eace3 100644 --- a/packages/federation-sdk/src/services/event-emitter.service.ts +++ b/packages/federation-sdk/src/services/event-emitter.service.ts @@ -1,9 +1,7 @@ import { Emitter } from '@rocket.chat/emitter'; -import { AsyncDispatcher, type EventHandlerOf, type EventOf, logger } from '@rocket.chat/federation-core'; +import { AsyncDispatcher, type EventHandlerOf, type EventOf, type HomeserverEventSignatures, logger } from '@rocket.chat/federation-core'; import { singleton } from 'tsyringe'; -import type { HomeserverEventSignatures } from '..'; - @singleton() export class EventEmitterService { private emitter: AsyncDispatcher = new AsyncDispatcher(); diff --git a/packages/federation-sdk/src/services/message.service.ts b/packages/federation-sdk/src/services/message.service.ts index bfc3d45be..a87a73065 100644 --- a/packages/federation-sdk/src/services/message.service.ts +++ b/packages/federation-sdk/src/services/message.service.ts @@ -1,3 +1,4 @@ +import { EventRouterService } from '@rocket.chat/appservice'; import { ForbiddenError, createLogger } from '@rocket.chat/federation-core'; import { type EventID, type PersistentEventBase, RoomID, UserID } from '@rocket.chat/federation-room'; import { singleton } from 'tsyringe'; @@ -52,6 +53,7 @@ export class MessageService { private readonly federationService: FederationService, private readonly roomService: RoomService, private readonly stateService: StateService, + private readonly eventRouterService: EventRouterService, ) {} private buildReplyContent(reply: Reply) { @@ -126,6 +128,7 @@ export class MessageService { } void this.federationService.sendEventToAllServersInRoom(event); + void this.eventRouterService.routeEvent(event); return event; } @@ -180,6 +183,7 @@ export class MessageService { } void this.federationService.sendEventToAllServersInRoom(event); + void this.eventRouterService.routeEvent(event); return event; } @@ -260,6 +264,7 @@ export class MessageService { await this.stateService.handlePdu(reactionEvent); void this.federationService.sendEventToAllServersInRoom(reactionEvent); + void this.eventRouterService.routeEvent(reactionEvent); return reactionEvent.eventId; } @@ -287,6 +292,7 @@ export class MessageService { await this.stateService.handlePdu(redactionEvent); void this.federationService.sendEventToAllServersInRoom(redactionEvent); + void this.eventRouterService.routeEvent(redactionEvent); return redactionEvent.eventId; } @@ -332,6 +338,7 @@ export class MessageService { await this.stateService.handlePdu(redactionEvent); void this.federationService.sendEventToAllServersInRoom(redactionEvent); + void this.eventRouterService.routeEvent(redactionEvent); return redactionEvent.eventId; } @@ -370,6 +377,7 @@ export class MessageService { await this.stateService.handlePdu(redactionEvent); void this.federationService.sendEventToAllServersInRoom(redactionEvent); + void this.eventRouterService.routeEvent(redactionEvent); return redactionEvent.eventId; } diff --git a/packages/federation-sdk/src/services/profiles.service.ts b/packages/federation-sdk/src/services/profiles.service.ts index 8e5d0bca5..98f56aacf 100644 --- a/packages/federation-sdk/src/services/profiles.service.ts +++ b/packages/federation-sdk/src/services/profiles.service.ts @@ -23,9 +23,16 @@ export class ProfilesService { return null; } - const username = userId.split(':')[0]?.slice(1); - - const user = await this.userRepository.findByUsername(username); + const user = await (async () => { + // try to find with full userId first, then fallback to localpart only + const found = await this.userRepository.findByUsername(userId); + if (found) { + return found; + } + + // this is for querying local users which does not have the server name in the username field + return this.userRepository.findByUsername(userId.split(':')[0]?.slice(1)); + })(); if (!user) { // this.logger.debug(`Local user ${userId} not found in repository`); diff --git a/packages/federation-sdk/src/services/room.service.ts b/packages/federation-sdk/src/services/room.service.ts index 1fd6eb80b..5a0246777 100644 --- a/packages/federation-sdk/src/services/room.service.ts +++ b/packages/federation-sdk/src/services/room.service.ts @@ -28,6 +28,7 @@ import { import { delay, inject, singleton } from 'tsyringe'; import { ConfigService } from './config.service'; +import { DirectoryService } from './directory.service'; import { EventAuthorizationService } from './event-authorization.service'; import { EventEmitterService } from './event-emitter.service'; import { EventFetcherService } from './event-fetcher.service'; @@ -61,6 +62,7 @@ export class RoomService { @inject(delay(() => EventStagingRepository)) private readonly eventStagingRepository: EventStagingRepository, private readonly federationValidationService: FederationValidationService, + private readonly directoryService: DirectoryService, ) {} private validatePowerLevelChange( @@ -171,21 +173,18 @@ export class RoomService { } } - /** - * Create a new room with the given sender and username - */ - async createRoom( - username: UserID, - name: string, - joinRule: PduJoinRuleEventContent['join_rule'], - powers: { + async createRoomV2(data: { + name: string; + owner: UserID; + joinRule: PduJoinRuleEventContent['join_rule']; + powersLevels?: { users?: Record; events?: Record; - } = { - users: {}, - events: {}, - }, - ) { + }; + alias?: string; + }) { + const { name, owner: username, joinRule, powersLevels: powers = {}, alias } = data; + logger.debug(`Creating room for ${username} with ${name} join_rule: ${joinRule}`); const roomCreateEvent = PersistentEventFactory.newCreateEvent(username, PersistentEventFactory.defaultRoomVersion); @@ -285,11 +284,20 @@ export class RoomService { await stateService.handlePdu(joinRuleEvent); + if (alias) { + const existing = await this.directoryService.resolveAlias(alias); + if (existing) { + throw new Error(`Alias ${alias} already exists, cannot create room with this alias.`); + } + + await this.directoryService.setAlias(alias, roomCreateEvent.roomId); + } + const canonicalAliasEvent = await stateService.buildEvent<'m.room.canonical_alias'>( { type: 'm.room.canonical_alias', content: { - alias: `#${name}:${this.configService.serverName}`, + alias: `#${alias || name}:${this.configService.serverName}`, alt_aliases: [], }, room_id: roomCreateEvent.roomId, @@ -311,6 +319,29 @@ export class RoomService { }; } + /** + * Create a new room with the given sender and username + */ + async createRoom( + username: UserID, + name: string, + joinRule: PduJoinRuleEventContent['join_rule'], + powers: { + users?: Record; + events?: Record; + } = { + users: {}, + events: {}, + }, + ) { + return this.createRoomV2({ + name, + owner: username, + joinRule, + powersLevels: powers, + }); + } + async updateRoomName(roomId: RoomID, name: string, senderId: UserID) { logger.info(`Updating room name for ${roomId} to \"${name}\" by ${senderId}`); diff --git a/packages/federation-sdk/tsconfig.json b/packages/federation-sdk/tsconfig.json index 91ec6e497..e257192f9 100644 --- a/packages/federation-sdk/tsconfig.json +++ b/packages/federation-sdk/tsconfig.json @@ -9,5 +9,10 @@ "tsBuildInfoFile": "./.tsbuildinfo" }, "include": ["src/**/*"], + "references": [ + { "path": "../appservice" }, + { "path": "../core" }, + { "path": "../room" } + ], "exclude": ["node_modules", "dist", "**/*.spec.ts", "**/*.test.ts"] } diff --git a/packages/homeserver/src/controllers/client/directory.controller.ts b/packages/homeserver/src/controllers/client/directory.controller.ts new file mode 100644 index 000000000..6bf80733f --- /dev/null +++ b/packages/homeserver/src/controllers/client/directory.controller.ts @@ -0,0 +1,89 @@ +import type { Elysia } from 'elysia'; +import { t } from 'elysia'; + +import { resolveAppServiceAuth } from '../../middlewares/appserviceAuth'; + +/** + * Client-Server API endpoints for room directory management. + */ +export const clientDirectoryPlugin = (serverName: string) => (app: Elysia) => { + return app + .put( + '/_matrix/client/v3/directory/room/:roomAlias', + async ({ params, body, headers, request, set }) => { + const auth = resolveAppServiceAuth(request, headers, serverName); + if (auth.error) { + set.status = auth.error.status; + return { errcode: auth.error.errcode, error: auth.error.error }; + } + if (!auth.appservice) { + set.status = 401; + return { errcode: 'M_UNKNOWN_TOKEN', error: 'Invalid or missing application service token' }; + } + + // TODO: Create room alias -> room_id mapping + return {}; + }, + { + params: t.Object({ roomAlias: t.String() }), + body: t.Object({ room_id: t.String() }), + detail: { tags: ['Client-Server'], summary: 'Create a room alias' }, + }, + ) + .delete( + '/_matrix/client/v3/directory/room/:roomAlias', + async ({ params, headers, request, set }) => { + const auth = resolveAppServiceAuth(request, headers, serverName); + if (auth.error) { + set.status = auth.error.status; + return { errcode: auth.error.errcode, error: auth.error.error }; + } + if (!auth.appservice) { + set.status = 401; + return { errcode: 'M_UNKNOWN_TOKEN', error: 'Invalid or missing application service token' }; + } + + // TODO: Remove room alias mapping + return {}; + }, + { + params: t.Object({ roomAlias: t.String() }), + detail: { tags: ['Client-Server'], summary: 'Delete a room alias' }, + }, + ) + .get( + '/_matrix/client/v3/directory/room/:roomAlias', + async ({ params, set }) => { + // TODO: Resolve alias to room_id + // If alias matches a bridge namespace, query bridge first + set.status = 404; + return { errcode: 'M_NOT_FOUND', error: 'Room alias not found' }; + }, + { + params: t.Object({ roomAlias: t.String() }), + detail: { tags: ['Client-Server'], summary: 'Resolve a room alias' }, + }, + ) + .put( + '/_matrix/client/v3/directory/list/appservice/:networkId/:roomId', + async ({ params, body, headers, request, set }) => { + const auth = resolveAppServiceAuth(request, headers, serverName); + if (auth.error) { + set.status = auth.error.status; + return { errcode: auth.error.errcode, error: auth.error.error }; + } + if (!auth.appservice) { + set.status = 401; + return { errcode: 'M_UNKNOWN_TOKEN', error: 'Invalid or missing application service token' }; + } + + // TODO: Update room visibility in directory + return {}; + }, + { + params: t.Object({ networkId: t.String(), roomId: t.String() }), + body: t.Object({ visibility: t.Union([t.Literal('public'), t.Literal('private')]) }), + detail: { tags: ['Client-Server'], summary: 'Set room directory visibility (appservice)' }, + }, + ); +}; diff --git a/packages/homeserver/src/controllers/client/events.controller.ts b/packages/homeserver/src/controllers/client/events.controller.ts new file mode 100644 index 000000000..70abb76a0 --- /dev/null +++ b/packages/homeserver/src/controllers/client/events.controller.ts @@ -0,0 +1,81 @@ +import type { Elysia } from 'elysia'; +import { t } from 'elysia'; + +import { resolveAppServiceAuth } from '../../middlewares/appserviceAuth'; + +/** + * Client-Server API endpoints for sending events. + * Used by bridges to send messages and state events as ghost users. + */ +export const clientEventsPlugin = (serverName: string) => (app: Elysia) => { + return app + .put( + '/_matrix/client/v3/rooms/:roomId/send/:eventType/:txnId', + async ({ params, body, headers, request, set }) => { + const auth = resolveAppServiceAuth(request, headers, serverName); + if (auth.error) { + set.status = auth.error.status; + return { errcode: auth.error.errcode, error: auth.error.error }; + } + if (!auth.appservice) { + set.status = 401; + return { errcode: 'M_UNKNOWN_TOKEN', error: 'Invalid or missing application service token' }; + } + + const { roomId, eventType, txnId } = params; + + // TODO: Create the event using the federation SDK + // - Use auth.actingUserId as the sender + // - Use auth.timestampOverride for origin_server_ts if provided + // - Deduplicate by txnId per sender + + return { event_id: `$stub_${txnId}` }; + }, + { + params: t.Object({ roomId: t.String(), eventType: t.String(), txnId: t.String() }), + detail: { tags: ['Client-Server'], summary: 'Send a message event' }, + }, + ) + .put( + '/_matrix/client/v3/rooms/:roomId/state/:eventType/:stateKey', + async ({ params, body, headers, request, set }) => { + const auth = resolveAppServiceAuth(request, headers, serverName); + if (auth.error) { + set.status = auth.error.status; + return { errcode: auth.error.errcode, error: auth.error.error }; + } + if (!auth.appservice) { + set.status = 401; + return { errcode: 'M_UNKNOWN_TOKEN', error: 'Invalid or missing application service token' }; + } + + // TODO: Create the state event using the federation SDK + + return { event_id: `$stub_state_${params.eventType}_${params.stateKey}` }; + }, + { + params: t.Object({ roomId: t.String(), eventType: t.String(), stateKey: t.String() }), + detail: { tags: ['Client-Server'], summary: 'Send a state event' }, + }, + ) + .put( + '/_matrix/client/v3/rooms/:roomId/state/:eventType', + async ({ params, body, headers, request, set }) => { + const auth = resolveAppServiceAuth(request, headers, serverName); + if (auth.error) { + set.status = auth.error.status; + return { errcode: auth.error.errcode, error: auth.error.error }; + } + if (!auth.appservice) { + set.status = 401; + return { errcode: 'M_UNKNOWN_TOKEN', error: 'Invalid or missing application service token' }; + } + + return { event_id: `$stub_state_${params.eventType}` }; + }, + { + params: t.Object({ roomId: t.String(), eventType: t.String() }), + detail: { tags: ['Client-Server'], summary: 'Send a state event (empty state key)' }, + }, + ); +}; diff --git a/packages/homeserver/src/controllers/client/ping.controller.ts b/packages/homeserver/src/controllers/client/ping.controller.ts new file mode 100644 index 000000000..4107be4c2 --- /dev/null +++ b/packages/homeserver/src/controllers/client/ping.controller.ts @@ -0,0 +1,49 @@ +import { federationSDK } from '@rocket.chat/federation-sdk'; +import type { Elysia } from 'elysia'; +import { t } from 'elysia'; + +import { resolveAppServiceAuth } from '../../middlewares/appserviceAuth'; + +/** + * POST /_matrix/client/v1/appservice/:appserviceId/ping + */ +export const clientAppservicePingPlugin = (serverName: string) => (app: Elysia) => { + return app.post( + '/_matrix/client/v1/appservice/:appserviceId/ping', + async ({ params, body, headers, request, set }) => { + const auth = resolveAppServiceAuth(request, headers, serverName); + if (auth.error) { + set.status = auth.error.status; + return { errcode: auth.error.errcode, error: auth.error.error }; + } + if (!auth.appservice) { + set.status = 401; + return { errcode: 'M_UNKNOWN_TOKEN', error: 'Invalid or missing application service token' }; + } + + if (auth.appservice.registration._id !== params.appserviceId) { + set.status = 403; + return { errcode: 'M_FORBIDDEN', error: 'Cannot ping a different appservice' }; + } + + const result = await federationSDK.pingAppService(params.appserviceId, body?.transaction_id); + + if ('errcode' in result) { + if (result.errcode === 'M_URL_NOT_SET') { + set.status = 400; + } else if (result.errcode === 'M_CONNECTION_TIMEOUT') { + set.status = 504; + } else { + set.status = 502; + } + return result; + } + return result; + }, + { + params: t.Object({ appserviceId: t.String() }), + body: t.Optional(t.Object({ transaction_id: t.Optional(t.String()) })), + detail: { tags: ['Client-Server'], summary: 'Ping an appservice' }, + }, + ); +}; diff --git a/packages/homeserver/src/controllers/client/profile.controller.ts b/packages/homeserver/src/controllers/client/profile.controller.ts new file mode 100644 index 000000000..2feb6345f --- /dev/null +++ b/packages/homeserver/src/controllers/client/profile.controller.ts @@ -0,0 +1,85 @@ +import type { Elysia } from 'elysia'; +import { t } from 'elysia'; + +import { resolveAppServiceAuth } from '../../middlewares/appserviceAuth'; + +/** + * Client-Server API endpoints for user profiles. + */ +export const clientProfilePlugin = (serverName: string) => (app: Elysia) => { + return app + .put( + '/_matrix/client/v3/profile/:userId/displayname', + async ({ params, body, headers, request, set }) => { + const auth = resolveAppServiceAuth(request, headers, serverName); + if (auth.error) { + set.status = auth.error.status; + return { errcode: auth.error.errcode, error: auth.error.error }; + } + if (!auth.appservice) { + set.status = 401; + return { errcode: 'M_UNKNOWN_TOKEN', error: 'Invalid or missing application service token' }; + } + + // TODO: Update user display name + return {}; + }, + { + params: t.Object({ userId: t.String() }), + body: t.Object({ displayname: t.String() }), + detail: { tags: ['Client-Server'], summary: 'Set display name' }, + }, + ) + .put( + '/_matrix/client/v3/profile/:userId/avatar_url', + async ({ params, body, headers, request, set }) => { + const auth = resolveAppServiceAuth(request, headers, serverName); + if (auth.error) { + set.status = auth.error.status; + return { errcode: auth.error.errcode, error: auth.error.error }; + } + if (!auth.appservice) { + set.status = 401; + return { errcode: 'M_UNKNOWN_TOKEN', error: 'Invalid or missing application service token' }; + } + + // TODO: Update user avatar URL + return {}; + }, + { + params: t.Object({ userId: t.String() }), + body: t.Object({ avatar_url: t.String() }), + detail: { tags: ['Client-Server'], summary: 'Set avatar URL' }, + }, + ) + .get( + '/_matrix/client/v3/profile/:userId', + async ({ params }) => { + // TODO: Look up user profile from the database + return { displayname: params.userId }; + }, + { + params: t.Object({ userId: t.String() }), + detail: { tags: ['Client-Server'], summary: 'Get user profile' }, + }, + ) + .get( + '/_matrix/client/v3/account/whoami', + async ({ headers, request, set }) => { + const auth = resolveAppServiceAuth(request, headers, serverName); + if (auth.error) { + set.status = auth.error.status; + return { errcode: auth.error.errcode, error: auth.error.error }; + } + if (!auth.appservice) { + set.status = 401; + return { errcode: 'M_UNKNOWN_TOKEN', error: 'Invalid or missing application service token' }; + } + + return { user_id: auth.actingUserId, is_guest: false }; + }, + { + detail: { tags: ['Client-Server'], summary: 'Who am I' }, + }, + ); +}; diff --git a/packages/homeserver/src/controllers/client/register.controller.ts b/packages/homeserver/src/controllers/client/register.controller.ts new file mode 100644 index 000000000..dac04510b --- /dev/null +++ b/packages/homeserver/src/controllers/client/register.controller.ts @@ -0,0 +1,63 @@ +import { federationSDK } from '@rocket.chat/federation-sdk'; +import type { Elysia } from 'elysia'; +import { t } from 'elysia'; + +import { resolveAppServiceAuth } from '../../middlewares/appserviceAuth'; + +/** + * POST /_matrix/client/v3/register + * + * Supports m.login.application_service auth type for ghost user registration. + */ +export const clientRegisterPlugin = (serverName: string) => (app: Elysia) => { + return app.post( + '/_matrix/client/v3/register', + async ({ body, headers, request, set }) => { + const authType = body.type || body.auth?.type; + if (authType !== 'm.login.application_service') { + set.status = 403; + return { errcode: 'M_FORBIDDEN', error: 'Only m.login.application_service registration is supported' }; + } + + const auth = resolveAppServiceAuth(request, headers, serverName); + if (auth.error) { + set.status = auth.error.status; + return { errcode: auth.error.errcode, error: auth.error.error }; + } + if (!auth.appservice) { + set.status = 401; + return { errcode: 'M_UNKNOWN_TOKEN', error: 'Invalid or missing application service token' }; + } + + const { username } = body; + if (!username) { + set.status = 400; + return { errcode: 'M_MISSING_PARAM', error: 'username is required' }; + } + + const userId = `@${username}:${serverName}`; + + // Check exclusive namespace + const owningAs = federationSDK.isExclusiveNamespace('users', userId); + if (owningAs && owningAs.registration._id !== auth.appservice.registration._id) { + set.status = 400; + return { errcode: 'M_EXCLUSIVE', error: 'Username is within an exclusive namespace of another appservice' }; + } + + // TODO: Check if user already exists, create user with appserviceId + return { user_id: userId }; + }, + { + body: t.Object({ + auth: t.Optional(t.Object({ type: t.String() })), + type: t.Optional(t.String()), + username: t.Optional(t.String()), + password: t.Optional(t.String()), + device_id: t.Optional(t.String()), + initial_device_display_name: t.Optional(t.String()), + inhibit_login: t.Optional(t.Boolean()), + }), + detail: { tags: ['Client-Server'], summary: 'Register a new user (appservice)' }, + }, + ); +}; diff --git a/packages/homeserver/src/controllers/client/rooms.controller.ts b/packages/homeserver/src/controllers/client/rooms.controller.ts new file mode 100644 index 000000000..58ff3ecb0 --- /dev/null +++ b/packages/homeserver/src/controllers/client/rooms.controller.ts @@ -0,0 +1,109 @@ +import type { Elysia } from 'elysia'; +import { t } from 'elysia'; + +import { resolveAppServiceAuth } from '../../middlewares/appserviceAuth'; + +/** + * Client-Server API endpoints for room operations. + */ +export const clientRoomsPlugin = (serverName: string) => (app: Elysia) => { + return app + .post( + '/_matrix/client/v3/createRoom', + async ({ body, headers, request, set }) => { + const auth = resolveAppServiceAuth(request, headers, serverName); + if (auth.error) { + set.status = auth.error.status; + return { errcode: auth.error.errcode, error: auth.error.error }; + } + if (!auth.appservice) { + set.status = 401; + return { errcode: 'M_UNKNOWN_TOKEN', error: 'Invalid or missing application service token' }; + } + + // TODO: Create room via federation SDK as auth.actingUserId + return { room_id: `!stub:${serverName}` }; + }, + { + body: t.Object({ + room_alias_name: t.Optional(t.String()), + name: t.Optional(t.String()), + topic: t.Optional(t.String()), + visibility: t.Optional(t.Union([t.Literal('public'), t.Literal('private')])), + invite: t.Optional(t.Array(t.String())), + preset: t.Optional(t.Union([t.Literal('private_chat'), t.Literal('public_chat'), t.Literal('trusted_private_chat')])), + is_direct: t.Optional(t.Boolean()), + creation_content: t.Optional(t.Record(t.String(), t.Unknown())), + initial_state: t.Optional( + t.Array(t.Object({ type: t.String(), state_key: t.Optional(t.String()), content: t.Record(t.String(), t.Unknown()) })), + ), + power_level_content_override: t.Optional(t.Record(t.String(), t.Unknown())), + }), + detail: { tags: ['Client-Server'], summary: 'Create a room' }, + }, + ) + .post( + '/_matrix/client/v3/join/:roomIdOrAlias', + async ({ params, headers, request, set }) => { + const auth = resolveAppServiceAuth(request, headers, serverName); + if (auth.error) { + set.status = auth.error.status; + return { errcode: auth.error.errcode, error: auth.error.error }; + } + if (!auth.appservice) { + set.status = 401; + return { errcode: 'M_UNKNOWN_TOKEN', error: 'Invalid or missing application service token' }; + } + + // TODO: Join room via federation SDK as auth.actingUserId + return { room_id: params.roomIdOrAlias.startsWith('!') ? params.roomIdOrAlias : `!stub:${serverName}` }; + }, + { + params: t.Object({ roomIdOrAlias: t.String() }), + detail: { tags: ['Client-Server'], summary: 'Join a room' }, + }, + ) + .post( + '/_matrix/client/v3/rooms/:roomId/leave', + async ({ params, headers, request, set }) => { + const auth = resolveAppServiceAuth(request, headers, serverName); + if (auth.error) { + set.status = auth.error.status; + return { errcode: auth.error.errcode, error: auth.error.error }; + } + if (!auth.appservice) { + set.status = 401; + return { errcode: 'M_UNKNOWN_TOKEN', error: 'Invalid or missing application service token' }; + } + + // TODO: Leave room + return {}; + }, + { + params: t.Object({ roomId: t.String() }), + detail: { tags: ['Client-Server'], summary: 'Leave a room' }, + }, + ) + .post( + '/_matrix/client/v3/rooms/:roomId/invite', + async ({ params, body, headers, request, set }) => { + const auth = resolveAppServiceAuth(request, headers, serverName); + if (auth.error) { + set.status = auth.error.status; + return { errcode: auth.error.errcode, error: auth.error.error }; + } + if (!auth.appservice) { + set.status = 401; + return { errcode: 'M_UNKNOWN_TOKEN', error: 'Invalid or missing application service token' }; + } + + // TODO: Invite user + return {}; + }, + { + params: t.Object({ roomId: t.String() }), + body: t.Object({ user_id: t.String(), reason: t.Optional(t.String()) }), + detail: { tags: ['Client-Server'], summary: 'Invite a user to a room' }, + }, + ); +}; diff --git a/packages/homeserver/src/controllers/client/thirdparty.controller.ts b/packages/homeserver/src/controllers/client/thirdparty.controller.ts new file mode 100644 index 000000000..5f20f5d92 --- /dev/null +++ b/packages/homeserver/src/controllers/client/thirdparty.controller.ts @@ -0,0 +1,79 @@ +import type { CachedAppService } from '@rocket.chat/federation-sdk'; +import { federationSDK } from '@rocket.chat/federation-sdk'; +import type { Elysia } from 'elysia'; +import { t } from 'elysia'; + +async function findFirstResult( + appservices: CachedAppService[], + protocol: string, + query: (asId: string, protocol: string) => Promise, +): Promise { + for (const as of appservices) { + if (as.registration.protocols.includes(protocol)) { + // eslint-disable-next-line no-await-in-loop + const result = await query(as.registration._id, protocol); + if (result) return result; + } + } + return null; +} + +/** + * Client-Server API endpoints for third-party protocol lookups. + */ +export const clientThirdPartyPlugin = (_serverName: string) => (app: Elysia) => { + return app + .get( + '/_matrix/client/v3/thirdparty/protocols', + async () => { + return federationSDK.getAllProtocols(); + }, + { detail: { tags: ['Client-Server'], summary: 'List all third-party protocols' } }, + ) + .get( + '/_matrix/client/v3/thirdparty/protocol/:protocol', + async ({ params, set }) => { + const result = await findFirstResult(federationSDK.getAllRegistrations(), params.protocol, (asId, protocol) => + federationSDK.queryThirdPartyProtocol(asId, protocol), + ); + if (result) return result; + + set.status = 404; + return { errcode: 'M_NOT_FOUND', error: 'Protocol not found' }; + }, + { + params: t.Object({ protocol: t.String() }), + detail: { tags: ['Client-Server'], summary: 'Get third-party protocol metadata' }, + }, + ) + .get( + '/_matrix/client/v3/thirdparty/user/:protocol', + async ({ params, query }) => { + const fields = { ...query } as Record; + + const result = await findFirstResult(federationSDK.getAllRegistrations(), params.protocol, (asId, protocol) => + federationSDK.queryThirdPartyUser(asId, protocol, fields), + ); + return result ?? []; + }, + { + params: t.Object({ protocol: t.String() }), + detail: { tags: ['Client-Server'], summary: 'Query third-party users' }, + }, + ) + .get( + '/_matrix/client/v3/thirdparty/location/:protocol', + async ({ params, query }) => { + const fields = { ...query } as Record; + + const result = await findFirstResult(federationSDK.getAllRegistrations(), params.protocol, (asId, protocol) => + federationSDK.queryThirdPartyLocation(asId, protocol, fields), + ); + return result ?? []; + }, + { + params: t.Object({ protocol: t.String() }), + detail: { tags: ['Client-Server'], summary: 'Query third-party locations' }, + }, + ); +}; diff --git a/packages/homeserver/src/homeserver.module.ts b/packages/homeserver/src/homeserver.module.ts index 4a79f1d94..d44aea6c0 100644 --- a/packages/homeserver/src/homeserver.module.ts +++ b/packages/homeserver/src/homeserver.module.ts @@ -5,11 +5,17 @@ import * as fs from 'node:fs'; import * as path from 'node:path'; import { swagger } from '@elysiajs/swagger'; -import type { Emitter } from '@rocket.chat/emitter'; -import { type HomeserverEventSignatures, federationSDK, init } from '@rocket.chat/federation-sdk'; +import { federationSDK, init } from '@rocket.chat/federation-sdk'; import * as dotenv from 'dotenv'; import Elysia from 'elysia'; +import { clientDirectoryPlugin } from './controllers/client/directory.controller'; +import { clientEventsPlugin } from './controllers/client/events.controller'; +import { clientAppservicePingPlugin } from './controllers/client/ping.controller'; +import { clientProfilePlugin } from './controllers/client/profile.controller'; +import { clientRegisterPlugin } from './controllers/client/register.controller'; +import { clientRoomsPlugin } from './controllers/client/rooms.controller'; +import { clientThirdPartyPlugin } from './controllers/client/thirdparty.controller'; import { invitePlugin } from './controllers/federation/invite.controller'; import { mediaPlugin } from './controllers/federation/media.controller'; import { profilesPlugin } from './controllers/federation/profiles.controller'; @@ -33,16 +39,21 @@ export async function setup() { dotenv.config({ path: envPath }); } + const dbUri = process.env.MONGO_URL || 'mongodb://localhost:27017/matrix'; + const dbPoolSize = Number.parseInt(process.env.DATABASE_POOL_SIZE || '10', 10); + await init({ dbConfig: { - uri: process.env.MONGO_URL || 'mongodb://localhost:27017/matrix', - poolSize: Number.parseInt(process.env.DATABASE_POOL_SIZE || '10', 10), + uri: dbUri, + poolSize: dbPoolSize, }, }); - federationSDK.setConfig({ + const serverName = process.env.SERVER_NAME || 'rc1'; + + await federationSDK.setConfig({ instanceId: crypto.randomUUID(), - serverName: process.env.SERVER_NAME || 'rc1', + serverName, port: Number.parseInt(process.env.SERVER_PORT || '8080', 10), matrixDomain: process.env.MATRIX_DOMAIN || 'rc1', keyRefreshInterval: Number.parseInt(process.env.MATRIX_KEY_REFRESH_INTERVAL || '60', 10), @@ -88,17 +99,19 @@ export async function setup() { info: { title: 'Matrix Homeserver API', version: '1.0.0', - description: 'Matrix Protocol Implementation - Federation and Internal APIs', + description: 'Matrix Protocol Implementation - Federation, Client-Server, and Application Service APIs', }, }, }), ) + // Federation API .use(invitePlugin) .use(statePlugin) .use(profilesPlugin) .use(sendJoinPlugin) .use(transactionsPlugin) .use(versionsPlugin) + // Internal API .use(internalDirectMessagePlugin) .use(internalInvitePlugin) .use(internalMessagePlugin) @@ -108,7 +121,15 @@ export async function setup() { .use(wellKnownPlugin) .use(roomPlugin) .use(mediaPlugin) - .use(internalRequestPlugin); + .use(internalRequestPlugin) + // Client-Server API (for bridges) + .use(clientRegisterPlugin(serverName)) + .use(clientEventsPlugin(serverName)) + .use(clientRoomsPlugin(serverName)) + .use(clientProfilePlugin(serverName)) + .use(clientDirectoryPlugin(serverName)) + .use(clientThirdPartyPlugin(serverName)) + .use(clientAppservicePingPlugin(serverName)); return { app }; } diff --git a/packages/homeserver/src/middlewares/appserviceAuth.ts b/packages/homeserver/src/middlewares/appserviceAuth.ts new file mode 100644 index 000000000..0298631bc --- /dev/null +++ b/packages/homeserver/src/middlewares/appserviceAuth.ts @@ -0,0 +1,79 @@ +import type { CachedAppService } from '@rocket.chat/federation-sdk'; +import { federationSDK } from '@rocket.chat/federation-sdk'; + +export interface AppServiceAuthResult { + appservice: CachedAppService | undefined; + actingUserId: string | undefined; + timestampOverride: number | undefined; + error?: { status: number; errcode: string; error: string }; +} + +/** + * Resolve Application Service authentication from a request. + * This is a utility function used by client-server API controllers. + */ +export function resolveAppServiceAuth( + request: Request, + headers: Record, + serverName: string, +): AppServiceAuthResult { + let asToken: string | undefined; + + // Extract token from Authorization: Bearer + const authHeader = headers.authorization; + if (authHeader?.startsWith('Bearer ')) { + asToken = authHeader.slice(7); + } + + // Fallback: legacy ?access_token= query param + if (!asToken) { + const url = new URL(request.url); + asToken = url.searchParams.get('access_token') ?? undefined; + } + + if (!asToken) { + return { appservice: undefined, actingUserId: undefined, timestampOverride: undefined }; + } + + const appservice = federationSDK.getRegistrationByAsToken(asToken); + + if (!appservice) { + return { + appservice: undefined, + actingUserId: undefined, + timestampOverride: undefined, + error: { status: 401, errcode: 'M_UNKNOWN_TOKEN', error: 'Invalid application service token' }, + }; + } + + // Resolve acting user from ?user_id= param + const url = new URL(request.url); + const userIdParam = url.searchParams.get('user_id'); + let actingUserId = `@${appservice.registration.senderLocalpart}:${serverName}`; + + if (userIdParam) { + const isInNamespace = federationSDK.isUserInAppServiceNamespace(userIdParam, appservice.registration._id); + + const senderUserId = `@${appservice.registration.senderLocalpart}:${serverName}`; + if (!isInNamespace && userIdParam !== senderUserId) { + return { + appservice: undefined, + actingUserId: undefined, + timestampOverride: undefined, + error: { status: 403, errcode: 'M_EXCLUSIVE', error: 'User is not within the appservice namespace' }, + }; + } + + actingUserId = userIdParam; + } + + // Extract optional ?ts= for timestamp massaging + const tsParam = url.searchParams.get('ts'); + const timestampOverride = tsParam ? Number.parseInt(tsParam, 10) : undefined; + + return { + appservice, + actingUserId, + timestampOverride: timestampOverride && !Number.isNaN(timestampOverride) ? timestampOverride : undefined, + }; +} diff --git a/packages/homeserver/tsconfig.json b/packages/homeserver/tsconfig.json index 050cae67f..da938513c 100644 --- a/packages/homeserver/tsconfig.json +++ b/packages/homeserver/tsconfig.json @@ -12,7 +12,8 @@ { "path": "../core" }, { "path": "../crypto" }, { "path": "../federation-sdk" }, - { "path": "../room" } + { "path": "../room" }, + { "path": "../appservice" } ], "include": ["src/**/*"], "exclude": ["node_modules", "dist", "**/*.spec.ts", "**/*.test.ts"] diff --git a/packages/room/src/manager/room-state.ts b/packages/room/src/manager/room-state.ts index 83794c73e..a3c951a5d 100644 --- a/packages/room/src/manager/room-state.ts +++ b/packages/room/src/manager/room-state.ts @@ -157,6 +157,29 @@ export class RoomState { return users; } + // Aliases this room advertises via m.room.canonical_alias (canonical + alternates). + // Does not include aliases registered only in a homeserver's local directory. + getCanonicalAliases(): string[] { + const event = getStateByMapKey(this.stateMap, { + type: 'm.room.canonical_alias', + }); + + if (!event?.isCanonicalAliasEvent()) { + return []; + } + + const content = event.getContent(); + const aliases: string[] = []; + + if (content.alias) { + aliases.push(content.alias); + } + if (content.alt_aliases?.length) { + aliases.push(...content.alt_aliases); + } + return aliases; + } + getMemberJoinEvents() { const events = [] as PersistentEventBase[]; for (const event of this.stateMap.values()) { diff --git a/tsconfig.base.json b/tsconfig.base.json index 0a0c7c19b..77f157a5f 100644 --- a/tsconfig.base.json +++ b/tsconfig.base.json @@ -51,6 +51,9 @@ ], "@rocket.chat/federation-room/*": [ "packages/room/src/*" + ], + "@rocket.chat/appservice/*": [ + "packages/appservice/src/*" ] } }, diff --git a/tsconfig.json b/tsconfig.json index 748ed619b..06a14dd8b 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -12,6 +12,7 @@ { "path": "./packages/crypto" }, { "path": "./packages/federation-sdk" }, { "path": "./packages/homeserver" }, - { "path": "./packages/room" } + { "path": "./packages/room" }, + { "path": "./packages/appservice" } ] }