Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions materialize-postgres/.snapshots/TestSQLGeneration
Original file line number Diff line number Diff line change
Expand Up @@ -344,4 +344,20 @@ BEGIN
END $$;
--- End Fence Update ---

--- Begin createTargetTable [jsonb contentMediaType] ---

CREATE TABLE IF NOT EXISTS "public".jsonb_round_trip (
id BIGINT NOT NULL,
plain_json JSON,
jsonb_col JSONB,

PRIMARY KEY (id)
);

COMMENT ON TABLE "public".jsonb_round_trip IS 'Generated for materialize-postgres jsonb round-trip test';
COMMENT ON COLUMN "public".jsonb_round_trip.id IS '';
COMMENT ON COLUMN "public".jsonb_round_trip.plain_json IS '';
COMMENT ON COLUMN "public".jsonb_round_trip.jsonb_col IS '';
--- End createTargetTable [jsonb contentMediaType] ---


37 changes: 30 additions & 7 deletions materialize-postgres/sqlgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,36 @@ func createPgDialect(featureFlags map[string]bool) sql.Dialect {
binaryMapping = sql.MapStatic("BYTEA", sql.UsingConverter(sql.Base64Decoder))
}

// Fields originating from a PostgreSQL JSONB column carry this
// (non-standard, vendor-tree) contentMediaType so they can be recreated
// as JSONB at the destination instead of collapsing onto JSON. Anything
// without the annotation defaults to JSON, preserving the historical
// behavior for existing materializations and non-Postgres sources.
jsonbContentMediaType := "application/vnd.postgresql.jsonb+json"
jsonOrJsonb := func(jsonMapper, jsonbMapper sql.MapProjectionFn) sql.MapProjectionFn {
return func(p *sql.Projection) (sql.DDLer, sql.CompatibleColumnTypes, sql.ElementConverter) {
if p.Inference.String_ != nil && p.Inference.String_.ContentType == jsonbContentMediaType {
return jsonbMapper(p)
}
return jsonMapper(p)
}
}

mapper := sql.NewDDLMapper(
sql.FlatTypeMappings{
sql.INTEGER: sql.MapSignedInt64(
sql.MapStatic("BIGINT", sql.AlsoCompatibleWith("integer")),
sql.MapStatic("NUMERIC"),
),
sql.NUMBER: sql.MapStatic("DOUBLE PRECISION"),
sql.BOOLEAN: sql.MapStatic("BOOLEAN"),
sql.OBJECT: sql.MapStatic("JSON"),
sql.ARRAY: sql.MapStatic("JSON"),
sql.BINARY: binaryMapping,
sql.MULTIPLE: sql.MapStatic("JSON", sql.UsingConverter(sql.ToJsonBytes)),
sql.NUMBER: sql.MapStatic("DOUBLE PRECISION"),
sql.BOOLEAN: sql.MapStatic("BOOLEAN"),
sql.OBJECT: jsonOrJsonb(sql.MapStatic("JSON"), sql.MapStatic("JSONB")),
sql.ARRAY: jsonOrJsonb(sql.MapStatic("JSON"), sql.MapStatic("JSONB")),
sql.BINARY: binaryMapping,
sql.MULTIPLE: jsonOrJsonb(
sql.MapStatic("JSON", sql.UsingConverter(sql.ToJsonBytes)),
sql.MapStatic("JSONB", sql.UsingConverter(sql.ToJsonBytes)),
),
sql.STRING_INTEGER: sql.MapStatic("NUMERIC"),
sql.STRING_NUMBER: sql.MapStatic("DECIMAL", sql.AlsoCompatibleWith("numeric")),
sql.STRING: sql.MapString(sql.StringMappings{
Expand Down Expand Up @@ -80,7 +98,12 @@ func createPgDialect(featureFlags map[string]bool) sql.Dialect {
"text": {sql.NewMigrationSpec([]string{"bytea"}, sql.WithCastSQL(stringToByteaCast))},
"character varying": {sql.NewMigrationSpec([]string{"bytea"}, sql.WithCastSQL(stringToByteaCast))},
"bytea": {sql.NewMigrationSpec([]string{"text"}, sql.WithCastSQL(byteaToStringCast))},
"*": {sql.NewMigrationSpec([]string{"json"}, sql.WithCastSQL(toJsonCast))},
// PostgreSQL accepts implicit casts in both directions between
// json and jsonb, so columns can move freely as the upstream
// source type annotation flips.
"json": {sql.NewMigrationSpec([]string{"jsonb"})},
"jsonb": {sql.NewMigrationSpec([]string{"json"})},
"*": {sql.NewMigrationSpec([]string{"json"}, sql.WithCastSQL(toJsonCast))},
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should * to jsonb also be permitted?

},
TableLocatorer: sql.TableLocatorFn(func(path []string) sql.InfoTableLocation {
if len(path) == 1 {
Expand Down
111 changes: 111 additions & 0 deletions materialize-postgres/sqlgen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,84 @@ func TestSQLGeneration(t *testing.T) {
},
)

// Exercise the source-postgres -> materialize-postgres JSONB round-trip:
// a value field carrying the application/vnd.postgresql.jsonb+json
// contentMediaType must render as JSONB, while a sibling field without
// the annotation stays on JSON.
jsonbTable := buildJSONBTestTable(t)
jsonbName := "createTargetTable [jsonb contentMediaType]"
snap.WriteString("--- Begin " + jsonbName + " ---\n")
require.NoError(t, testTemplates.createTargetTable.Execute(snap, &jsonbTable))
snap.WriteString("--- End " + jsonbName + " ---\n\n")

cupaloy.SnapshotT(t, snap.String())
}

// buildJSONBTestTable assembles a synthetic Table with two value projections:
// one carrying the jsonb contentMediaType so it should map to JSONB, and one
// without so it should default to JSON.
func buildJSONBTestTable(t *testing.T) sql.Table {
t.Helper()

const jsonbMediaType = "application/vnd.postgresql.jsonb+json"
multipleTypes := []string{"object", "string", "array", "number", "boolean", "null"}

mkValue := func(field, contentType string) sql.Column {
var stringInf *pf.Inference_String
if contentType != "" {
stringInf = &pf.Inference_String{ContentType: contentType}
}
p := sql.Projection{
Projection: pf.Projection{
Field: field,
Ptr: "/" + field,
Inference: pf.Inference{
Types: multipleTypes,
String_: stringInf,
Exists: pf.Inference_MAY,
},
},
}
return sql.Column{
Projection: p,
MappedType: testDialect.MapType(&p, sql.FieldConfig{}),
Identifier: testDialect.Identifier(field),
}
}

keyProj := sql.Projection{
Projection: pf.Projection{
Field: "id",
Ptr: "/id",
IsPrimaryKey: true,
Inference: pf.Inference{
Types: []string{"integer"},
Exists: pf.Inference_MUST,
},
},
}
keyCol := sql.Column{
Projection: keyProj,
MappedType: testDialect.MapType(&keyProj, sql.FieldConfig{}),
Identifier: testDialect.Identifier("id"),
MustExist: true,
}

tableName := "jsonb_round_trip"
return sql.Table{
TableShape: sql.TableShape{
Path: []string{"public", tableName},
Comment: "Generated for materialize-postgres jsonb round-trip test",
},
Identifier: testDialect.Identifier("public", tableName),
Keys: []sql.Column{keyCol},
Values: []sql.Column{
mkValue("plain_json", ""),
mkValue("jsonb_col", jsonbMediaType),
},
}
}

func TestDateTimeColumn(t *testing.T) {

var mapped = testDialect.MapType(&sql.Projection{
Expand All @@ -62,6 +137,42 @@ func TestDateTimeColumn(t *testing.T) {
require.NoError(t, err)
}

func TestJSONBContentMediaType(t *testing.T) {
jsonbMediaType := "application/vnd.postgresql.jsonb+json"

mapWithMediaType := func(types []string, contentType string) string {
var stringInf *pf.Inference_String
for _, ty := range types {
if ty == "string" {
stringInf = &pf.Inference_String{ContentType: contentType}
break
}
}
return testDialect.MapType(&sql.Projection{
Projection: pf.Projection{
Inference: pf.Inference{
Types: types,
String_: stringInf,
Exists: pf.Inference_MUST,
},
},
}, sql.FieldConfig{}).DDL
}

require.Equal(t,
"JSONB NOT NULL",
mapWithMediaType([]string{"object", "string", "array", "number", "boolean"}, jsonbMediaType),
"MULTIPLE-typed field with jsonb contentMediaType should map to JSONB")
require.Equal(t,
"JSON NOT NULL",
mapWithMediaType([]string{"object", "string", "array", "number", "boolean"}, "application/json"),
"MULTIPLE-typed field with application/json contentMediaType should map to JSON")
require.Equal(t,
"JSON NOT NULL",
mapWithMediaType([]string{"object", "string", "array", "number", "boolean"}, ""),
"MULTIPLE-typed field without contentMediaType should default to JSON")
}

func TestTruncatedIdentifier(t *testing.T) {
tests := []struct {
name string
Expand Down
6 changes: 4 additions & 2 deletions source-postgres/.snapshots/TestDiscoveryComplex
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ sql> COMMENT ON COLUMN test.discoverycomplex_934635.k1 IS 'I think this is a key
]
},
"doc": {
"description": "(source type: json)"
"description": "(source type: json)",
"contentMediaType": "application/json"
},
"doc/bin": {
"description": "(source type: non-nullable jsonb)"
"description": "(source type: non-nullable jsonb)",
"contentMediaType": "application/vnd.postgresql.jsonb+json"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that PostgREST uses a similar media type format, but they define it on their own vendor: application/vnd.pgrst.object+json. Should we instead use something specific to us as a vendor such as application/vnd.estuary.jsonb+json?

I know this specifically meant to handle same db round-tripping, but I wonder if there really won't be any ideas to have things like mysql -> postgres customizations based on the source type, will we have a combinatorial issue?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danielnelson I think you are right, I was also a bit worried about overlapping with another vendor (postgres itself), so I think we can do vnd.estuary.postgres.jsonb+json or something like this

},
"foo": {
"description": "This is a text field! (source type: text)",
Expand Down
21 changes: 15 additions & 6 deletions source-postgres/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,11 @@ func (db *postgresDatabase) TranslateDBToJSONType(column sqlcapture.ColumnInfo,
}

type columnSchema struct {
contentEncoding string
format string
nullable bool
jsonTypes []string
contentEncoding string
contentMediaType string
format string
nullable bool
jsonTypes []string
}

func (s columnSchema) toType() *jsonschema.Schema {
Expand All @@ -283,6 +284,10 @@ func (s columnSchema) toType() *jsonschema.Schema {
out.Extras["contentEncoding"] = s.contentEncoding // New in 2019-09.
}

if s.contentMediaType != "" {
out.Extras["contentMediaType"] = s.contentMediaType // New in 2019-09.
}

if s.jsonTypes != nil {
var types = append([]string(nil), s.jsonTypes...)
if s.nullable {
Expand Down Expand Up @@ -338,8 +343,12 @@ var postgresTypeToJSON = map[string]columnSchema{
"bit": {jsonTypes: []string{"string"}},
"varbit": {jsonTypes: []string{"string"}},

"json": {},
"jsonb": {},
// json and jsonb columns capture arbitrary JSON values, so we don't constrain
// the JSON Schema type. The contentMediaType annotation distinguishes the two
// at the wire so downstream connectors (e.g. materialize-postgres) can
// recreate the original column type instead of collapsing both onto json.
"json": {contentMediaType: "application/json"},
"jsonb": {contentMediaType: "application/vnd.postgresql.jsonb+json"},
"jsonpath": {jsonTypes: []string{"string"}},

// Domain-Specific Types
Expand Down
Loading