From 7cfc77eb388b2815844152bece60f88c2be1905f Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Thu, 7 May 2026 15:48:11 -0700 Subject: [PATCH] filesink: add date patterns doc to path description This was added in #3988 but not documented. Only the S3 Store supports using date patterns because it has the 2-phase store implementation. --- filesink/filesink.go | 22 +++++++++++----------- filesink/store_s3.go | 4 ++++ materialize-azure-blob-parquet/main.go | 2 +- materialize-gcs-csv/main.go | 2 +- materialize-gcs-parquet/main.go | 2 +- materialize-s3-csv/.snapshots/TestSpec | 4 ++-- materialize-s3-csv/main.go | 2 +- materialize-s3-parquet/.snapshots/TestSpec | 4 ++-- materialize-s3-parquet/main.go | 2 +- 9 files changed, 24 insertions(+), 20 deletions(-) diff --git a/filesink/filesink.go b/filesink/filesink.go index 826b493748..ab2cd3e21d 100644 --- a/filesink/filesink.go +++ b/filesink/filesink.go @@ -67,11 +67,11 @@ type StreamWriter interface { Close() error } -type resource struct { +type Resource struct { Path string `json:"path" jsonschema:"title=Path,description=The path that objects will be materialized to." jsonschema_extras:"x-collection-name=true"` } -func (r resource) Validate() error { +func (r Resource) Validate() error { if r.Path == "" { return fmt.Errorf("missing 'path'") } @@ -87,14 +87,14 @@ func (fc fieldConfig) Validate() error { return nil } -var _ boilerplate.Connector = &FileDriver[*SinglePhase]{} +var _ boilerplate.Connector = &FileDriver[*SinglePhase, Resource]{} type Upload interface { FileKey() string } // FileDriver contains the behaviors particular to a destination system and file format. -type FileDriver[T Upload] struct { +type FileDriver[T Upload, R any] struct { NewConfig func(raw json.RawMessage) (Config, error) NewStore func(ctx context.Context, config Config, featureFlags map[string]bool) (Store[T], error) NewWriter func(config Config, featureFlags map[string]bool, b *pf.MaterializationSpec_Binding, w io.WriteCloser) (StreamWriter, error) @@ -103,11 +103,11 @@ type FileDriver[T Upload] struct { ConfigSchema func() ([]byte, error) } -func (d FileDriver[T]) Apply(context.Context, *pm.Request_Apply) (*pm.Response_Applied, error) { +func (d FileDriver[T, R]) Apply(context.Context, *pm.Request_Apply) (*pm.Response_Applied, error) { return &pm.Response_Applied{}, nil } -func (d FileDriver[T]) Spec(ctx context.Context, req *pm.Request_Spec) (*pm.Response_Spec, error) { +func (d FileDriver[T, R]) Spec(ctx context.Context, req *pm.Request_Spec) (*pm.Response_Spec, error) { if err := req.Validate(); err != nil { return nil, fmt.Errorf("validating request: %w", err) } @@ -117,7 +117,7 @@ func (d FileDriver[T]) Spec(ctx context.Context, req *pm.Request_Spec) (*pm.Resp return nil, fmt.Errorf("generating endpoint schema: %w", err) } - resourceSchema, err := schemagen.GenerateSchema("ResourceConfig", &resource{}).MarshalJSON() + resourceSchema, err := schemagen.GenerateSchema("ResourceConfig", new(R)).MarshalJSON() if err != nil { return nil, fmt.Errorf("generating resource schema: %w", err) } @@ -129,7 +129,7 @@ func (d FileDriver[T]) Spec(ctx context.Context, req *pm.Request_Spec) (*pm.Resp }, nil } -func (d FileDriver[T]) Validate(ctx context.Context, req *pm.Request_Validate) (*pm.Response_Validated, error) { +func (d FileDriver[T, R]) Validate(ctx context.Context, req *pm.Request_Validate) (*pm.Response_Validated, error) { var out []*pm.Response_Validated_Binding var config, err = d.NewConfig(req.ConfigJson) @@ -138,7 +138,7 @@ func (d FileDriver[T]) Validate(ctx context.Context, req *pm.Request_Validate) ( } for _, b := range req.Bindings { - var res resource + var res Resource if err := pf.UnmarshalStrict(b.ResourceConfigJson, &res); err != nil { return nil, fmt.Errorf("parsing resource config: %w", err) } @@ -159,7 +159,7 @@ func (d FileDriver[T]) Validate(ctx context.Context, req *pm.Request_Validate) ( return &pm.Response_Validated{Bindings: out}, nil } -func (d FileDriver[T]) NewTransactor(ctx context.Context, open pm.Request_Open, _ *m.BindingEvents) (m.Transactor, *pm.Response_Opened, *m.MaterializeOptions, error) { +func (d FileDriver[T, R]) NewTransactor(ctx context.Context, open pm.Request_Open, _ *m.BindingEvents) (m.Transactor, *pm.Response_Opened, *m.MaterializeOptions, error) { driverCfg, err := d.NewConfig(open.Materialization.ConfigJson) if err != nil { return nil, nil, nil, err @@ -179,7 +179,7 @@ func (d FileDriver[T]) NewTransactor(ctx context.Context, open pm.Request_Open, bindings := make([]binding, 0, len(open.Materialization.Bindings)) for _, b := range open.Materialization.Bindings { - var res resource + var res Resource if err := pf.UnmarshalStrict(b.ResourceConfigJson, &res); err != nil { return nil, nil, nil, err } diff --git a/filesink/store_s3.go b/filesink/store_s3.go index 391470ccde..5a3abc7142 100644 --- a/filesink/store_s3.go +++ b/filesink/store_s3.go @@ -73,6 +73,10 @@ func (c *CredentialsConfig) Validate() error { return fmt.Errorf("unknown 'auth_type'") } +type S3Resource struct { + Path string `json:"path" jsonschema:"title=Path,description=The path that objects will be materialized to. May contain date patterns." jsonschema_extras:"x-collection-name=true"` +} + type S3StoreConfig struct { Bucket string `json:"bucket" jsonschema:"title=Bucket,description=Bucket to store materialized objects." jsonschema_extras:"order=0"` AWSAccessKeyID string `json:"awsAccessKeyId,omitempty" jsonschema:"-"` diff --git a/materialize-azure-blob-parquet/main.go b/materialize-azure-blob-parquet/main.go index 145035a50b..8396fb911f 100644 --- a/materialize-azure-blob-parquet/main.go +++ b/materialize-azure-blob-parquet/main.go @@ -53,7 +53,7 @@ func (c config) CommonConfig() filesink.CommonConfig { } } -var driver = filesink.FileDriver[*filesink.SinglePhase]{ +var driver = filesink.FileDriver[*filesink.SinglePhase, filesink.Resource]{ NewConfig: func(raw json.RawMessage) (filesink.Config, error) { var cfg config if err := pf.UnmarshalStrict(raw, &cfg); err != nil { diff --git a/materialize-gcs-csv/main.go b/materialize-gcs-csv/main.go index f2a91a037d..915c2a7603 100644 --- a/materialize-gcs-csv/main.go +++ b/materialize-gcs-csv/main.go @@ -50,7 +50,7 @@ func (c config) CommonConfig() filesink.CommonConfig { } } -var driver = filesink.FileDriver[*filesink.SinglePhase]{ +var driver = filesink.FileDriver[*filesink.SinglePhase, filesink.Resource]{ NewConfig: func(raw json.RawMessage) (filesink.Config, error) { var cfg config if err := pf.UnmarshalStrict(raw, &cfg); err != nil { diff --git a/materialize-gcs-parquet/main.go b/materialize-gcs-parquet/main.go index 88fb4a26c9..a0a8361d18 100644 --- a/materialize-gcs-parquet/main.go +++ b/materialize-gcs-parquet/main.go @@ -53,7 +53,7 @@ func (c config) CommonConfig() filesink.CommonConfig { } } -var driver = filesink.FileDriver[*filesink.SinglePhase]{ +var driver = filesink.FileDriver[*filesink.SinglePhase, filesink.Resource]{ NewConfig: func(raw json.RawMessage) (filesink.Config, error) { var cfg config if err := pf.UnmarshalStrict(raw, &cfg); err != nil { diff --git a/materialize-s3-csv/.snapshots/TestSpec b/materialize-s3-csv/.snapshots/TestSpec index fee71cf2f7..b1a7d15a2c 100644 --- a/materialize-s3-csv/.snapshots/TestSpec +++ b/materialize-s3-csv/.snapshots/TestSpec @@ -161,12 +161,12 @@ }, "resource_config_schema_json": { "$schema": "https://json-schema.org/draft/2020-12/schema", - "$id": "https://github.com/estuary/connectors/filesink/resource", + "$id": "https://github.com/estuary/connectors/filesink/s3-resource", "properties": { "path": { "type": "string", "title": "Path", - "description": "The path that objects will be materialized to.", + "description": "The path that objects will be materialized to. May contain date patterns.", "x-collection-name": true } }, diff --git a/materialize-s3-csv/main.go b/materialize-s3-csv/main.go index 893664a8b2..7db3ffd2e6 100644 --- a/materialize-s3-csv/main.go +++ b/materialize-s3-csv/main.go @@ -50,7 +50,7 @@ func (c config) CommonConfig() filesink.CommonConfig { } } -var driver = filesink.FileDriver[*filesink.S3MultipartUpload]{ +var driver = filesink.FileDriver[*filesink.S3MultipartUpload, filesink.S3Resource]{ NewConfig: func(raw json.RawMessage) (filesink.Config, error) { var cfg config if err := pf.UnmarshalStrict(raw, &cfg); err != nil { diff --git a/materialize-s3-parquet/.snapshots/TestSpec b/materialize-s3-parquet/.snapshots/TestSpec index 26152c054b..338c8546f0 100644 --- a/materialize-s3-parquet/.snapshots/TestSpec +++ b/materialize-s3-parquet/.snapshots/TestSpec @@ -167,12 +167,12 @@ }, "resource_config_schema_json": { "$schema": "https://json-schema.org/draft/2020-12/schema", - "$id": "https://github.com/estuary/connectors/filesink/resource", + "$id": "https://github.com/estuary/connectors/filesink/s3-resource", "properties": { "path": { "type": "string", "title": "Path", - "description": "The path that objects will be materialized to.", + "description": "The path that objects will be materialized to. May contain date patterns.", "x-collection-name": true } }, diff --git a/materialize-s3-parquet/main.go b/materialize-s3-parquet/main.go index c5e0ec4803..2a954c6f8b 100644 --- a/materialize-s3-parquet/main.go +++ b/materialize-s3-parquet/main.go @@ -53,7 +53,7 @@ func (c config) CommonConfig() filesink.CommonConfig { } } -var driver = filesink.FileDriver[*filesink.S3MultipartUpload]{ +var driver = filesink.FileDriver[*filesink.S3MultipartUpload, filesink.S3Resource]{ NewConfig: func(raw json.RawMessage) (filesink.Config, error) { var cfg config if err := pf.UnmarshalStrict(raw, &cfg); err != nil {