From c215b83664c2d160207d5aca22740c5b0f2258c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Tiburcio=20Ribeiro=20Netto?= Date: Tue, 16 Jun 2026 21:58:51 -0300 Subject: [PATCH] feat(aws/lambda): add event invoke config --- .../src/AWS/Lambda/EventInvokeConfig.ts | 326 ++++++++++++++++++ packages/alchemy/src/AWS/Lambda/index.ts | 1 + packages/alchemy/src/AWS/Providers.ts | 2 + .../test/AWS/Lambda/EventInvokeConfig.test.ts | 167 +++++++++ 4 files changed, 496 insertions(+) create mode 100644 packages/alchemy/src/AWS/Lambda/EventInvokeConfig.ts create mode 100644 packages/alchemy/test/AWS/Lambda/EventInvokeConfig.test.ts diff --git a/packages/alchemy/src/AWS/Lambda/EventInvokeConfig.ts b/packages/alchemy/src/AWS/Lambda/EventInvokeConfig.ts new file mode 100644 index 000000000..0ad60c0ce --- /dev/null +++ b/packages/alchemy/src/AWS/Lambda/EventInvokeConfig.ts @@ -0,0 +1,326 @@ +import * as Lambda from "@distilled.cloud/aws/lambda"; +import * as Effect from "effect/Effect"; +import * as Schedule from "effect/Schedule"; +import * as Stream from "effect/Stream"; +import { deepEqual, isResolved } from "../../Diff.ts"; +import * as Provider from "../../Provider.ts"; +import { Resource } from "../../Resource.ts"; +import type { Providers } from "../Providers.ts"; + +const DEFAULT_MAXIMUM_RETRY_ATTEMPTS = 2; +const DEFAULT_MAXIMUM_EVENT_AGE_IN_SECONDS = 21_600; + +export interface EventInvokeConfigProps { + /** + * The name or ARN of the Lambda function. + */ + functionName: string; + /** + * Lambda function version or alias name. Omit to configure the unqualified + * function. + */ + qualifier?: string; + /** + * Maximum number of times Lambda retries an asynchronous invocation. + * @default 2 + */ + maximumRetryAttempts?: number; + /** + * Maximum age in seconds that Lambda retains an asynchronous event. + * @default 21600 + */ + maximumEventAgeInSeconds?: number; + /** + * Destinations for successful or failed asynchronous invocation records. + */ + destinationConfig?: Lambda.DestinationConfig; +} + +export interface EventInvokeConfig extends Resource< + "AWS.Lambda.EventInvokeConfig", + EventInvokeConfigProps, + { + /** + * The name or ARN of the Lambda function. + */ + functionName: string; + /** + * Lambda function version or alias name. + */ + qualifier?: string; + /** + * ARN of the function, version, or alias this config applies to. + */ + functionArn: string; + /** + * Maximum number of times Lambda retries an asynchronous invocation. + */ + maximumRetryAttempts?: number; + /** + * Maximum age in seconds that Lambda retains an asynchronous event. + */ + maximumEventAgeInSeconds?: number; + /** + * Destinations for successful or failed asynchronous invocation records. + */ + destinationConfig?: Lambda.DestinationConfig; + /** + * Last modification time reported by Lambda. + */ + lastModified?: Date; + }, + never, + Providers +> {} + +/** + * Lambda asynchronous invocation settings for a function, version, or alias. + * + * @section Async Invocation + * @example Configure Retry Behavior + * ```typescript + * const config = yield* EventInvokeConfig("AsyncInvoke", { + * functionName: fn.functionName, + * maximumRetryAttempts: 0, + * maximumEventAgeInSeconds: 60, + * }); + * ``` + * + * @example Configure Failure Destination + * ```typescript + * const config = yield* EventInvokeConfig("AsyncInvoke", { + * functionName: fn.functionName, + * destinationConfig: { + * OnFailure: { + * Destination: queue.queueArn, + * }, + * }, + * }); + * ``` + */ +export const EventInvokeConfig = Resource( + "AWS.Lambda.EventInvokeConfig", +); + +const normalizeQualifier = (qualifier: string | undefined) => + qualifier === "$LATEST" ? undefined : qualifier; + +const normalizeDestinationConfig = ( + config: Lambda.DestinationConfig | undefined, +): Lambda.DestinationConfig | undefined => { + const onSuccess = config?.OnSuccess?.Destination + ? { Destination: config.OnSuccess.Destination } + : undefined; + const onFailure = config?.OnFailure?.Destination + ? { Destination: config.OnFailure.Destination } + : undefined; + return onSuccess || onFailure + ? { + OnSuccess: onSuccess, + OnFailure: onFailure, + } + : undefined; +}; + +const desiredConfig = (props: EventInvokeConfigProps) => ({ + maximumRetryAttempts: + props.maximumRetryAttempts ?? DEFAULT_MAXIMUM_RETRY_ATTEMPTS, + maximumEventAgeInSeconds: + props.maximumEventAgeInSeconds ?? DEFAULT_MAXIMUM_EVENT_AGE_IN_SECONDS, + destinationConfig: normalizeDestinationConfig(props.destinationConfig), +}); + +const parseQualifierFromArn = ( + functionName: string, + functionArn: string | undefined, +) => { + if (!functionArn) return undefined; + const marker = `:function:${functionName}:`; + const markerIndex = functionArn.indexOf(marker); + if (markerIndex === -1) return undefined; + return normalizeQualifier(functionArn.slice(markerIndex + marker.length)); +}; + +const snapshotConfig = ( + functionName: string, + qualifier: string | undefined, + config: Lambda.FunctionEventInvokeConfig, +): EventInvokeConfig["Attributes"] | undefined => { + if (!config.FunctionArn) return undefined; + return { + functionName, + qualifier: normalizeQualifier(qualifier), + functionArn: config.FunctionArn, + maximumRetryAttempts: + config.MaximumRetryAttempts ?? DEFAULT_MAXIMUM_RETRY_ATTEMPTS, + maximumEventAgeInSeconds: + config.MaximumEventAgeInSeconds ?? DEFAULT_MAXIMUM_EVENT_AGE_IN_SECONDS, + destinationConfig: normalizeDestinationConfig(config.DestinationConfig), + lastModified: config.LastModified, + }; +}; + +export const EventInvokeConfigProvider = () => + Provider.effect( + EventInvokeConfig, + Effect.gen(function* () { + const retryOnConflict = ( + effect: Effect.Effect, + ) => + effect.pipe( + Effect.retry({ + while: (e) => e._tag === "ResourceConflictException", + schedule: Schedule.exponential(500).pipe( + Schedule.both(Schedule.recurs(10)), + ), + }), + ); + + const retryPermissionsPropagation = Effect.retry({ + while: (e: any) => + e._tag === "InvalidParameterValueException" && + e.message?.includes( + "The function execution role does not have permissions to call", + ), + schedule: Schedule.exponential(100).pipe( + Schedule.both(Schedule.recurs(30)), + ), + }) as ( + self: Effect.Effect, + ) => Effect.Effect; + + const getConfig = (functionName: string, qualifier: string | undefined) => + Lambda.getFunctionEventInvokeConfig({ + FunctionName: functionName, + Qualifier: normalizeQualifier(qualifier), + }).pipe( + Effect.catchTag("ResourceNotFoundException", () => + Effect.succeed(undefined), + ), + ); + + const putConfig = (props: EventInvokeConfigProps) => + retryOnConflict( + Lambda.putFunctionEventInvokeConfig({ + FunctionName: props.functionName, + Qualifier: normalizeQualifier(props.qualifier), + MaximumRetryAttempts: props.maximumRetryAttempts, + MaximumEventAgeInSeconds: props.maximumEventAgeInSeconds, + DestinationConfig: normalizeDestinationConfig( + props.destinationConfig, + ), + }).pipe(retryPermissionsPropagation), + ); + + return { + stables: ["functionName", "qualifier", "functionArn"], + diff: Effect.fn(function* ({ news, olds }) { + if (!isResolved(news)) return; + if ( + news.functionName !== olds.functionName || + normalizeQualifier(news.qualifier) !== + normalizeQualifier(olds.qualifier) + ) { + return { action: "replace" } as const; + } + }), + read: Effect.fn(function* ({ olds, output }) { + const functionName = output?.functionName ?? olds?.functionName; + if (!functionName) return undefined; + const qualifier = output?.qualifier ?? olds?.qualifier; + const config = yield* getConfig(functionName, qualifier); + return config + ? snapshotConfig(functionName, qualifier, config) + : undefined; + }), + list: () => + Effect.gen(function* () { + const functionNames = yield* Lambda.listFunctions.pages({}).pipe( + Stream.runCollect, + Effect.map((chunk) => + Array.from(chunk).flatMap((page) => + (page.Functions ?? []) + .map((fn) => fn.FunctionName) + .filter((name): name is string => name !== undefined), + ), + ), + ); + + const configs = yield* Effect.forEach( + functionNames, + (functionName) => + Lambda.listFunctionEventInvokeConfigs + .items({ FunctionName: functionName }) + .pipe( + Stream.runCollect, + Effect.map((chunk) => + Array.from(chunk).flatMap((config) => { + const attrs = snapshotConfig( + functionName, + parseQualifierFromArn( + functionName, + config.FunctionArn, + ), + config, + ); + return attrs ? [attrs] : []; + }), + ), + Effect.catchTag("ResourceNotFoundException", () => + Effect.succeed([] as EventInvokeConfig["Attributes"][]), + ), + ), + { concurrency: 10 }, + ); + return configs.flat(); + }), + reconcile: Effect.fn(function* ({ news, session }) { + const qualifier = normalizeQualifier(news.qualifier); + const desired = desiredConfig(news); + let config = yield* getConfig(news.functionName, qualifier); + const current = config + ? snapshotConfig(news.functionName, qualifier, config) + : undefined; + + if ( + !current || + current.maximumRetryAttempts !== desired.maximumRetryAttempts || + current.maximumEventAgeInSeconds !== + desired.maximumEventAgeInSeconds || + !deepEqual(current.destinationConfig, desired.destinationConfig) + ) { + config = yield* putConfig(news); + } + + if (!config) { + return yield* Effect.die( + `Lambda event invoke config for ${news.functionName} could not be reconciled.`, + ); + } + + const attrs = snapshotConfig(news.functionName, qualifier, config); + if (!attrs) { + return yield* Effect.die( + `Lambda event invoke config for ${news.functionName} did not return complete attributes.`, + ); + } + + yield* session.note( + `Event invoke config on ${attrs.functionName}${attrs.qualifier ? `:${attrs.qualifier}` : ""}`, + ); + + return attrs; + }), + delete: Effect.fn(function* ({ output }) { + yield* retryOnConflict( + Lambda.deleteFunctionEventInvokeConfig({ + FunctionName: output.functionName, + Qualifier: normalizeQualifier(output.qualifier), + }), + ).pipe( + Effect.catchTag("ResourceNotFoundException", () => Effect.void), + ); + }), + }; + }), + ); diff --git a/packages/alchemy/src/AWS/Lambda/index.ts b/packages/alchemy/src/AWS/Lambda/index.ts index 3db36e083..ca40f6a4c 100644 --- a/packages/alchemy/src/AWS/Lambda/index.ts +++ b/packages/alchemy/src/AWS/Lambda/index.ts @@ -1,5 +1,6 @@ export * from "./BucketEventSource.ts"; export * from "./EventBridgeEventSource.ts"; +export * from "./EventInvokeConfig.ts"; export * from "./EventSourceMapping.ts"; export * from "./Function.ts"; export * from "./HttpServer.ts"; diff --git a/packages/alchemy/src/AWS/Providers.ts b/packages/alchemy/src/AWS/Providers.ts index 50631bd91..be6552665 100644 --- a/packages/alchemy/src/AWS/Providers.ts +++ b/packages/alchemy/src/AWS/Providers.ts @@ -231,6 +231,7 @@ export const providers = () => Kinesis.StreamSinkPolicy, Kinesis.SubscribeToShardPolicy, Lambda.BucketEventSourcePolicy, + Lambda.EventInvokeConfig, Lambda.EventSourcePolicy, Lambda.EventSourceMapping, Lambda.Function, @@ -484,6 +485,7 @@ export const providers = () => Kinesis.StreamSinkPolicyLive, Kinesis.SubscribeToShardPolicyLive, Lambda.BucketEventSourcePolicyLive, + Lambda.EventInvokeConfigProvider(), Lambda.EventSourceMappingProvider(), Lambda.EventSourcePolicyLive, Lambda.FunctionProvider(), diff --git a/packages/alchemy/test/AWS/Lambda/EventInvokeConfig.test.ts b/packages/alchemy/test/AWS/Lambda/EventInvokeConfig.test.ts new file mode 100644 index 000000000..e66e523f7 --- /dev/null +++ b/packages/alchemy/test/AWS/Lambda/EventInvokeConfig.test.ts @@ -0,0 +1,167 @@ +import * as AWS from "@/AWS"; +import * as Provider from "@/Provider"; +import * as Test from "@/Test/Vitest"; +import * as Lambda from "@distilled.cloud/aws/lambda"; +import { expect } from "@effect/vitest"; +import * as Effect from "effect/Effect"; +import * as Schedule from "effect/Schedule"; + +const timeoutHandlerPath = new URL("./timeout-handler.ts", import.meta.url) + .pathname; + +const { test } = Test.make({ providers: AWS.providers() }); + +test.provider( + "create, update, list, delete event invoke config", + (stack) => + Effect.gen(function* () { + yield* stack.destroy(); + + const program = ({ + maximumRetryAttempts, + maximumEventAgeInSeconds, + }: { + maximumRetryAttempts?: number; + maximumEventAgeInSeconds?: number; + }) => + Effect.gen(function* () { + const queue = yield* AWS.SQS.Queue("FailureQueue", { + visibilityTimeout: 30, + }); + + const fn = yield* AWS.Lambda.Function<{}>()("AsyncFn", { + main: timeoutHandlerPath, + handler: "handler", + isExternal: true, + url: false, + }); + + yield* fn.bind("AllowEventInvokeDestination", { + policyStatements: [ + { + Effect: "Allow", + Action: ["sqs:SendMessage"], + Resource: [queue.queueArn], + }, + ], + }); + + const config = yield* AWS.Lambda.EventInvokeConfig("AsyncConfig", { + functionName: fn.functionName, + maximumRetryAttempts, + maximumEventAgeInSeconds, + destinationConfig: { + OnFailure: { + Destination: queue.queueArn, + }, + }, + }); + + return { fn, queue, config }; + }); + + const created = yield* stack.deploy( + program({ + maximumRetryAttempts: 0, + maximumEventAgeInSeconds: 60, + }), + ); + + expect(created.config.maximumRetryAttempts).toBe(0); + expect(created.config.maximumEventAgeInSeconds).toBe(60); + expect(created.config.destinationConfig?.OnFailure?.Destination).toBe( + created.queue.queueArn, + ); + + const liveCreated = yield* getEventInvokeConfig(created.fn.functionName, { + maximumRetryAttempts: 0, + maximumEventAgeInSeconds: 60, + onFailureDestination: created.queue.queueArn, + }); + expect(liveCreated.MaximumRetryAttempts).toBe(0); + expect(liveCreated.MaximumEventAgeInSeconds).toBe(60); + expect(liveCreated.DestinationConfig?.OnFailure?.Destination).toBe( + created.queue.queueArn, + ); + + const updated = yield* stack.deploy( + program({ + maximumRetryAttempts: 1, + maximumEventAgeInSeconds: 120, + }), + ); + + expect(updated.config.functionArn).toBe(created.config.functionArn); + expect(updated.config.maximumRetryAttempts).toBe(1); + expect(updated.config.maximumEventAgeInSeconds).toBe(120); + expect(updated.config.destinationConfig?.OnFailure?.Destination).toBe( + created.queue.queueArn, + ); + + const liveUpdated = yield* getEventInvokeConfig(updated.fn.functionName, { + maximumRetryAttempts: 1, + maximumEventAgeInSeconds: 120, + onFailureDestination: created.queue.queueArn, + }); + expect(liveUpdated.MaximumRetryAttempts).toBe(1); + expect(liveUpdated.MaximumEventAgeInSeconds).toBe(120); + expect(liveUpdated.DestinationConfig?.OnFailure?.Destination).toBe( + created.queue.queueArn, + ); + + const reset = yield* stack.deploy(program({})); + + expect(reset.config.functionArn).toBe(created.config.functionArn); + expect(reset.config.maximumRetryAttempts).toBe(2); + expect(reset.config.maximumEventAgeInSeconds).toBe(21_600); + expect(reset.config.destinationConfig?.OnFailure?.Destination).toBe( + created.queue.queueArn, + ); + + const provider = yield* Provider.findProvider( + AWS.Lambda.EventInvokeConfig, + ); + const configs = yield* provider.list(); + expect( + configs.some( + (config) => + config.functionName === reset.fn.functionName && + config.maximumRetryAttempts === 2 && + config.maximumEventAgeInSeconds === 21_600 && + config.destinationConfig?.OnFailure?.Destination === + reset.queue.queueArn, + ), + ).toBe(true); + }).pipe( + Effect.tap(() => stack.destroy()), + Effect.onError(() => stack.destroy().pipe(Effect.ignore)), + ), + { timeout: 360_000 }, +); + +const getEventInvokeConfig = Effect.fn(function* ( + functionName: string, + expected: { + maximumRetryAttempts: number; + maximumEventAgeInSeconds: number; + onFailureDestination: string; + }, +) { + return yield* Lambda.getFunctionEventInvokeConfig({ + FunctionName: functionName, + }).pipe( + Effect.filterOrFail( + (config) => + config.MaximumRetryAttempts === expected.maximumRetryAttempts && + config.MaximumEventAgeInSeconds === expected.maximumEventAgeInSeconds && + config.DestinationConfig?.OnFailure?.Destination === + expected.onFailureDestination, + () => new Error("Event invoke config update has not propagated yet"), + ), + Effect.retry({ + schedule: Schedule.exponential(500).pipe( + Schedule.both(Schedule.recurs(10)), + ), + }), + ); +});