diff --git a/materialize-elasticsearch/driver.go b/materialize-elasticsearch/driver.go index b07ef42c61..3dd1afd8c4 100644 --- a/materialize-elasticsearch/driver.go +++ b/materialize-elasticsearch/driver.go @@ -25,6 +25,11 @@ import ( var featureFlagDefaults = map[string]bool{ "retain_existing_data_on_backfill": false, + + // Enable this flag to avoid a one time backfill due to change in date + // format in the mapping (a581ab1b761816b2b93b525c63153aac3493d49a). This + // flag should not be used on tasks created after this commit. + "ignore_mapping_format_changes": false, } type sshForwarding struct { @@ -425,9 +430,10 @@ func (driver) NewTransactor(ctx context.Context, req pm.Request_Open, be *m.Bind } type materialization struct { - cfg config - metaClient *client - dataClient *client + cfg config + metaClient *client + dataClient *client + featureFlags map[string]bool } var _ boilerplate.Materializer[config, fieldConfig, resource, property] = &materialization{} @@ -444,9 +450,10 @@ func newMaterialization(ctx context.Context, materializationName string, cfg con } return &materialization{ - cfg: cfg, - metaClient: metaClient, - dataClient: dataClient, + cfg: cfg, + metaClient: metaClient, + dataClient: dataClient, + featureFlags: featureFlags, }, nil } @@ -530,7 +537,9 @@ func (d *materialization) NewConstraint(p pf.Projection, deltaUpdates bool, fc f } func (d *materialization) MapType(p boilerplate.Projection, fc fieldConfig) (property, boilerplate.ElementConverter) { - return propForProjection(&p.Projection, p.Inference.Types, fc), nil + prop := propForProjection(&p.Projection, p.Inference.Types, fc) + prop.IgnoreFormat = d.featureFlags["ignore_mapping_format_changes"] + return prop, nil } func (d *materialization) Setup(ctx context.Context, is *boilerplate.InfoSchema) (string, error) { diff --git a/materialize-elasticsearch/type_mapping.go b/materialize-elasticsearch/type_mapping.go index 2992870f1b..d8c4c8a301 100644 --- a/materialize-elasticsearch/type_mapping.go +++ b/materialize-elasticsearch/type_mapping.go @@ -13,24 +13,25 @@ import ( type elasticPropertyType string const ( - elasticTypeKeyword elasticPropertyType = "keyword" - elasticTypeBoolean elasticPropertyType = "boolean" - elasticTypeLong elasticPropertyType = "long" - elasticTypeDouble elasticPropertyType = "double" - elasticTypeText elasticPropertyType = "text" - elasticTypeBinary elasticPropertyType = "binary" - elasticTypeDate elasticPropertyType = "date" - elasticTypeIp elasticPropertyType = "ip" + elasticTypeKeyword elasticPropertyType = "keyword" + elasticTypeBoolean elasticPropertyType = "boolean" + elasticTypeLong elasticPropertyType = "long" + elasticTypeDouble elasticPropertyType = "double" + elasticTypeText elasticPropertyType = "text" + elasticTypeBinary elasticPropertyType = "binary" + elasticTypeDate elasticPropertyType = "date" + elasticTypeIp elasticPropertyType = "ip" elasticTypeUnsignedLong elasticPropertyType = "unsigned_long" elasticTypeFlattened elasticPropertyType = "flattened" ) type property struct { - Type elasticPropertyType `json:"type"` - Coerce bool `json:"coerce,omitempty"` - Index *bool `json:"index,omitempty"` - IgnoreAbove int `json:"ignore_above,omitempty"` - Format string `json:"format,omitempty"` + Type elasticPropertyType `json:"type"` + Coerce bool `json:"coerce,omitempty"` + Index *bool `json:"index,omitempty"` + IgnoreAbove int `json:"ignore_above,omitempty"` + Format string `json:"format,omitempty"` + IgnoreFormat bool `json:"-"` } func (p property) String() string { @@ -38,6 +39,9 @@ func (p property) String() string { } func (p property) Compatible(existing boilerplate.ExistingField) bool { + if p.IgnoreFormat { + return strings.EqualFold(existing.Type, string(p.Type)) + } return strings.EqualFold(existing.Type, string(p.Type)) && strings.EqualFold(existing.Format, string(p.Format)) } diff --git a/materialize-elasticsearch/type_mapping_test.go b/materialize-elasticsearch/type_mapping_test.go index 44f05492a6..4aface8393 100644 --- a/materialize-elasticsearch/type_mapping_test.go +++ b/materialize-elasticsearch/type_mapping_test.go @@ -3,6 +3,7 @@ package main import ( "testing" + boilerplate "github.com/estuary/connectors/materialize-boilerplate" pf "github.com/estuary/flow/go/protocols/flow" "github.com/stretchr/testify/require" ) @@ -91,6 +92,21 @@ func TestPropForProjection(t *testing.T) { }, want: objProp(), }, + { + name: "string date-time", + in: &pf.Projection{ + Inference: pf.Inference{ + Types: []string{"string"}, + String_: &pf.Inference_String{ + Format: "date-time", + }, + }, + }, + want: property{ + Type: elasticTypeDate, + Format: "strict_date_optional_time_nanos||strict_date_optional_time||epoch_millis", + }, + }, } for _, tt := range tests { @@ -99,3 +115,59 @@ func TestPropForProjection(t *testing.T) { }) } } + +func TestPropertyCompatible(t *testing.T) { + tests := []struct { + name string + prop property + existing boilerplate.ExistingField + compatible bool + }{ + { + name: "date compatible", + prop: property{ + Type: elasticTypeDate, + Format: "", + }, + existing: boilerplate.ExistingField{ + Type: "date", + Format: "", + }, + compatible: true, + }, + { + name: "date with incompatible format", + prop: property{ + Type: elasticTypeDate, + Format: "strict_date_optional_time_nanos||strict_date_optional_time||epoch_millis", + }, + existing: boilerplate.ExistingField{ + Type: "date", + Format: "", + }, + compatible: false, + }, + { + name: "date compatible when ignore format", + prop: property{ + Type: elasticTypeDate, + Format: "strict_date_optional_time_nanos||strict_date_optional_time||epoch_millis", + IgnoreFormat: true, + }, + existing: boilerplate.ExistingField{ + Type: "date", + Format: "", + }, + compatible: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.compatible { + require.True(t, tt.prop.Compatible(tt.existing)) + } else { + require.False(t, tt.prop.Compatible(tt.existing)) + } + }) + } +}