diff --git a/biome.jsonc b/biome.jsonc index c9464dc7e..8c7e79810 100644 --- a/biome.jsonc +++ b/biome.jsonc @@ -42,7 +42,7 @@ "noAssignInExpressions": "error", "noAsyncPromiseExecutor": "error", "noDoubleEquals": "error", - "noExplicitAny": "warn", + "noExplicitAny": "error", "noFocusedTests": "error", "noImplicitAnyLet": "error", "noShadowRestrictedNames": "error", diff --git a/build.js b/build.js index f00a62cfd..d588642b8 100644 --- a/build.js +++ b/build.js @@ -1,8 +1,39 @@ // biome-ignore lint/style/noCommonJs: build script runs as CommonJS const { build } = require('esbuild') +// biome-ignore lint/style/noCommonJs: build script runs as CommonJS +const { readdirSync, rmSync } = require('node:fs') +// biome-ignore lint/style/noCommonJs: build script runs as CommonJS +const { join, sep } = require('node:path') + +function collectEntryPoints(dir) { + const entryPoints = [] + + for (const entry of readdirSync(dir, { withFileTypes: true })) { + const fullPath = join(dir, entry.name) + + if (entry.isDirectory()) { + if (fullPath === join('src', 'typecheck')) { + continue + } + + entryPoints.push(...collectEntryPoints(fullPath)) + continue + } + + if (entry.isFile() && entry.name.endsWith('.ts')) { + entryPoints.push(`./${fullPath.split(sep).join('/')}`) + } + } + + return entryPoints +} + +// Typecheck sentinels are enforced by `tsc -noEmit` +// keep them out of emitted build output. +rmSync('dist/typecheck', { recursive: true, force: true }) build({ - entryPoints: ['./src/**/*.ts'], + entryPoints: collectEntryPoints('src').sort(), bundle: false, outdir: 'dist', platform: 'node', diff --git a/src/http/error-handler.ts b/src/http/error-handler.ts index a99cd87cd..e39318565 100644 --- a/src/http/error-handler.ts +++ b/src/http/error-handler.ts @@ -14,7 +14,7 @@ export const setErrorHandler = ( app: FastifyInstance, options?: { respectStatusCode?: boolean - formatter?: (error: StorageError) => Record + formatter?: (error: StorageError) => unknown } ) => { app.setErrorHandler(function (error, request, reply) { diff --git a/src/http/plugins/log-request.ts b/src/http/plugins/log-request.ts index 56e8e7a69..77f31b784 100644 --- a/src/http/plugins/log-request.ts +++ b/src/http/plugins/log-request.ts @@ -1,4 +1,5 @@ import { logger, logSchema, redactQueryParamFromRequest } from '@internal/monitoring' +import { RouteGenericInterface } from 'fastify' import { FastifyReply } from 'fastify/types/reply' import { FastifyRequest } from 'fastify/types/request' import fastifyPlugin from 'fastify-plugin' @@ -7,6 +8,11 @@ interface RequestLoggerOptions { excludeUrls?: string[] } +type RawRequestMetadata = FastifyRequest['raw'] & { + executionError?: Error + resources?: string[] +} + declare module 'fastify' { interface FastifyRequest { executionError?: Error @@ -18,8 +24,8 @@ declare module 'fastify' { interface FastifyContextConfig { operation?: { type: string } - resources?: (req: FastifyRequest) => string[] - logMetadata?: (req: FastifyRequest) => Record + resources?(req: FastifyRequest): string[] + logMetadata?(req: FastifyRequest): Record } } @@ -64,30 +70,11 @@ export const logRequest = (options: RequestLoggerOptions) => } if (resources === undefined) { - resources = (req.raw as any).resources + resources = getRawRequest(req).resources } if (resources === undefined) { - const params = req.params as Record | undefined - let resourceFromParams = '' - - if (params) { - let first = true - for (const key in params) { - if (!Object.prototype.hasOwnProperty.call(params, key)) { - continue - } - - if (!first) { - resourceFromParams += '/' - } - - const value = params[key] - resourceFromParams += value == null ? '' : String(value) - first = false - } - } - + const resourceFromParams = getResourceFromParams(req.params) resources = resourceFromParams ? [resourceFromParams] : [] } @@ -150,7 +137,7 @@ function doRequestLog(req: FastifyRequest, options: LogRequestOptions) { const rId = req.id const cIP = req.ip const statusCode = options.statusCode - const error = (req.raw as any).executionError || req.executionError + const error = getRawRequest(req).executionError || req.executionError const tenantId = req.tenantId let reqMetadata: Record = {} @@ -197,3 +184,32 @@ function doRequestLog(req: FastifyRequest, options: LogRequestOptions) { serverTimes: req.serverTimings, }) } + +function getRawRequest(req: FastifyRequest): RawRequestMetadata { + return req.raw as RawRequestMetadata +} + +function getResourceFromParams(params: unknown): string { + if (!params || typeof params !== 'object') { + return '' + } + + let resource = '' + let first = true + + for (const key in params) { + if (!Object.prototype.hasOwnProperty.call(params, key)) { + continue + } + + if (!first) { + resource += '/' + } + + const value = (params as Record)[key] + resource += value == null ? '' : String(value) + first = false + } + + return resource +} diff --git a/src/http/routes-helper.ts b/src/http/routes-helper.ts index 1cfb821fd..215a3fa79 100644 --- a/src/http/routes-helper.ts +++ b/src/http/routes-helper.ts @@ -1,4 +1,5 @@ type BucketResponseType = { message: string; statusCode?: string; error?: string } +type SchemaObject = Record /** * Create generic response for all buckets @@ -23,7 +24,10 @@ function createResponse(message: string, status?: string, error?: string): Bucke return response } -function createDefaultSchema(successResponseSchema: any, properties: any): any { +function createDefaultSchema( + successResponseSchema: SchemaObject, + properties: SchemaObject +): SchemaObject { return { headers: { $ref: 'authSchema#' }, response: { diff --git a/src/http/routes/admin/migrations.ts b/src/http/routes/admin/migrations.ts index de9bb63f8..ec55a2baa 100644 --- a/src/http/routes/admin/migrations.ts +++ b/src/http/routes/admin/migrations.ts @@ -13,6 +13,15 @@ import apiKey from '../../plugins/apikey' const { pgQueueEnable } = getConfig() const migrationQueueName = RunMigrationsOnTenants.getQueueName() +type ResetFleetBody = { + untilMigration?: unknown + markCompletedTillMigration?: unknown +} + +type FailedQuery = { + cursor?: string +} + export default async function routes(fastify: FastifyInstance) { fastify.register(apiKey) @@ -31,7 +40,7 @@ export default async function routes(fastify: FastifyInstance) { return reply.status(400).send({ message: 'Queue is not enabled' }) } - const { untilMigration, markCompletedTillMigration } = req.body as Record + const { untilMigration, markCompletedTillMigration } = req.body as ResetFleetBody if (!isDBMigrationName(untilMigration)) { return reply.status(400).send({ message: 'Invalid migration' }) @@ -96,7 +105,8 @@ export default async function routes(fastify: FastifyInstance) { if (!pgQueueEnable) { return reply.code(400).send({ message: 'Queue is not enabled' }) } - const offset = (req.query as any).cursor ? Number((req.query as any).cursor) : 0 + const { cursor } = req.query as FailedQuery + const offset = cursor ? Number(cursor) : 0 const failed = await multitenantKnex .table('tenants') diff --git a/src/http/routes/admin/queue.ts b/src/http/routes/admin/queue.ts index 2af65c583..5a8a05dd4 100644 --- a/src/http/routes/admin/queue.ts +++ b/src/http/routes/admin/queue.ts @@ -5,6 +5,11 @@ import { getConfig } from '../../../config' import apiKey from '../../plugins/apikey' const { pgQueueEnable } = getConfig() +// Empty ref/host mark system-scoped events and intentionally skip per-tenant shouldSend lookups. +const systemTenant = { + ref: '', + host: '', +} as const const moveJobsSchema = { body: { @@ -37,7 +42,9 @@ export default async function routes(fastify: FastifyInstance) { return reply.status(400).send({ message: 'Queue is not enabled' }) } - await UpgradePgBossV10.send({}) + await UpgradePgBossV10.send({ + tenant: systemTenant, + }) return reply.send({ message: 'Migration scheduled' }) }) @@ -58,6 +65,7 @@ export default async function routes(fastify: FastifyInstance) { fromQueue, toQueue, deleteJobsFromOriginalQueue, + tenant: systemTenant, }) return reply.send({ message: 'Move jobs scheduled' }) diff --git a/src/http/routes/iceberg/table.ts b/src/http/routes/iceberg/table.ts index a752f0f74..4f0c3603c 100644 --- a/src/http/routes/iceberg/table.ts +++ b/src/http/routes/iceberg/table.ts @@ -518,12 +518,10 @@ export default async function routes(fastify: FastifyInstance) { try { if (typeof payload === 'string') return done(null, JSONBigint.parse(payload)) if (Buffer.isBuffer(payload)) return done(null, JSONBigint.parse(payload.toString('utf8'))) - if (payload && typeof (payload as any).on === 'function') { + if (isReadablePayload(payload)) { const chunks: Buffer[] = [] - ;(payload as NodeJS.ReadableStream).on('data', (c) => - chunks.push(Buffer.isBuffer(c) ? c : Buffer.from(String(c))) - ) - ;(payload as NodeJS.ReadableStream).on('end', () => { + payload.on('data', (c) => chunks.push(Buffer.isBuffer(c) ? c : Buffer.from(String(c)))) + payload.on('end', () => { try { done(null, JSONBigint.parse(Buffer.concat(chunks).toString('utf8'))) } catch (err) { @@ -574,3 +572,9 @@ export default async function routes(fastify: FastifyInstance) { ) }) } + +function isReadablePayload(payload: unknown): payload is NodeJS.ReadableStream { + return ( + !!payload && typeof payload === 'object' && 'on' in payload && typeof payload.on === 'function' + ) +} diff --git a/src/http/routes/render/rate-limiter.ts b/src/http/routes/render/rate-limiter.ts index 05fedade2..92d9be7a3 100644 --- a/src/http/routes/render/rate-limiter.ts +++ b/src/http/routes/render/rate-limiter.ts @@ -13,7 +13,7 @@ const { rateLimiterRenderPathMaxReqSec, } = getConfig() -export const rateLimiter = fp((fastify: FastifyInstance, ops: any, done: () => void) => { +export const rateLimiter = fp((fastify: FastifyInstance, _ops: unknown, done: () => void) => { fastify.register(fastifyRateLimit, { global: true, max: rateLimiterRenderPathMaxReqSec * 4, diff --git a/src/http/routes/s3/error-handler.ts b/src/http/routes/s3/error-handler.ts index df9e40f45..1e8914c8f 100644 --- a/src/http/routes/s3/error-handler.ts +++ b/src/http/routes/s3/error-handler.ts @@ -5,12 +5,18 @@ import { FastifyReply } from 'fastify/types/reply' import { FastifyRequest } from 'fastify/types/request' import { DatabaseError } from 'pg' +type ValidationIssue = { + instancePath?: string + message?: string +} + export const s3ErrorHandler = ( error: FastifyError | Error, request: FastifyRequest, reply: FastifyReply ) => { request.executionError = error + const validation = getValidationIssues(error) const resource = request.url .split('?')[0] @@ -19,12 +25,12 @@ export const s3ErrorHandler = ( .filter((e) => e) .join('/') - if ('validation' in error) { + if (validation) { return reply.status(400).send({ Error: { Resource: resource, Code: ErrorCode.InvalidRequest, - Message: formatValidationError(error.validation).message, + Message: formatValidationError(validation).message, }, }) } @@ -91,7 +97,20 @@ export const s3ErrorHandler = ( }) } -function formatValidationError(errors: any) { +function isValidationIssueArray(value: unknown): value is ValidationIssue[] { + return Array.isArray(value) +} + +function getValidationIssues(error: FastifyError | Error): ValidationIssue[] | undefined { + if (!('validation' in error)) { + return undefined + } + + const value = error.validation + return isValidationIssueArray(value) ? value : undefined +} + +function formatValidationError(errors: readonly ValidationIssue[]) { let text = '' const separator = ', ' diff --git a/src/http/routes/s3/index.ts b/src/http/routes/s3/index.ts index 0594c0edd..90c89d67c 100644 --- a/src/http/routes/s3/index.ts +++ b/src/http/routes/s3/index.ts @@ -66,12 +66,12 @@ export default async function routes(fastify: FastifyInstance) { req.opentelemetry()?.span?.setAttribute('http.operation', req.operation.type) } - const data: RequestInput = { + const data = { Params: req.params, Body: req.body, Headers: req.headers, Querystring: req.query, - } + } as unknown as RequestInput const compiler = route.compiledSchema() const isValid = compiler(data) diff --git a/src/http/routes/s3/router.ts b/src/http/routes/s3/router.ts index 172bd7ec3..ecff7017e 100644 --- a/src/http/routes/s3/router.ts +++ b/src/http/routes/s3/router.ts @@ -122,7 +122,7 @@ type Route = { compiledSchema: () => ValidateFunction> } -interface RouteOptions { +interface RouteOptions { disableContentTypeParser?: boolean allowEmptyJsonBody?: boolean acceptMultiformData?: boolean @@ -143,11 +143,11 @@ export class Router { allErrors: false, }) - registerRoute( + registerRoute( method: HTTPMethod, url: string, - options: RouteOptions, - handler: Handler + options: RouteOptions, + handler: Handler ) { const { query, headers } = this.parseRequestInfo(url) const normalizedUrl = url.split('?')[0].split('|')[0] @@ -204,14 +204,15 @@ export class Router { ) } - const newRoute: Route = { + const newRoute: Route = { method: method as HTTPMethod, path: normalizedUrl, querystringMatches: query, headersMatches: headers, schema, - compiledSchema: () => this.ajv.getSchema(method + url) as ValidateFunction>, - handler: handler as Handler, + compiledSchema: () => + this.ajv.getSchema(method + url) as ValidateFunction>, + handler, disableContentTypeParser, allowEmptyJsonBody, acceptMultiformData, @@ -228,24 +229,59 @@ export class Router { this._routes.set(normalizedUrl, existingPath) } - get(url: string, options: RouteOptions, handler: Handler) { - this.registerRoute('get', url, options, handler as any) + // Route storage is schema-erased; the public helpers preserve per-route inference for callers. + private registerRouteErased( + method: HTTPMethod, + url: string, + options: RouteOptions, + handler: Handler + ) { + this.registerRoute(method, url, options, handler) + } + + get(url: string, options: RouteOptions, handler: Handler) { + this.registerRouteErased( + 'get', + url, + options as unknown as RouteOptions, + handler as unknown as Handler + ) } - post(url: string, options: RouteOptions, handler: Handler) { - this.registerRoute('post', url, options, handler as any) + post(url: string, options: RouteOptions, handler: Handler) { + this.registerRouteErased( + 'post', + url, + options as unknown as RouteOptions, + handler as unknown as Handler + ) } - put(url: string, options: RouteOptions, handler: Handler) { - this.registerRoute('put', url, options, handler as any) + put(url: string, options: RouteOptions, handler: Handler) { + this.registerRouteErased( + 'put', + url, + options as unknown as RouteOptions, + handler as unknown as Handler + ) } - delete(url: string, options: RouteOptions, handler: Handler) { - this.registerRoute('delete', url, options, handler as any) + delete(url: string, options: RouteOptions, handler: Handler) { + this.registerRouteErased( + 'delete', + url, + options as unknown as RouteOptions, + handler as unknown as Handler + ) } - head(url: string, options: RouteOptions, handler: Handler) { - this.registerRoute('head', url, options, handler as any) + head(url: string, options: RouteOptions, handler: Handler) { + this.registerRouteErased( + 'head', + url, + options as unknown as RouteOptions, + handler as unknown as Handler + ) } parseQueryMatch(query: string): QuerystringMatch { diff --git a/src/http/routes/tus/lifecycle.ts b/src/http/routes/tus/lifecycle.ts index e8c4f497a..114054bab 100644 --- a/src/http/routes/tus/lifecycle.ts +++ b/src/http/routes/tus/lifecycle.ts @@ -31,6 +31,7 @@ function getNodeRequest(rawReq: Request): MultiPartRequest { return req } export type MultiPartRequest = http.IncomingMessage & { + executionError?: Error log: BaseLogger upload: { storage: Storage @@ -326,8 +327,9 @@ export async function onUploadFinish(rawReq: Request, upload: Upload) { } } catch (e) { if (isRenderableError(e)) { - ;(e as any).status_code = parseInt(e.render().statusCode, 10) - ;(e as any).body = e.render().message + const renderableError = e as unknown as Error & Partial + renderableError.status_code = parseInt(e.render().statusCode, 10) + renderableError.body = e.render().message } throw e } @@ -342,9 +344,9 @@ export function onResponseError(rawReq: Request, e: TusError | Error) { const req = getNodeRequest(rawReq) if (e instanceof Error) { - ;(req as any).executionError = e + req.executionError = e } else { - ;(req as any).executionError = ERRORS.TusError(e.body, e.status_code).withMetadata(e) + req.executionError = ERRORS.TusError(e.body, e.status_code).withMetadata(e) } if (isRenderableError(e)) { diff --git a/src/http/routes/vector/query-vectors.ts b/src/http/routes/vector/query-vectors.ts index d9d4276b3..2b072bd40 100644 --- a/src/http/routes/vector/query-vectors.ts +++ b/src/http/routes/vector/query-vectors.ts @@ -112,7 +112,7 @@ export default async function routes(fastify: FastifyInstance) { coerceTypes: false, }) - const perRouteValidator: FastifySchemaCompiler = ({ schema }) => { + const perRouteValidator: FastifySchemaCompiler = ({ schema }) => { const validate = ajvNoRemoval.compile(schema as object) return (data) => { const ok = validate(data) diff --git a/src/internal/auth/jwks/manager.ts b/src/internal/auth/jwks/manager.ts index a46c25a7d..cec6a62ff 100644 --- a/src/internal/auth/jwks/manager.ts +++ b/src/internal/auth/jwks/manager.ts @@ -15,6 +15,10 @@ import { JWKSManagerStore } from './store' const JWK_KIND_STORAGE_URL_SIGNING = 'storage-url-signing-key' const JWK_KID_SEPARATOR = '_' +function isTenantCacheKeyMessage(message: unknown): message is string { + return typeof message === 'string' +} + const tenantJwksMutex = createMutexByKey() export const TENANT_JWKS_CACHE_MAX_ITEMS = 16384 export const TENANT_JWKS_CACHE_MAX_SIZE_BYTES = 1024 * 1024 * 50 // 50 MiB @@ -50,6 +54,10 @@ export class JWKSManager { */ async listenForTenantUpdate(pubSub: PubSubAdapter): Promise { await pubSub.subscribe(TENANTS_JWKS_UPDATE_CHANNEL, (cacheKey) => { + if (!isTenantCacheKeyMessage(cacheKey)) { + return + } + tenantJwksConfigCache.delete(cacheKey) }) } diff --git a/src/internal/cluster/cluster.ts b/src/internal/cluster/cluster.ts index 106c75726..e462ebd07 100644 --- a/src/internal/cluster/cluster.ts +++ b/src/internal/cluster/cluster.ts @@ -5,11 +5,18 @@ import { logger } from '@internal/monitoring' const clusterEvent = new EventEmitter() +interface ClusterEvents { + change: { size: number } +} + export class Cluster { static size: number = 0 protected static watcher?: NodeJS.Timeout = undefined - static on(event: string, listener: (...args: any[]) => void) { + static on( + event: E, + listener: (payload: ClusterEvents[E]) => void + ) { clusterEvent.on(event, listener) } diff --git a/src/internal/concurrency/async-abort-controller.ts b/src/internal/concurrency/async-abort-controller.ts index d25ec2695..be387a14d 100644 --- a/src/internal/concurrency/async-abort-controller.ts +++ b/src/internal/concurrency/async-abort-controller.ts @@ -2,31 +2,44 @@ * This special AbortController is used to wait for all the abort handlers to finish before resolving the promise. */ export class AsyncAbortController extends AbortController { - protected promises: Promise[] = [] + protected promises: Promise[] = [] protected _nextGroup?: AsyncAbortController constructor() { super() - const originalEventListener = this.signal.addEventListener + const originalEventListener = this.signal.addEventListener.bind(this.signal) // Patch event addEventListener to keep track of listeners and their promises - this.signal.addEventListener = (type: string, listener: any, options: any) => { + this.signal.addEventListener = ( + type: string, + listener: EventListenerOrEventListenerObject | null, + options?: boolean | AddEventListenerOptions + ) => { + if (!listener) { + return + } + if (type !== 'abort') { - return originalEventListener.call(this.signal, type, listener, options) + return originalEventListener(type, listener, options) } - let resolving: undefined | (() => Promise) = undefined + let resolving: ((event: Event) => Promise) | undefined const promise = new Promise((resolve, reject) => { - resolving = async (): Promise => { - return Promise.resolve() - .then(() => listener()) - .then(() => { + resolving = (event: Event): Promise => { + try { + const result = + typeof listener === 'function' + ? listener.call(this.signal, event) + : listener.handleEvent(event) + + return Promise.resolve(result).then(() => { resolve() - }) - .catch((error) => { - reject(error) - }) + }, reject) + } catch (error) { + reject(error) + return Promise.resolve() + } } }) this.promises.push(promise) @@ -35,7 +48,7 @@ export class AsyncAbortController extends AbortController { throw new Error('resolve is undefined') } - return originalEventListener.call(this.signal, type, resolving, options) + return originalEventListener(type, resolving as EventListener, options) } } diff --git a/src/internal/concurrency/merge-async-itertor.ts b/src/internal/concurrency/merge-async-itertor.ts index 8912c6853..e279cc90d 100644 --- a/src/internal/concurrency/merge-async-itertor.ts +++ b/src/internal/concurrency/merge-async-itertor.ts @@ -1,8 +1,8 @@ -type MergedYield>> = { +type MergedYield>> = { [K in keyof Gens]: Gens[K] extends AsyncGenerator ? { type: K; value: V } : never }[keyof Gens] -export async function* mergeAsyncGenerators>>( +export async function* mergeAsyncGenerators>>( gens: Gens ): AsyncGenerator> { // Convert the input object into an array of [name, generator] tuples diff --git a/src/internal/database/migrations/migrate.ts b/src/internal/database/migrations/migrate.ts index b4a3c7d40..d762c63b0 100644 --- a/src/internal/database/migrations/migrate.ts +++ b/src/internal/database/migrations/migrate.ts @@ -682,15 +682,18 @@ function runMigrations({ for (const migration of migrationsToRun) { try { const ignore = migration.sql.includes('-- postgres-migrations ignore') + const migrationToRun = ignore + ? { + ...migration, + sql: 'SELECT 1;', + contents: 'SELECT 1;', + } + : migration - if (ignore) { - ;(migration as any).sql = 'SELECT 1;' - ;(migration as any).contents = 'SELECT 1;' - } const result = await runMigration( migrationTableName, client - )(runMigrationTransformers(migration, transformers)) + )(runMigrationTransformers(migrationToRun, transformers)) completedMigrations.push(result) } catch (e) { throw ERRORS.DatabaseError( diff --git a/src/internal/database/tenant.ts b/src/internal/database/tenant.ts index 74f3dd36f..e4be452f9 100644 --- a/src/internal/database/tenant.ts +++ b/src/internal/database/tenant.ts @@ -402,11 +402,21 @@ export async function getFeatures(tenantId: string): Promise { const TENANTS_UPDATE_CHANNEL = 'tenants_update' +function isTenantCacheKeyMessage(message: unknown): message is string { + return typeof message === 'string' +} + /** * Keeps the in memory config cache up to date */ export async function listenForTenantUpdate(pubSub: PubSubAdapter): Promise { - await pubSub.subscribe(TENANTS_UPDATE_CHANNEL, onTenantConfigChange) + await pubSub.subscribe(TENANTS_UPDATE_CHANNEL, (cacheKey) => { + if (!isTenantCacheKeyMessage(cacheKey)) { + return + } + + void onTenantConfigChange(cacheKey) + }) await s3CredentialsManager.listenForTenantUpdate(pubSub) await jwksManager.listenForTenantUpdate(pubSub) } diff --git a/src/internal/errors/codes.ts b/src/internal/errors/codes.ts index 59d6c49e3..491616b7e 100644 --- a/src/internal/errors/codes.ts +++ b/src/internal/errors/codes.ts @@ -550,15 +550,16 @@ export const ERRORS = { }, } -export function isStorageError(errorType: ErrorCode, error: any): error is StorageBackendError { +export function isStorageError(errorType: ErrorCode, error: unknown): error is StorageBackendError { return error instanceof StorageBackendError && error.code === errorType } function hasStatusCode(error: Error): error is Error & { statusCode: number } { - return 'statusCode' in error && typeof (error as any).statusCode === 'number' + const statusCode = (error as Error & { statusCode?: unknown }).statusCode + return typeof statusCode === 'number' } -export function normalizeRawError(error: any) { +export function normalizeRawError(error: unknown) { if (error instanceof Error) { let statusCode = 0 if (error instanceof StorageBackendError && error.httpStatusCode) { diff --git a/src/internal/errors/storage-error.ts b/src/internal/errors/storage-error.ts index 53d80b28d..ccc20070f 100644 --- a/src/internal/errors/storage-error.ts +++ b/src/internal/errors/storage-error.ts @@ -13,7 +13,7 @@ export class StorageBackendError extends Error implements RenderableError { userStatusCode: number resource?: string code: ErrorCode - metadata?: Record = {} + metadata?: Record = {} error?: string // backwards compatible error constructor(options: StorageErrorOptions) { @@ -71,7 +71,7 @@ export class StorageBackendError extends Error implements RenderableError { return this } - withMetadata(metadata: Record) { + withMetadata(metadata: Record) { this.metadata = metadata return this } diff --git a/src/internal/monitoring/otel-class-instrumentations.ts b/src/internal/monitoring/otel-class-instrumentations.ts index c27c01b3a..8e867fd12 100644 --- a/src/internal/monitoring/otel-class-instrumentations.ts +++ b/src/internal/monitoring/otel-class-instrumentations.ts @@ -60,8 +60,9 @@ export const classInstrumentations = [ enabled: true, methodsToInstrument: ['send', 'batchSend'], setName: (name, attrs, eventClass) => { - if (attrs.constructor.name) { - return name + '.' + eventClass.constructor.name + const eventName = eventClass.constructor?.name + if (eventName) { + return name + '.' + eventName } return name }, @@ -101,7 +102,7 @@ export const classInstrumentations = [ enabled: true, methodsToInstrument: ['runQuery'], setName: (name, attrs) => { - if (attrs.queryName) { + if (typeof attrs.queryName === 'string') { return name + '.' + attrs.queryName } return name @@ -109,7 +110,7 @@ export const classInstrumentations = [ setAttributes: { runQuery: (queryName) => { return { - queryName, + queryName: String(queryName), } }, }, @@ -139,17 +140,18 @@ export const classInstrumentations = [ targetClass: StreamSplitter, enabled: true, methodsToInstrument: ['emitEvent'], - setName: (name: string, attrs: any) => { - if (attrs.event) { + setName: (name, attrs) => { + if (typeof attrs.event === 'string') { return name + '.' + attrs.event } return name }, setAttributes: { - emitEvent(event) { + emitEvent(this: unknown, event) { + const splitter = this as unknown as StreamSplitter return { - part: this.part as any, - event, + part: splitter.part, + event: String(event), } }, }, @@ -176,11 +178,12 @@ export const classInstrumentations = [ setAttributes: { send: (command) => { return { - operation: command.constructor.name as string, + operation: getConstructorName(command), } }, }, - setName: (name, attrs) => 'S3.' + attrs.operation, + setName: (name, attrs) => + typeof attrs.operation === 'string' ? 'S3.' + attrs.operation : name, }), new ClassInstrumentation({ targetClass: Upload, @@ -194,6 +197,14 @@ export const classInstrumentations = [ }), ] +function getConstructorName(value: unknown): string { + if (value && typeof value === 'object' && value.constructor?.name) { + return value.constructor.name + } + + return 'unknown' +} + export async function loadClassInstrumentations() { return classInstrumentations } diff --git a/src/internal/monitoring/otel-instrumentation.ts b/src/internal/monitoring/otel-instrumentation.ts index de8ea6c85..995b754b6 100644 --- a/src/internal/monitoring/otel-instrumentation.ts +++ b/src/internal/monitoring/otel-instrumentation.ts @@ -1,4 +1,5 @@ import { + Attributes, Context, MeterProvider, Span, @@ -9,6 +10,17 @@ import { import { Instrumentation, InstrumentationConfig } from '@opentelemetry/instrumentation' import { ReadableSpan, Span as SdkSpan, SpanProcessor } from '@opentelemetry/sdk-trace-base' +type InstrumentedMethod = ((...args: unknown[]) => unknown) & { + __original?: (...args: unknown[]) => unknown +} + +type InstrumentedPrototype = Record + +type InstrumentedClass = { + name: string + prototype: object +} + export class TenantSpanProcessor implements SpanProcessor { private readonly attributesToPropagate: string[] @@ -49,13 +61,10 @@ export class TenantSpanProcessor implements SpanProcessor { } interface GenericInstrumentationConfig extends InstrumentationConfig { - targetClass: new (...args: any[]) => any + targetClass: InstrumentedClass methodsToInstrument: string[] - setName?: (name: string, attrs: Record, targetClass: new () => any) => string - setAttributes?: Record< - GenericInstrumentationConfig['methodsToInstrument'][number], - (...args: any[]) => Record - > + setName?: (name: string, attrs: Attributes, targetClass: object) => string + setAttributes?: Partial Attributes>> } class ClassInstrumentation implements Instrumentation { @@ -101,7 +110,7 @@ class ClassInstrumentation implements Instrumentation { private patchMethods(): void { const { targetClass, methodsToInstrument } = this._config - const proto = targetClass.prototype + const proto = targetClass.prototype as InstrumentedPrototype methodsToInstrument.forEach((methodName) => { if (methodName in proto && typeof proto[methodName] === 'function') { @@ -112,21 +121,26 @@ class ClassInstrumentation implements Instrumentation { private unpatchMethods(): void { const { targetClass, methodsToInstrument } = this._config - const proto = targetClass.prototype + const proto = targetClass.prototype as InstrumentedPrototype methodsToInstrument.forEach((methodName) => { - if (methodName in proto && proto[methodName].__original) { - proto[methodName] = proto[methodName].__original + const method = proto[methodName] + if (methodName in proto && isInstrumentedMethod(method) && method.__original) { + proto[methodName] = method.__original } }) } - private patchMethod(proto: any, methodName: string): void { + private patchMethod(proto: InstrumentedPrototype, methodName: string): void { const original = proto[methodName] + if (!isInstrumentedMethod(original)) { + return + } + const instrumentationName = this.instrumentationName const instrumentation = this - proto[methodName] = function (...args: any[]) { + const wrappedMethod: InstrumentedMethod = function (this: object, ...args: unknown[]) { const tracer = trace.getTracer(instrumentationName) return tracer.startActiveSpan( @@ -176,8 +190,13 @@ class ClassInstrumentation implements Instrumentation { ) } - proto[methodName].__original = original + wrappedMethod.__original = original + proto[methodName] = wrappedMethod } } +function isInstrumentedMethod(value: unknown): value is InstrumentedMethod { + return typeof value === 'function' +} + export { ClassInstrumentation, GenericInstrumentationConfig } diff --git a/src/internal/monitoring/otel-tracing.ts b/src/internal/monitoring/otel-tracing.ts index 2ed4fd97a..aeed62f92 100644 --- a/src/internal/monitoring/otel-tracing.ts +++ b/src/internal/monitoring/otel-tracing.ts @@ -36,7 +36,7 @@ const exporterHeaders = headersEnv all[name] = value return all }, - {} as Record + {} as Record ) const grpcMetadata = new grpc.Metadata() diff --git a/src/internal/pubsub/adapter.ts b/src/internal/pubsub/adapter.ts index 62330cdb4..1a77d1722 100644 --- a/src/internal/pubsub/adapter.ts +++ b/src/internal/pubsub/adapter.ts @@ -1,8 +1,9 @@ export interface PubSubAdapter { start(): Promise - publish(channel: string, message: any): Promise - subscribe(channel: string, cb: (message: any) => void): Promise - unsubscribe(channel: string, cb: (message: any) => void): Promise + // PubSub payloads cross a runtime boundary; subscribers must validate them locally. + publish(channel: string, message: unknown): Promise + subscribe(channel: string, cb: (message: unknown) => void): Promise + unsubscribe(channel: string, cb: (message: unknown) => void): Promise close(): Promise on(event: 'error', listener: (error: Error) => void): this } diff --git a/src/internal/pubsub/postgres.ts b/src/internal/pubsub/postgres.ts index 7221a0bc2..d6363bdb5 100644 --- a/src/internal/pubsub/postgres.ts +++ b/src/internal/pubsub/postgres.ts @@ -67,7 +67,7 @@ export class PostgresPubSub extends EventEmitter implements PubSubAdapter { await this.subscriber.notify(channel, payload) } - async subscribe(channel: string, cb: (payload: any) => void): Promise { + async subscribe(channel: string, cb: (payload: unknown) => void): Promise { const listenerCount = this.subscriber.notifications.listenerCount(channel) this.subscriber.notifications.on(channel, cb) @@ -76,7 +76,7 @@ export class PostgresPubSub extends EventEmitter implements PubSubAdapter { } } - async unsubscribe(channel: string, cb: (payload: any) => void): Promise { + async unsubscribe(channel: string, cb: (payload: unknown) => void): Promise { this.subscriber.notifications.removeListener(channel, cb) const isListening = this.subscriber.notifications.listenerCount(channel) > 0 diff --git a/src/internal/queue/database.ts b/src/internal/queue/database.ts index f4b343cb3..15adb110e 100644 --- a/src/internal/queue/database.ts +++ b/src/internal/queue/database.ts @@ -79,7 +79,7 @@ export class QueueDB extends EventEmitter implements Db { } } - async executeSql(text: string, values: any[]) { + async executeSql(...[text, values]: Parameters): ReturnType { if (this.opened && this.pool) { return this.useTransaction((client) => client.query(text, values)) } @@ -97,9 +97,9 @@ export class KnexQueueDB extends EventEmitter implements Db { super() } - async executeSql(text: string, values: any[]): Promise<{ rows: any[] }> { + async executeSql(...[text, values]: Parameters): ReturnType { const knexQuery = text.replaceAll('$', ':') - const params: Record = {} + const params: Record = {} values.forEach((value, index) => { const key = (index + 1).toString() diff --git a/src/internal/queue/event.ts b/src/internal/queue/event.ts index 671fd0397..c1071358e 100644 --- a/src/internal/queue/event.ts +++ b/src/internal/queue/event.ts @@ -30,17 +30,22 @@ function withPayloadVersion( $version: payload.$version ?? version, } } +export type EventPayload = Omit +type EventInstance = Event +type EventClass = abstract new (...args: never[]) => EventInstance +type EventInput = Omit & { + $version?: string +} +export type StaticThis = BaseEventConstructor -export type StaticThis> = BaseEventConstructor - -interface BaseEventConstructor> { +interface BaseEventConstructor { version: string - new (...args: any): Base + new (...args: never[]): Event send( - this: StaticThis, - payload: Omit + this: StaticThis, + payload: EventInput> ): Promise eventName(): string @@ -50,7 +55,7 @@ interface BaseEventConstructor> { /** * Base class for all events that are sent to the queue */ -export class Event> { +export class Event { public static readonly version: string = 'v1' protected static queueName = '' protected static allowSync = true @@ -77,7 +82,7 @@ export class Event> { return undefined } - static getSendOptions>(payload: T['payload']): SendOptions | undefined { + static getSendOptions(payload: unknown): SendOptions | undefined { return undefined } @@ -93,15 +98,20 @@ export class Event> { // no-op } - static batchSend[]>(messages: T) { + static batchSend( + this: TThis & { version: string }, + messages: Array> + ) { + const eventClass = this as unknown as typeof Event + if (!pgQueueEnable) { - if (this.allowSync) { + if (eventClass.allowSync) { return Promise.all(messages.map((message) => message.send())) } else { logger.warn( { type: 'queue', - eventType: this.eventName(), + eventType: eventClass.eventName(), }, '[Queue] skipped sending batch messages' ) @@ -111,11 +121,9 @@ export class Event> { return Queue.getInstance().insert( messages.map((message) => { - const payloadWithVersion = withPayloadVersion( - message.payload as (typeof message)['payload'], - this.version - ) - const sendOptions = (this.getSendOptions(payloadWithVersion) as PgBoss.JobInsert) || {} + const payloadWithVersion = withPayloadVersion(message.payload, this.version) + const sendOptions = + (eventClass.getSendOptions(payloadWithVersion) as PgBoss.JobInsert) || {} if (payloadWithVersion.scheduleAt) { sendOptions.startAfter = new Date(payloadWithVersion.scheduleAt) @@ -123,48 +131,66 @@ export class Event> { return { ...sendOptions, - name: this.getQueueName(), + name: eventClass.getQueueName(), data: payloadWithVersion, - deadLetter: this.deadLetterQueueName(), + deadLetter: eventClass.deadLetterQueueName(), } }) ) } - static send>( - this: StaticThis, - payload: Omit, + static send( + this: TThis & { version: string }, + payload: EventInput>, opts?: SendOptions & { tnx?: Knex } ) { - const that = new this(withPayloadVersion(payload as T['payload'], this.version)) + const payloadWithVersion = withPayloadVersion( + payload as InstanceType['payload'], + this.version + ) as InstanceType['payload'] + const EventCtor = this as unknown as new ( + payload: InstanceType['payload'] + ) => InstanceType + const that = new EventCtor(payloadWithVersion) return that.send(opts) } - static invoke>( - this: StaticThis, - payload: Omit + static invoke( + this: TThis & { version: string }, + payload: EventInput> ) { - const that = new this(withPayloadVersion(payload as T['payload'], this.version)) + const payloadWithVersion = withPayloadVersion( + payload as InstanceType['payload'], + this.version + ) as InstanceType['payload'] + const EventCtor = this as unknown as new ( + payload: InstanceType['payload'] + ) => InstanceType + const that = new EventCtor(payloadWithVersion) return that.invoke() } - static invokeOrSend>( - this: StaticThis, - payload: Omit, + static invokeOrSend( + this: TThis & { version: string }, + payload: EventInput>, options?: SendOptions & { sendWhenError?: (error: unknown) => boolean } ) { - const that = new this(withPayloadVersion(payload as T['payload'], this.version)) + const payloadWithVersion = withPayloadVersion( + payload as InstanceType['payload'], + this.version + ) as InstanceType['payload'] + const EventCtor = this as unknown as new ( + payload: InstanceType['payload'] + ) => InstanceType + const that = new EventCtor(payloadWithVersion) return that.invokeOrSend(options) } - static handle( - job: Job['payload']> | Job['payload']>[], - opts?: { signal?: AbortSignal } - ) { + static handle(job: Job | Job[], opts?: { signal?: AbortSignal }) { throw new Error('not implemented') } - static async shouldSend(payload: any) { + static async shouldSend(payload: { tenant?: { ref?: string } } | null | undefined) { if (isMultitenant && payload?.tenant?.ref) { // Do not send an event if disabled for this specific tenant const tenant = await getTenantConfig(payload.tenant.ref) diff --git a/src/internal/queue/queue.ts b/src/internal/queue/queue.ts index 102fc7009..9d66fe214 100644 --- a/src/internal/queue/queue.ts +++ b/src/internal/queue/queue.ts @@ -5,11 +5,11 @@ import PgBoss, { Db, Job, JobWithMetadata } from 'pg-boss' import { getConfig } from '../../config' import { logger, logSchema } from '../monitoring' import { queueJobCompleted, queueJobError, queueJobRetryFailed } from '../monitoring/metrics' -import { Event } from './event' +import { Event, EventPayload } from './event' -type SubclassOfBaseClass = (new ( - payload: any -) => Event) & { +type SubclassOfBaseClass = (abstract new ( + ...args: never[] +) => Event) & { [K in keyof typeof Event]: (typeof Event)[K] } diff --git a/src/internal/sharding/knex.ts b/src/internal/sharding/knex.ts index ff93730b2..67f85294b 100644 --- a/src/internal/sharding/knex.ts +++ b/src/internal/sharding/knex.ts @@ -35,11 +35,24 @@ export class KnexShardStoreFactory implements ShardStoreFactory = { + rows: Row[] + rowCount?: number +} + +type ShardStatsRow = { + shard_id: string | number + shard_key: string + capacity: string | number + used: string | number + free: string | number +} + class KnexShardStore implements ShardStore { constructor(private db: Knex | Knex.Transaction) {} - private q(sql: string, params?: any[]) { - return this.db.raw(sql, params as any) + private q(sql: string, params: readonly Knex.Value[] = []) { + return this.db.raw>(sql, params) } async findShardById(shardId: number): Promise { @@ -90,7 +103,7 @@ class KnexShardStore implements ShardStore { } async findShardWithLeastFreeCapacity(kind: ResourceKind): Promise { - const result = await this.q<{ rows: ShardRow[] }>( + const result = await this.q( ` WITH candidates AS ( SELECT s.*, @@ -160,7 +173,7 @@ class KnexShardStore implements ShardStore { */ async reserveOneSlotOnShard(shardId: string | number, tenantId: string): Promise { // 1) Try to claim a free existing row - const claimed = await this.q<{ rows: { slot_no: number }[] }>( + const claimed = await this.q<{ slot_no: number }>( ` WITH pick AS ( SELECT ss.slot_no @@ -189,7 +202,7 @@ class KnexShardStore implements ShardStore { if (claimed.rows.length) return claimed.rows[0].slot_no // 2) Mint a fresh slot_no by bumping shard.next_slot (bounded by capacity) - const minted = await this.q<{ rows: { slot_no: number }[] }>( + const minted = await this.q<{ slot_no: number }>( ` WITH ok AS ( SELECT id, capacity, next_slot @@ -219,8 +232,8 @@ class KnexShardStore implements ShardStore { tenant_id: tenantId, }) return slotNo - } catch (e: any) { - if (e?.code === '23505') { + } catch (e: unknown) { + if (hasErrorCode(e, '23505')) { // Extremely rare race if another tx inserted the same slot first. Let caller try another shard/attempt. return null } @@ -248,13 +261,13 @@ class KnexShardStore implements ShardStore { shard_id: data.shardId, slot_no: data.slotNo, status: 'pending', - lease_expires_at: (this.db as any).raw(`now() + interval '${data.leaseMs} milliseconds'`), + lease_expires_at: this.db.raw(`now() + interval '${data.leaseMs} milliseconds'`), }) .returning(['lease_expires_at']) return row[0] - } catch (e: any) { - if (e?.code === '23505') throw new UniqueViolationError() + } catch (e: unknown) { + if (hasErrorCode(e, '23505')) throw new UniqueViolationError() throw e } } @@ -265,7 +278,7 @@ class KnexShardStore implements ShardStore { resourceId: string, tenantId: string ): Promise { - const res = await this.q( + const res = await this.q( ` WITH ok AS ( SELECT r.shard_id, r.slot_no @@ -291,7 +304,7 @@ class KnexShardStore implements ShardStore { `, [reservationId, tenantId, resourceId, reservationId] ) - return (res as any).rowCount ?? (res as any).rows.length + return res.rowCount ?? res.rows.length } async updateReservationStatus( @@ -379,7 +392,7 @@ class KnexShardStore implements ShardStore { } async shardStats(kind?: ResourceKind) { - const res = await this.q( + const res = await this.q( ` SELECT s.id AS shard_id, s.shard_key, s.capacity, s.next_slot, -- confirmed allocations @@ -409,7 +422,7 @@ class KnexShardStore implements ShardStore { kind ? [kind] : [] ) - return (res as any).rows.map((r: any) => ({ + return res.rows.map((r: ShardStatsRow) => ({ shardId: String(r.shard_id), shardKey: r.shard_key, capacity: Number(r.capacity), @@ -418,3 +431,7 @@ class KnexShardStore implements ShardStore { })) } } + +function hasErrorCode(error: unknown, code: string): error is { code: string } { + return !!error && typeof error === 'object' && 'code' in error && error.code === code +} diff --git a/src/start/server.ts b/src/start/server.ts index c3365b25b..896532df0 100644 --- a/src/start/server.ts +++ b/src/start/server.ts @@ -244,5 +244,12 @@ async function httpAdminServer( } async function upgrades() { - return Promise.all([SyncCatalogIds.invoke({})]) + return Promise.all([ + SyncCatalogIds.invoke({ + tenant: { + ref: '', + host: '', + }, + }), + ]) } diff --git a/src/storage/backend/adapter.ts b/src/storage/backend/adapter.ts index b4cb61e40..1516cae94 100644 --- a/src/storage/backend/adapter.ts +++ b/src/storage/backend/adapter.ts @@ -16,7 +16,7 @@ export interface BrowserCacheHeaders { export type ObjectResponse = { metadata: ObjectMetadata httpStatusCode: number - body?: ReadableStream | Readable | Blob | Buffer + body?: ReadableStream | Readable | Blob | Buffer } /** @@ -48,7 +48,7 @@ export type UploadPart = { * A generic storage Adapter to interact with files */ export abstract class StorageBackendAdapter { - client: any + client: unknown constructor() { this.client = null } diff --git a/src/storage/backend/file.ts b/src/storage/backend/file.ts index 13d236b9b..50839923b 100644 --- a/src/storage/backend/file.ts +++ b/src/storage/backend/file.ts @@ -203,7 +203,7 @@ export class FileBackend implements StorageBackendAdapter { ...metadata, httpStatusCode: 200, } - } catch (err: any) { + } catch (err: unknown) { if (err instanceof StorageBackendError) { throw err } @@ -226,7 +226,7 @@ export class FileBackend implements StorageBackendAdapter { await this.cleanupEmptyDirectories(path.dirname(file)) } catch (e) { if (e instanceof Error && 'code' in e) { - if ((e as any).code === 'ENOENT') { + if (e.code === 'ENOENT') { return } throw e diff --git a/src/storage/database/adapter.ts b/src/storage/database/adapter.ts index d90b0beaf..dc673d595 100644 --- a/src/storage/database/adapter.ts +++ b/src/storage/database/adapter.ts @@ -64,12 +64,12 @@ export interface Database { asSuperUser(): Database - withTransaction Promise>( - fn: T, + withTransaction( + fn: (db: Database) => Promise, transactionOptions?: TransactionOptions - ): Promise> + ): Promise - testPermission any>(fn: T): Promise>> + testPermission(fn: (db: Database) => T | Promise): Promise> createBucket( data: Pick< @@ -149,7 +149,6 @@ export interface Database { name: string, data: Pick ): Promise - createObject( data: Pick ): Promise diff --git a/src/storage/database/knex.ts b/src/storage/database/knex.ts index 5e50fa510..a192e2078 100644 --- a/src/storage/database/knex.ts +++ b/src/storage/database/knex.ts @@ -33,7 +33,7 @@ export function escapeLike(str: string) { } class TestPermissionRollbackError extends Error { - constructor() { + constructor(readonly result: unknown) { super('Rollback test permission transaction') this.name = 'TestPermissionRollbackError' Object.setPrototypeOf(this, TestPermissionRollbackError.prototype) @@ -62,10 +62,10 @@ export class StorageKnexDB implements Database { this.latestMigration = options.latestMigration } - async withTransaction Promise>( - fn: T, + async withTransaction( + fn: (db: Database) => Promise, opts?: TransactionOptions - ) { + ): Promise { const tnx = await this.connection.transactionProvider(this.options.tnx, opts)() try { @@ -78,7 +78,7 @@ export class StorageKnexDB implements Database { const opts = { ...this.options, tnx } const storageWithTnx = new StorageKnexDB(this.connection, opts) - const result: Awaited> = await fn(storageWithTnx) + const result = await fn(storageWithTnx) await tnx.commit() return result } catch (e) { @@ -102,19 +102,17 @@ export class StorageKnexDB implements Database { }) } - async testPermission any>(fn: T) { - let result: any - try { - await this.withTransaction(async (db) => { - result = await fn(db) - throw new TestPermissionRollbackError() - }) - } catch (e) { + async testPermission(fn: (db: Database) => T | Promise): Promise> { + return this.withTransaction(async (db) => { + const result = await fn(db) + throw new TestPermissionRollbackError(result) + }).catch((e) => { if (e instanceof TestPermissionRollbackError) { - return result + return e.result as Awaited } + throw e - } + }) } deleteAnalyticsBucket(id: string, opts?: { soft: boolean }): Promise { @@ -1044,7 +1042,7 @@ export class StorageKnexDB implements Database { * @param columns * @protected */ - protected normalizeColumns>(columns: T): T { + protected normalizeColumns>(columns: T): T { const latestMigration = this.latestMigration if (!latestMigration) { @@ -1066,7 +1064,7 @@ export class StorageKnexDB implements Database { if (typeof columns === 'object') { value.forEach((column: string) => { - delete (columns as Record)[column] + delete (columns as Record)[column] }) } } @@ -1075,9 +1073,10 @@ export class StorageKnexDB implements Database { return columns } - protected async runQuery< - T extends (...args: [db: Knex.Transaction, signal?: AbortSignal]) => Promise, - >(queryName: string, fn: T): Promise>> { + protected async runQuery( + queryName: string, + fn: (db: Knex.Transaction, signal?: AbortSignal) => Promise + ): Promise { const startTime = process.hrtime.bigint() const recordDuration = () => { const duration = Number(process.hrtime.bigint() - startTime) / 1e9 @@ -1109,7 +1108,7 @@ export class StorageKnexDB implements Database { await this.connection.setScope(tnx) } - const result: Awaited> = await fn(tnx, abortSignal) + const result = await fn(tnx, abortSignal) if (needsNewTransaction) { await tnx.commit() diff --git a/src/storage/events/base-event.ts b/src/storage/events/base-event.ts index e882d5b4e..a03e3f113 100644 --- a/src/storage/events/base-event.ts +++ b/src/storage/events/base-event.ts @@ -1,7 +1,7 @@ import { getPostgresConnection, getServiceKeyUser } from '@internal/database' import { createAgent } from '@internal/http' import { logger } from '@internal/monitoring' -import { BasePayload, Event, Event as QueueBaseEvent, StaticThis } from '@internal/queue' +import { BasePayload, Event, EventPayload, Event as QueueBaseEvent } from '@internal/queue' import { TenantLocation } from '@storage/locator' import { getConfig } from '../../config' import { createStorageBackend, StorageBackendAdapter } from '../backend' @@ -12,7 +12,7 @@ const { storageS3Bucket, storageS3MaxSockets, storageBackendType, region } = get let storageBackend: StorageBackendAdapter | undefined = undefined -export abstract class BaseEvent> extends QueueBaseEvent { +export abstract class BaseEvent extends QueueBaseEvent { static onStart() { this.getOrCreateStorageBackend() } @@ -25,9 +25,9 @@ export abstract class BaseEvent> extends * Sends a message as a webhook * @param payload */ - static async sendWebhook>( - this: StaticThis, - payload: Omit + static async sendWebhook Event>( + this: TThis & { version: string; eventName(): string }, + payload: Omit['payload'], '$version'> ) { // biome-ignore lint/style/noCommonJs: build script runs as CommonJS const { Webhook } = require('./lifecycle/webhook') diff --git a/src/storage/events/lifecycle/object-created.ts b/src/storage/events/lifecycle/object-created.ts index aa1cc1477..fdd6dd80f 100644 --- a/src/storage/events/lifecycle/object-created.ts +++ b/src/storage/events/lifecycle/object-created.ts @@ -1,43 +1,53 @@ import { BasePayload } from '@internal/queue' import { ObjectMetadata } from '../../backend' +import type { Obj } from '../../schemas' import { BaseEvent } from '../base-event' -import { ObjectRemovedEvent } from './object-removed' -interface ObjectCreatedEvent extends BasePayload { +interface ObjectCreatedEventBase extends BasePayload { name: string version: string bucketId: string metadata: ObjectMetadata +} + +interface ObjectCreatedUploadEvent extends ObjectCreatedEventBase { uploadType: 'standard' | 'resumable' | 's3' } -abstract class ObjectCreated extends BaseEvent { +interface ObjectCreatedMoveSource { + name: string + bucketId: string + version: Obj['version'] | null + reqId?: string +} + +interface ObjectCreatedMoveEvent extends ObjectCreatedEventBase { + oldObject: ObjectCreatedMoveSource +} + +abstract class ObjectCreated extends BaseEvent { protected static queueName = 'object:created' } -export class ObjectCreatedPutEvent extends ObjectCreated { +export class ObjectCreatedPutEvent extends ObjectCreated { static eventName() { return `ObjectCreated:Put` } } -export class ObjectCreatedPostEvent extends ObjectCreated { +export class ObjectCreatedPostEvent extends ObjectCreated { static eventName() { return `ObjectCreated:Post` } } -export class ObjectCreatedCopyEvent extends ObjectCreated { +export class ObjectCreatedCopyEvent extends ObjectCreated { static eventName() { return `ObjectCreated:Copy` } } -export interface ObjectedCreatedMove extends ObjectCreatedEvent { - oldObject: Omit -} - -export class ObjectCreatedMove extends BaseEvent { +export class ObjectCreatedMove extends ObjectCreated { static eventName() { return `ObjectCreated:Move` } diff --git a/src/storage/events/lifecycle/object-removed.ts b/src/storage/events/lifecycle/object-removed.ts index 7e7b3c2e9..0f1ba8a75 100644 --- a/src/storage/events/lifecycle/object-removed.ts +++ b/src/storage/events/lifecycle/object-removed.ts @@ -1,12 +1,12 @@ import { BasePayload } from '@internal/queue' -import { ObjectMetadata } from '@storage/backend' +import type { Obj } from '../../schemas' import { BaseEvent } from '../base-event' export interface ObjectRemovedEvent extends BasePayload { name: string bucketId: string - version: string - metadata?: ObjectMetadata + version: Obj['version'] | null + metadata: Obj['metadata'] } export class ObjectRemoved extends BaseEvent { diff --git a/src/storage/events/lifecycle/object-updated.ts b/src/storage/events/lifecycle/object-updated.ts index a94f0b672..89ba612c3 100644 --- a/src/storage/events/lifecycle/object-updated.ts +++ b/src/storage/events/lifecycle/object-updated.ts @@ -1,11 +1,12 @@ import { BasePayload } from '@internal/queue' import { ObjectMetadata } from '../../backend' +import type { Obj } from '../../schemas' import { BaseEvent } from '../base-event' interface ObjectUpdatedMetadataEvent extends BasePayload { name: string bucketId: string - version: string + version: Obj['version'] | null metadata: ObjectMetadata } diff --git a/src/storage/events/migrations/reset-migrations.ts b/src/storage/events/migrations/reset-migrations.ts index c54eda611..312af64cd 100644 --- a/src/storage/events/migrations/reset-migrations.ts +++ b/src/storage/events/migrations/reset-migrations.ts @@ -59,6 +59,7 @@ export class ResetMigrationsOnTenant extends BaseEvent { tenantId, tenant: { ref: tenantId, + host: job.data.tenant.host, }, singletonKey: tenantId, }) diff --git a/src/storage/events/objects/object-admin-delete-all-before.ts b/src/storage/events/objects/object-admin-delete-all-before.ts index c0b4ef852..ac0c4b064 100644 --- a/src/storage/events/objects/object-admin-delete-all-before.ts +++ b/src/storage/events/objects/object-admin-delete-all-before.ts @@ -99,7 +99,7 @@ export class ObjectAdminDeleteAllBefore extends BaseEvent { - const obj = await db.asSuperUser().findObject(this.bucketId, objectName, 'id,version', { - forUpdate: true, - }) + const obj = await db + .asSuperUser() + .findObject(this.bucketId, objectName, 'id,version,metadata', { + forUpdate: true, + }) const deleted = await db.deleteObject(this.bucketId, objectName) @@ -393,7 +395,9 @@ export class ObjectStorage { if (existingDestObject) { await ObjectAdminDelete.send({ name: existingDestObject.name, - bucketId: existingDestObject.bucket_id, + // The generated Obj type leaves selected columns optional; this lookup is scoped + // to destinationBucket, so fall back only when the narrowed field is missing. + bucketId: existingDestObject.bucket_id ?? destinationBucket, tenant: this.db.tenant(), version: existingDestObject.version, reqId: this.db.reqId, diff --git a/src/storage/protocols/iceberg/catalog/rest-catalog-client.ts b/src/storage/protocols/iceberg/catalog/rest-catalog-client.ts index bcc5a42e4..528d23103 100644 --- a/src/storage/protocols/iceberg/catalog/rest-catalog-client.ts +++ b/src/storage/protocols/iceberg/catalog/rest-catalog-client.ts @@ -470,7 +470,7 @@ export class RestCatalogClient { .then((response) => { const data = response.data - const overrides: Record = { + const overrides: Record = { prefix: params.warehouse, } diff --git a/src/storage/protocols/iceberg/knex.ts b/src/storage/protocols/iceberg/knex.ts index 8938fbaab..d9550b5d1 100644 --- a/src/storage/protocols/iceberg/knex.ts +++ b/src/storage/protocols/iceberg/knex.ts @@ -93,7 +93,7 @@ export interface Metastore { opts?: { isolationLevel?: Knex.IsolationLevels } ): Promise - assignCatalog(param: { bucketName: string; tenantId: string }): Promise + assignCatalog(param: { bucketName: string; tenantId: string }): Promise countCatalogs(params: { tenantId: string; limit: number }): Promise countNamespaces(param: { tenantId: string; limit: number }): Promise countTables(params: { namespaceId: string; tenantId?: string; limit: number }): Promise diff --git a/src/storage/protocols/s3/credentials/manager.ts b/src/storage/protocols/s3/credentials/manager.ts index 2a294ad0e..8e417d3a1 100644 --- a/src/storage/protocols/s3/credentials/manager.ts +++ b/src/storage/protocols/s3/credentials/manager.ts @@ -19,6 +19,10 @@ export const TENANT_S3_CREDENTIALS_CACHE_MAX_ITEMS = 16384 export const TENANT_S3_CREDENTIALS_CACHE_MAX_SIZE_BYTES = 1024 * 1024 * 50 // 50 MiB export const TENANT_S3_CREDENTIALS_CACHE_TTL_MS = 1000 * 60 * 60 // 1h +function isTenantCacheKeyMessage(message: unknown): message is string { + return typeof message === 'string' +} + const tenantS3CredentialsCache = createLruCache( TENANT_S3_CREDENTIALS_CACHE_NAME, { @@ -47,6 +51,10 @@ export class S3CredentialsManager { */ async listenForTenantUpdate(pubSub: PubSubAdapter): Promise { await pubSub.subscribe(TENANTS_S3_CREDENTIALS_UPDATE_CHANNEL, (cacheKey) => { + if (!isTenantCacheKeyMessage(cacheKey)) { + return + } + tenantS3CredentialsCache.delete(cacheKey) }) } diff --git a/src/storage/protocols/s3/signature-v4.ts b/src/storage/protocols/s3/signature-v4.ts index 13f0ca429..52e674b06 100644 --- a/src/storage/protocols/s3/signature-v4.ts +++ b/src/storage/protocols/s3/signature-v4.ts @@ -49,6 +49,9 @@ interface Credentials { service: string } +type SignatureHeaders = Record +type SignatureQuery = Record + export interface Policy { expiration: string conditions: PolicyConditions @@ -111,8 +114,8 @@ export class SignatureV4 { this.publicUrl = options.publicUrl } - static parseAuthorizationHeader(headers: Record) { - const clientSignature = headers.authorization + static parseAuthorizationHeader(headers: SignatureHeaders) { + const clientSignature = getSingleValue(headers.authorization, 'authorization') if (typeof clientSignature !== 'string') { throw ERRORS.InvalidSignature('Missing authorization header') } @@ -126,16 +129,18 @@ export class SignatureV4 { const credentialPart = params.get('Credential') const signedHeadersPart = params.get('SignedHeaders') const signature = params.get('Signature') - const longDate = headers['x-amz-date'] - const contentSha = headers['x-amz-content-sha256'] - const sessionToken = headers['x-amz-security-token'] + const longDate = getSingleValue(headers['x-amz-date'], 'x-amz-date') + const contentSha = getSingleValue(headers['x-amz-content-sha256'], 'x-amz-content-sha256') + const sessionToken = getSingleValue(headers['x-amz-security-token'], 'x-amz-security-token') if (!validateTypeOfStrings(credentialPart, signedHeadersPart, signature, longDate)) { throw ERRORS.InvalidSignature('Invalid signature format') } + const credential = credentialPart as string + const longDateValue = longDate as string const signedHeaders = signedHeadersPart?.split(';') || [] - const credentialsPart = credentialPart?.split('/') || [] + const credentialsPart = credential.split('/') if (credentialsPart.length !== 5) { throw ERRORS.InvalidSignature('Invalid credentials') @@ -146,13 +151,13 @@ export class SignatureV4 { credentials: { accessKey, shortDate, region, service }, signedHeaders, signature: signature as string, - longDate, + longDate: longDateValue, contentSha, sessionToken, } } - static isChunkedUpload(headers: Record): boolean { + static isChunkedUpload(headers: SignatureHeaders): boolean { const sha = headers['x-amz-content-sha256'] if (typeof sha !== 'string') return false // If it exactly matches or starts with streaming prefix... @@ -162,24 +167,28 @@ export class SignatureV4 { ) } - static parseQuerySignature(query: Record) { + static parseQuerySignature(query: SignatureQuery) { const credentialPart = query['X-Amz-Credential'] - const signedHeaders: string = query['X-Amz-SignedHeaders'] - const signature: string = query['X-Amz-Signature'] - const longDate: string = query['X-Amz-Date'] - const contentSha: string = query['X-Amz-Content-Sha256'] - const sessionToken: string | undefined = query['X-Amz-Security-Token'] + const signedHeaders = query['X-Amz-SignedHeaders'] + const signature = query['X-Amz-Signature'] + const longDate = query['X-Amz-Date'] + const contentSha = query['X-Amz-Content-Sha256'] + const sessionToken = query['X-Amz-Security-Token'] const expires = query['X-Amz-Expires'] if (!validateTypeOfStrings(credentialPart, signedHeaders, signature, longDate)) { throw ERRORS.InvalidSignature('Invalid signature format') } + const credential = credentialPart as string + const signedHeadersValue = signedHeaders as string + const signatureValue = signature as string + const longDateValue = longDate as string if (expires) { - this.checkExpiration(longDate, expires) + this.checkExpiration(longDateValue, expires) } - const credentialsPart = credentialPart.split('/') as string[] + const credentialsPart = credential.split('/') if (credentialsPart.length !== 5) { throw ERRORS.InvalidSignature('Invalid credentials') } @@ -187,9 +196,9 @@ export class SignatureV4 { const [accessKey, shortDate, region, service] = credentialsPart return { credentials: { accessKey, shortDate, region, service }, - signedHeaders: signedHeaders.split(';'), - signature, - longDate, + signedHeaders: signedHeadersValue.split(';'), + signature: signatureValue, + longDate: longDateValue, contentSha, sessionToken, } @@ -611,6 +620,21 @@ export class SignatureV4 { } } -function validateTypeOfStrings(...values: any[]) { +function validateTypeOfStrings(...values: unknown[]) { return values.every((value) => typeof value === 'string') } + +function getSingleValue( + value: string | string[] | undefined, + headerName: string +): string | undefined { + if (!Array.isArray(value)) { + return value + } + + if (value.length <= 1) { + return value[0] + } + + throw ERRORS.InvalidSignature(`Multiple ${headerName} headers are not supported`) +} diff --git a/src/storage/protocols/tus/postgres-locker.ts b/src/storage/protocols/tus/postgres-locker.ts index bf96fa92e..3ffa5d04c 100644 --- a/src/storage/protocols/tus/postgres-locker.ts +++ b/src/storage/protocols/tus/postgres-locker.ts @@ -8,11 +8,23 @@ import { UploadId } from './upload-id' const REQUEST_LOCK_RELEASE_MESSAGE = 'REQUEST_LOCK_RELEASE' +function isRequestLockReleaseMessage(payload: unknown): payload is { id: string } { + if (!payload || typeof payload !== 'object') { + return false + } + + return typeof (payload as { id?: unknown }).id === 'string' +} + export class LockNotifier { protected events = new EventEmitter() - handler = ({ id }: { id: string }) => { - this.events.emit(`release:${id}`) + handler = (payload: unknown) => { + if (!isRequestLockReleaseMessage(payload)) { + return + } + + this.events.emit(`release:${payload.id}`) } constructor(private readonly pubSub: PubSubAdapter) {} diff --git a/src/storage/protocols/tus/s3-locker.ts b/src/storage/protocols/tus/s3-locker.ts index 7e75a4901..c3d8cb30a 100644 --- a/src/storage/protocols/tus/s3-locker.ts +++ b/src/storage/protocols/tus/s3-locker.ts @@ -29,6 +29,14 @@ interface LockMetadata { renewedAt: number } +type LockerError = { + name?: string + message?: string + $metadata?: { + httpStatusCode?: number + } +} + export class S3Locker implements Locker { private readonly s3Client: S3Client private readonly bucket: string @@ -91,13 +99,13 @@ export class S3Locker implements Locker { }) ) return true - } catch (error: any) { + } catch (error: unknown) { if (signal.aborted) { return false } // If lock already exists, check if it's expired or zombie - if (error.name === 'PreconditionFailed' || error.$metadata?.httpStatusCode === 412) { + if (hasErrorName(error, 'PreconditionFailed') || getHttpStatusCode(error) === 412) { const isExpired = await this.checkAndCleanupExpiredLock(lockKey, signal) await this.notifier.release(id) @@ -112,7 +120,7 @@ export class S3Locker implements Locker { continue } - this.logger.error(`Lock acquisition failed for ${id}:`, error.message) + this.logger.error(`Lock acquisition failed for ${id}:`, getErrorMessage(error)) throw error } } @@ -165,8 +173,8 @@ export class S3Locker implements Locker { }) ) return true - } catch (error: any) { - if (error.name === 'NoSuchKey' || error.name === 'PreconditionFailed') { + } catch (error: unknown) { + if (hasErrorName(error, 'NoSuchKey') || hasErrorName(error, 'PreconditionFailed')) { return false } throw error @@ -184,9 +192,9 @@ export class S3Locker implements Locker { Key: lockKey, }) ) - } catch (error: any) { + } catch (error: unknown) { // If lock doesn't exist, that's fine - if (error.name !== 'NoSuchKey') { + if (!hasErrorName(error, 'NoSuchKey')) { throw error } } @@ -230,12 +238,15 @@ export class S3Locker implements Locker { expiredLocks.push(object.Key) } } - } catch (error: any) { + } catch (error: unknown) { // If we can't read the lock, it might be corrupted - clean it up - if (error.name === 'NoSuchKey') { + if (hasErrorName(error, 'NoSuchKey')) { continue } - console.warn(`Failed to read lock ${object.Key}, marking for cleanup:`, error.message) + console.warn( + `Failed to read lock ${object.Key}, marking for cleanup:`, + getErrorMessage(error) + ) expiredLocks.push(object.Key) } } @@ -315,8 +326,8 @@ export class S3Locker implements Locker { } return false // Lock is still valid - } catch (error: any) { - if (error.name === 'NoSuchKey') { + } catch (error: unknown) { + if (hasErrorName(error, 'NoSuchKey')) { return true // Lock doesn't exist } throw error @@ -341,6 +352,30 @@ export class S3Locker implements Locker { } } +function hasErrorName(error: unknown, name: string): error is LockerError { + return !!error && typeof error === 'object' && 'name' in error && error.name === name +} + +function getHttpStatusCode(error: unknown): number | undefined { + if (!error || typeof error !== 'object') { + return undefined + } + + return (error as LockerError).$metadata?.httpStatusCode +} + +function getErrorMessage(error: unknown): string { + if (error instanceof Error) { + return error.message + } + + if (error && typeof error === 'object' && typeof (error as LockerError).message === 'string') { + return (error as LockerError).message as string + } + + return String(error) +} + export class S3Lock implements Lock { private renewalTimer?: NodeJS.Timeout private isLocked = false diff --git a/src/storage/renderer/head.ts b/src/storage/renderer/head.ts index 8978d37e8..2f56ffbd0 100644 --- a/src/storage/renderer/head.ts +++ b/src/storage/renderer/head.ts @@ -28,8 +28,8 @@ export class HeadRenderer extends Renderer { } protected handleCacheControl( - request: FastifyRequest, - response: FastifyReply, + request: FastifyRequest, + response: FastifyReply, metadata: ObjectMetadata ) { const etag = this.findEtagHeader(request) diff --git a/src/storage/renderer/image.ts b/src/storage/renderer/image.ts index 2a434df20..dc025f23d 100644 --- a/src/storage/renderer/image.ts +++ b/src/storage/renderer/image.ts @@ -172,7 +172,7 @@ export class ImageRenderer extends Renderer { const params = transformations.split(',') this.transformOptions = params.reduce((all, param) => { - const [name, value] = param.split(':') as [keyof TransformOptions, any] + const [name, value] = param.split(':') as [keyof TransformOptions, string] switch (name) { case 'height': all.height = parseInt(value, 10) @@ -181,10 +181,10 @@ export class ImageRenderer extends Renderer { all.width = parseInt(value, 10) break case 'resize': - all.resize = value + all.resize = value as TransformOptions['resize'] break case 'format': - all.format = value + all.format = value as TransformOptions['format'] break case 'quality': all.quality = parseInt(value, 10) diff --git a/src/storage/renderer/info.ts b/src/storage/renderer/info.ts index 9d6daa161..610e8fcf0 100644 --- a/src/storage/renderer/info.ts +++ b/src/storage/renderer/info.ts @@ -34,8 +34,8 @@ export class InfoRenderer extends HeadRenderer { } protected setHeaders( - request: FastifyRequest, - response: FastifyReply, + request: FastifyRequest, + response: FastifyReply, data: AssetResponse, options: RenderOptions ) { diff --git a/src/storage/renderer/renderer.ts b/src/storage/renderer/renderer.ts index d79ea2ac3..cc8d54b2e 100644 --- a/src/storage/renderer/renderer.ts +++ b/src/storage/renderer/renderer.ts @@ -17,11 +17,17 @@ export interface RenderOptions { } export interface AssetResponse { - body?: Readable | ReadableStream | Blob | Buffer | Record + body?: Readable | ReadableStream | Blob | Buffer | Record metadata: ObjectMetadata transformations?: string[] } +type HttpMetadataError = { + $metadata?: { + httpStatusCode?: number + } +} + const { requestEtagHeaders, responseSMaxAge } = getConfig() /** @@ -40,7 +46,7 @@ export abstract class Renderer { * @param response * @param options */ - async render(request: FastifyRequest, response: FastifyReply, options: RenderOptions) { + async render(request: FastifyRequest, response: FastifyReply, options: RenderOptions) { try { if (options.signal?.aborted) { return response.send({ error: 'Request aborted', statusCode: '499' }) @@ -51,12 +57,12 @@ export abstract class Renderer { this.setHeaders(request, response, data, options) return response.send(data.body) - } catch (err: any) { - if (err.$metadata?.httpStatusCode === 304) { + } catch (err: unknown) { + if (hasMetadataStatusCode(err, 304)) { return response.status(304).send() } - if (err.$metadata?.httpStatusCode === 404) { + if (hasMetadataStatusCode(err, 404)) { response.header('cache-control', 'no-store') return response.status(400).send({ error: 'Not found', @@ -70,8 +76,8 @@ export abstract class Renderer { } protected setHeaders( - request: FastifyRequest, - response: FastifyReply, + request: FastifyRequest, + response: FastifyReply, data: AssetResponse, options: RenderOptions ) { @@ -110,7 +116,7 @@ export abstract class Renderer { this.handleDownload(response, options.download) } - protected handleDownload(response: FastifyReply, download?: string) { + protected handleDownload(response: FastifyReply, download?: string) { if (typeof download !== 'undefined') { if (download === '') { response.header('Content-Disposition', 'attachment;') @@ -126,8 +132,8 @@ export abstract class Renderer { } protected handleCacheControl( - request: FastifyRequest, - response: FastifyReply, + request: FastifyRequest, + response: FastifyReply, metadata: ObjectMetadata ) { const etag = this.findEtagHeader(request) @@ -150,7 +156,7 @@ export abstract class Renderer { response.header('Cache-Control', cacheControl.join(', ')) } - protected findEtagHeader(request: FastifyRequest) { + protected findEtagHeader(request: FastifyRequest) { for (const header of requestEtagHeaders) { const etag = request.headers[header] if (etag) { @@ -166,3 +172,11 @@ function normalizeContentType(contentType: string | undefined): string | undefin } return contentType } + +function hasMetadataStatusCode(error: unknown, statusCode: number): error is HttpMetadataError { + if (!error || typeof error !== 'object') { + return false + } + + return (error as HttpMetadataError).$metadata?.httpStatusCode === statusCode +} diff --git a/src/storage/storage.ts b/src/storage/storage.ts index 8ce9f5023..0e6440149 100644 --- a/src/storage/storage.ts +++ b/src/storage/storage.ts @@ -295,7 +295,7 @@ export class Storage { // use queue to recursively delete all objects created before the specified time await ObjectAdminDeleteAllBefore.send({ - before, + before: before.toISOString(), bucketId, tenant: this.db.tenant(), reqId: this.db.reqId, diff --git a/src/storage/uploader.ts b/src/storage/uploader.ts index 20a53bb2d..4831816a2 100644 --- a/src/storage/uploader.ts +++ b/src/storage/uploader.ts @@ -118,6 +118,7 @@ export class Uploader { */ async upload(request: UploadRequest) { const file = request.file + const uploadType = request.uploadType ?? 'standard' const version = await this.prepareUpload({ bucketId: request.bucketId, objectName: request.objectName, @@ -128,7 +129,7 @@ export class Uploader { mimetype: file.mimeType, contentLength: file.declaredContentLength ?? file.contentLength, }, - uploadType: request.uploadType, + uploadType, }) try { @@ -204,6 +205,7 @@ export class Uploader { }) { try { const db = this.db.asSuperUser() + const effectiveUploadType = uploadType ?? 'standard' // Since we have finished uploading the file, // even if the request is aborted now, we want to complete the DB transaction const abController = new AbortController() @@ -257,7 +259,7 @@ export class Uploader { bucketId, metadata: objectMetadata, reqId: this.db.reqId, - uploadType, + uploadType: effectiveUploadType, }) .catch((e) => { logSchema.error(logger, 'Failed to send webhook', { @@ -269,7 +271,7 @@ export class Uploader { bucketId, metadata: objectMetadata, reqId: this.db.reqId, - uploadType, + uploadType: effectiveUploadType, }), }) }) @@ -278,7 +280,7 @@ export class Uploader { await Promise.all(events) fileUploadedSuccess.add(1, { - uploadType, + uploadType: effectiveUploadType, tenantId: this.db.tenantId, }) diff --git a/src/test/async-abort-controller.test.ts b/src/test/async-abort-controller.test.ts index 335933f09..9bf80e12f 100644 --- a/src/test/async-abort-controller.test.ts +++ b/src/test/async-abort-controller.test.ts @@ -69,4 +69,29 @@ describe('AsyncAbortController', () => { expect(order).toEqual(['root:start', 'root:end', 'child', 'grandchild']) }) + + it('forwards the real abort event to listeners', async () => { + const controller = new AsyncAbortController() + const seen: { + target: EventTarget | null + currentTarget: EventTarget | null + context: unknown + } = { + target: null, + currentTarget: null, + context: undefined, + } + + controller.signal.addEventListener('abort', function (event) { + seen.target = event.target + seen.currentTarget = event.currentTarget + seen.context = this + }) + + await controller.abortAsync() + + expect(seen.target).toBe(controller.signal) + expect(seen.currentTarget).toBe(controller.signal) + expect(seen.context).toBe(controller.signal) + }) }) diff --git a/src/test/jwt.test.ts b/src/test/jwt.test.ts index 86779c775..cdce9a763 100644 --- a/src/test/jwt.test.ts +++ b/src/test/jwt.test.ts @@ -5,6 +5,10 @@ import { SignJWT } from 'jose' import { JwksConfigKey } from '../config' import { generateHS512JWK, signJWT, verifyJWT, verifyJWTWithCache } from '../internal/auth' +type JwtTestPublicKey = { + export: (options?: { format: 'jwk' }) => crypto.JsonWebKey | Record +} + describe('JWT', () => { describe('verifyJWT with JWKS', () => { afterEach(() => { @@ -17,8 +21,8 @@ describe('JWT', () => { options?: object alg: string kid?: string - publicKey: any - privateKey: any + publicKey: JwtTestPublicKey + privateKey: Buffer | crypto.KeyObject }[] = [ { type: 'rsa', options: { modulusLength: 2048 }, alg: 'RS256' }, { type: 'ec', options: { namedCurve: 'P-256' }, alg: 'ES256' }, @@ -64,7 +68,7 @@ describe('JWT', () => { keys: keys.map( ({ publicKey, kid, alg }) => ({ - ...(publicKey as unknown as crypto.KeyObject).export({ format: 'jwk' }), + ...publicKey.export({ format: 'jwk' }), kid, alg, }) as JwksConfigKey diff --git a/src/test/log-request.test.ts b/src/test/log-request.test.ts index 26cb53903..96aa870bd 100644 --- a/src/test/log-request.test.ts +++ b/src/test/log-request.test.ts @@ -56,4 +56,39 @@ describe('log-request plugin', () => { resources: ['/bucket/demo', '/object/demo'], }) }) + + it('does not evaluate configured resources when request resources already exist', async () => { + const configuredResources = jest.fn(() => { + throw new Error('resources config should not run') + }) + + app.addHook('onRequest', async (request) => { + request.resources = ['/preset/resource'] + }) + + app.get( + '/bucket/:bucket', + { + config: { + resources: configuredResources, + }, + }, + async (request) => { + return { + resources: request.resources, + } + } + ) + + const response = await app.inject({ + method: 'GET', + url: '/bucket/demo', + }) + + expect(response.statusCode).toBe(200) + expect(configuredResources).not.toHaveBeenCalled() + expect(response.json()).toEqual({ + resources: ['/preset/resource'], + }) + }) }) diff --git a/src/test/migrations-memoize.test.ts b/src/test/migrations-memoize.test.ts index 155661a51..537a5853e 100644 --- a/src/test/migrations-memoize.test.ts +++ b/src/test/migrations-memoize.test.ts @@ -4,7 +4,7 @@ describe('memoizePromise helper', () => { it('should cache promise results correctly', async () => { // Create a simple memoized function for testing - const memoizePromise = ( + const memoizePromise = ( fn: (...args: T) => Promise ): ((...args: T) => Promise) => { const cache = new Map>() @@ -48,7 +48,7 @@ describe('memoizePromise helper', () => { }) it('should handle different argument combinations', async () => { - const memoizePromise = ( + const memoizePromise = ( fn: (...args: T) => Promise ): ((...args: T) => Promise) => { const cache = new Map>() @@ -86,7 +86,7 @@ describe('memoizePromise helper', () => { }) it('should generate keys for objects and primitives', async () => { - const memoizePromise = ( + const memoizePromise = ( fn: (...args: T) => Promise ): ((...args: T) => Promise) => { const cache = new Map>() @@ -125,7 +125,7 @@ describe('memoizePromise helper', () => { }) it('should handle promise rejections correctly', async () => { - const memoizePromise = ( + const memoizePromise = ( fn: (...args: T) => Promise ): ((...args: T) => Promise) => { const cache = new Map>() diff --git a/src/test/pool-cache.test.ts b/src/test/pool-cache.test.ts index 52c9773a6..25d28fac6 100644 --- a/src/test/pool-cache.test.ts +++ b/src/test/pool-cache.test.ts @@ -1,10 +1,13 @@ 'use strict' -type TestPool = { - acquire: jest.Mock - rebalance: jest.Mock - destroy: jest.Mock, []> - getPoolStats: jest.Mock +import type { CacheName } from '../internal/cache' +import type { PoolStrategy, TenantConnectionOptions } from '../internal/database/pool' + +type TestPool = { + pool: TPool + rebalance: jest.SpiedFunction + destroy: jest.SpiedFunction + getPoolStats: jest.SpiedFunction } type PoolModule = typeof import('../internal/database/pool') @@ -19,12 +22,17 @@ function createPoolSettings(tenantId: string) { } } -function createTestPool(stats: { used: number; total: number } | null = null): TestPool { +function createTestPool( + pool: TPool, + stats: { used: number; total: number } | null = null +): TestPool { + const strategyPool: PoolStrategy = pool + return { - acquire: jest.fn(), - rebalance: jest.fn(), - destroy: jest.fn().mockResolvedValue(undefined), - getPoolStats: jest.fn().mockReturnValue(stats), + pool, + rebalance: jest.spyOn(strategyPool, 'rebalance'), + destroy: jest.spyOn(strategyPool, 'destroy').mockResolvedValue(undefined), + getPoolStats: jest.spyOn(strategyPool, 'getPoolStats').mockReturnValue(stats), } } @@ -42,19 +50,16 @@ async function loadPoolModule(ttlMs: number): Promise { ...actual, createTtlCache: ((optionsOrName: unknown, maybeOptions?: Record) => { if (typeof optionsOrName === 'string') { - return actual.createTtlCache( - optionsOrName as never, - { - ...(maybeOptions || {}), - ttl: ttlMs, - } as never - ) + return actual.createTtlCache(optionsOrName as CacheName, { + ...(maybeOptions || {}), + ttl: ttlMs, + }) } return actual.createTtlCache({ ...(optionsOrName as Record), ttl: ttlMs, - } as never) + }) }) as typeof actual.createTtlCache, } }) @@ -80,15 +85,10 @@ describe('PoolManager cache lifecycle', () => { class TestPoolManager extends poolModule.PoolManager { created: TestPool[] = [] - protected newPool(_settings: any): any { - const pool: TestPool = { - acquire: jest.fn(), - rebalance: jest.fn(), - destroy: jest.fn().mockResolvedValue(undefined), - getPoolStats: jest.fn().mockReturnValue(null), - } + protected newPool(settings: TenantConnectionOptions) { + const pool = createTestPool(super.newPool(settings)) this.created.push(pool) - return pool + return pool.pool } } @@ -117,15 +117,10 @@ describe('PoolManager cache lifecycle', () => { class TestPoolManager extends poolModule.PoolManager { created: TestPool[] = [] - protected newPool(_settings: any): any { - const pool: TestPool = { - acquire: jest.fn(), - rebalance: jest.fn(), - destroy: jest.fn().mockResolvedValue(undefined), - getPoolStats: jest.fn().mockReturnValue(null), - } + protected newPool(settings: TenantConnectionOptions) { + const pool = createTestPool(super.newPool(settings)) this.created.push(pool) - return pool + return pool.pool } } @@ -163,20 +158,20 @@ describe('PoolManager cache lifecycle', () => { ) let batchObserver: ((observer: { observe: (...args: unknown[]) => void }) => void) | undefined - addBatchObservableCallbackSpy.mockImplementation((callback) => { + addBatchObservableCallbackSpy.mockImplementation((callback): void => { batchObserver = callback as typeof batchObserver - return undefined as never }) class TestPoolManager extends poolModule.PoolManager { created: Record = {} - protected newPool(settings: any): any { + protected newPool(settings: TenantConnectionOptions) { const pool = createTestPool( + super.newPool(settings), settings.tenantId === 'tenant-a' ? { used: 2, total: 5 } : { used: 3, total: 7 } ) this.created[settings.tenantId] = pool - return pool + return pool.pool } } @@ -203,26 +198,26 @@ describe('PoolManager cache lifecycle', () => { class TestPoolManager extends poolModule.PoolManager { created: Record = {} - protected newPool(settings: any): any { - const pool = createTestPool() + protected newPool(settings: TenantConnectionOptions) { + const pool = createTestPool(super.newPool(settings)) this.created[settings.tenantId] = pool - return pool + return pool.pool } } const poolManager = new TestPoolManager() const first = poolManager.getPool(createPoolSettings('tenant-c')) - const second = poolManager.getPool(createPoolSettings('tenant-d')) + poolManager.getPool(createPoolSettings('tenant-d')) poolManager.rebalanceAll({ clusterSize: 4 }) - expect(first.rebalance).toHaveBeenCalledWith({ clusterSize: 4 }) - expect(second.rebalance).toHaveBeenCalledWith({ clusterSize: 4 }) + expect(poolManager.created['tenant-c'].rebalance).toHaveBeenCalledWith({ clusterSize: 4 }) + expect(poolManager.created['tenant-d'].rebalance).toHaveBeenCalledWith({ clusterSize: 4 }) await poolManager.destroyAll() - expect(first.destroy).toHaveBeenCalledTimes(1) - expect(second.destroy).toHaveBeenCalledTimes(1) + expect(poolManager.created['tenant-c'].destroy).toHaveBeenCalledTimes(1) + expect(poolManager.created['tenant-d'].destroy).toHaveBeenCalledTimes(1) const recreated = poolManager.getPool(createPoolSettings('tenant-c')) @@ -235,11 +230,11 @@ describe('PoolManager cache lifecycle', () => { class TestPoolManager extends poolModule.PoolManager { created: Record = {} - protected newPool(settings: any): any { - const pool = createTestPool() + protected newPool(settings: TenantConnectionOptions) { + const pool = createTestPool(super.newPool(settings)) pool.destroy.mockRejectedValue(new Error(`destroy failed for ${settings.tenantId}`)) this.created[settings.tenantId] = pool - return pool + return pool.pool } } @@ -258,15 +253,15 @@ describe('PoolManager cache lifecycle', () => { class TestPoolManager extends poolModule.PoolManager { created: Record = {} - protected newPool(settings: any): any { - const pool = createTestPool() + protected newPool(settings: TenantConnectionOptions) { + const pool = createTestPool(super.newPool(settings)) if (settings.tenantId === 'tenant-destroyall-error') { pool.destroy.mockRejectedValue(new Error('destroyAll failed')) } this.created[settings.tenantId] = pool - return pool + return pool.pool } } @@ -296,18 +291,17 @@ describe('PoolManager cache lifecycle', () => { ) let batchObserver: ((observer: { observe: (...args: unknown[]) => void }) => void) | undefined - addBatchObservableCallbackSpy.mockImplementation((callback) => { + addBatchObservableCallbackSpy.mockImplementation((callback): void => { batchObserver = callback as typeof batchObserver - return undefined as never }) class TestPoolManager extends poolModule.PoolManager { created: Record = {} - protected newPool(settings: any): any { - const pool = createTestPool({ used: 1, total: 2 }) + protected newPool(settings: TenantConnectionOptions) { + const pool = createTestPool(super.newPool(settings), { used: 1, total: 2 }) this.created[settings.tenantId] = pool - return pool + return pool.pool } } diff --git a/src/test/postgres-locker.test.ts b/src/test/postgres-locker.test.ts new file mode 100644 index 000000000..cb642bed1 --- /dev/null +++ b/src/test/postgres-locker.test.ts @@ -0,0 +1,70 @@ +import { PubSubAdapter } from '@internal/pubsub' +import { LockNotifier } from '../storage/protocols/tus/postgres-locker' + +class FakePubSub implements PubSubAdapter { + readonly startSpy = jest.fn, []>().mockResolvedValue(undefined) + readonly publishSpy = jest.fn, [string, unknown]>().mockResolvedValue(undefined) + readonly subscribeSpy = jest.fn, [string, (message: unknown) => void]>() + readonly unsubscribeSpy = jest.fn, [string, (message: unknown) => void]>() + readonly closeSpy = jest.fn, []>().mockResolvedValue(undefined) + + start(): Promise { + return this.startSpy() + } + + publish(channel: string, message: unknown): Promise { + return this.publishSpy(channel, message) + } + + subscribe(channel: string, cb: (message: unknown) => void): Promise { + return this.subscribeSpy(channel, cb) + } + + unsubscribe(channel: string, cb: (message: unknown) => void): Promise { + return this.unsubscribeSpy(channel, cb) + } + + close(): Promise { + return this.closeSpy() + } + + on(): this { + return this + } +} + +describe('LockNotifier', () => { + it('ignores malformed pubsub payloads', () => { + const pubSub = new FakePubSub() + const notifier = new LockNotifier(pubSub) + const onRelease = jest.fn() + + notifier.onRelease('upload-id', onRelease) + + expect(() => notifier.handler('upload-id')).not.toThrow() + expect(() => notifier.handler({ id: 123 })).not.toThrow() + expect(onRelease).not.toHaveBeenCalled() + }) + + it('emits release events for valid payloads', () => { + const pubSub = new FakePubSub() + const notifier = new LockNotifier(pubSub) + const onRelease = jest.fn() + + notifier.onRelease('upload-id', onRelease) + notifier.handler({ id: 'upload-id' }) + + expect(onRelease).toHaveBeenCalledTimes(1) + }) + + it('subscribes and unsubscribes the shared handler', async () => { + const pubSub = new FakePubSub() + const notifier = new LockNotifier(pubSub) + + await notifier.start() + await notifier.stop() + + expect(pubSub.subscribeSpy).toHaveBeenCalledWith('REQUEST_LOCK_RELEASE', notifier.handler) + expect(pubSub.unsubscribeSpy).toHaveBeenCalledWith('REQUEST_LOCK_RELEASE', notifier.handler) + }) +}) diff --git a/src/test/query-abort-signal.test.ts b/src/test/query-abort-signal.test.ts index 3287cc99a..e79ebb0ff 100644 --- a/src/test/query-abort-signal.test.ts +++ b/src/test/query-abort-signal.test.ts @@ -107,11 +107,13 @@ describe('Query Abort Signal', () => { await queryPromise fail('Expected query to be aborted') } catch (error: unknown) { - expect(error).toMatchObject({ - name: 'AbortError', - code: 'ABORT_ERR', - message: 'Query was aborted', - }) + expect(error).toBeInstanceOf(Error) + if (!(error instanceof Error)) { + throw error + } + expect(error.name).toBe('AbortError') + expect('code' in error ? error.code : undefined).toBe('ABORT_ERR') + expect(error.message).toBe('Query was aborted') } finally { clearTimeout(abortTimeout) } diff --git a/src/test/s3-adapter.test.ts b/src/test/s3-adapter.test.ts index c237ecb1f..d43a238a4 100644 --- a/src/test/s3-adapter.test.ts +++ b/src/test/s3-adapter.test.ts @@ -26,9 +26,9 @@ jest.mock('@aws-sdk/lib-storage', () => { }) type MockUploadInstance = { - options: any + options: ConstructorParameters[0] abort: jest.Mock - done: jest.Mock, []> + done: jest.Mock, []> on: jest.Mock off: jest.Mock emit: (event: string, payload: unknown) => void @@ -36,7 +36,7 @@ type MockUploadInstance = { describe('S3Backend', () => { let mockSend: jest.Mock - let mockUploadDone: jest.Mock, [MockUploadInstance]> + let mockUploadDone: jest.Mock, [MockUploadInstance]> let uploadInstances: MockUploadInstance[] beforeEach(() => { diff --git a/src/test/s3-locker.test.ts b/src/test/s3-locker.test.ts index 8954d045e..5305f3b3a 100644 --- a/src/test/s3-locker.test.ts +++ b/src/test/s3-locker.test.ts @@ -11,6 +11,7 @@ import { PutObjectCommand, S3Client, } from '@aws-sdk/client-s3' +import { Lock } from '@tus/server' import { getConfig } from '../config' import { backends } from '../storage' import { LockNotifier } from '../storage/protocols/tus/postgres-locker' @@ -26,7 +27,12 @@ describe('S3Locker', () => { let locker: S3Locker let testBucket: string let mockNotifier: LockNotifier - let allLocks: Array<{ lock: any; locker: S3Locker }> = [] + let allLocks: Array<{ lock: Lock; locker: S3Locker }> = [] + + const getErrorMessage = (error: unknown) => + error instanceof Error ? error.message : String(error) + + const getErrorName = (error: unknown) => (error instanceof Error ? error.name : undefined) beforeAll(async () => { // Use the configured S3 client from the backend @@ -54,7 +60,7 @@ describe('S3Locker', () => { onRelease: jest.fn(), unsubscribe: jest.fn(), subscribe: jest.fn(), - } as any + } as unknown as LockNotifier // Create fresh locker instance locker = new S3Locker({ @@ -112,7 +118,7 @@ describe('S3Locker', () => { } }) - function trackLock(lock: any, lockLocker: S3Locker = locker) { + function trackLock(lock: Lock, lockLocker: S3Locker = locker) { allLocks.push({ lock, locker: lockLocker }) return lock } @@ -368,8 +374,8 @@ describe('S3Locker', () => { }) ) fail('Lock should have been deleted') - } catch (error: any) { - expect(error.name).toBe('NoSuchKey') + } catch (error: unknown) { + expect(getErrorName(error)).toBe('NoSuchKey') } }) }) @@ -440,10 +446,10 @@ describe('S3Locker', () => { const start = Date.now() try { await lock1.lock(abortController1.signal, cancelReq) - } catch (error: any) { + } catch (error: unknown) { const lockDuration = Date.now() - start throw new Error( - `Lock acquisition failed on iteration ${i} after ${lockDuration}ms with error: ${error.message}. This likely means a zombie lock exists from a previous iteration.` + `Lock acquisition failed on iteration ${i} after ${lockDuration}ms with error: ${getErrorMessage(error)}. This likely means a zombie lock exists from a previous iteration.` ) } const lockDuration = Date.now() - start @@ -483,8 +489,8 @@ describe('S3Locker', () => { throw new Error( `Zombie lock detected on iteration ${i}! A lock exists after deletion, indicating the IfMatch fix is missing.` ) - } catch (error: any) { - if (error.name === 'NoSuchKey') { + } catch (error: unknown) { + if (getErrorName(error) === 'NoSuchKey') { // Good - no zombie lock exists continue } diff --git a/src/test/s3-router.test.ts b/src/test/s3-router.test.ts index 5f4bce3d9..0ad196cf5 100644 --- a/src/test/s3-router.test.ts +++ b/src/test/s3-router.test.ts @@ -7,6 +7,12 @@ afterEach(() => { jest.restoreAllMocks() }) +type S3HandlerStorage = ConstructorParameters[0] + +function createHandler() { + return new S3ProtocolHandler({} as unknown as S3HandlerStorage, 'tenant-id') +} + describe('S3 router query matching', () => { it('parses key-only query params with an undefined value', () => { const router = new Router() @@ -112,7 +118,7 @@ describe('S3 router query matching', () => { describe('S3ProtocolHandler.parseMetadataHeaders', () => { it('returns only x-amz-meta headers without the prefix', () => { - const handler = new S3ProtocolHandler({} as any, 'tenant-id') + const handler = createHandler() expect( handler.parseMetadataHeaders({ @@ -127,7 +133,7 @@ describe('S3ProtocolHandler.parseMetadataHeaders', () => { }) it('returns undefined when there are no metadata headers', () => { - const handler = new S3ProtocolHandler({} as any, 'tenant-id') + const handler = createHandler() expect( handler.parseMetadataHeaders({ @@ -138,7 +144,7 @@ describe('S3ProtocolHandler.parseMetadataHeaders', () => { }) it('keeps only string metadata values', () => { - const handler = new S3ProtocolHandler({} as any, 'tenant-id') + const handler = createHandler() expect( handler.parseMetadataHeaders({ @@ -154,7 +160,7 @@ describe('S3ProtocolHandler.parseMetadataHeaders', () => { }) it('returns undefined when metadata headers are present but none are strings', () => { - const handler = new S3ProtocolHandler({} as any, 'tenant-id') + const handler = createHandler() expect( handler.parseMetadataHeaders({ diff --git a/src/test/signature-v4-stream.test.ts b/src/test/signature-v4-stream.test.ts index 6d3b948d8..94905566e 100644 --- a/src/test/signature-v4-stream.test.ts +++ b/src/test/signature-v4-stream.test.ts @@ -5,6 +5,10 @@ import { import { Buffer } from 'buffer' import crypto from 'crypto' +type ParserWithHeaderParser = { + parseHeaderLine(line: string): { size: number; signature?: string } +} + describe('ChunkSignatureV4Parser', () => { const makeParser = (opts: Partial = {}) => { const defaultOpts: ChunkSignatureParserOptions = { @@ -17,13 +21,18 @@ describe('ChunkSignatureV4Parser', () => { test('constructor throws on invalid algorithm', () => { expect( - () => new ChunkSignatureV4Parser({ streamingAlgorithm: 'INVALID' as any, maxChunkSize: 1 }) + () => + new ChunkSignatureV4Parser({ + streamingAlgorithm: + 'INVALID' as unknown as ChunkSignatureParserOptions['streamingAlgorithm'], + maxChunkSize: 1, + }) ).toThrow(/Invalid streaming algorithm/) }) test('parseHeaderLine accepts signed header and rejects malformed signature', () => { const parser = makeParser() - const parse = (parser as any).parseHeaderLine.bind(parser) + const parse = (parser as unknown as ParserWithHeaderParser).parseHeaderLine.bind(parser) const validSig = 'a'.repeat(64) const { size, signature } = parse(`5;chunk-signature=${validSig}`) expect(size).toBe(5) @@ -33,7 +42,7 @@ describe('ChunkSignatureV4Parser', () => { test('parseHeaderLine rejects invalid chunk size', () => { const parser = makeParser() - const parse = (parser as any).parseHeaderLine.bind(parser) + const parse = (parser as unknown as ParserWithHeaderParser).parseHeaderLine.bind(parser) expect(() => parse('zz;chunk-signature=' + 'a'.repeat(64))).toThrow(/Invalid header/) expect(() => parse('zz')).toThrow(/Invalid chunk size/) }) @@ -175,7 +184,7 @@ describe('ChunkSignatureV4Parser', () => { maxChunkSize: 10, signaturePattern: /^[0-9]+$/, }) - const parse = (parser as any).parseHeaderLine.bind(parser) + const parse = (parser as unknown as ParserWithHeaderParser).parseHeaderLine.bind(parser) expect(parse('3;chunk-signature=123').signature).toBe('123') expect(() => parse('3;chunk-signature=abc')).toThrow() }) diff --git a/src/test/signature-v4.test.ts b/src/test/signature-v4.test.ts new file mode 100644 index 000000000..be47ae1fa --- /dev/null +++ b/src/test/signature-v4.test.ts @@ -0,0 +1,24 @@ +import { SignatureV4 } from '../storage/protocols/s3/signature-v4' + +describe('SignatureV4.parseAuthorizationHeader', () => { + const authorization = + 'AWS4-HMAC-SHA256 Credential=test-access/20260407/us-east-1/s3/aws4_request,SignedHeaders=host;x-amz-date,Signature=abc123' + + it('rejects duplicate authorization headers', () => { + expect(() => + SignatureV4.parseAuthorizationHeader({ + authorization: [authorization, authorization], + 'x-amz-date': '20260407T120000Z', + }) + ).toThrow('Multiple authorization headers are not supported') + }) + + it('rejects duplicate x-amz-date headers', () => { + expect(() => + SignatureV4.parseAuthorizationHeader({ + authorization, + 'x-amz-date': ['20260407T120000Z', '20260407T120001Z'], + }) + ).toThrow('Multiple x-amz-date headers are not supported') + }) +}) diff --git a/src/test/single-shard.test.ts b/src/test/single-shard.test.ts new file mode 100644 index 000000000..dd5b15507 --- /dev/null +++ b/src/test/single-shard.test.ts @@ -0,0 +1,22 @@ +'use strict' + +import { SingleShard } from '../internal/sharding/strategy/single-shard' + +describe('SingleShard', () => { + it('returns shard stats in the canonical array shape', async () => { + const sharder = new SingleShard({ + shardKey: 'single-shard-key', + capacity: 25, + }) + + await expect(sharder.shardStats()).resolves.toEqual([ + { + shardId: '1', + shardKey: 'single-shard-key', + capacity: 25, + used: -1, + free: -1, + }, + ]) + }) +}) diff --git a/src/test/storage-knex.test.ts b/src/test/storage-knex.test.ts new file mode 100644 index 000000000..a71c8b61a --- /dev/null +++ b/src/test/storage-knex.test.ts @@ -0,0 +1,56 @@ +import { StorageKnexDB } from '@storage/database' +import { TenantConnection } from '../internal/database/connection' + +function createStorageKnexTestHarness() { + const transaction = { + once: jest.fn(), + commit: jest.fn().mockResolvedValue(undefined), + rollback: jest.fn().mockResolvedValue(undefined), + } + + const connection = { + role: 'anon', + transactionProvider: jest.fn().mockReturnValue(async () => transaction), + setScope: jest.fn().mockResolvedValue(undefined), + getAbortSignal: jest.fn().mockReturnValue(undefined), + } as unknown as TenantConnection + + const db = new StorageKnexDB(connection, { + tenantId: 'test-tenant', + host: 'localhost', + }) + + return { db, connection, transaction } +} + +describe('StorageKnexDB.testPermission', () => { + it('returns the callback result after rolling back the transaction', async () => { + const { db, connection, transaction } = createStorageKnexTestHarness() + + const result = await db.testPermission(async (txDb) => { + expect(txDb).toBeInstanceOf(StorageKnexDB) + expect(txDb).not.toBe(db) + return 'allowed' + }) + + expect(result).toBe('allowed') + expect(connection.transactionProvider).toHaveBeenCalledWith(undefined, undefined) + expect(connection.setScope).toHaveBeenCalledWith(transaction) + expect(transaction.rollback).toHaveBeenCalledTimes(1) + expect(transaction.commit).not.toHaveBeenCalled() + }) + + it('rethrows callback failures without wrapping them', async () => { + const { db, transaction } = createStorageKnexTestHarness() + const error = new Error('permission denied') + + await expect( + db.testPermission(async () => { + throw error + }) + ).rejects.toBe(error) + + expect(transaction.rollback).toHaveBeenCalledTimes(1) + expect(transaction.commit).not.toHaveBeenCalled() + }) +}) diff --git a/src/test/tenant.test.ts b/src/test/tenant.test.ts index ebb791eff..ddf7333e6 100644 --- a/src/test/tenant.test.ts +++ b/src/test/tenant.test.ts @@ -743,7 +743,9 @@ describe('Tenant configs', () => { } try { - knexTableSpy.mockReturnValue(queryBuilder as any) + knexTableSpy.mockReturnValue( + queryBuilder as unknown as ReturnType + ) for (const tenantId of tenantIds) { await tenantModule.getTenantConfig(tenantId) @@ -803,7 +805,9 @@ describe('Tenant configs', () => { } try { - knexTableSpy.mockReturnValue(queryBuilder as any) + knexTableSpy.mockReturnValue( + queryBuilder as unknown as ReturnType + ) await assertLogicalLookupMetrics({ addSpy, backendCallSpy: queryBuilder.abortOnSignal, diff --git a/src/test/uploader.test.ts b/src/test/uploader.test.ts index 0f0c0f6e5..03eeaed28 100644 --- a/src/test/uploader.test.ts +++ b/src/test/uploader.test.ts @@ -6,6 +6,10 @@ import { ObjectAdminDelete } from '../storage/events' import { TenantLocation } from '../storage/locator' import { fileUploadFromRequest, Uploader } from '../storage/uploader' +type UploaderBackend = ConstructorParameters[0] +type UploaderDatabase = ConstructorParameters[1] +type CompleteUploadResult = Awaited> + describe('fileUploadFromRequest', () => { test('keeps multipart/form-data file size undefined even when the request content-length exceeds 5GB', async () => { const file = Readable.from(['payload']) as Readable & { truncated: boolean } @@ -279,19 +283,21 @@ describe('fileUploadFromRequest', () => { .spyOn(ObjectAdminDelete, 'send') .mockResolvedValue(undefined) + const backend = { + uploadObject: jest.fn(async (_bucket, _key, _version, body: Readable) => { + body.destroy(new Error('stream pipeline failed')) + throw StorageBackendError.fromError(new Error('socket hang up')) + }), + } + const db = { + tenantId: 'stub-tenant', + reqId: 'req-1', + tenant: () => ({ ref: 'stub-tenant' }), + testPermission: jest.fn().mockResolvedValue(undefined), + } const uploader = new Uploader( - { - uploadObject: jest.fn(async (_bucket, _key, _version, body: Readable) => { - body.destroy(new Error('stream pipeline failed')) - throw StorageBackendError.fromError(new Error('socket hang up')) - }), - } as any, - { - tenantId: 'stub-tenant', - reqId: 'req-1', - tenant: () => ({ ref: 'stub-tenant' }), - testPermission: jest.fn().mockResolvedValue(undefined), - } as any, + backend as unknown as UploaderBackend, + db as unknown as UploaderDatabase, new TenantLocation('test-bucket') ) @@ -326,29 +332,30 @@ describe('fileUploadFromRequest', () => { contentRange: undefined, }), } + const db = { + tenantId: 'stub-tenant', + reqId: 'req-1', + tenant: () => ({ ref: 'stub-tenant' }), + testPermission: jest.fn(async (fn) => + fn({ + createObject: jest.fn(async (payload: { metadata?: { contentLength?: number } }) => { + capturedWrites.push(payload) + }), + upsertObject: jest.fn(async (payload: { metadata?: { contentLength?: number } }) => { + capturedWrites.push(payload) + }), + }) + ), + } const uploader = new Uploader( - backend as any, - { - tenantId: 'stub-tenant', - reqId: 'req-1', - tenant: () => ({ ref: 'stub-tenant' }), - testPermission: jest.fn(async (fn) => - fn({ - createObject: jest.fn(async (payload: { metadata?: { contentLength?: number } }) => { - capturedWrites.push(payload) - }), - upsertObject: jest.fn(async (payload: { metadata?: { contentLength?: number } }) => { - capturedWrites.push(payload) - }), - }) - ), - } as any, + backend as unknown as UploaderBackend, + db as unknown as UploaderDatabase, new TenantLocation('test-bucket') ) const completeUploadSpy = jest.spyOn(uploader, 'completeUpload').mockResolvedValue({ metadata: { eTag: '"etag"' }, obj: { id: 'obj-id' }, - } as any) + } as unknown as CompleteUploadResult) await uploader.upload({ bucketId: 'bucket', diff --git a/src/test/vector-store-manager.test.ts b/src/test/vector-store-manager.test.ts index e911e3c18..4278ef2e0 100644 --- a/src/test/vector-store-manager.test.ts +++ b/src/test/vector-store-manager.test.ts @@ -18,7 +18,24 @@ import { VectorStore, VectorStoreManager, } from '@storage/protocols/vector' +import { VectorBucket } from '@storage/schemas' +function deferred() { + let resolve!: () => void + const promise = new Promise((res) => { + resolve = res + }) + + return { promise, resolve } +} + +function createVectorBucket(bucketName: string): VectorBucket { + return { + id: bucketName, + created_at: new Date(), + updated_at: new Date().toISOString(), + } as unknown as VectorBucket +} function createMockVectorStore(): jest.Mocked { return { createVectorIndex: jest.fn().mockResolvedValue({} as CreateIndexCommandOutput), @@ -119,11 +136,7 @@ function createDeterministicVectorDb(options: { }, findVectorBucket: async (bucketName: string) => { if (state.existingBuckets.has(bucketName)) { - return { - id: bucketName, - created_at: new Date(), - updated_at: new Date().toISOString(), - } as any + return createVectorBucket(bucketName) } throw ERRORS.S3VectorNotFoundException('vector bucket', bucketName) @@ -185,8 +198,8 @@ function createDeterministicVectorDb(options: { describe('VectorStoreManager bucket lifecycle', () => { it('serializes concurrent creates for the final bucket slot', async () => { - const releaseFirstCreate = Promise.withResolvers() - const firstCreateStarted = Promise.withResolvers() + const releaseFirstCreate = deferred() + const firstCreateStarted = deferred() const db = createDeterministicVectorDb({ bucketCount: 1, @@ -241,8 +254,8 @@ describe('VectorStoreManager bucket lifecycle', () => { }) it('shares the bucket-count lock between delete and create so capacity is observed after delete commits', async () => { - const releaseDelete = Promise.withResolvers() - const deleteReachedRemoval = Promise.withResolvers() + const releaseDelete = deferred() + const deleteReachedRemoval = deferred() const db = createDeterministicVectorDb({ bucketCount: 1, @@ -272,8 +285,8 @@ describe('VectorStoreManager bucket lifecycle', () => { }) it('does not block unrelated creates while delete waits on the target bucket lock', async () => { - const releaseBucketLock = Promise.withResolvers() - const deleteWaitingOnBucketLock = Promise.withResolvers() + const releaseBucketLock = deferred() + const deleteWaitingOnBucketLock = deferred() const db = createDeterministicVectorDb({ bucketCount: 1, @@ -331,11 +344,7 @@ describe('VectorStoreManager bucket lifecycle', () => { const vectorStore = createMockVectorStore() db.findVectorBucket - .mockResolvedValueOnce({ - id: 'bucket-a', - created_at: new Date(), - updated_at: new Date().toISOString(), - } as any) + .mockResolvedValueOnce(createVectorBucket('bucket-a')) .mockRejectedValueOnce(ERRORS.S3VectorNotFoundException('vector bucket', 'bucket-a')) db.withTransaction.mockImplementation(async (fn) => fn(db as unknown as KnexVectorMetadataDB)) diff --git a/src/test/vectors.test.ts b/src/test/vectors.test.ts index 517495bc1..653c46bb3 100644 --- a/src/test/vectors.test.ts +++ b/src/test/vectors.test.ts @@ -24,6 +24,35 @@ const vectorBucketS3 = vectorS3Buckets[0] let appInstance: FastifyInstance let serviceToken: string +type VectorBucketListItem = { + vectorBucketName: string + creationTime: number +} + +type ListVectorBucketsResponse = { + vectorBuckets: VectorBucketListItem[] + nextToken?: string +} + +type GetVectorBucketResponse = { + vectorBucket: VectorBucketListItem +} + +type ErrorResponse = { + error: string +} + +type VectorIndexListItem = { + indexName: string + vectorBucketName: string + creationTime: number +} + +type ListIndexesResponse = { + indexes: VectorIndexListItem[] + nextToken?: string +} + // Use the common mock helpers useMockObject() useMockQueue() @@ -537,7 +566,7 @@ describe('Vectors API', () => { }) expect(response.statusCode).toBe(400) - const body = JSON.parse(response.body) + const body = JSON.parse(response.body) as ErrorResponse expect(body.error).toBe('VectorBucketNotEmpty') }) @@ -601,13 +630,13 @@ describe('Vectors API', () => { }) expect(response.statusCode).toBe(200) - const body = JSON.parse(response.body) + const body = JSON.parse(response.body) as ListVectorBucketsResponse expect(body.vectorBuckets).toBeDefined() expect(Array.isArray(body.vectorBuckets)).toBe(true) expect(body.vectorBuckets.length).toBeGreaterThan(0) // Verify structure of bucket objects - body.vectorBuckets.forEach((bucket: any) => { + body.vectorBuckets.forEach((bucket) => { expect(bucket.vectorBucketName).toBeDefined() expect(bucket.creationTime).toBeDefined() expect(typeof bucket.creationTime).toBe('number') @@ -627,7 +656,7 @@ describe('Vectors API', () => { }) expect(response.statusCode).toBe(200) - const body = JSON.parse(response.body) + const body = JSON.parse(response.body) as ListVectorBucketsResponse expect(body.vectorBuckets.length).toBeLessThanOrEqual(2) if (body.vectorBuckets.length === 2) { expect(body.nextToken).toBeDefined() @@ -688,8 +717,8 @@ describe('Vectors API', () => { }) expect(response.statusCode).toBe(200) - const body = JSON.parse(response.body) - body.vectorBuckets.forEach((bucket: any) => { + const body = JSON.parse(response.body) as ListVectorBucketsResponse + body.vectorBuckets.forEach((bucket) => { expect(bucket.vectorBucketName).toMatch(new RegExp(`^${prefix}`)) }) }) @@ -807,8 +836,8 @@ describe('Vectors API', () => { }) expect(response.statusCode).toBe(200) - const body = JSON.parse(response.body) - const names = body.vectorBuckets.map((bucket: any) => bucket.vectorBucketName) + const body = JSON.parse(response.body) as ListVectorBucketsResponse + const names = body.vectorBuckets.map((bucket) => bucket.vectorBucketName) expect(names).toContain(matchingBucket) expect(names).not.toContain(nonMatchingBucket) }) @@ -834,8 +863,8 @@ describe('Vectors API', () => { }) expect(response.statusCode).toBe(200) - const body = JSON.parse(response.body) - const names = body.vectorBuckets.map((bucket: any) => bucket.vectorBucketName) + const body = JSON.parse(response.body) as ListVectorBucketsResponse + const names = body.vectorBuckets.map((bucket) => bucket.vectorBucketName) expect(names).toContain(matchingBucket) expect(names).not.toContain(nonMatchingBucket) }) @@ -865,7 +894,7 @@ describe('Vectors API', () => { }) expect(response.statusCode).toBe(200) - const body = JSON.parse(response.body) + const body = JSON.parse(response.body) as GetVectorBucketResponse expect(body.vectorBucket).toBeDefined() expect(body.vectorBucket.vectorBucketName).toBe(vectorBucketName) expect(body.vectorBucket.creationTime).toBeDefined() @@ -1074,13 +1103,13 @@ describe('Vectors API', () => { }) expect(response.statusCode).toBe(200) - const body = JSON.parse(response.body) + const body = JSON.parse(response.body) as ListIndexesResponse expect(body.indexes).toBeDefined() expect(Array.isArray(body.indexes)).toBe(true) expect(body.indexes.length).toBeGreaterThanOrEqual(2) // Verify structure of index objects - body.indexes.forEach((index: any) => { + body.indexes.forEach((index) => { expect(index.indexName).toBeDefined() expect(index.vectorBucketName).toBe(vectorBucketName) expect(index.creationTime).toBeDefined() @@ -1102,7 +1131,7 @@ describe('Vectors API', () => { }) expect(response.statusCode).toBe(200) - const body = JSON.parse(response.body) + const body = JSON.parse(response.body) as ListIndexesResponse expect(body.indexes.length).toBeLessThanOrEqual(1) }) @@ -1158,8 +1187,8 @@ describe('Vectors API', () => { }) expect(response.statusCode).toBe(200) - const body = JSON.parse(response.body) - body.indexes.forEach((index: any) => { + const body = JSON.parse(response.body) as ListIndexesResponse + body.indexes.forEach((index) => { expect(index.indexName).toMatch(new RegExp(`^${prefix}`)) }) }) diff --git a/src/test/webhooks.test.ts b/src/test/webhooks.test.ts index 7795aff43..eeaf9186c 100644 --- a/src/test/webhooks.test.ts +++ b/src/test/webhooks.test.ts @@ -87,6 +87,7 @@ describe('Webhooks', () => { size: 3746, }), name: `public/${fileName}.png`, + uploadType: 'standard', tenant: expect.objectContaining({ ref: 'bjhaohmqunupljrqypxz', }), @@ -130,6 +131,15 @@ describe('Webhooks', () => { applyTime: expect.any(Number), payload: expect.objectContaining({ bucketId: 'bucket6', + metadata: expect.objectContaining({ + cacheControl: 'no-cache', + contentLength: 3746, + eTag: 'abc', + lastModified: expect.any(String), + httpStatusCode: 200, + mimetype: 'image/png', + size: 3746, + }), name: obj.name, tenant: { host: undefined, @@ -183,6 +193,15 @@ describe('Webhooks', () => { applyTime: expect.any(Number), payload: expect.objectContaining({ bucketId: 'bucket6', + metadata: expect.objectContaining({ + cacheControl: 'no-cache', + contentLength: 3746, + eTag: 'abc', + lastModified: expect.any(String), + httpStatusCode: 200, + mimetype: 'image/png', + size: 3746, + }), name: obj.name, version: expect.any(String), tenant: { diff --git a/src/typecheck/event-input.ts b/src/typecheck/event-input.ts new file mode 100644 index 000000000..981c17c97 --- /dev/null +++ b/src/typecheck/event-input.ts @@ -0,0 +1,111 @@ +import { Event } from '../internal/queue/event' +import { ObjectMetadata } from '../storage/backend' +import { ObjectCreatedPutEvent } from '../storage/events/lifecycle/object-created' +import { ObjectRemoved } from '../storage/events/lifecycle/object-removed' +import { ObjectUpdatedMetadata } from '../storage/events/lifecycle/object-updated' +import type { Obj } from '../storage/schemas' + +type TypeCheckedPayload = { + tenant: { + ref: string + host: string + } + name: string + bucketId: string +} + +class TypeCheckedEvent extends Event { + protected static queueName = 'type-checked-event' +} + +const tenant = { + ref: 'tenant-ref', + host: 'tenant-host', +} + +const metadata: ObjectMetadata = { + cacheControl: 'no-cache', + contentLength: 1, + size: 1, + mimetype: 'text/plain', + lastModified: new Date('2026-04-07T00:00:00.000Z'), + eTag: 'etag', +} + +const persistedMetadata: NonNullable = { + cacheControl: 'no-cache', + contentLength: 1, + size: 1, + mimetype: 'text/plain', + lastModified: '2026-04-07T00:00:00.000Z', + eTag: 'etag', +} + +void TypeCheckedEvent.send({ + tenant, + name: 'object-name', + bucketId: 'bucket-id', +}) + +function _typecheckRequiredFields() { + // @ts-expect-error required event payload fields must stay required + void TypeCheckedEvent.send({ + tenant, + }) +} + +void ObjectCreatedPutEvent.sendWebhook({ + tenant, + name: 'object-name', + version: 'object-version', + bucketId: 'bucket-id', + metadata, + uploadType: 'standard', +}) + +function _typecheckUploadTypeRequired() { + // @ts-expect-error upload-created events must keep uploadType required + void ObjectCreatedPutEvent.sendWebhook({ + tenant, + name: 'object-name', + version: 'object-version', + bucketId: 'bucket-id', + metadata, + }) +} + +void ObjectRemoved.sendWebhook({ + tenant, + name: 'object-name', + version: null, + bucketId: 'bucket-id', + metadata: persistedMetadata, +}) + +function _typecheckRemovedVersionRequired() { + // @ts-expect-error removed events must keep a version field, even when the value is null + void ObjectRemoved.sendWebhook({ + tenant, + name: 'object-name', + bucketId: 'bucket-id', + metadata: persistedMetadata, + }) +} + +void ObjectUpdatedMetadata.sendWebhook({ + tenant, + name: 'object-name', + version: null, + bucketId: 'bucket-id', + metadata, +}) + +function _typecheckUpdatedVersionRequired() { + // @ts-expect-error metadata-updated events must keep a version field, even for legacy rows + void ObjectUpdatedMetadata.sendWebhook({ + tenant, + name: 'object-name', + bucketId: 'bucket-id', + metadata, + }) +} diff --git a/src/typecheck/pubsub.ts b/src/typecheck/pubsub.ts new file mode 100644 index 000000000..93689b185 --- /dev/null +++ b/src/typecheck/pubsub.ts @@ -0,0 +1,17 @@ +import { PubSubAdapter } from '../internal/pubsub' + +declare const pubSub: PubSubAdapter + +void pubSub.subscribe('tenant-update', (message) => { + if (typeof message === 'string') { + void message.toUpperCase() + } +}) + +function _typecheckSubscriberMustNarrowPayload() { + void pubSub.subscribe( + 'tenant-update', + // @ts-expect-error pubsub payloads are unknown at the transport boundary + (message) => message.toUpperCase() + ) +}