From 905372f3998dbd54f90377a4878f27308e859381 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Wed, 13 May 2026 15:38:46 -0700 Subject: [PATCH] materialize-elasticsearch: add flag to ignore date format mapping change In #3284 there was a change to the date format mapping. This change makes the field incompatible and so requires a backfill to validate and publish existing tasks but is otherwise unrequired. Since mappings cannot be updated, the only options are to backfill or ignore the change. This ignores the change but only when a feature flag is set. --- materialize-elasticsearch/driver.go | 23 ++++-- materialize-elasticsearch/type_mapping.go | 30 ++++---- .../type_mapping_test.go | 72 +++++++++++++++++++ 3 files changed, 105 insertions(+), 20 deletions(-) diff --git a/materialize-elasticsearch/driver.go b/materialize-elasticsearch/driver.go index b07ef42c61..3dd1afd8c4 100644 --- a/materialize-elasticsearch/driver.go +++ b/materialize-elasticsearch/driver.go @@ -25,6 +25,11 @@ import ( var featureFlagDefaults = map[string]bool{ "retain_existing_data_on_backfill": false, + + // Enable this flag to avoid a one time backfill due to change in date + // format in the mapping (a581ab1b761816b2b93b525c63153aac3493d49a). This + // flag should not be used on tasks created after this commit. + "ignore_mapping_format_changes": false, } type sshForwarding struct { @@ -425,9 +430,10 @@ func (driver) NewTransactor(ctx context.Context, req pm.Request_Open, be *m.Bind } type materialization struct { - cfg config - metaClient *client - dataClient *client + cfg config + metaClient *client + dataClient *client + featureFlags map[string]bool } var _ boilerplate.Materializer[config, fieldConfig, resource, property] = &materialization{} @@ -444,9 +450,10 @@ func newMaterialization(ctx context.Context, materializationName string, cfg con } return &materialization{ - cfg: cfg, - metaClient: metaClient, - dataClient: dataClient, + cfg: cfg, + metaClient: metaClient, + dataClient: dataClient, + featureFlags: featureFlags, }, nil } @@ -530,7 +537,9 @@ func (d *materialization) NewConstraint(p pf.Projection, deltaUpdates bool, fc f } func (d *materialization) MapType(p boilerplate.Projection, fc fieldConfig) (property, boilerplate.ElementConverter) { - return propForProjection(&p.Projection, p.Inference.Types, fc), nil + prop := propForProjection(&p.Projection, p.Inference.Types, fc) + prop.IgnoreFormat = d.featureFlags["ignore_mapping_format_changes"] + return prop, nil } func (d *materialization) Setup(ctx context.Context, is *boilerplate.InfoSchema) (string, error) { diff --git a/materialize-elasticsearch/type_mapping.go b/materialize-elasticsearch/type_mapping.go index 2992870f1b..d8c4c8a301 100644 --- a/materialize-elasticsearch/type_mapping.go +++ b/materialize-elasticsearch/type_mapping.go @@ -13,24 +13,25 @@ import ( type elasticPropertyType string const ( - elasticTypeKeyword elasticPropertyType = "keyword" - elasticTypeBoolean elasticPropertyType = "boolean" - elasticTypeLong elasticPropertyType = "long" - elasticTypeDouble elasticPropertyType = "double" - elasticTypeText elasticPropertyType = "text" - elasticTypeBinary elasticPropertyType = "binary" - elasticTypeDate elasticPropertyType = "date" - elasticTypeIp elasticPropertyType = "ip" + elasticTypeKeyword elasticPropertyType = "keyword" + elasticTypeBoolean elasticPropertyType = "boolean" + elasticTypeLong elasticPropertyType = "long" + elasticTypeDouble elasticPropertyType = "double" + elasticTypeText elasticPropertyType = "text" + elasticTypeBinary elasticPropertyType = "binary" + elasticTypeDate elasticPropertyType = "date" + elasticTypeIp elasticPropertyType = "ip" elasticTypeUnsignedLong elasticPropertyType = "unsigned_long" elasticTypeFlattened elasticPropertyType = "flattened" ) type property struct { - Type elasticPropertyType `json:"type"` - Coerce bool `json:"coerce,omitempty"` - Index *bool `json:"index,omitempty"` - IgnoreAbove int `json:"ignore_above,omitempty"` - Format string `json:"format,omitempty"` + Type elasticPropertyType `json:"type"` + Coerce bool `json:"coerce,omitempty"` + Index *bool `json:"index,omitempty"` + IgnoreAbove int `json:"ignore_above,omitempty"` + Format string `json:"format,omitempty"` + IgnoreFormat bool `json:"-"` } func (p property) String() string { @@ -38,6 +39,9 @@ func (p property) String() string { } func (p property) Compatible(existing boilerplate.ExistingField) bool { + if p.IgnoreFormat { + return strings.EqualFold(existing.Type, string(p.Type)) + } return strings.EqualFold(existing.Type, string(p.Type)) && strings.EqualFold(existing.Format, string(p.Format)) } diff --git a/materialize-elasticsearch/type_mapping_test.go b/materialize-elasticsearch/type_mapping_test.go index 44f05492a6..4aface8393 100644 --- a/materialize-elasticsearch/type_mapping_test.go +++ b/materialize-elasticsearch/type_mapping_test.go @@ -3,6 +3,7 @@ package main import ( "testing" + boilerplate "github.com/estuary/connectors/materialize-boilerplate" pf "github.com/estuary/flow/go/protocols/flow" "github.com/stretchr/testify/require" ) @@ -91,6 +92,21 @@ func TestPropForProjection(t *testing.T) { }, want: objProp(), }, + { + name: "string date-time", + in: &pf.Projection{ + Inference: pf.Inference{ + Types: []string{"string"}, + String_: &pf.Inference_String{ + Format: "date-time", + }, + }, + }, + want: property{ + Type: elasticTypeDate, + Format: "strict_date_optional_time_nanos||strict_date_optional_time||epoch_millis", + }, + }, } for _, tt := range tests { @@ -99,3 +115,59 @@ func TestPropForProjection(t *testing.T) { }) } } + +func TestPropertyCompatible(t *testing.T) { + tests := []struct { + name string + prop property + existing boilerplate.ExistingField + compatible bool + }{ + { + name: "date compatible", + prop: property{ + Type: elasticTypeDate, + Format: "", + }, + existing: boilerplate.ExistingField{ + Type: "date", + Format: "", + }, + compatible: true, + }, + { + name: "date with incompatible format", + prop: property{ + Type: elasticTypeDate, + Format: "strict_date_optional_time_nanos||strict_date_optional_time||epoch_millis", + }, + existing: boilerplate.ExistingField{ + Type: "date", + Format: "", + }, + compatible: false, + }, + { + name: "date compatible when ignore format", + prop: property{ + Type: elasticTypeDate, + Format: "strict_date_optional_time_nanos||strict_date_optional_time||epoch_millis", + IgnoreFormat: true, + }, + existing: boilerplate.ExistingField{ + Type: "date", + Format: "", + }, + compatible: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.compatible { + require.True(t, tt.prop.Compatible(tt.existing)) + } else { + require.False(t, tt.prop.Compatible(tt.existing)) + } + }) + } +}