diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml
index 84f050803a..7c728b9a47 100644
--- a/.github/workflows/ci.yaml
+++ b/.github/workflows/ci.yaml
@@ -112,6 +112,7 @@ jobs:
- materialize-databricks
- materialize-dynamodb
- materialize-elasticsearch
+ - materialize-eventbridge
- materialize-gcs-csv
- materialize-gcs-parquet
- materialize-google-pubsub
diff --git a/go.mod b/go.mod
index e8d1733de4..fd01e95f73 100644
--- a/go.mod
+++ b/go.mod
@@ -27,10 +27,15 @@ require (
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.44.0
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.26.0
github.com/aws/aws-sdk-go-v2/service/emrserverless v1.28.1
+ github.com/aws/aws-sdk-go-v2/service/eventbridge v1.45.25
github.com/aws/aws-sdk-go-v2/service/glue v1.137.0
+ github.com/aws/aws-sdk-go-v2/service/iam v1.53.10
github.com/aws/aws-sdk-go-v2/service/kinesis v1.33.1
github.com/aws/aws-sdk-go-v2/service/s3 v1.96.0
+ github.com/aws/aws-sdk-go-v2/service/sns v1.39.17
+ github.com/aws/aws-sdk-go-v2/service/sqs v1.42.27
github.com/aws/aws-sdk-go-v2/service/ssm v1.60.1
+ github.com/aws/aws-sdk-go-v2/service/sts v1.41.6
github.com/aws/smithy-go v1.25.1
github.com/bradleyjkemp/cupaloy v2.3.0+incompatible
github.com/cespare/xxhash/v2 v2.3.0
@@ -137,18 +142,15 @@ require (
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.23 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.23 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect
- github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.17 // indirect
+ github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.24 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.8 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.17 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.17 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.17 // indirect
github.com/aws/aws-sdk-go-v2/service/signin v1.0.5 // indirect
- github.com/aws/aws-sdk-go-v2/service/sns v1.39.17 // indirect
- github.com/aws/aws-sdk-go-v2/service/sqs v1.42.27 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.30.10 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.14 // indirect
- github.com/aws/aws-sdk-go-v2/service/sts v1.41.6 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bitfield/gotestdox v0.2.2 // indirect
diff --git a/go.sum b/go.sum
index e9a772dcfc..46f951f688 100644
--- a/go.sum
+++ b/go.sum
@@ -188,8 +188,6 @@ github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP
github.com/apparentlymart/go-textseg/v15 v15.0.0 h1:uYvfpb3DyLSCGWnctWKGj857c6ew1u1fNQOlOtuGxQY=
github.com/apparentlymart/go-textseg/v15 v15.0.0/go.mod h1:K8XmNZdhEBkdlyDdvbmmsvpAG721bKi0joRfFdHIWJ4=
github.com/atomicgo/cursor v0.0.1/go.mod h1:cBON2QmmrysudxNBFthvMtN32r3jxVRIvzkUiF/RuIk=
-github.com/aws/aws-sdk-go-v2 v1.41.1 h1:ABlyEARCDLN034NhxlRUSZr4l71mh+T5KAeGh6cerhU=
-github.com/aws/aws-sdk-go-v2 v1.41.1/go.mod h1:MayyLB8y+buD9hZqkCW3kX1AKq07Y5pXxtgB+rRFhz0=
github.com/aws/aws-sdk-go-v2 v1.41.7 h1:DWpAJt66FmnnaRIOT/8ASTucrvuDPZASqhhLey6tLY8=
github.com/aws/aws-sdk-go-v2 v1.41.7/go.mod h1:4LAfZOPHNVNQEckOACQx60Y8pSRjIkNZQz1w92xpMJc=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.4 h1:489krEF9xIGkOaaX3CE/Be2uWjiXrkCH6gUX+bZA/BU=
@@ -206,26 +204,26 @@ github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.5.13 h1:bJoSh9iQrFpt/u1A0fiSEwh
github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.5.13/go.mod h1:RxLhhGmjEidlLTRZyk1BLMigHONURhQakw2//prq+DA=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.3 h1:4GNV1lhyELGjMz5ILMRxDvxvOaeo3Ux9Z69S1EgVMMQ=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.3/go.mod h1:br7KA6edAAqDGUYJ+zVVPAyMrPhnN+zdt17yTUT6FPw=
-github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.17 h1:xOLELNKGp2vsiteLsvLPwxC+mYmO6OZ8PYgiuPJzF8U=
-github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.17/go.mod h1:5M5CI3D12dNOtH3/mk6minaRwI2/37ifCURZISxA/IQ=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.23 h1:GpT/TrnBYuE5gan2cZbTtvP+JlHsutdmlV2YfEyNde0=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.23/go.mod h1:xYWD6BS9ywC5bS3sz9Xh04whO/hzK2plt2Zkyrp4JuA=
-github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.17 h1:WWLqlh79iO48yLkj1v3ISRNiv+3KdQoZ6JWyfcsyQik=
-github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.17/go.mod h1:EhG22vHRrvF8oXSTYStZhJc1aUgKtnJe+aOiFEV90cM=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.23 h1:bpd8vxhlQi2r1hiueOw02f/duEPTMK59Q4QMAoTTtTo=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.23/go.mod h1:15DfR2nw+CRHIk0tqNyifu3G1YdAOy68RftkhMDDwYk=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4/go.mod h1:ZWy7j6v1vWGmPReu0iSGvRiise4YI5SkR3OHKTZ6Wuc=
-github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.17 h1:JqcdRG//czea7Ppjb+g/n4o8i/R50aTBHkA7vu0lK+k=
-github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.17/go.mod h1:CO+WeGmIdj/MlPel2KwID9Gt7CNq4M65HUfBW97liM0=
+github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.24 h1:OQqn11BtaYv1WLUowvcA30MpzIu8Ti4pcLPIIyoKZrA=
+github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.24/go.mod h1:X5ZJyfwVrWA96GzPmUCWFQaEARPR7gCrpq2E92PJwAE=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.44.0 h1:A99gjqZDbdhjtjJVZrmVzVKO2+p3MSg35bDWtbMQVxw=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.44.0/go.mod h1:mWB0GE1bqcVSvpW7OtFA0sKuHk52+IqtnsYU2jUfYAs=
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.26.0 h1:0wOCTKrmwkyC8Bk76hYH/B4IJn5MGt6gMkSXc0A2uyc=
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.26.0/go.mod h1:He/RikglWUczbkV+fkdpcV/3GdL/rTRNVy7VaUiezMo=
github.com/aws/aws-sdk-go-v2/service/emrserverless v1.28.1 h1:m9P0E91x7dWJTH0EzvDhHSariQD4dIhpPbY2VjYvEIg=
github.com/aws/aws-sdk-go-v2/service/emrserverless v1.28.1/go.mod h1:8cCnS5JHTXwdz5BulKy02qwZl613YhSZsxQXXoG71Ns=
+github.com/aws/aws-sdk-go-v2/service/eventbridge v1.45.25 h1:9PZbyFSCN/E0TnqXqnvYJRhu7yQJv31vHG/vyuirbCY=
+github.com/aws/aws-sdk-go-v2/service/eventbridge v1.45.25/go.mod h1:ZJ1LBykgykfLqmsP2pBUesSd24sL6SebSEeXzzJ2hhE=
github.com/aws/aws-sdk-go-v2/service/glue v1.137.0 h1:gPQ3FlHOFQELCiNSc76CS9B1G7wBT73PktKG64Q3tRc=
github.com/aws/aws-sdk-go-v2/service/glue v1.137.0/go.mod h1:B6g7dsUUg4QUcH6zou32L1LDXjgtk/YjVFcu09jXv10=
+github.com/aws/aws-sdk-go-v2/service/iam v1.53.10 h1:kcN3I3llO7VwIY5w3Pc5FmEonpsr23Ou7Cwk4qf7dik=
+github.com/aws/aws-sdk-go-v2/service/iam v1.53.10/go.mod h1:1vkJzjCYC3byO0kIrBqLPzvZpuvYhPXkuyARs6E7tM4=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4 h1:0ryTNEdJbzUCEWkVXEXoqlXV72J5keC1GvILMOuD00E=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4/go.mod h1:HQ4qwNZh32C3CBeO6iJLQlgtMzqeG17ziAA/3KDJFow=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.8 h1:Z5EiPIzXKewUQK0QTMkutjiaPVeVYXX7KIqhXu/0fXs=
@@ -254,8 +252,6 @@ github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.14 h1:0jbJeuEHlwKJ9PfXtpSFc4M
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.14/go.mod h1:sTGThjphYE4Ohw8vJiRStAcu3rbjtXRsdNB0TvZ5wwo=
github.com/aws/aws-sdk-go-v2/service/sts v1.41.6 h1:5fFjR/ToSOzB2OQ/XqWpZBmNvmP/pJ1jOWYlFDJTjRQ=
github.com/aws/aws-sdk-go-v2/service/sts v1.41.6/go.mod h1:qgFDZQSD/Kys7nJnVqYlWKnh0SSdMjAi0uSwON4wgYQ=
-github.com/aws/smithy-go v1.24.1 h1:VbyeNfmYkWoxMVpGUAbQumkODcYmfMRfZ8yQiH30SK0=
-github.com/aws/smithy-go v1.24.1/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0=
github.com/aws/smithy-go v1.25.1 h1:J8ERsGSU7d+aCmdQur5Txg6bVoYelvQJgtZehD12GkI=
github.com/aws/smithy-go v1.25.1/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc=
github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk=
diff --git a/materialize-eventbridge/.snapshots/TestIntegration-apply b/materialize-eventbridge/.snapshots/TestIntegration-apply
new file mode 100644
index 0000000000..df2ea067f7
--- /dev/null
+++ b/materialize-eventbridge/.snapshots/TestIntegration-apply
@@ -0,0 +1,174 @@
+Task: acmeCo/tests/materialize-eventbridge/local
+
+Big Schema Initial Constraints:
+{"Field":"_meta/flow_truncated","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"arrayField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"boolField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"flow_document","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"The root document must be materialized"}
+{"Field":"flow_published_at","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"intField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"key","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"EventBridge ignores keys; including them in the payload is optional"}
+{"Field":"multipleField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"nullField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"numField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"objField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringDateField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringDateTimeField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringDurationField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringEmailField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringHostnameField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIdnEmailField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIdnHostnameField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIntegerField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIpv4Field","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIpv6Field","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIriField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIriReferenceField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringJsonPointerField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringMacAddr8Field","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringMacAddrField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringNumberField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringRegexField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringRelativeJsonPointerField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringTimeField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringUint32Field","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringUint64Field","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringUriField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringUriReferenceField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringUriTemplateField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringUuidField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+
+Big Schema Re-validated Constraints:
+{"Field":"_meta/flow_truncated","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"arrayField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"boolField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"flow_document","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"The root document must be materialized"}
+{"Field":"flow_published_at","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"intField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"key","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
+{"Field":"multipleField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"nullField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"numField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"objField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringDateField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringDateTimeField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringDurationField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringEmailField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringHostnameField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIdnEmailField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIdnHostnameField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIntegerField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIpv4Field","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIpv6Field","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIriField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIriReferenceField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringJsonPointerField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringMacAddr8Field","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringMacAddrField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringNumberField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringRegexField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringRelativeJsonPointerField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringTimeField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringUint32Field","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringUint64Field","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringUriField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringUriReferenceField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringUriTemplateField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringUuidField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+
+Big Schema Changed Types Constraints:
+{"Field":"_meta/flow_truncated","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"arrayField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"boolField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"flow_document","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"The root document must be materialized"}
+{"Field":"flow_published_at","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"intField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"key","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
+{"Field":"multipleField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"nullField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"numField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"objField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringDateField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringDateTimeField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringDurationField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringEmailField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringHostnameField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIdnEmailField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIdnHostnameField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIntegerField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIpv4Field","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIpv6Field","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIriField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIriReferenceField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringJsonPointerField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringMacAddr8Field","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringMacAddrField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringNumberField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringRegexField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringRelativeJsonPointerField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringTimeField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringUint32Field","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringUint64Field","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringUriField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringUriReferenceField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringUriTemplateField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringUuidField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+
+Big Schema Materialized Resource Schema With All Fields Required:
+
+Big Schema Materialized Resource Schema With No Fields Required:
+
+Big Schema Changed Types With Backfill Constraints:
+{"Field":"_meta/flow_truncated","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"arrayField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"boolField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"flow_document","Type":2,"TypeString":"LOCATION_REQUIRED","Reason":"The root document must be materialized"}
+{"Field":"flow_published_at","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"intField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"key","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"EventBridge ignores keys; including them in the payload is optional"}
+{"Field":"multipleField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"nullField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"numField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"objField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringDateField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringDateTimeField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringDurationField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringEmailField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringHostnameField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIdnEmailField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIdnHostnameField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIntegerField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIpv4Field","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIpv6Field","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIriField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringIriReferenceField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringJsonPointerField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringMacAddr8Field","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringMacAddrField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringNumberField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringRegexField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringRelativeJsonPointerField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringTimeField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringUint32Field","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringUint64Field","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringUriField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringUriReferenceField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringUriTemplateField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+{"Field":"stringUuidField","Type":5,"TypeString":"FIELD_FORBIDDEN","Reason":"EventBridge only materializes the full document"}
+
+Big Schema Materialized Resource Schema Changed Types With Backfill:
+
+add a single field:
+
+remove a single optional field:
+
+remove a single required field:
+
+add and remove many fields:
+
+Challenging Field Names Materialized Columns:
+
diff --git a/materialize-eventbridge/.snapshots/TestIntegration-materialize b/materialize-eventbridge/.snapshots/TestIntegration-materialize
new file mode 100644
index 0000000000..2a5a8804ca
--- /dev/null
+++ b/materialize-eventbridge/.snapshots/TestIntegration-materialize
@@ -0,0 +1,47 @@
+Task: acmeCo/tests/materialize-eventbridge/local
+
+Resource: flow-test-bus.flow.test.simple_standard
+["applied.actionDescription", ""]
+["connectorState",{}]
+["connectorState",{}]
+["connectorState",{}]
+["connectorState",{}]
+["connectorState",{"updated":{}}]
+
+
+Table Data:
+{"detail":"{\"_meta\":{\"op\":\"c\",\"uuid\":\"38ce7800-1deb-11b2-8000-071353030311\"},\"canary\":\"pieced\",\"id\":7,\"val\":7}"}
+{"detail":"{\"_meta\":{\"op\":\"c\",\"uuid\":\"39670e80-1deb-11b2-8000-071353030311\"},\"canary\":\"roaches\",\"id\":8,\"val\":8}"}
+{"detail":"{\"_meta\":{\"op\":\"c\",\"uuid\":\"39ffa500-1deb-11b2-8000-071353030311\"},\"canary\":\"devilish\",\"id\":9,\"val\":9}"}
+{"detail":"{\"_meta\":{\"op\":\"c\",\"uuid\":\"3a983b80-1deb-11b2-8000-071353030311\"},\"canary\":\"glucose's\",\"id\":10,\"val\":10}"}
+{"detail":"{\"_meta\":{\"op\":\"c\",\"uuid\":\"3bc96880-1deb-11b2-8000-071353030311\"},\"canary\":\"penguin\",\"id\":12,\"val\":12}"}
+{"detail":"{\"_meta\":{\"op\":\"c\",\"uuid\":\"7545a800-1dda-11b2-8000-071353030311\"},\"canary\":\"amputation's\",\"id\":1,\"val\":1}"}
+{"detail":"{\"_meta\":{\"op\":\"c\",\"uuid\":\"75de3e80-1dda-11b2-8000-071353030311\"},\"canary\":\"armament's\",\"id\":2,\"val\":2}"}
+{"detail":"{\"_meta\":{\"op\":\"c\",\"uuid\":\"7676d500-1dda-11b2-8000-071353030311\"},\"canary\":\"splatters\",\"id\":3,\"val\":3}"}
+{"detail":"{\"_meta\":{\"op\":\"c\",\"uuid\":\"770f6b80-1dda-11b2-8000-071353030311\"},\"canary\":\"strengthen\",\"id\":4,\"val\":4}"}
+{"detail":"{\"_meta\":{\"op\":\"c\",\"uuid\":\"77a80200-1dda-11b2-8000-071353030311\"},\"canary\":\"Kringle's\",\"id\":5,\"val\":5}"}
+{"detail":"{\"_meta\":{\"op\":\"c\",\"uuid\":\"78409880-1dda-11b2-8000-071353030311\"},\"canary\":\"grosbeak's\",\"id\":6,\"val\":6}"}
+{"detail":"{\"_meta\":{\"op\":\"d\",\"uuid\":\"3b30d200-1deb-11b2-8000-071353030311\"},\"canary\":\"asteroid\",\"id\":11,\"val\":11}"}
+{"detail":"{\"_meta\":{\"op\":\"d\",\"uuid\":\"3c61ff00-1deb-11b2-8000-071353030311\"},\"canary\":\"penguin\",\"id\":12,\"val\":12}"}
+{"detail":"{\"_meta\":{\"op\":\"d\",\"uuid\":\"3cfa9580-1deb-11b2-8000-071353030311\"},\"canary\":\"amputation's\",\"id\":1,\"val\":1}"}
+{"detail":"{\"_meta\":{\"op\":\"d\",\"uuid\":\"3d932c00-1deb-11b2-8000-071353030311\"},\"canary\":\"armament's\",\"id\":2,\"val\":2}"}
+{"detail":"{\"_meta\":{\"op\":\"u\",\"uuid\":\"3e2bc280-1deb-11b2-8000-071353030311\"},\"canary\":\"splatters\",\"id\":3,\"val\":3}"}
+
+Resource: flow-test-bus.flow.test.data_types
+["applied.actionDescription", ""]
+["connectorState",{}]
+["connectorState",{}]
+["connectorState",{}]
+["connectorState",{}]
+["connectorState",{"updated":{}}]
+
+
+Table Data:
+{"detail":"{\"_meta\":{\"uuid\":\"3f5cef80-1deb-11b2-8000-071353030311\"},\"arrayField\":[\"updated\",\"array\",\"values\"],\"boolField\":false,\"id\":4,\"intField\":2147483647,\"multipleField\":999,\"nullField\":null,\"numField\":12345678900.0,\"objField\":{\"timestamp\":\"2024-01-01\",\"updated\":true},\"stringAndIntegerField\":999,\"stringAndNumberField\":999.999,\"stringDateField\":\"2024-12-25\",\"stringDateTimeField\":\"2024-12-25T00:00:00Z\",\"stringDurationField\":\"P1D\",\"stringEmailField\":\"updated@test.org\",\"stringField\":\"updated value\",\"stringHostnameField\":\"api.example.org\",\"stringIntegerField\":\"999\",\"stringIpv4Field\":\"8.8.8.8\",\"stringIpv6Field\":\"2001:4860:4860::8888\",\"stringIriField\":\"https://\\u30c6\\u30b9\\u30c8.\\u4f8b\",\"stringIriReferenceField\":\"/\\u30c6\\u30b9\\u30c8/\\u4f8b\",\"stringJsonPointerField\":\"/data/items/0/name\",\"stringMacAddr8Field\":\"aa:bb:cc:dd:ee:ff:00:11\",\"stringMacAddrField\":\"aa:bb:cc:dd:ee:ff\",\"stringNumberField\":\"999.999\",\"stringRegexField\":\"^\\\\d{4}-\\\\d{2}-\\\\d{2}$\",\"stringRelativeJsonPointerField\":\"3/updated\",\"stringTimeField\":\"12:00:00z\",\"stringUint32Field\":\"1000000000\",\"stringUint64Field\":\"1000000000000000000\",\"stringUriField\":\"https://api.example.org/v1/resource\",\"stringUriReferenceField\":\"../parent/resource\",\"stringUriTemplateField\":\"/api/v{version}/users/{id}\",\"stringUuidField\":\"123e4567-e89b-12d3-a456-426614174000\"}"}
+{"detail":"{\"_meta\":{\"uuid\":\"3ff58600-1deb-11b2-8000-071353030311\"},\"arrayField\":[1,\"two\",3.0,true,null],\"boolField\":true,\"id\":1,\"intField\":42.0,\"multipleField\":\"string value\",\"nullField\":null,\"numField\":3.14159,\"objField\":{\"count\":123,\"nested\":\"value\"},\"stringAndIntegerField\":\"1.0\",\"stringAndNumberField\":\"123.456\",\"stringDateField\":\"2023-01-15\",\"stringDateTimeField\":\"2023-01-15T10:30:45z\",\"stringDurationField\":\"P1Y2M3DT4H5M6S\",\"stringEmailField\":\"test@example.com\",\"stringField\":\"hello world\",\"stringHostnameField\":\"example.com\",\"stringIntegerField\":\"1.0\",\"stringIpv4Field\":\"192.168.1.1\",\"stringIpv6Field\":\"2001:db8::1\",\"stringIriField\":\"https://\\u4f8b\\u3048.\\u30c6\\u30b9\\u30c8\",\"stringIriReferenceField\":\"/\\u4f8b\\u3048/\\u30c6\\u30b9\\u30c8\",\"stringJsonPointerField\":\"/path/to/field\",\"stringMacAddr8Field\":\"00:14:22:ff:fe:01:23:45\",\"stringMacAddrField\":\"00:14:22:01:23:45\",\"stringNumberField\":\"123.456\",\"stringRegexField\":\"^[a-zA-Z0-9]+$\",\"stringRelativeJsonPointerField\":\"1/name\",\"stringTimeField\":\"14:30:00+02:30\",\"stringUint32Field\":\"4294967295\",\"stringUint64Field\":\"18446744073709551615\",\"stringUriField\":\"https://example.com/path\",\"stringUriReferenceField\":\"/relative/path\",\"stringUriTemplateField\":\"https://api.example.com/{id}\",\"stringUuidField\":\"550e8400-e29b-41d4-a716-446655440000\"}"}
+{"detail":"{\"_meta\":{\"uuid\":\"408e1c80-1deb-11b2-8000-071353030311\"},\"arrayField\":[],\"boolField\":false,\"id\":2,\"intField\":-999,\"multipleField\":42,\"nullField\":null,\"numField\":-2.718281828,\"objField\":{},\"stringAndIntegerField\":-123,\"stringAndNumberField\":-456.789,\"stringDateField\":\"1999-12-31\",\"stringDateTimeField\":\"1999-12-31T23:59:59.123456789Z\",\"stringDurationField\":\"PT30M\",\"stringEmailField\":\"admin@localhost\",\"stringField\":\"\",\"stringHostnameField\":\"localhost\",\"stringIntegerField\":\"-123\",\"stringIpv4Field\":\"127.0.0.1\",\"stringIpv6Field\":\"::1\",\"stringIriField\":\"http://localhost\",\"stringIriReferenceField\":\"/\",\"stringJsonPointerField\":\"/0\",\"stringMacAddr8Field\":\"ff:ff:ff:ff:ff:ff:ff:ff\",\"stringMacAddrField\":\"ff:ff:ff:ff:ff:ff\",\"stringNumberField\":\"-456.789\",\"stringRegexField\":\".*\",\"stringRelativeJsonPointerField\":\"0\",\"stringTimeField\":\"03:55:23.123456789Z\",\"stringUint32Field\":\"0\",\"stringUint64Field\":\"0\",\"stringUriField\":\"http://localhost:8080\",\"stringUriReferenceField\":\"#fragment\",\"stringUriTemplateField\":\"http://example.com/~{username}/\",\"stringUuidField\":\"00000000-0000-0000-0000-000000000000\"}"}
+{"detail":"{\"_meta\":{\"uuid\":\"7aa2f280-1dda-11b2-8000-071353030311\"},\"arrayField\":[1,\"two\",3.0,true,null],\"boolField\":true,\"id\":1,\"intField\":42,\"multipleField\":\"string value\",\"nullField\":null,\"numField\":3.14159,\"objField\":{\"count\":123,\"nested\":\"value\"},\"stringAndIntegerField\":\"789\",\"stringAndNumberField\":\"123.456\",\"stringDateField\":\"2023-01-15\",\"stringDateTimeField\":\"2023-01-15T10:30:45Z\",\"stringDurationField\":\"P1Y2M3DT4H5M6S\",\"stringEmailField\":\"test@example.com\",\"stringField\":\"hello world\",\"stringHostnameField\":\"example.com\",\"stringIntegerField\":\"789\",\"stringIpv4Field\":\"192.168.1.1\",\"stringIpv6Field\":\"2001:db8::1\",\"stringIriField\":\"https://\\u4f8b\\u3048.\\u30c6\\u30b9\\u30c8\",\"stringIriReferenceField\":\"/\\u4f8b\\u3048/\\u30c6\\u30b9\\u30c8\",\"stringJsonPointerField\":\"/path/to/field\",\"stringMacAddr8Field\":\"00:14:22:ff:fe:01:23:45\",\"stringMacAddrField\":\"00:14:22:01:23:45\",\"stringNumberField\":\"123.456\",\"stringRegexField\":\"^[a-zA-Z0-9]+$\",\"stringRelativeJsonPointerField\":\"1/name\",\"stringTimeField\":\"14:30:00z\",\"stringUint32Field\":\"4294967295\",\"stringUint64Field\":\"18446744073709551615\",\"stringUriField\":\"https://example.com/path\",\"stringUriReferenceField\":\"/relative/path\",\"stringUriTemplateField\":\"https://api.example.com/{id}\",\"stringUuidField\":\"550e8400-e29b-41d4-a716-446655440000\"}"}
+{"detail":"{\"_meta\":{\"uuid\":\"7b3b8900-1dda-11b2-8000-071353030311\"},\"arrayField\":[],\"boolField\":false,\"id\":2,\"intField\":-999,\"multipleField\":42,\"nullField\":null,\"numField\":7,\"objField\":{},\"stringAndIntegerField\":1.0,\"stringAndNumberField\":-456.789,\"stringDateField\":\"1999-12-31\",\"stringDateTimeField\":\"1999-12-31T23:59:59.999Z\",\"stringDurationField\":\"PT30M\",\"stringEmailField\":\"admin@localhost\",\"stringField\":\"\",\"stringHostnameField\":\"localhost\",\"stringIntegerField\":\"1.0\",\"stringIpv4Field\":\"127.0.0.1\",\"stringIpv6Field\":\"::1\",\"stringIriField\":\"http://localhost\",\"stringIriReferenceField\":\"/\",\"stringJsonPointerField\":\"/0\",\"stringMacAddr8Field\":\"ff:ff:ff:ff:ff:ff:ff:ff\",\"stringMacAddrField\":\"ff:ff:ff:ff:ff:ff\",\"stringNumberField\":\"-456.789\",\"stringRegexField\":\".*\",\"stringRelativeJsonPointerField\":\"0\",\"stringTimeField\":\"00:00:00+00:00\",\"stringUint32Field\":\"0\",\"stringUint64Field\":\"0\",\"stringUriField\":\"http://localhost:8080\",\"stringUriReferenceField\":\"#fragment\",\"stringUriTemplateField\":\"http://example.com/~{username}/\",\"stringUuidField\":\"00000000-0000-0000-0000-000000000000\"}"}
+{"detail":"{\"_meta\":{\"uuid\":\"7bd41f80-1dda-11b2-8000-071353030311\"},\"arrayField\":[{\"obj\":\"in array\"},[1,[2,[3]]],\"mixed types\"],\"boolField\":true,\"id\":3,\"intField\":0,\"multipleField\":{\"object\":\"value\"},\"nullField\":null,\"numField\":0.0,\"objField\":{\"array\":[1,2,3],\"nested\":{\"deep\":\"value\"}},\"stringAndIntegerField\":\"2147483647\",\"stringAndNumberField\":\"0\",\"stringDateField\":\"2024-02-29\",\"stringDateTimeField\":\"2024-02-29T12:00:00.000000000z\",\"stringDurationField\":\"P0D\",\"stringEmailField\":\"user+tag@sub.domain.com\",\"stringField\":\"special chars: !@#$%^\u0026*()[]{}|\\\\:;\\\"'\u003c\u003e?/.,\",\"stringHostnameField\":\"sub.domain.example.org\",\"stringIntegerField\":\"2147483647\",\"stringIpv4Field\":\"10.0.0.1\",\"stringIpv6Field\":\"2001:0db8:0000:0000:0000:ff00:0042:8329\",\"stringIriField\":\"https://\\u043c\\u043e\\u0441\\u043a\\u0432\\u0430.\\u0440\\u0444/path\",\"stringIriReferenceField\":\"/\\u043c\\u043e\\u0441\\u043a\\u0432\\u0430/\\u043f\\u0443\\u0442\\u044c\",\"stringJsonPointerField\":\"/nested/array/0/field\",\"stringMacAddr8Field\":\"01:23:45:67:89:ab:cd:ef\",\"stringMacAddrField\":\"01:23:45:67:89:ab\",\"stringNumberField\":\"0\",\"stringRegexField\":\"[0-9]{3}-[0-9]{2}-[0-9]{4}\",\"stringRelativeJsonPointerField\":\"2/nested/field\",\"stringTimeField\":\"23:59:59.999-01:15\",\"stringUint32Field\":\"2147483648\",\"stringUint64Field\":\"9223372036854775808\",\"stringUriField\":\"ftp://user:pass@host.com:21/path?query=value\",\"stringUriReferenceField\":\"?query=value\u0026other=param\",\"stringUriTemplateField\":\"https://{host}{/path*}{?query*}\",\"stringUuidField\":\"f47ac10b-58cc-4372-a567-0e02b2c3d479\"}"}
+
+
diff --git a/materialize-eventbridge/.snapshots/TestSpec b/materialize-eventbridge/.snapshots/TestSpec
new file mode 100644
index 0000000000..1a709a7e76
--- /dev/null
+++ b/materialize-eventbridge/.snapshots/TestSpec
@@ -0,0 +1,147 @@
+{
+ "config_schema_json": {
+ "$schema": "https://json-schema.org/draft/2020-12/schema",
+ "$id": "https://github.com/estuary/connectors/materialize-eventbridge/config",
+ "properties": {
+ "region": {
+ "type": "string",
+ "title": "AWS Region",
+ "description": "Region of the EventBridge event bus.",
+ "order": 1
+ },
+ "event_bus_name": {
+ "type": "string",
+ "title": "Event Bus Name",
+ "description": "Name or ARN of the EventBridge event bus to publish to; verified via DescribeEventBus on Apply. Use \"default\" for the account's default bus.",
+ "default": "default",
+ "order": 2
+ },
+ "credentials": {
+ "oneOf": [
+ {
+ "$schema": "https://json-schema.org/draft/2020-12/schema",
+ "$id": "https://github.com/estuary/connectors/materialize-eventbridge/access-key-credentials",
+ "properties": {
+ "auth_type": {
+ "type": "string",
+ "const": "AWSAccessKey",
+ "default": "AWSAccessKey",
+ "order": 0
+ },
+ "aws_access_key_id": {
+ "type": "string",
+ "title": "AWS Access Key ID",
+ "description": "AWS Access Key ID for publishing to EventBridge.",
+ "order": 1
+ },
+ "aws_secret_access_key": {
+ "type": "string",
+ "title": "AWS Secret Access Key",
+ "description": "AWS Secret Access Key for publishing to EventBridge.",
+ "order": 2,
+ "secret": true
+ }
+ },
+ "type": "object",
+ "required": [
+ "aws_access_key_id",
+ "aws_secret_access_key"
+ ],
+ "title": "Access Key"
+ },
+ {
+ "$schema": "https://json-schema.org/draft/2020-12/schema",
+ "$id": "https://github.com/estuary/connectors/go/auth/iam/aws-config",
+ "properties": {
+ "auth_type": {
+ "type": "string",
+ "const": "AWSIAM",
+ "default": "AWSIAM",
+ "order": 0
+ },
+ "aws_region": {
+ "type": "string",
+ "title": "AWS Region",
+ "description": "AWS Region of your resource"
+ },
+ "aws_role_arn": {
+ "type": "string",
+ "title": "AWS Role ARN",
+ "description": "AWS Role which has access to the resource which will be assumed by Flow"
+ }
+ },
+ "type": "object",
+ "required": [
+ "aws_region",
+ "aws_role_arn"
+ ],
+ "title": "AWS IAM"
+ }
+ ],
+ "type": "object",
+ "title": "Authentication",
+ "default": {
+ "auth_type": "AWSAccessKey"
+ },
+ "discriminator": {
+ "propertyName": "auth_type"
+ },
+ "order": 3,
+ "x-iam-auth": true
+ },
+ "advanced": {
+ "properties": {
+ "endpoint": {
+ "type": "string",
+ "title": "AWS Endpoint",
+ "description": "Override the AWS endpoint URL. Used to direct requests at a compatible API such as LocalStack."
+ },
+ "feature_flags": {
+ "type": "string",
+ "title": "Feature Flags",
+ "description": "This property is intended for Estuary internal use. You should only modify this field as directed by Estuary support."
+ }
+ },
+ "additionalProperties": false,
+ "type": "object",
+ "title": "Advanced Options",
+ "description": "Options for advanced users. You should not typically need to modify these.",
+ "advanced": true
+ }
+ },
+ "type": "object",
+ "required": [
+ "region",
+ "event_bus_name",
+ "credentials"
+ ],
+ "title": "Materialize EventBridge Spec"
+ },
+ "resource_config_schema_json": {
+ "$schema": "https://json-schema.org/draft/2020-12/schema",
+ "$id": "https://github.com/estuary/connectors/materialize-eventbridge/resource",
+ "properties": {
+ "source": {
+ "type": "string",
+ "title": "Event Source",
+ "description": "Source field set on every event published from this binding (e.g. \"my.app\").",
+ "order": 1,
+ "x-collection-name": true
+ },
+ "detail_type": {
+ "type": "string",
+ "title": "Detail Type",
+ "description": "DetailType field set on every event published from this binding (e.g. \"OrderPlaced\").",
+ "order": 2,
+ "x-collection-name": true
+ }
+ },
+ "type": "object",
+ "required": [
+ "source",
+ "detail_type"
+ ],
+ "title": "EventBridge Binding"
+ },
+ "documentation_url": "https://go.estuary.dev/materialize-eventbridge"
+}
diff --git a/materialize-eventbridge/Dockerfile b/materialize-eventbridge/Dockerfile
new file mode 100644
index 0000000000..b17d56f60b
--- /dev/null
+++ b/materialize-eventbridge/Dockerfile
@@ -0,0 +1,35 @@
+ARG BASE_IMAGE=ghcr.io/estuary/base-image:v1
+
+# Build Stage
+################################################################################
+FROM --platform=linux/amd64 golang:1.25-bookworm AS builder
+
+WORKDIR /builder
+
+# Download & compile dependencies early. Doing this separately allows for layer
+# caching opportunities when no dependencies are updated.
+COPY go.* ./
+RUN go mod download
+
+COPY go ./go
+COPY materialize-boilerplate ./materialize-boilerplate
+COPY materialize-eventbridge ./materialize-eventbridge
+
+RUN go test -tags nozstd -short -v ./materialize-eventbridge/...
+
+RUN go build -tags nozstd -v -o ./connector ./materialize-eventbridge
+
+# Runtime Stage
+################################################################################
+FROM ${BASE_IMAGE}
+
+WORKDIR /connector
+ENV PATH="/connector:$PATH"
+
+COPY --from=builder /builder/connector /connector/materialize-eventbridge
+
+USER nonroot:nonroot
+
+LABEL FLOW_RUNTIME_PROTOCOL=materialize
+
+ENTRYPOINT ["/connector/materialize-eventbridge"]
diff --git a/materialize-eventbridge/VERSION b/materialize-eventbridge/VERSION
new file mode 100644
index 0000000000..626799f0f8
--- /dev/null
+++ b/materialize-eventbridge/VERSION
@@ -0,0 +1 @@
+v1
diff --git a/materialize-eventbridge/docker-compose.yaml b/materialize-eventbridge/docker-compose.yaml
new file mode 100644
index 0000000000..9a3f5c9457
--- /dev/null
+++ b/materialize-eventbridge/docker-compose.yaml
@@ -0,0 +1,32 @@
+version: '3.7'
+
+# LocalStack provides a local stand-in for AWS services. We need both
+# `events` (EventBridge itself, which receives `PutEvents` calls from the
+# connector) and `sqs` (the integration test rig configures an EventBridge
+# rule that forwards events to an SQS queue, then drains the queue to
+# verify delivery — EventBridge has no read API of its own).
+#
+# Pin to v3 because v4+ requires a LocalStack auth token. Same rationale as
+# source-kinesis/docker-compose.yaml.
+
+services:
+ eventbridge:
+ image: localstack/localstack:3
+ environment:
+ - SERVICES=events,sqs
+ - DEFAULT_REGION=us-east-1
+ - DEBUG=0
+ ports:
+ - "4566:4566"
+ healthcheck:
+ test: ["CMD-SHELL", "curl -fs http://localhost:4566/_localstack/health | grep -q '\"events\": \"\\(running\\|available\\)\"' && curl -fs http://localhost:4566/_localstack/health | grep -q '\"sqs\": \"\\(running\\|available\\)\"'"]
+ interval: 5s
+ timeout: 5s
+ retries: 20
+ networks:
+ - flow-test
+
+networks:
+ flow-test:
+ name: flow-test
+ external: true
diff --git a/materialize-eventbridge/driver.go b/materialize-eventbridge/driver.go
new file mode 100644
index 0000000000..aa506be4d6
--- /dev/null
+++ b/materialize-eventbridge/driver.go
@@ -0,0 +1,461 @@
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "strings"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ awsConfig "github.com/aws/aws-sdk-go-v2/config"
+ "github.com/aws/aws-sdk-go-v2/credentials"
+ "github.com/aws/aws-sdk-go-v2/service/eventbridge"
+ awsiam "github.com/aws/aws-sdk-go-v2/service/iam"
+ iamtypes "github.com/aws/aws-sdk-go-v2/service/iam/types"
+ "github.com/aws/aws-sdk-go-v2/service/sts"
+ "github.com/estuary/connectors/go/auth/iam"
+ cerrors "github.com/estuary/connectors/go/connector-errors"
+ m "github.com/estuary/connectors/go/materialize"
+ schemagen "github.com/estuary/connectors/go/schema-gen"
+ boilerplate "github.com/estuary/connectors/materialize-boilerplate"
+ pf "github.com/estuary/flow/go/protocols/flow"
+ pm "github.com/estuary/flow/go/protocols/materialize"
+ "github.com/invopop/jsonschema"
+ log "github.com/sirupsen/logrus"
+)
+
+type AuthType string
+
+const (
+ AWSAccessKey AuthType = "AWSAccessKey"
+ AWSIAM AuthType = "AWSIAM"
+)
+
+type CredentialsConfig struct {
+ AuthType AuthType `json:"auth_type"`
+
+ AccessKeyCredentials
+ iam.IAMConfig
+}
+
+func (CredentialsConfig) JSONSchema() *jsonschema.Schema {
+ return schemagen.OneOfSchema("Authentication", "", "auth_type", string(AWSAccessKey),
+ schemagen.OneOfSubSchema("Access Key", AccessKeyCredentials{}, string(AWSAccessKey)),
+ schemagen.OneOfSubSchema("AWS IAM", iam.AWSConfig{}, string(AWSIAM)),
+ )
+}
+
+func (c *CredentialsConfig) Validate() error {
+ switch c.AuthType {
+ case AWSAccessKey:
+ if c.AccessKeyCredentials.AWSAccessKeyID == "" {
+ return fmt.Errorf("missing %q", "aws_access_key_id")
+ }
+ if c.AccessKeyCredentials.AWSSecretAccessKey == "" {
+ return fmt.Errorf("missing %q", "aws_secret_access_key")
+ }
+ return nil
+ case AWSIAM:
+ return c.ValidateIAM()
+ }
+ return fmt.Errorf("unknown %q: %s", "auth_type", c.AuthType)
+}
+
+// Since the AccessKeyCredentials and IAMConfig have conflicting JSON field names, only parse
+// the embedded struct of interest.
+func (c *CredentialsConfig) UnmarshalJSON(data []byte) error {
+ var discriminator struct {
+ AuthType AuthType `json:"auth_type"`
+ }
+ if err := json.Unmarshal(data, &discriminator); err != nil {
+ return err
+ }
+ c.AuthType = discriminator.AuthType
+
+ switch c.AuthType {
+ case AWSAccessKey:
+ return json.Unmarshal(data, &c.AccessKeyCredentials)
+ case AWSIAM:
+ return json.Unmarshal(data, &c.IAMConfig)
+ }
+ return fmt.Errorf("unknown %q: %s", "auth_type", c.AuthType)
+}
+
+type AccessKeyCredentials struct {
+ AWSAccessKeyID string `json:"aws_access_key_id" jsonschema:"title=AWS Access Key ID,description=AWS Access Key ID for publishing to EventBridge." jsonschema_extras:"order=1"`
+ AWSSecretAccessKey string `json:"aws_secret_access_key" jsonschema:"title=AWS Secret Access Key,description=AWS Secret Access Key for publishing to EventBridge." jsonschema_extras:"secret=true,order=2"`
+}
+
+type config struct {
+ Region string `json:"region" jsonschema:"title=AWS Region,description=Region of the EventBridge event bus." jsonschema_extras:"order=1"`
+ EventBusName string `json:"event_bus_name" jsonschema:"title=Event Bus Name,description=Name or ARN of the EventBridge event bus to publish to; verified via DescribeEventBus on Apply. Use \"default\" for the account's default bus.,default=default" jsonschema_extras:"order=2"`
+ Credentials *CredentialsConfig `json:"credentials" jsonschema:"title=Authentication" jsonschema_extras:"x-iam-auth=true,order=3"`
+ Advanced advancedConfig `json:"advanced,omitempty" jsonschema:"title=Advanced Options,description=Options for advanced users. You should not typically need to modify these." jsonschema_extras:"advanced=true"`
+}
+
+type advancedConfig struct {
+ Endpoint string `json:"endpoint,omitempty" jsonschema:"title=AWS Endpoint,description=Override the AWS endpoint URL. Used to direct requests at a compatible API such as LocalStack."`
+ FeatureFlags string `json:"feature_flags,omitempty" jsonschema:"title=Feature Flags,description=This property is intended for Estuary internal use. You should only modify this field as directed by Estuary support."`
+}
+
+func (c config) Validate() error {
+ if c.Region == "" {
+ return fmt.Errorf("missing %q", "region")
+ }
+ if c.EventBusName == "" {
+ return fmt.Errorf("missing %q", "event_bus_name")
+ }
+ if c.Credentials == nil {
+ return fmt.Errorf("missing %q", "credentials")
+ }
+ return c.Credentials.Validate()
+}
+
+// EventBridge has no namespace concept; the bus name is endpoint-level.
+func (c config) DefaultNamespace() string { return "" }
+
+func (c config) FeatureFlags() (string, map[string]bool) {
+ return c.Advanced.FeatureFlags, nil
+}
+
+func (c config) credentialsProvider() (aws.CredentialsProvider, error) {
+ if c.Credentials == nil {
+ return nil, errors.New("missing credentials")
+ }
+ switch c.Credentials.AuthType {
+ case AWSAccessKey:
+ return credentials.NewStaticCredentialsProvider(
+ c.Credentials.AccessKeyCredentials.AWSAccessKeyID,
+ c.Credentials.AccessKeyCredentials.AWSSecretAccessKey,
+ "",
+ ), nil
+ case AWSIAM:
+ return c.Credentials.IAMTokens.AWSCredentialsProvider()
+ }
+ return nil, fmt.Errorf("unknown %q: %s", "auth_type", c.Credentials.AuthType)
+}
+
+type resource struct {
+ Source string `json:"source" jsonschema:"title=Event Source,description=Source field set on every event published from this binding,default=estuary.flow" jsonschema_extras:"x-collection-name=true,order=1"`
+ DetailType string `json:"detail_type" jsonschema:"title=Detail Type,description=DetailType field set on every event published from this binding,default=Document Published" jsonschema_extras:"order=2"`
+
+ // busName is populated from the endpoint config via WithDefaults so that
+ // Parameters() can produce the full [bus, source, detail_type] resource
+ // path. It is not serialized.
+ busName string `json:"-"`
+}
+
+func (r resource) Validate() error {
+ if r.Source == "" {
+ return fmt.Errorf("missing %q", "source")
+ }
+ if r.DetailType == "" {
+ return fmt.Errorf("missing %q", "detail_type")
+ }
+ return nil
+}
+
+func (r resource) WithDefaults(cfg config) resource {
+ r.busName = cfg.EventBusName
+ return r
+}
+
+func (r resource) Parameters() ([]string, bool, error) {
+ return []string{r.busName, r.Source, r.DetailType}, true, nil
+}
+
+// fieldConfig and mappedType are required by the Materializer generic
+// signature but carry no destination-specific information for EventBridge —
+// every document is published as an opaque JSON payload, so there is no
+// per-field schema to track.
+
+type fieldConfig struct{}
+
+func (fieldConfig) Validate() error { return nil }
+func (fieldConfig) CastToString() bool { return false }
+
+type mappedType struct{}
+
+func (mappedType) String() string { return "json" }
+func (mappedType) Compatible(boilerplate.ExistingField) bool { return true }
+func (mappedType) CanMigrate(boilerplate.ExistingField) bool { return false }
+
+type driver struct{}
+
+var _ boilerplate.Connector = &driver{}
+
+func (driver) Spec(ctx context.Context, req *pm.Request_Spec) (*pm.Response_Spec, error) {
+ endpointSchema, err := schemagen.GenerateSchema("Materialize EventBridge Spec", &config{}).MarshalJSON()
+ if err != nil {
+ return nil, fmt.Errorf("generating endpoint schema: %w", err)
+ }
+
+ resourceSchema, err := schemagen.GenerateSchema("EventBridge Binding", &resource{}).MarshalJSON()
+ if err != nil {
+ return nil, fmt.Errorf("generating resource schema: %w", err)
+ }
+
+ return boilerplate.RunSpec(ctx, req, "https://go.estuary.dev/materialize-eventbridge", endpointSchema, resourceSchema)
+}
+
+func (driver) Validate(ctx context.Context, req *pm.Request_Validate) (*pm.Response_Validated, error) {
+ return boilerplate.RunValidate(ctx, req, newMaterialization)
+}
+
+func (driver) Apply(ctx context.Context, req *pm.Request_Apply) (*pm.Response_Applied, error) {
+ return boilerplate.RunApply(ctx, req, newMaterialization)
+}
+
+func (driver) NewTransactor(ctx context.Context, req pm.Request_Open, be *m.BindingEvents) (m.Transactor, *pm.Response_Opened, *m.MaterializeOptions, error) {
+ return boilerplate.RunNewTransactor(ctx, req, be, newMaterialization)
+}
+
+// awsConfigFor builds an aws.Config from the connector's authentication
+// config (static keys or assumed-role STS tokens) and region, applying the
+// optional endpoint override (used by tests against LocalStack).
+func awsConfigFor(ctx context.Context, cfg config) (aws.Config, error) {
+ credProvider, err := cfg.credentialsProvider()
+ if err != nil {
+ return aws.Config{}, err
+ }
+
+ opts := []func(*awsConfig.LoadOptions) error{
+ awsConfig.WithCredentialsProvider(credProvider),
+ awsConfig.WithRegion(cfg.Region),
+ }
+
+ if cfg.Advanced.Endpoint != "" {
+ resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
+ return aws.Endpoint{URL: cfg.Advanced.Endpoint, SigningRegion: region}, nil
+ })
+ opts = append(opts, awsConfig.WithEndpointResolverWithOptions(resolver))
+ }
+
+ awsCfg, err := awsConfig.LoadDefaultConfig(ctx, opts...)
+ if err != nil {
+ return aws.Config{}, fmt.Errorf("creating aws config: %w", err)
+ }
+ return awsCfg, nil
+}
+
+type materialization struct {
+ cfg config
+ client *eventbridge.Client
+}
+
+var _ boilerplate.Materializer[config, fieldConfig, resource, mappedType] = &materialization{}
+
+func newMaterialization(ctx context.Context, materializationName string, cfg config, featureFlags map[string]bool) (boilerplate.Materializer[config, fieldConfig, resource, mappedType], error) {
+ awsCfg, err := awsConfigFor(ctx, cfg)
+ if err != nil {
+ return nil, err
+ }
+ return &materialization{
+ cfg: cfg,
+ client: eventbridge.NewFromConfig(awsCfg),
+ }, nil
+}
+
+func (d *materialization) Config() boilerplate.MaterializeCfg {
+ return boilerplate.MaterializeCfg{
+ ConcurrentApply: true,
+ NoCreateNamespaces: true,
+ NoTruncateResources: true,
+ }
+}
+
+func (d *materialization) PopulateInfoSchema(ctx context.Context, is *boilerplate.InfoSchema, resourcePaths [][]string) error {
+ for _, p := range resourcePaths {
+ is.PushResource(p...)
+ }
+ return nil
+}
+
+func (d *materialization) CheckPrerequisites(ctx context.Context) *cerrors.PrereqErr {
+ prereqs := &cerrors.PrereqErr{}
+ busARN, err := describeBus(ctx, d.client, d.cfg.EventBusName)
+ if err != nil {
+ prereqs.Err(err)
+ return prereqs
+ }
+ checkPutEventsPermission(ctx, d.cfg, busARN, prereqs)
+ return prereqs
+}
+
+func (d *materialization) NewConstraint(p pf.Projection, deltaUpdates bool, fc fieldConfig) pm.Response_Validated_Constraint {
+ if !deltaUpdates {
+ // EventBridge is delta-only and Parameters() coerces deltaUpdates=true
+ // upstream, so the false branch should be unreachable. Panic if it ever
+ // fires so a regression in Parameters() (or in the boilerplate plumbing)
+ // surfaces immediately rather than producing silently-wrong constraints.
+ panic("NewConstraint called with deltaUpdates=false; Parameters() should have coerced to true")
+ }
+
+ var constraint pm.Response_Validated_Constraint
+ switch {
+ case p.IsRootDocumentProjection():
+ constraint.Type = pm.Response_Validated_Constraint_LOCATION_REQUIRED
+ constraint.Reason = "The root document must be materialized"
+ case p.IsPrimaryKey:
+ // EventBridge has no partition/order/dedup concept and the
+ // connector never reads it.PackedKey or per-key projections, so
+ // keys carry no meaning on the destination side. FIELD_FORBIDDEN
+ // would be the most accurate signal, but the Flow runtime rejects
+ // FORBIDDEN on collection-key fields (they back the materialization
+ // group-by). OPTIONAL is the closest we can go.
+ constraint.Type = pm.Response_Validated_Constraint_FIELD_OPTIONAL
+ constraint.Reason = "EventBridge ignores keys; including them in the payload is optional"
+ default:
+ constraint.Type = pm.Response_Validated_Constraint_FIELD_FORBIDDEN
+ constraint.Reason = "EventBridge only materializes the full document"
+ }
+ return constraint
+}
+
+func (d *materialization) MapType(p boilerplate.Projection, fc fieldConfig) (mappedType, boilerplate.ElementConverter) {
+ return mappedType{}, nil
+}
+
+func (d *materialization) Setup(ctx context.Context, is *boilerplate.InfoSchema) (string, error) {
+ return "", nil
+}
+
+func (d *materialization) CreateNamespace(ctx context.Context, ns string) (string, error) {
+ return "", nil
+}
+
+func (d *materialization) CreateResource(ctx context.Context, b boilerplate.MappedBinding[config, resource, mappedType]) (string, boilerplate.ActionApplyFn, error) {
+ return "", nil, nil
+}
+
+func (d *materialization) DeleteResource(ctx context.Context, path []string) (string, boilerplate.ActionApplyFn, error) {
+ // Test cleanup paths invoke the returned fn directly without nil-checking,
+ // so we return a no-op fn rather than nil.
+ return "", func(context.Context) error { return nil }, nil
+}
+
+func (d *materialization) UpdateResource(
+ ctx context.Context,
+ path []string,
+ existing boilerplate.ExistingResource,
+ update boilerplate.BindingUpdate[config, resource, mappedType],
+) (string, boilerplate.ActionApplyFn, error) {
+ return "", nil, nil
+}
+
+func (d *materialization) TruncateResource(ctx context.Context, path []string) (string, boilerplate.ActionApplyFn, error) {
+ // Unreachable: Config() sets NoTruncateResources=true, so boilerplate
+ // never invokes this. Kept to satisfy the Materializer interface.
+ panic("TruncateResource called despite NoTruncateResources=true")
+}
+
+func (d *materialization) MustRecreateResource(req *pm.Request_Apply, lastBinding, newBinding *pf.MaterializationSpec_Binding) (bool, error) {
+ return false, nil
+}
+
+func (d *materialization) NewTransactor(
+ ctx context.Context,
+ req pm.Request_Open,
+ is boilerplate.InfoSchema,
+ mappedBindings []boilerplate.MappedBinding[config, resource, mappedType],
+ be *m.BindingEvents,
+) (m.Transactor, error) {
+ bindings := make([]bindingState, 0, len(mappedBindings))
+ for _, b := range mappedBindings {
+ bindings = append(bindings, bindingState{
+ source: b.Config.Source,
+ detailType: b.Config.DetailType,
+ })
+ }
+
+ return &transactor{
+ client: d.client,
+ eventBusName: d.cfg.EventBusName,
+ bindings: bindings,
+ }, nil
+}
+
+func (d *materialization) ListTestTasks(ctx context.Context) ([]string, error) {
+ return nil, nil
+}
+
+func (d *materialization) CleanupTestTask(ctx context.Context, name string) error {
+ return nil
+}
+
+// SnapshotTestResource is required by boilerplate.Materializer but is only
+// invoked by the test rig (bptest.RunMaterializationTestParallel). The
+// production runtime never calls it. The panic is a safety net: if a test ever
+// forgets to wrap the materialization in materializationUnderTest, we want a
+// loud failure rather than a silent empty snapshot that would mask the bug.
+func (d *materialization) SnapshotTestResource(ctx context.Context, path []string) ([]string, [][]any, error) {
+ panic("SnapshotTestResource is unreachable in production; tests must wrap the materialization in materializationUnderTest")
+}
+
+func (d *materialization) Close(ctx context.Context) {}
+
+func describeBus(ctx context.Context, client *eventbridge.Client, name string) (string, error) {
+ out, err := client.DescribeEventBus(ctx, &eventbridge.DescribeEventBusInput{Name: &name})
+ if err != nil {
+ return "", fmt.Errorf("describing event bus %q: %w", name, err)
+ }
+ return aws.ToString(out.Arn), nil
+}
+
+func checkPutEventsPermission(ctx context.Context, cfg config, busARN string, prereqs *cerrors.PrereqErr) {
+ awsCfg, err := awsConfigFor(ctx, cfg)
+ if err != nil {
+ log.WithField("error", err).Warn("could not build AWS config for permission check; skipping")
+ return
+ }
+
+ callerOut, err := sts.NewFromConfig(awsCfg).GetCallerIdentity(ctx, &sts.GetCallerIdentityInput{})
+ if err != nil {
+ log.WithField("error", err).Warn("GetCallerIdentity failed; skipping write-permission check")
+ return
+ }
+
+ principalARN := iamPrincipalARN(aws.ToString(callerOut.Arn))
+ simOut, err := awsiam.NewFromConfig(awsCfg).SimulatePrincipalPolicy(ctx, &awsiam.SimulatePrincipalPolicyInput{
+ PolicySourceArn: &principalARN,
+ ActionNames: []string{"events:PutEvents"},
+ ResourceArns: []string{busARN},
+ })
+ if err != nil {
+ log.WithField("error", err).Warn("SimulatePrincipalPolicy failed; skipping write-permission check")
+ return
+ }
+
+ for _, result := range simOut.EvaluationResults {
+ if result.EvalDecision != iamtypes.PolicyEvaluationDecisionTypeAllowed {
+ prereqs.Err(fmt.Errorf("principal %s does not have events:PutEvents permission on bus %s", principalARN, busARN))
+ return
+ }
+ }
+}
+
+// iamPrincipalARN converts an STS assumed-role ARN to the IAM role ARN that
+// SimulatePrincipalPolicy requires. Other ARN forms are returned unchanged.
+//
+// Example:
+//
+// arn:aws:sts::123456789012:assumed-role/MyRole/session
+// → arn:aws:iam::123456789012:role/MyRole
+func iamPrincipalARN(arnStr string) string {
+ // ARN structure: arn:partition:service:region:account:resource
+ parts := strings.SplitN(arnStr, ":", 6)
+ if len(parts) != 6 {
+ return arnStr
+ }
+ resource := parts[5]
+ if !strings.HasPrefix(resource, "assumed-role/") {
+ return arnStr
+ }
+ // resource is "assumed-role/RoleName/SessionName"
+ roleParts := strings.SplitN(resource, "/", 3)
+ if len(roleParts) < 2 {
+ return arnStr
+ }
+ return fmt.Sprintf("arn:%s:iam::%s:role/%s", parts[1], parts[4], roleParts[1])
+}
diff --git a/materialize-eventbridge/driver_test.go b/materialize-eventbridge/driver_test.go
new file mode 100644
index 0000000000..33be36ad4f
--- /dev/null
+++ b/materialize-eventbridge/driver_test.go
@@ -0,0 +1,470 @@
+package main
+
+// Integration tests run against LocalStack EventBridge in docker compose.
+//
+// EventBridge has no read API. To verify what was published, this test rig
+// stands up an SQS queue and an EventBridge rule that forwards every event
+// on the test bus to that queue. Per-resource snapshots are then produced
+// by draining the queue and filtering by the binding's source/detail_type.
+//
+// The SQS infrastructure lives entirely in this file (the production
+// connector has no SQS dependency). The wrapper type
+// `materializationUnderTest` embeds the production `*materialization` and
+// overrides `SnapshotTestResource` to do the SQS drain. The bus + queue +
+// rule + target are all provisioned up front by `provisionLocalStackInfra` since
+// `Setup()` is never invoked on the test-process materializer instance —
+// the boilerplate test rig drives the actual transactions through a
+// `flowctl preview` subprocess.
+//
+// Per project convention this file starts and stops docker compose itself;
+// callers should NOT run docker compose externally before invoking
+// `go test`.
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "flag"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "os/exec"
+ "sort"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/service/eventbridge"
+ ebtypes "github.com/aws/aws-sdk-go-v2/service/eventbridge/types"
+ "github.com/aws/aws-sdk-go-v2/service/sqs"
+ sqstypes "github.com/aws/aws-sdk-go-v2/service/sqs/types"
+ "github.com/bradleyjkemp/cupaloy"
+ cerrors "github.com/estuary/connectors/go/connector-errors"
+ boilerplate "github.com/estuary/connectors/materialize-boilerplate"
+ bptest "github.com/estuary/connectors/materialize-boilerplate/testutil"
+ pm "github.com/estuary/flow/go/protocols/materialize"
+ "github.com/stretchr/testify/require"
+)
+
+// testAll controls whether integration tests run against real AWS in
+// addition to the LocalStack docker-compose stack. Running the full
+// matrix requires a pre-provisioned EventBridge bus on real AWS plus
+// credentials in `testdata/.local/config.aws.yaml` (gitignored), so by
+// default we only exercise the local stack. Opt in to the full matrix
+// by setting the `EVENTBRIDGE_TEST_ALL` environment variable or passing
+// `-eventbridge.test-all` to `go test`.
+var testAll = flag.Bool("eventbridge.test-all", false, "run integration tests against real AWS in addition to LocalStack")
+
+const (
+ // LocalStack-side resource names — provisioned by provisionLocalStackInfra.
+ testBusName = "flow-test-bus"
+ testQueueName = "flow-test-queue"
+ testRuleName = "flow-test-rule"
+ testTargetID = "flow-test-target"
+
+ // Real-AWS-side queue name — provisioned out-of-band on the AWS account
+ // hosting the integration-tests bus. The SnapshotTestResource path
+ // drains this queue when the materialization runs against real AWS
+ // (i.e. when cfg.Advanced.Endpoint is empty).
+ awsTestQueueName = "integration-tests"
+)
+
+func TestSpec(t *testing.T) {
+ t.Parallel()
+
+ resp, err := driver{}.Spec(context.Background(), &pm.Request_Spec{})
+ require.NoError(t, err)
+
+ formatted, err := json.MarshalIndent(resp, "", " ")
+ require.NoError(t, err)
+ cupaloy.SnapshotT(t, string(formatted))
+}
+
+func TestIntegration(t *testing.T) {
+ if testing.Short() {
+ t.Skip("integration test requires docker")
+ }
+
+ require.NoError(t,
+ exec.Command("docker", "compose", "-f", "docker-compose.yaml", "up", "--wait").Run(),
+ "docker compose up failed",
+ )
+ t.Cleanup(func() {
+ _ = exec.Command("docker", "compose", "-f", "docker-compose.yaml", "down", "-v").Run()
+ })
+
+ provisionLocalStackInfra(t)
+
+ makeResourceFn := func(s string, delta bool) resource {
+ return resource{Source: "flow.test", DetailType: s}
+ }
+
+ all := *testAll || os.Getenv("EVENTBRIDGE_TEST_ALL") != ""
+
+ materializeSpec := "testdata/materialize-local.flow.yaml"
+ applySpec := "testdata/apply-local.flow.yaml"
+ if all {
+ materializeSpec = "testdata/materialize.flow.yaml"
+ applySpec = "testdata/apply.flow.yaml"
+ }
+
+ t.Run("materialize", func(t *testing.T) {
+ bptest.RunMaterializationTestParallel(t, newMaterializationUnderTest, materializeSpec, makeResourceFn, nil)
+ })
+
+ t.Run("apply", func(t *testing.T) {
+ bptest.RunApplyTestParallel(t, &driver{}, newMaterializationUnderTest, applySpec, makeResourceFn)
+ })
+}
+
+// materializationUnderTest wraps the production materialization with an SQS
+// sidecar so that SnapshotTestResource can return what was published. The
+// production type has no awareness of SQS.
+//
+// The wrapper drains the LocalStack queue when the materialization is
+// configured against an endpoint override, and the real-AWS
+// `integration-tests` queue otherwise. Both queues are pre-wired (LocalStack
+// via provisionLocalStackInfra, AWS via out-of-band setup) to receive every event
+// published to their respective bus.
+type materializationUnderTest struct {
+ *materialization
+ sqsClient *sqs.Client
+ queueName string
+
+ // Populated lazily on first SnapshotTestResource call. The drain is done
+ // once and cached because each binding shares the same queue and we
+ // filter in-memory by source/detail-type.
+ queueURL string
+ drainOnce sync.Once
+ drainErr error
+ drainedMsgs []receivedEvent
+}
+
+var _ boilerplate.Materializer[config, fieldConfig, resource, mappedType] = &materializationUnderTest{}
+
+func newMaterializationUnderTest(ctx context.Context, name string, cfg config, flags map[string]bool) (boilerplate.Materializer[config, fieldConfig, resource, mappedType], error) {
+ base, err := newMaterialization(ctx, name, cfg, flags)
+ if err != nil {
+ return nil, err
+ }
+
+ awsCfg, err := awsConfigFor(ctx, cfg)
+ if err != nil {
+ return nil, err
+ }
+
+ queueName := awsTestQueueName
+ if cfg.Advanced.Endpoint != "" {
+ queueName = testQueueName
+ }
+
+ return &materializationUnderTest{
+ materialization: base.(*materialization),
+ sqsClient: sqs.NewFromConfig(awsCfg),
+ queueName: queueName,
+ }, nil
+}
+
+// SnapshotTestResource drains the SQS sidecar queue (LocalStack or real AWS,
+// chosen by which queue the wrapper was constructed against) and returns
+// events whose source + detail-type match the requested resource path.
+func (m *materializationUnderTest) SnapshotTestResource(ctx context.Context, path []string) ([]string, [][]any, error) {
+ if m.queueURL == "" {
+ out, err := m.sqsClient.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
+ QueueName: aws.String(m.queueName),
+ })
+ if err != nil {
+ return nil, nil, fmt.Errorf("locating test SQS queue %q: %w", m.queueName, err)
+ }
+ m.queueURL = aws.ToString(out.QueueUrl)
+ }
+
+ m.drainOnce.Do(func() {
+ m.drainedMsgs, m.drainErr = drainAllMessages(ctx, m.sqsClient, m.queueURL)
+ })
+ if m.drainErr != nil {
+ return nil, nil, m.drainErr
+ }
+
+ if len(path) != 3 {
+ return nil, nil, fmt.Errorf("unexpected resource path length %d (want 3)", len(path))
+ }
+ wantSource, wantDetailType := path[1], path[2]
+
+ var rows [][]any
+ for _, ev := range m.drainedMsgs {
+ if ev.Source != wantSource || ev.DetailType != wantDetailType {
+ continue
+ }
+ rows = append(rows, []any{string(ev.Detail)})
+ }
+
+ if len(rows) == 0 {
+ return nil, nil, nil
+ }
+
+ sort.Slice(rows, func(i, j int) bool {
+ return fmt.Sprint(rows[i][0]) < fmt.Sprint(rows[j][0])
+ })
+
+ return []string{"detail"}, rows, nil
+}
+
+// provisionLocalStackInfra brings up the bus + SQS queue + EventBridge rule +
+// target so the connector subprocess has somewhere to publish and the test
+// process has a queue to drain. Idempotent — re-runs are safe.
+func provisionLocalStackInfra(t *testing.T) {
+ t.Helper()
+
+ cfg := config{
+ Region: "us-east-1",
+ EventBusName: testBusName,
+ Credentials: &CredentialsConfig{
+ AuthType: AWSAccessKey,
+ AccessKeyCredentials: AccessKeyCredentials{
+ AWSAccessKeyID: "test",
+ AWSSecretAccessKey: "test",
+ },
+ },
+ Advanced: advancedConfig{Endpoint: "http://localhost:4566"},
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel()
+
+ awsCfg, err := awsConfigFor(ctx, cfg)
+ require.NoError(t, err)
+
+ eb := eventbridge.NewFromConfig(awsCfg)
+ sq := sqs.NewFromConfig(awsCfg)
+
+ if _, err := eb.CreateEventBus(ctx, &eventbridge.CreateEventBusInput{
+ Name: aws.String(testBusName),
+ }); err != nil && !alreadyExists(err) {
+ t.Fatalf("CreateEventBus: %v", err)
+ }
+
+ qOut, err := sq.CreateQueue(ctx, &sqs.CreateQueueInput{
+ QueueName: aws.String(testQueueName),
+ })
+ require.NoError(t, err, "CreateQueue")
+ queueURL := aws.ToString(qOut.QueueUrl)
+
+ attrOut, err := sq.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{
+ QueueUrl: aws.String(queueURL),
+ AttributeNames: []sqstypes.QueueAttributeName{sqstypes.QueueAttributeNameQueueArn},
+ })
+ require.NoError(t, err, "GetQueueAttributes")
+ queueArn := attrOut.Attributes[string(sqstypes.QueueAttributeNameQueueArn)]
+
+ if _, err := eb.PutRule(ctx, &eventbridge.PutRuleInput{
+ Name: aws.String(testRuleName),
+ EventBusName: aws.String(testBusName),
+ EventPattern: aws.String(`{"source":[{"prefix":""}]}`),
+ State: ebtypes.RuleStateEnabled,
+ }); err != nil {
+ t.Fatalf("PutRule: %v", err)
+ }
+
+ if _, err := eb.PutTargets(ctx, &eventbridge.PutTargetsInput{
+ Rule: aws.String(testRuleName),
+ EventBusName: aws.String(testBusName),
+ Targets: []ebtypes.Target{{
+ Id: aws.String(testTargetID),
+ Arn: aws.String(queueArn),
+ }},
+ }); err != nil {
+ t.Fatalf("PutTargets: %v", err)
+ }
+}
+
+func alreadyExists(err error) bool {
+ if err == nil {
+ return false
+ }
+ var rae *ebtypes.ResourceAlreadyExistsException
+ return errors.As(err, &rae)
+}
+
+// receivedEvent matches the JSON shape EventBridge writes to SQS via a rule
+// target.
+type receivedEvent struct {
+ Source string `json:"source"`
+ DetailType string `json:"detail-type"`
+ Detail json.RawMessage `json:"detail"`
+}
+
+// drainAllMessages polls the queue until two consecutive empty receives,
+// deleting each message it observes so a re-run starts clean.
+func drainAllMessages(ctx context.Context, sq *sqs.Client, queueURL string) ([]receivedEvent, error) {
+ var out []receivedEvent
+ emptyPolls := 0
+ for emptyPolls < 2 {
+ resp, err := sq.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
+ QueueUrl: aws.String(queueURL),
+ MaxNumberOfMessages: 10,
+ WaitTimeSeconds: 1,
+ })
+ if err != nil {
+ return nil, fmt.Errorf("ReceiveMessage: %w", err)
+ }
+ if len(resp.Messages) == 0 {
+ emptyPolls++
+ continue
+ }
+ emptyPolls = 0
+ for _, msg := range resp.Messages {
+ var ev receivedEvent
+ if err := json.Unmarshal([]byte(aws.ToString(msg.Body)), &ev); err != nil {
+ return nil, fmt.Errorf("decoding SQS body: %w", err)
+ }
+ out = append(out, ev)
+ if _, err := sq.DeleteMessage(ctx, &sqs.DeleteMessageInput{
+ QueueUrl: aws.String(queueURL),
+ ReceiptHandle: msg.ReceiptHandle,
+ }); err != nil {
+ return nil, fmt.Errorf("DeleteMessage: %w", err)
+ }
+ }
+ }
+ return out, nil
+}
+
+func TestCheckPutEventsPermission(t *testing.T) {
+ t.Parallel()
+
+ const (
+ callerIdentityXML = `
+
+ arn:aws:iam::123456789012:user/test-user
+ AIDATEST
+ 123456789012
+
+ test-req-id
+`
+
+ simulateAllowedXML = `
+
+ false
+
+
+ events:PutEvents
+ allowed
+ arn:aws:events:us-east-1:123456789012:event-bus/test-bus
+
+
+
+
+ test-req-id
+`
+
+ simulateDeniedXML = `
+
+ false
+
+
+ events:PutEvents
+ implicitDeny
+ arn:aws:events:us-east-1:123456789012:event-bus/test-bus
+
+
+
+
+ test-req-id
+`
+
+ accessDeniedXML = `
+
+ Sender
+ AccessDenied
+ User is not authorized
+
+ test-req-id
+`
+
+ testBusARN = "arn:aws:events:us-east-1:123456789012:event-bus/test-bus"
+ )
+
+ makeServer := func(failGetCallerIdentity, failSimulate bool, decision string) *httptest.Server {
+ return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if err := r.ParseForm(); err != nil {
+ http.Error(w, "bad form", http.StatusBadRequest)
+ return
+ }
+ w.Header().Set("Content-Type", "text/xml")
+ switch r.FormValue("Action") {
+ case "GetCallerIdentity":
+ if failGetCallerIdentity {
+ w.WriteHeader(http.StatusForbidden)
+ fmt.Fprint(w, accessDeniedXML)
+ return
+ }
+ fmt.Fprint(w, callerIdentityXML)
+ case "SimulatePrincipalPolicy":
+ if failSimulate {
+ w.WriteHeader(http.StatusForbidden)
+ fmt.Fprint(w, accessDeniedXML)
+ return
+ }
+ if decision == "allowed" {
+ fmt.Fprint(w, simulateAllowedXML)
+ } else {
+ fmt.Fprint(w, simulateDeniedXML)
+ }
+ }
+ }))
+ }
+
+ makeCfg := func(endpoint string) config {
+ return config{
+ Region: "us-east-1",
+ EventBusName: "test-bus",
+ Credentials: &CredentialsConfig{
+ AuthType: AWSAccessKey,
+ AccessKeyCredentials: AccessKeyCredentials{
+ AWSAccessKeyID: "test",
+ AWSSecretAccessKey: "test",
+ },
+ },
+ Advanced: advancedConfig{Endpoint: endpoint},
+ }
+ }
+
+ t.Run("allowed", func(t *testing.T) {
+ t.Parallel()
+ srv := makeServer(false, false, "allowed")
+ defer srv.Close()
+ prereqs := &cerrors.PrereqErr{}
+ checkPutEventsPermission(context.Background(), makeCfg(srv.URL), testBusARN, prereqs)
+ require.Equal(t, 0, prereqs.Len())
+ })
+
+ t.Run("denied", func(t *testing.T) {
+ t.Parallel()
+ srv := makeServer(false, false, "implicitDeny")
+ defer srv.Close()
+ prereqs := &cerrors.PrereqErr{}
+ checkPutEventsPermission(context.Background(), makeCfg(srv.URL), testBusARN, prereqs)
+ require.Equal(t, 1, prereqs.Len())
+ })
+
+ t.Run("GetCallerIdentity_forbidden", func(t *testing.T) {
+ t.Parallel()
+ srv := makeServer(true, false, "")
+ defer srv.Close()
+ prereqs := &cerrors.PrereqErr{}
+ checkPutEventsPermission(context.Background(), makeCfg(srv.URL), testBusARN, prereqs)
+ require.Equal(t, 0, prereqs.Len())
+ })
+
+ t.Run("SimulatePrincipalPolicy_forbidden", func(t *testing.T) {
+ t.Parallel()
+ srv := makeServer(false, true, "")
+ defer srv.Close()
+ prereqs := &cerrors.PrereqErr{}
+ checkPutEventsPermission(context.Background(), makeCfg(srv.URL), testBusARN, prereqs)
+ require.Equal(t, 0, prereqs.Len())
+ })
+}
diff --git a/materialize-eventbridge/main.go b/materialize-eventbridge/main.go
new file mode 100644
index 0000000000..d950b07208
--- /dev/null
+++ b/materialize-eventbridge/main.go
@@ -0,0 +1,9 @@
+package main
+
+import (
+ boilerplate "github.com/estuary/connectors/materialize-boilerplate"
+)
+
+func main() {
+ boilerplate.RunMain(driver{})
+}
diff --git a/materialize-eventbridge/testdata/README.md b/materialize-eventbridge/testdata/README.md
new file mode 100644
index 0000000000..2095778d6e
--- /dev/null
+++ b/materialize-eventbridge/testdata/README.md
@@ -0,0 +1,85 @@
+# materialize-eventbridge integration test resources
+
+The cloud-backed integration tests in `driver_test.go` use AWS resources
+provisioned out-of-band. This file documents what exists and how to
+recreate it on a fresh account.
+
+## Current resources (Estuary-managed)
+
+- **Account:** `076183946664`
+- **Region:** `us-east-2`
+- **EventBridge bus:** `integration-tests`
+- **EventBridge rule:** `integration-tests-rule` on the bus above,
+ pattern matching every event, target = the SQS queue below
+- **SQS queue:** `integration-tests`
+ - Resource policy allows `events.amazonaws.com` to call
+ `sqs:SendMessage`, scoped via `aws:SourceArn` to the rule ARN
+- **IAM user:** `materialize-eventbridge-tests`
+ - Inline policy grants:
+ - `events:PutEvents`, `events:DescribeEventBus` on the bus ARN
+ - `sqs:GetQueueAttributes`, `sqs:GetQueueUrl`,
+ `sqs:ReceiveMessage`, `sqs:DeleteMessage` on the queue ARN
+ - Static access key for this user is committed (SOPS-encrypted)
+ in `testdata/config.aws.yaml` under `credentials.auth_type:
+ AWSAccessKey` alongside `region` and `event_bus_name`. The
+ connector also supports `credentials.auth_type: AWSIAM` (assume
+ role + STS) for production use; integration tests use static keys
+ because the test rig has no role-trust setup.
+
+The rule + SQS queue are *test plumbing only* — production users do
+not need them. The connector itself only writes to the bus; the queue
+exists so `driver_test.go` can observe what was published.
+
+## Recreate on a fresh account
+
+The commands below assume `AWS_REGION` is set and the caller has admin
+in the target account. Names match the originals; substitute as
+desired.
+
+```bash
+# 1. Bus
+aws events create-event-bus --name integration-tests
+
+# 2. Queue
+QURL=$(aws sqs create-queue --queue-name integration-tests \
+ --query QueueUrl --output text)
+QARN=$(aws sqs get-queue-attributes --queue-url "$QURL" \
+ --attribute-names QueueArn --query Attributes.QueueArn --output text)
+BUS_ARN=$(aws events describe-event-bus --name integration-tests \
+ --query Arn --output text)
+
+# 3. Rule on the bus, forward everything to the queue
+aws events put-rule --name integration-tests-rule \
+ --event-bus-name integration-tests \
+ --event-pattern '{"account":["'"$(aws sts get-caller-identity --query Account --output text)"'"]}'
+RULE_ARN=$(aws events describe-rule --name integration-tests-rule \
+ --event-bus-name integration-tests --query Arn --output text)
+aws events put-targets --rule integration-tests-rule \
+ --event-bus-name integration-tests \
+ --targets "Id=1,Arn=$QARN"
+
+# 4. Allow EventBridge to deliver into the queue, scoped to the rule
+aws sqs set-queue-attributes --queue-url "$QURL" --attributes '{
+ "Policy": "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"events.amazonaws.com\"},\"Action\":\"sqs:SendMessage\",\"Resource\":\"'"$QARN"'\",\"Condition\":{\"ArnEquals\":{\"aws:SourceArn\":\"'"$RULE_ARN"'\"}}}]}"
+}'
+
+# 5. IAM user + inline policy + access key
+aws iam create-user --user-name materialize-eventbridge-tests
+aws iam put-user-policy --user-name materialize-eventbridge-tests \
+ --policy-name materialize-eventbridge-tests \
+ --policy-document '{"Version":"2012-10-17","Statement":[
+ {"Effect":"Allow","Action":["events:PutEvents","events:DescribeEventBus"],"Resource":"'"$BUS_ARN"'"},
+ {"Effect":"Allow","Action":["sqs:GetQueueAttributes","sqs:GetQueueUrl","sqs:ReceiveMessage","sqs:DeleteMessage"],"Resource":"'"$QARN"'"}
+ ]}'
+aws iam create-access-key --user-name materialize-eventbridge-tests
+```
+
+Take the access-key id and secret from step 5 and re-encrypt
+`testdata/config.aws.yaml` with SOPS (the GCP KMS key is referenced
+inside that file's `sops` block).
+
+## Drift warning
+
+Nothing in CI verifies these commands stay in sync with the live
+account. If a test fails with permission errors after an AWS-side
+change, treat the live policy as ground truth and update this file.
diff --git a/materialize-eventbridge/testdata/apply-local.flow.yaml b/materialize-eventbridge/testdata/apply-local.flow.yaml
new file mode 100644
index 0000000000..3fc044f281
--- /dev/null
+++ b/materialize-eventbridge/testdata/apply-local.flow.yaml
@@ -0,0 +1,8 @@
+materializations:
+ acmeCo/tests/materialize-eventbridge/local:
+ endpoint:
+ local:
+ command: ["go", "run", "."]
+ protobuf: true
+ config: config.local.yaml
+ bindings: []
diff --git a/materialize-eventbridge/testdata/apply.flow.yaml b/materialize-eventbridge/testdata/apply.flow.yaml
new file mode 100644
index 0000000000..b03865aba0
--- /dev/null
+++ b/materialize-eventbridge/testdata/apply.flow.yaml
@@ -0,0 +1,16 @@
+materializations:
+ acmeCo/tests/materialize-eventbridge/local:
+ endpoint:
+ local:
+ command: ["go", "run", "."]
+ protobuf: true
+ config: config.local.yaml
+ bindings: []
+
+ acmeCo/tests/materialize-eventbridge/aws:
+ endpoint:
+ local:
+ command: ["go", "run", "."]
+ protobuf: true
+ config: config.aws.yaml
+ bindings: []
diff --git a/materialize-eventbridge/testdata/config.aws.yaml b/materialize-eventbridge/testdata/config.aws.yaml
new file mode 100644
index 0000000000..a3d201bd77
--- /dev/null
+++ b/materialize-eventbridge/testdata/config.aws.yaml
@@ -0,0 +1,17 @@
+region: ENC[AES256_GCM,data:4+HCTq9F1dvi,iv:MLCxFWmfjUMYrP33SfjxhJu7uuNGTywd7+xxbxhfyi0=,tag:5RrAM5pRik3T0X+cea2SQg==,type:str]
+event_bus_name: ENC[AES256_GCM,data:iYJF/qVeq/JJgEu3Drw6O6c=,iv:a64AohBk2IJdhOqXwh7sb9cfF/5jBhpKLfClulE9CuY=,tag:0ay5RtylJmWe2ssXa10CyA==,type:str]
+credentials:
+ auth_type: ENC[AES256_GCM,data:rKeUF5/Rn2XRdui5,iv:SLBaSHTa4mGpu+QtAHZ6/MQQC2aoHevktr2UVo2RYeU=,tag:V5xYpOlSQZhXLe9BUYRr8Q==,type:str]
+ aws_access_key_id: ENC[AES256_GCM,data:VI2XsizJ57Jepc/WFCDAd8uyaZ8=,iv:hiKiHf2oGA8IcXPhoiG7EBM837cB+ZYJhHt4jmYY3ec=,tag:9ZxUVZgpUfixH9BfjhV+Sw==,type:str]
+ aws_secret_access_key: ENC[AES256_GCM,data:F6iPbwxkajxxNvHoDzHnxu6+LxPQo1dX+2fGU61eOnGA0RZUJL3MGA==,iv:86cGHsVSEzMkSYHnLI+vKZeWKCURvvQrAnFjhssk1dY=,tag:ZY1xJjZRbYA9cAIQxuW7eA==,type:str]
+advanced:
+ feature_flags: ENC[AES256_GCM,data:0OVMqoR5KoAkm/hNZMFAqyOc0mSUpvJqd2tNf4WdZJ6c5KiMXKU=,iv:0xKG0tQjvIvF0xVUvvSMCe62CCZIYIm2j9MLP9fDUmc=,tag:66xBi+/yYDA8uwFnTLVwUg==,type:str]
+sops:
+ gcp_kms:
+ - resource_id: projects/estuary-theatre/locations/us-central1/keyRings/connector-keyring/cryptoKeys/connector-repository
+ created_at: "2026-05-04T19:54:50Z"
+ enc: CiUAdmEdwtqjtbRcsM5Lo4VH8aH+oqePjCfGPwIcainLyWJqJKcpEkkAJFAC3bv+VsPta4rF5Mng6IGGPr+UK51uCwIiLkNoLZjzKE/HlPHDQZOThJ86cffrslf8sJOu0ppJCk2qfMFrP9V7fOuLmjdq
+ lastmodified: "2026-05-04T19:54:50Z"
+ mac: ENC[AES256_GCM,data:ee3IVXnG3GaL3uEZ2Yt2TT8+Dl/zdd70VWWHls/g/dQ7JCtN9CaIrNIafiNxNcK0FvxWMKfk2R8sOnsEVgtFXND9Y8okfQ9I98QBJEYrsNOB2SQqTJZv3Si+bxe3wrv5JgrV3Ew71AHhtVxZnOp70+57iEJJBRbyxQJgGyXilY4=,iv:RJPxKdE+EAsrT6q6i1ovG+AIMuR7t1LazDndRzO3PcE=,tag:QcaOvZRSZpbmj3ZKmIz1Rg==,type:str]
+ unencrypted_suffix: _unencrypted
+ version: 3.12.1
diff --git a/materialize-eventbridge/testdata/config.local.yaml b/materialize-eventbridge/testdata/config.local.yaml
new file mode 100644
index 0000000000..3d1471c103
--- /dev/null
+++ b/materialize-eventbridge/testdata/config.local.yaml
@@ -0,0 +1,9 @@
+region: us-east-1
+event_bus_name: flow-test-bus
+credentials:
+ auth_type: AWSAccessKey
+ aws_access_key_id: test
+ aws_secret_access_key: test
+advanced:
+ endpoint: http://localhost:4566
+ feature_flags: allow_existing_tables_for_new_bindings
diff --git a/materialize-eventbridge/testdata/materialize-local.flow.yaml b/materialize-eventbridge/testdata/materialize-local.flow.yaml
new file mode 100644
index 0000000000..c6ddc00e92
--- /dev/null
+++ b/materialize-eventbridge/testdata/materialize-local.flow.yaml
@@ -0,0 +1,23 @@
+import:
+ - ../../materialize-boilerplate/testdata/integration/collections.materialize.flow.yaml
+
+materializations:
+ acmeCo/tests/materialize-eventbridge/local:
+ endpoint:
+ local:
+ command: ["go", "run", "."]
+ protobuf: true
+ config: config.local.yaml
+ bindings:
+ - resource:
+ source: flow.test
+ detail_type: simple_standard
+ source: tests/simple
+ fields:
+ recommended: true
+ - resource:
+ source: flow.test
+ detail_type: data_types
+ source: tests/data-types
+ fields:
+ recommended: true
diff --git a/materialize-eventbridge/testdata/materialize.flow.yaml b/materialize-eventbridge/testdata/materialize.flow.yaml
new file mode 100644
index 0000000000..9760749fea
--- /dev/null
+++ b/materialize-eventbridge/testdata/materialize.flow.yaml
@@ -0,0 +1,43 @@
+import:
+ - ../../materialize-boilerplate/testdata/integration/collections.materialize.flow.yaml
+
+materializations:
+ acmeCo/tests/materialize-eventbridge/local:
+ endpoint:
+ local:
+ command: ["go", "run", "."]
+ protobuf: true
+ config: config.local.yaml
+ bindings:
+ - resource:
+ source: flow.test
+ detail_type: simple_standard
+ source: tests/simple
+ fields:
+ recommended: true
+ - resource:
+ source: flow.test
+ detail_type: data_types
+ source: tests/data-types
+ fields:
+ recommended: true
+
+ acmeCo/tests/materialize-eventbridge/aws:
+ endpoint:
+ local:
+ command: ["go", "run", "."]
+ protobuf: true
+ config: config.aws.yaml
+ bindings:
+ - resource:
+ source: flow.test
+ detail_type: simple_standard
+ source: tests/simple
+ fields:
+ recommended: true
+ - resource:
+ source: flow.test
+ detail_type: data_types
+ source: tests/data-types
+ fields:
+ recommended: true
diff --git a/materialize-eventbridge/transactor.go b/materialize-eventbridge/transactor.go
new file mode 100644
index 0000000000..72f2aec08e
--- /dev/null
+++ b/materialize-eventbridge/transactor.go
@@ -0,0 +1,221 @@
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "time"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/service/eventbridge"
+ "github.com/aws/aws-sdk-go-v2/service/eventbridge/types"
+ m "github.com/estuary/connectors/go/materialize"
+ pf "github.com/estuary/flow/go/protocols/flow"
+ "golang.org/x/sync/errgroup"
+)
+
+// Per the EventBridge PutEvents API limits.
+//
+// Retry layering: connection-level errors and 5xx responses bubble out of
+// t.client.PutEvents and are handled by the AWS SDK's built-in retryer
+// (default standard mode: 3 attempts, exponential backoff with jitter, max
+// ~20s). The putEventsRetry* constants below govern a *separate* loop for
+// PutEvents 200 responses with FailedEntryCount > 0 (per-entry partial
+// failures, e.g. ThrottlingException on a subset of entries) — the SDK
+// retryer does not see those because the HTTP call succeeded. 4 attempts
+// with 200ms base gives 200/400/800ms backoff = ~1.4s cumulative wait
+// across 4 RPCs. A sustained per-entry throttle that exhausts this budget
+// surfaces an error and the Flow runtime retries the whole transaction
+// (at-least-once delivery is contractual, so re-publishing duplicates is
+// safe). Do not raise this without a concrete throttling incident — bigger
+// numbers just delay the bounce; they do not improve correctness.
+const (
+ // putEventsMaxBatch is the AWS-side hard cap on entries per PutEvents
+ // request; an 11th entry returns 413. See API reference Entries field:
+ // https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_PutEvents.html
+ putEventsMaxBatch = 10
+ // putEventsMaxEntrySize and putEventsMaxRequestSize encode two
+ // documented limits that the AWS docs do not perfectly agree on:
+ //
+ // - EventBridge User Guide says the 1 MB limit applies to the
+ // request as a whole, with a single entry permitted to use the
+ // full 1 MB if alone:
+ // https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-putevents.html#eb-putevent-size
+ // - The Go SDK's PutEvents doc and older API reference language
+ // state the per-entry limit is 256 KB:
+ // https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/service/eventbridge#Client.PutEvents
+ //
+ // We honor both, conservatively: each entry must fit in 256 KB and
+ // the sum of entries in a single PutEvents call must fit in 1 MB.
+ //
+ // Per the AWS-defined size calculation, each entry's size is
+ // len(Source) + len(DetailType) + len(Detail) (UTF-8 bytes); plus 14
+ // if Time is set; plus UTF-8 byte length of each Resources entry.
+ // This connector sets neither Time nor Resources, and EventBusName
+ // is not counted.
+ putEventsMaxEntrySize = 256 * 1024
+ putEventsMaxRequestSize = 1024 * 1024
+ putEventsRetryAttempts = 4
+ putEventsRetryBaseWait = 200 * time.Millisecond
+ // storeConcurrency caps in-flight PutEvents calls per Store transaction.
+ // 8 * 10-entry batches stays well under default per-region PutEvents rate
+ // limits while keeping the pipeline filled across typical call latency;
+ // raise for high-throughput buses with raised quotas, lower if you see
+ // ThrottlingException retries dominating.
+ storeConcurrency = 8
+)
+
+type bindingState struct {
+ source string
+ detailType string
+}
+
+type transactor struct {
+ client *eventbridge.Client
+ eventBusName string
+ bindings []bindingState
+}
+
+var _ m.Transactor = (*transactor)(nil)
+
+func (t *transactor) UnmarshalState(state json.RawMessage) error { return nil }
+func (t *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error) { return nil, nil }
+func (t *transactor) Destroy() {}
+
+func (t *transactor) RecoverCheckpoint(ctx context.Context, spec pf.MaterializationSpec, rangeSpec pf.RangeSpec) (m.RuntimeCheckpoint, error) {
+ return nil, nil
+}
+
+// EventBridge is delta-update only.
+func (t *transactor) Load(it *m.LoadIterator, _ func(int, json.RawMessage) error) error {
+ for it.Next() {
+ panic("Load should never be called for materialize-eventbridge")
+ }
+ return nil
+}
+
+func (t *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) {
+ errGroup, ctx := errgroup.WithContext(it.Context())
+ errGroup.SetLimit(storeConcurrency)
+
+ var (
+ batch []types.PutEventsRequestEntry
+ batchBytes int
+ )
+ flush := func() {
+ if len(batch) == 0 {
+ return
+ }
+ entries := batch
+ batch = nil
+ batchBytes = 0
+ errGroup.Go(func() error {
+ return t.putEvents(ctx, entries)
+ })
+ }
+
+ for it.Next(false) {
+ b := t.bindings[it.Binding]
+
+ entrySize := len(b.source) + len(b.detailType) + len(it.RawJSON)
+ if entrySize > putEventsMaxEntrySize {
+ return nil, fmt.Errorf(
+ "document for binding %d is %d bytes (source+detail-type+detail), exceeding the EventBridge PutEvents per-entry limit of %d bytes",
+ it.Binding, entrySize, putEventsMaxEntrySize,
+ )
+ }
+
+ // Flush before append if either the batch is full or adding this
+ // entry would push the request total over the 1 MB limit.
+ if len(batch) >= putEventsMaxBatch || batchBytes+entrySize > putEventsMaxRequestSize {
+ flush()
+ }
+
+ batch = append(batch, types.PutEventsRequestEntry{
+ EventBusName: aws.String(t.eventBusName),
+ Source: aws.String(b.source),
+ DetailType: aws.String(b.detailType),
+ Detail: aws.String(string(it.RawJSON)),
+ })
+ batchBytes += entrySize
+ }
+ flush()
+
+ if err := errGroup.Wait(); err != nil {
+ return nil, err
+ }
+ if err := it.Err(); err != nil {
+ return nil, err
+ }
+ return nil, nil
+}
+
+// putEvents publishes a batch and retries any individually-failed entries.
+// PutEvents returns 200 OK even when some entries fail (partial-failure
+// semantics), so we must inspect each result.
+func (t *transactor) putEvents(ctx context.Context, entries []types.PutEventsRequestEntry) error {
+ for attempt := range putEventsRetryAttempts {
+ if attempt > 0 {
+ wait := putEventsRetryBaseWait << (attempt - 1)
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-time.After(wait):
+ }
+ }
+
+ out, err := t.client.PutEvents(ctx, &eventbridge.PutEventsInput{Entries: entries})
+ if err != nil {
+ return fmt.Errorf("PutEvents: %w", err)
+ }
+ if out.FailedEntryCount == 0 {
+ return nil
+ }
+
+ var retry []types.PutEventsRequestEntry
+ // Track the first non-retryable failure so the returned error
+ // describes a genuinely permanent entry, not whatever happened to
+ // be last in iteration order. At-least-once semantics make
+ // fail-fast on permanent codes preferable to silently abandoning
+ // the still-retryable entries: the runtime retries the whole
+ // transaction, so surfacing the permanent failure loudly is the
+ // safer signal.
+ var permIdx = -1
+ var permCode, permMsg string
+ for i, r := range out.Entries {
+ if r.ErrorCode == nil && r.ErrorMessage == nil {
+ continue
+ }
+ code := aws.ToString(r.ErrorCode)
+ if !retryableCode(code) {
+ if permIdx == -1 {
+ permIdx, permCode, permMsg = i, code, aws.ToString(r.ErrorMessage)
+ }
+ continue
+ }
+ retry = append(retry, entries[i])
+ }
+ if permIdx != -1 {
+ return fmt.Errorf("entry %d: %s: %s", permIdx, permCode, permMsg)
+ }
+ if len(retry) == 0 {
+ // Defensive: FailedEntryCount > 0 but no entries flagged. Treat as success.
+ return nil
+ }
+ entries = retry
+ }
+ return fmt.Errorf("PutEvents: %d entries still failing after %d attempts",
+ len(entries), putEventsRetryAttempts)
+}
+
+// retryableCode returns true for EventBridge per-entry error codes that
+// indicate transient failures. These codes are surfaced on
+// PutEventsResultEntry.ErrorCode and are stable across SDK versions.
+func retryableCode(code string) bool {
+ switch code {
+ case "ThrottlingException", "Throttling",
+ "InternalFailure", "ServiceUnavailable":
+ return true
+ }
+ return false
+}