Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion biome.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
"noAssignInExpressions": "error",
"noAsyncPromiseExecutor": "error",
"noDoubleEquals": "error",
"noExplicitAny": "warn",
"noExplicitAny": "error",
"noFocusedTests": "error",
"noImplicitAnyLet": "error",
"noShadowRestrictedNames": "error",
Expand Down
33 changes: 32 additions & 1 deletion build.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,39 @@
// biome-ignore lint/style/noCommonJs: build script runs as CommonJS
const { build } = require('esbuild')
// biome-ignore lint/style/noCommonJs: build script runs as CommonJS
const { readdirSync, rmSync } = require('node:fs')
// biome-ignore lint/style/noCommonJs: build script runs as CommonJS
const { join, sep } = require('node:path')

function collectEntryPoints(dir) {
const entryPoints = []

for (const entry of readdirSync(dir, { withFileTypes: true })) {
const fullPath = join(dir, entry.name)

if (entry.isDirectory()) {
if (fullPath === join('src', 'typecheck')) {
continue
}

entryPoints.push(...collectEntryPoints(fullPath))
continue
}

if (entry.isFile() && entry.name.endsWith('.ts')) {
entryPoints.push(`./${fullPath.split(sep).join('/')}`)
}
}

return entryPoints
}

// Typecheck sentinels are enforced by `tsc -noEmit`
// keep them out of emitted build output.
rmSync('dist/typecheck', { recursive: true, force: true })

build({
entryPoints: ['./src/**/*.ts'],
entryPoints: collectEntryPoints('src').sort(),
bundle: false,
outdir: 'dist',
platform: 'node',
Expand Down
2 changes: 1 addition & 1 deletion src/http/error-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export const setErrorHandler = (
app: FastifyInstance,
options?: {
respectStatusCode?: boolean
formatter?: (error: StorageError) => Record<string, any>
formatter?: (error: StorageError) => unknown
}
) => {
app.setErrorHandler<Error>(function (error, request, reply) {
Expand Down
64 changes: 40 additions & 24 deletions src/http/plugins/log-request.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { logger, logSchema, redactQueryParamFromRequest } from '@internal/monitoring'
import { RouteGenericInterface } from 'fastify'
import { FastifyReply } from 'fastify/types/reply'
import { FastifyRequest } from 'fastify/types/request'
import fastifyPlugin from 'fastify-plugin'
Expand All @@ -7,6 +8,11 @@ interface RequestLoggerOptions {
excludeUrls?: string[]
}

type RawRequestMetadata = FastifyRequest['raw'] & {
executionError?: Error
resources?: string[]
}

declare module 'fastify' {
interface FastifyRequest {
executionError?: Error
Expand All @@ -18,8 +24,8 @@ declare module 'fastify' {

interface FastifyContextConfig {
operation?: { type: string }
resources?: (req: FastifyRequest<any>) => string[]
logMetadata?: (req: FastifyRequest<any>) => Record<string, unknown>
resources?(req: FastifyRequest<RouteGenericInterface>): string[]
logMetadata?(req: FastifyRequest<RouteGenericInterface>): Record<string, unknown>
}
}

Expand Down Expand Up @@ -64,30 +70,11 @@ export const logRequest = (options: RequestLoggerOptions) =>
}

if (resources === undefined) {
resources = (req.raw as any).resources
resources = getRawRequest(req).resources
}

if (resources === undefined) {
const params = req.params as Record<string, unknown> | undefined
let resourceFromParams = ''

if (params) {
let first = true
for (const key in params) {
if (!Object.prototype.hasOwnProperty.call(params, key)) {
continue
}

if (!first) {
resourceFromParams += '/'
}

const value = params[key]
resourceFromParams += value == null ? '' : String(value)
first = false
}
}

const resourceFromParams = getResourceFromParams(req.params)
resources = resourceFromParams ? [resourceFromParams] : []
}

Expand Down Expand Up @@ -150,7 +137,7 @@ function doRequestLog(req: FastifyRequest, options: LogRequestOptions) {
const rId = req.id
const cIP = req.ip
const statusCode = options.statusCode
const error = (req.raw as any).executionError || req.executionError
const error = getRawRequest(req).executionError || req.executionError
const tenantId = req.tenantId

let reqMetadata: Record<string, unknown> = {}
Expand Down Expand Up @@ -197,3 +184,32 @@ function doRequestLog(req: FastifyRequest, options: LogRequestOptions) {
serverTimes: req.serverTimings,
})
}

function getRawRequest(req: FastifyRequest): RawRequestMetadata {
return req.raw as RawRequestMetadata
}

function getResourceFromParams(params: unknown): string {
if (!params || typeof params !== 'object') {
return ''
}

let resource = ''
let first = true

for (const key in params) {
if (!Object.prototype.hasOwnProperty.call(params, key)) {
continue
}

if (!first) {
resource += '/'
}

const value = (params as Record<string, unknown>)[key]
resource += value == null ? '' : String(value)
first = false
}

return resource
}
6 changes: 5 additions & 1 deletion src/http/routes-helper.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
type BucketResponseType = { message: string; statusCode?: string; error?: string }
type SchemaObject = Record<string, unknown>

/**
* Create generic response for all buckets
Expand All @@ -23,7 +24,10 @@ function createResponse(message: string, status?: string, error?: string): Bucke
return response
}

function createDefaultSchema(successResponseSchema: any, properties: any): any {
function createDefaultSchema(
successResponseSchema: SchemaObject,
properties: SchemaObject
): SchemaObject {
return {
headers: { $ref: 'authSchema#' },
response: {
Expand Down
14 changes: 12 additions & 2 deletions src/http/routes/admin/migrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ import apiKey from '../../plugins/apikey'
const { pgQueueEnable } = getConfig()
const migrationQueueName = RunMigrationsOnTenants.getQueueName()

type ResetFleetBody = {
untilMigration?: unknown
markCompletedTillMigration?: unknown
}

type FailedQuery = {
cursor?: string
}

export default async function routes(fastify: FastifyInstance) {
fastify.register(apiKey)

Expand All @@ -31,7 +40,7 @@ export default async function routes(fastify: FastifyInstance) {
return reply.status(400).send({ message: 'Queue is not enabled' })
}

const { untilMigration, markCompletedTillMigration } = req.body as Record<string, unknown>
const { untilMigration, markCompletedTillMigration } = req.body as ResetFleetBody

if (!isDBMigrationName(untilMigration)) {
return reply.status(400).send({ message: 'Invalid migration' })
Expand Down Expand Up @@ -96,7 +105,8 @@ export default async function routes(fastify: FastifyInstance) {
if (!pgQueueEnable) {
return reply.code(400).send({ message: 'Queue is not enabled' })
}
const offset = (req.query as any).cursor ? Number((req.query as any).cursor) : 0
const { cursor } = req.query as FailedQuery
const offset = cursor ? Number(cursor) : 0

const failed = await multitenantKnex
.table('tenants')
Expand Down
10 changes: 9 additions & 1 deletion src/http/routes/admin/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import { getConfig } from '../../../config'
import apiKey from '../../plugins/apikey'

const { pgQueueEnable } = getConfig()
// Empty ref/host mark system-scoped events and intentionally skip per-tenant shouldSend lookups.
const systemTenant = {
ref: '',
host: '',
} as const

const moveJobsSchema = {
body: {
Expand Down Expand Up @@ -37,7 +42,9 @@ export default async function routes(fastify: FastifyInstance) {
return reply.status(400).send({ message: 'Queue is not enabled' })
}

await UpgradePgBossV10.send({})
await UpgradePgBossV10.send({
tenant: systemTenant,
})

return reply.send({ message: 'Migration scheduled' })
})
Expand All @@ -58,6 +65,7 @@ export default async function routes(fastify: FastifyInstance) {
fromQueue,
toQueue,
deleteJobsFromOriginalQueue,
tenant: systemTenant,
})

return reply.send({ message: 'Move jobs scheduled' })
Expand Down
14 changes: 9 additions & 5 deletions src/http/routes/iceberg/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -518,12 +518,10 @@ export default async function routes(fastify: FastifyInstance) {
try {
if (typeof payload === 'string') return done(null, JSONBigint.parse(payload))
if (Buffer.isBuffer(payload)) return done(null, JSONBigint.parse(payload.toString('utf8')))
if (payload && typeof (payload as any).on === 'function') {
if (isReadablePayload(payload)) {
const chunks: Buffer[] = []
;(payload as NodeJS.ReadableStream).on('data', (c) =>
chunks.push(Buffer.isBuffer(c) ? c : Buffer.from(String(c)))
)
;(payload as NodeJS.ReadableStream).on('end', () => {
payload.on('data', (c) => chunks.push(Buffer.isBuffer(c) ? c : Buffer.from(String(c))))
payload.on('end', () => {
try {
done(null, JSONBigint.parse(Buffer.concat(chunks).toString('utf8')))
} catch (err) {
Expand Down Expand Up @@ -574,3 +572,9 @@ export default async function routes(fastify: FastifyInstance) {
)
})
}

function isReadablePayload(payload: unknown): payload is NodeJS.ReadableStream {
return (
!!payload && typeof payload === 'object' && 'on' in payload && typeof payload.on === 'function'
)
}
2 changes: 1 addition & 1 deletion src/http/routes/render/rate-limiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const {
rateLimiterRenderPathMaxReqSec,
} = getConfig()

export const rateLimiter = fp((fastify: FastifyInstance, ops: any, done: () => void) => {
export const rateLimiter = fp((fastify: FastifyInstance, _ops: unknown, done: () => void) => {
fastify.register(fastifyRateLimit, {
global: true,
max: rateLimiterRenderPathMaxReqSec * 4,
Expand Down
25 changes: 22 additions & 3 deletions src/http/routes/s3/error-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@ import { FastifyReply } from 'fastify/types/reply'
import { FastifyRequest } from 'fastify/types/request'
import { DatabaseError } from 'pg'

type ValidationIssue = {
instancePath?: string
message?: string
}

export const s3ErrorHandler = (
error: FastifyError | Error,
request: FastifyRequest,
reply: FastifyReply
) => {
request.executionError = error
const validation = getValidationIssues(error)

const resource = request.url
.split('?')[0]
Expand All @@ -19,12 +25,12 @@ export const s3ErrorHandler = (
.filter((e) => e)
.join('/')

if ('validation' in error) {
if (validation) {
return reply.status(400).send({
Error: {
Resource: resource,
Code: ErrorCode.InvalidRequest,
Message: formatValidationError(error.validation).message,
Message: formatValidationError(validation).message,
},
})
}
Expand Down Expand Up @@ -91,7 +97,20 @@ export const s3ErrorHandler = (
})
}

function formatValidationError(errors: any) {
function isValidationIssueArray(value: unknown): value is ValidationIssue[] {
return Array.isArray(value)
}

function getValidationIssues(error: FastifyError | Error): ValidationIssue[] | undefined {
if (!('validation' in error)) {
return undefined
}

const value = error.validation
return isValidationIssueArray(value) ? value : undefined
}

function formatValidationError(errors: readonly ValidationIssue[]) {
let text = ''
const separator = ', '

Expand Down
4 changes: 2 additions & 2 deletions src/http/routes/s3/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ export default async function routes(fastify: FastifyInstance) {
req.opentelemetry()?.span?.setAttribute('http.operation', req.operation.type)
}

const data: RequestInput<any> = {
const data = {
Params: req.params,
Body: req.body,
Headers: req.headers,
Querystring: req.query,
}
} as unknown as RequestInput<typeof route.schema>
const compiler = route.compiledSchema()
const isValid = compiler(data)

Expand Down
Loading