Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions filesink/filesink.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ type StreamWriter interface {
Close() error
}

type resource struct {
type Resource struct {
Path string `json:"path" jsonschema:"title=Path,description=The path that objects will be materialized to." jsonschema_extras:"x-collection-name=true"`
}

func (r resource) Validate() error {
func (r Resource) Validate() error {
if r.Path == "" {
return fmt.Errorf("missing 'path'")
}
Expand All @@ -87,14 +87,14 @@ func (fc fieldConfig) Validate() error {
return nil
}

var _ boilerplate.Connector = &FileDriver[*SinglePhase]{}
var _ boilerplate.Connector = &FileDriver[*SinglePhase, Resource]{}

type Upload interface {
FileKey() string
}

// FileDriver contains the behaviors particular to a destination system and file format.
type FileDriver[T Upload] struct {
type FileDriver[T Upload, R any] struct {
NewConfig func(raw json.RawMessage) (Config, error)
NewStore func(ctx context.Context, config Config, featureFlags map[string]bool) (Store[T], error)
NewWriter func(config Config, featureFlags map[string]bool, b *pf.MaterializationSpec_Binding, w io.WriteCloser) (StreamWriter, error)
Expand All @@ -103,11 +103,11 @@ type FileDriver[T Upload] struct {
ConfigSchema func() ([]byte, error)
}

func (d FileDriver[T]) Apply(context.Context, *pm.Request_Apply) (*pm.Response_Applied, error) {
func (d FileDriver[T, R]) Apply(context.Context, *pm.Request_Apply) (*pm.Response_Applied, error) {
return &pm.Response_Applied{}, nil
}

func (d FileDriver[T]) Spec(ctx context.Context, req *pm.Request_Spec) (*pm.Response_Spec, error) {
func (d FileDriver[T, R]) Spec(ctx context.Context, req *pm.Request_Spec) (*pm.Response_Spec, error) {
if err := req.Validate(); err != nil {
return nil, fmt.Errorf("validating request: %w", err)
}
Expand All @@ -117,7 +117,7 @@ func (d FileDriver[T]) Spec(ctx context.Context, req *pm.Request_Spec) (*pm.Resp
return nil, fmt.Errorf("generating endpoint schema: %w", err)
}

resourceSchema, err := schemagen.GenerateSchema("ResourceConfig", &resource{}).MarshalJSON()
resourceSchema, err := schemagen.GenerateSchema("ResourceConfig", new(R)).MarshalJSON()
if err != nil {
return nil, fmt.Errorf("generating resource schema: %w", err)
}
Expand All @@ -129,7 +129,7 @@ func (d FileDriver[T]) Spec(ctx context.Context, req *pm.Request_Spec) (*pm.Resp
}, nil
}

func (d FileDriver[T]) Validate(ctx context.Context, req *pm.Request_Validate) (*pm.Response_Validated, error) {
func (d FileDriver[T, R]) Validate(ctx context.Context, req *pm.Request_Validate) (*pm.Response_Validated, error) {
var out []*pm.Response_Validated_Binding

var config, err = d.NewConfig(req.ConfigJson)
Expand All @@ -138,7 +138,7 @@ func (d FileDriver[T]) Validate(ctx context.Context, req *pm.Request_Validate) (
}

for _, b := range req.Bindings {
var res resource
var res Resource
if err := pf.UnmarshalStrict(b.ResourceConfigJson, &res); err != nil {
return nil, fmt.Errorf("parsing resource config: %w", err)
}
Expand All @@ -159,7 +159,7 @@ func (d FileDriver[T]) Validate(ctx context.Context, req *pm.Request_Validate) (
return &pm.Response_Validated{Bindings: out}, nil
}

func (d FileDriver[T]) NewTransactor(ctx context.Context, open pm.Request_Open, _ *m.BindingEvents) (m.Transactor, *pm.Response_Opened, *m.MaterializeOptions, error) {
func (d FileDriver[T, R]) NewTransactor(ctx context.Context, open pm.Request_Open, _ *m.BindingEvents) (m.Transactor, *pm.Response_Opened, *m.MaterializeOptions, error) {
driverCfg, err := d.NewConfig(open.Materialization.ConfigJson)
if err != nil {
return nil, nil, nil, err
Expand All @@ -179,7 +179,7 @@ func (d FileDriver[T]) NewTransactor(ctx context.Context, open pm.Request_Open,
bindings := make([]binding, 0, len(open.Materialization.Bindings))

for _, b := range open.Materialization.Bindings {
var res resource
var res Resource
if err := pf.UnmarshalStrict(b.ResourceConfigJson, &res); err != nil {
return nil, nil, nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions filesink/store_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func (c *CredentialsConfig) Validate() error {
return fmt.Errorf("unknown 'auth_type'")
}

type S3Resource struct {
Path string `json:"path" jsonschema:"title=Path,description=The path that objects will be materialized to. May contain date patterns." jsonschema_extras:"x-collection-name=true"`
}

type S3StoreConfig struct {
Bucket string `json:"bucket" jsonschema:"title=Bucket,description=Bucket to store materialized objects." jsonschema_extras:"order=0"`
AWSAccessKeyID string `json:"awsAccessKeyId,omitempty" jsonschema:"-"`
Expand Down
2 changes: 1 addition & 1 deletion materialize-azure-blob-parquet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (c config) CommonConfig() filesink.CommonConfig {
}
}

var driver = filesink.FileDriver[*filesink.SinglePhase]{
var driver = filesink.FileDriver[*filesink.SinglePhase, filesink.Resource]{
NewConfig: func(raw json.RawMessage) (filesink.Config, error) {
var cfg config
if err := pf.UnmarshalStrict(raw, &cfg); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion materialize-gcs-csv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (c config) CommonConfig() filesink.CommonConfig {
}
}

var driver = filesink.FileDriver[*filesink.SinglePhase]{
var driver = filesink.FileDriver[*filesink.SinglePhase, filesink.Resource]{
NewConfig: func(raw json.RawMessage) (filesink.Config, error) {
var cfg config
if err := pf.UnmarshalStrict(raw, &cfg); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion materialize-gcs-parquet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (c config) CommonConfig() filesink.CommonConfig {
}
}

var driver = filesink.FileDriver[*filesink.SinglePhase]{
var driver = filesink.FileDriver[*filesink.SinglePhase, filesink.Resource]{
NewConfig: func(raw json.RawMessage) (filesink.Config, error) {
var cfg config
if err := pf.UnmarshalStrict(raw, &cfg); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions materialize-s3-csv/.snapshots/TestSpec
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,12 @@
},
"resource_config_schema_json": {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://github.com/estuary/connectors/filesink/resource",
"$id": "https://github.com/estuary/connectors/filesink/s3-resource",
"properties": {
"path": {
"type": "string",
"title": "Path",
"description": "The path that objects will be materialized to.",
"description": "The path that objects will be materialized to. May contain date patterns.",
"x-collection-name": true
}
},
Expand Down
2 changes: 1 addition & 1 deletion materialize-s3-csv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (c config) CommonConfig() filesink.CommonConfig {
}
}

var driver = filesink.FileDriver[*filesink.S3MultipartUpload]{
var driver = filesink.FileDriver[*filesink.S3MultipartUpload, filesink.S3Resource]{
NewConfig: func(raw json.RawMessage) (filesink.Config, error) {
var cfg config
if err := pf.UnmarshalStrict(raw, &cfg); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions materialize-s3-parquet/.snapshots/TestSpec
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,12 @@
},
"resource_config_schema_json": {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://github.com/estuary/connectors/filesink/resource",
"$id": "https://github.com/estuary/connectors/filesink/s3-resource",
"properties": {
"path": {
"type": "string",
"title": "Path",
"description": "The path that objects will be materialized to.",
"description": "The path that objects will be materialized to. May contain date patterns.",
"x-collection-name": true
}
},
Expand Down
2 changes: 1 addition & 1 deletion materialize-s3-parquet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (c config) CommonConfig() filesink.CommonConfig {
}
}

var driver = filesink.FileDriver[*filesink.S3MultipartUpload]{
var driver = filesink.FileDriver[*filesink.S3MultipartUpload, filesink.S3Resource]{
NewConfig: func(raw json.RawMessage) (filesink.Config, error) {
var cfg config
if err := pf.UnmarshalStrict(raw, &cfg); err != nil {
Expand Down
Loading