Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions packages/@aws-cdk/aws-bedrock-agentcore-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2520,3 +2520,65 @@ const memory = new agentcore.Memory(this, "test-memory", {
memory.addMemoryStrategy(agentcore.MemoryStrategy.usingBuiltInSummarization());
memory.addMemoryStrategy(agentcore.MemoryStrategy.usingBuiltInSemantic());
```

### Memory with Stream Delivery

You can configure stream delivery resources to enable real-time push-based streaming of memory record lifecycle events (created, updated, deleted) to Amazon Kinesis Data Streams. This allows you to react to memory changes in real-time, build event-driven architectures, or feed memory events into downstream analytics pipelines.

```typescript fixture=default
// Create a Kinesis Data Stream
const stream = new kinesis.Stream(this, 'MemoryEventStream', {
streamName: 'memory-events',
});

// Create a memory with stream delivery (defaults to MEMORY_RECORDS + FULL_CONTENT)
const memory = new agentcore.Memory(this, 'MemoryWithStreamDelivery', {
memoryName: 'memory_with_stream',
description: 'Memory with Kinesis stream delivery',
expirationDuration: cdk.Duration.days(90),
streamDeliveryResources: [{ stream }],
});
```

To customize the content level, specify `contentConfigurations` explicitly:

```typescript fixture=default
const stream = new kinesis.Stream(this, 'MemoryEventStream');

const memory = new agentcore.Memory(this, 'MemoryWithStreamDelivery', {
memoryName: 'memory_with_stream',
streamDeliveryResources: [
{
stream,
contentConfigurations: [
{
type: agentcore.StreamDeliveryContentType.MEMORY_RECORDS,
level: agentcore.StreamDeliveryContentLevel.METADATA_ONLY,
},
],
},
],
});
```

You can also add stream delivery resources after instantiation using the `addStreamDeliveryResource()` method:

```typescript fixture=default
const memory = new agentcore.Memory(this, 'MyMemory', {
memoryName: 'my_memory',
});

const stream = new kinesis.Stream(this, 'EventStream');

memory.addStreamDeliveryResource({
stream: stream,
contentConfigurations: [
{
type: agentcore.StreamDeliveryContentType.MEMORY_RECORDS,
level: agentcore.StreamDeliveryContentLevel.METADATA_ONLY,
},
],
});
```

The memory execution role is automatically granted write permissions (`kinesis:PutRecord`, `kinesis:PutRecords`, `kinesis:ListShards`, `kinesis:DescribeStream`) to each configured Kinesis stream. If the stream uses a customer-managed KMS key, encryption permissions are also granted automatically.
174 changes: 174 additions & 0 deletions packages/@aws-cdk/aws-bedrock-agentcore-alpha/lib/memory/memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ import {
Stats,
} from 'aws-cdk-lib/aws-cloudwatch';
import * as iam from 'aws-cdk-lib/aws-iam';
import type * as kinesis from 'aws-cdk-lib/aws-kinesis';
import * as kms from 'aws-cdk-lib/aws-kms';
import { ValidationError } from 'aws-cdk-lib/core/lib/errors';
import { lit } from 'aws-cdk-lib/core/lib/helpers-internal';
import { addConstructMetadata, MethodMetadata } from 'aws-cdk-lib/core/lib/metadata-resource';
import { propertyInjectable } from 'aws-cdk-lib/core/lib/prop-injectable';
import type { IConstruct, Construct } from 'constructs';
Expand Down Expand Up @@ -72,6 +75,66 @@ const MEMORY_EXPIRATION_DAYS_MIN = 7;
*/
const MEMORY_EXPIRATION_DAYS_MAX = 365;

/******************************************************************************
* Stream Delivery Types
*****************************************************************************/
/**
* Content type for stream delivery.
* Defines what kind of memory content is delivered to the Kinesis stream.
*/
export enum StreamDeliveryContentType {
/** Deliver memory record lifecycle events (created, updated, deleted) */
MEMORY_RECORDS = 'MEMORY_RECORDS',
}

/**
* Content detail level for stream delivery.
* Controls how much detail is included in each delivered record.
*/
export enum StreamDeliveryContentLevel {
/** Deliver only metadata (record ID, timestamps, event type) */
METADATA_ONLY = 'METADATA_ONLY',
/** Deliver full content including the memory record body */
FULL_CONTENT = 'FULL_CONTENT',
}

/**
* Content configuration for a stream delivery resource.
* Defines what content type and detail level to deliver.
*/
export interface StreamDeliveryContentConfiguration {
/**
* The type of content to deliver.
*/
readonly type: StreamDeliveryContentType;
/**
* The level of content detail to deliver.
*
* @default StreamDeliveryContentLevel.FULL_CONTENT
*/
readonly level?: StreamDeliveryContentLevel;
}

/**
* Configuration for a Kinesis stream delivery resource.
* Defines a Kinesis Data Stream target and what content to deliver to it.
*
* @see https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/memory-record-streaming.html
*/
export interface StreamDeliveryResource {
/**
* The Kinesis Data Stream to deliver memory record events to.
*/
readonly stream: kinesis.IStream;
/**
* Content configurations defining what to deliver.
* Currently only one configuration is supported.
*
* @default - [{ type: MEMORY_RECORDS, level: FULL_CONTENT }]
*/
readonly contentConfigurations?: StreamDeliveryContentConfiguration[];
}

/******************************************************************************
* Interface
*****************************************************************************/
Expand Down Expand Up @@ -519,6 +582,18 @@ export interface MemoryProps {
* @default - no tags
*/
readonly tags?: { [key: string]: string };

/**
* Stream delivery resources for real-time push-based streaming of memory
* record lifecycle events (created, updated, deleted) to Amazon Kinesis Data Streams.
*
* The memory execution role will automatically be granted write permissions to each stream.
* Currently only one stream delivery resource is supported.
*
* @default - No stream delivery (events are not pushed to Kinesis)
* @see https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/memory-record-streaming.html
*/
readonly streamDeliveryResources?: StreamDeliveryResource[];
}

/******************************************************************************
Expand Down Expand Up @@ -675,6 +750,11 @@ export class Memory extends MemoryBase {
* @attribute
*/
public readonly memoryStrategies: IMemoryStrategy[] = [];
/**
* The stream delivery resources configured for this memory.
* @attribute
*/
public readonly streamDeliveryResources: StreamDeliveryResource[] = [];
// ------------------------------------------------------
// Internal Only
// ------------------------------------------------------
Expand Down Expand Up @@ -746,6 +826,10 @@ export class Memory extends MemoryBase {
encryptionKeyArn: this.kmsKey?.keyArn,
memoryExecutionRoleArn: this.executionRole?.roleArn,
memoryStrategies: Lazy.any({ produce: () => this._renderMemoryStrategies() }, { omitEmptyArray: true }),
streamDeliveryResources: Lazy.any(
{ produce: () => this._renderStreamDeliveryResources() },
{ omitEmptyArray: true },
),
tags: this.tags,
};

Expand All @@ -763,6 +847,9 @@ export class Memory extends MemoryBase {

// Add memory strategies to the memory
for (const strategy of props?.memoryStrategies ?? []) {this.addMemoryStrategy(strategy);}

// Add stream delivery resources to the memory
for (const resource of props?.streamDeliveryResources ?? []) {this.addStreamDeliveryResource(resource);}
}

// ------------------------------------------------------
Expand All @@ -782,6 +869,46 @@ export class Memory extends MemoryBase {
grant?.applyBefore(this.__resource);
}

/**
* Add a stream delivery resource to the memory.
* Grants Kinesis write permissions to the execution role automatically.
*
* @param resource - The stream delivery resource configuration
* @see https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/memory-record-streaming.html
*/
@MethodMetadata()
public addStreamDeliveryResource(resource: StreamDeliveryResource): void {
// Validate current limit: at most 1 stream delivery resource
if (this.streamDeliveryResources.length >= 1) {
throw new ValidationError(
lit`TooManyStreamDeliveryResources`,
'Memory currently supports at most one stream delivery resource',
this,
);
}

// Validate content configurations
throwIfInvalid(this._validateStreamDeliveryResource, resource);

// Add to internal array
this.streamDeliveryResources.push(resource);

// Grant Kinesis write permissions to the execution role
// stream.grantWrite() grants: kinesis:ListShards, kinesis:PutRecord, kinesis:PutRecords
// It also handles KMS encryption key permissions automatically if the stream is encrypted
const grant = resource.stream.grantWrite(this.executionRole as iam.IRole);

// AgentCore also requires kinesis:DescribeStream which is not included in grantWrite()
const describeGrant = iam.Grant.addToPrincipal({
grantee: this.executionRole as iam.IRole,
actions: ['kinesis:DescribeStream'],
resourceArns: [resource.stream.streamArn],
});

grant.applyBefore(this.__resource);
describeGrant.applyBefore(this.__resource);
}

/**
* Creates execution role needed for the memory to access AWS services
* @returns The created role
Expand Down Expand Up @@ -880,6 +1007,25 @@ export class Memory extends MemoryBase {
return errors;
};

/**
* Validates a stream delivery resource configuration.
* Currently CloudFormation limits contentConfigurations to exactly 1 entry
* and streamDeliveryResources to at most 1 entry.
*/
private _validateStreamDeliveryResource = (resource: StreamDeliveryResource): string[] => {
const errors: string[] = [];

if (resource.contentConfigurations && resource.contentConfigurations.length === 0) {
errors.push('Stream delivery resource contentConfigurations must not be an empty array. Omit the property to use defaults, or provide at least one configuration');
}

if (resource.contentConfigurations && resource.contentConfigurations.length > 1) {
errors.push('Stream delivery resource currently supports at most one content configuration');
}

return errors;
};

// ------------------------------------------------------
// RENDERERS
// ------------------------------------------------------
Expand All @@ -897,4 +1043,32 @@ export class Memory extends MemoryBase {

return this.memoryStrategies.map(ms => ms.render());
}

/**
* Render the stream delivery resources into CloudFormation format.
*
* @returns StreamDeliveryResourcesProperty or undefined if no resources configured
* @internal This is an internal core function and should not be called directly.
*/
private _renderStreamDeliveryResources(): CfnMemory.StreamDeliveryResourcesProperty | undefined {
if (!this.streamDeliveryResources || this.streamDeliveryResources.length === 0) {
return undefined;
}

const defaultContentConfigurations: StreamDeliveryContentConfiguration[] = [
{ type: StreamDeliveryContentType.MEMORY_RECORDS, level: StreamDeliveryContentLevel.FULL_CONTENT },
];

return {
resources: this.streamDeliveryResources.map(resource => ({
kinesis: {
dataStreamArn: resource.stream.streamArn,
contentConfigurations: (resource.contentConfigurations ?? defaultContentConfigurations).map(config => ({
type: config.type,
level: config.level,
})),
},
})),
};
}
}
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-bedrock-agentcore-alpha/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@
"prefer-ref-interface:@aws-cdk/aws-bedrock-agentcore-alpha.ITargetConfiguration.bind.gateway",
"prefer-ref-interface:@aws-cdk/aws-bedrock-agentcore-alpha.MemoryProps.executionRole",
"prefer-ref-interface:@aws-cdk/aws-bedrock-agentcore-alpha.MemoryProps.kmsKey",
"prefer-ref-interface:@aws-cdk/aws-bedrock-agentcore-alpha.StreamDeliveryResource.stream",
"prefer-ref-interface:@aws-cdk/aws-bedrock-agentcore-alpha.AgentRuntimeAttributes.securityGroups",
"prefer-ref-interface:@aws-cdk/aws-bedrock-agentcore-alpha.RuntimeProps.executionRole",
"interface-extends-ref:@aws-cdk/aws-bedrock-agentcore-alpha.IBedrockAgentRuntime",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import * as iam from 'aws-cdk-lib/aws-iam';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as sns from 'aws-cdk-lib/aws-sns';
import * as kms from 'aws-cdk-lib/aws-kms';
import * as kinesis from 'aws-cdk-lib/aws-kinesis';
import * as ecr from 'aws-cdk-lib/aws-ecr';
import * as ec2 from 'aws-cdk-lib/aws-ec2';
import { Platform } from 'aws-cdk-lib/aws-ecr-assets';
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading