Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions plugins/debezium-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ What it provides:
- Process compose job
- Debezium Server instance pre-configured with the outbox event router
- readme detailing environment variables and basic usage
- init_outbox service that assumes an existing Postgresql setup and configures the required outbox table, publication and heartbeat table for Debezium to use.
- init_topic service that attempts to auto-create the heartbeat topic, which is required by Debezium Server. Normally auto topic creation would suffice, but debezium server will error if the heartbeat topic does not exist.'
- `debezium-server-init-outbox` service that assumes an existing Postgresql setup and configures the required outbox table, publication and heartbeat table for Debezium to use.
- `debezium-server-init-topic` service that attempts to auto-create the heartbeat topic, which is required by Debezium Server. Normally auto topic creation would suffice, but debezium server will error if the heartbeat topic does not exist.'
- Various CLI tools
- Kafka CLI tools are included for creating the heartbeat topic
- psql is provided and used for setting up Postgresql
Expand All @@ -26,7 +26,11 @@ Include the plugin in your `devbox.json`:
"$schema": "https://raw.githubusercontent.com/jetify-com/devbox/main/.schema/devbox.schema.json",
"include": [
"github:cultureamp/devbox-extras?dir=plugins/debezium-server"
]
],
env: {
"_DEBEZIUM_SERVER_DB_PORT": "12345" // Adjust to your Postgresql port
"_DEBEZIUM_SERVER_DB_NAME": "my_app" // Adjust to your Postgresql database name
}
}

You will need to add the `ca-kafka-local` plugin and follow the plugin [README.md](../ca-kafka-local/README.md).
Expand All @@ -44,4 +48,4 @@ You will also need to add `postgresql` package to your project, and correctly de
command: pg_isready -U postgres
availability:
restart: always
```
```
34 changes: 17 additions & 17 deletions plugins/debezium-server/bin/debezium-server-readme
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@ echo
echo "You will need to first ensure both postgresql and kafka-local (https://github.com/cultureamp/kafka-local) are running"
echo
echo "== Environment variables =="
echo "DB_HOSTNAME: ${DB_HOSTNAME}"
echo "DB_PORT: ${DB_PORT}"
echo "DB_AUTH_USERNAME: ${DB_AUTH_USERNAME}"
echo "DB_AUTH_PASSWORD: ${DB_AUTH_PASSWORD}"
echo "DB_USERNAME: ${DB_USERNAME}"
echo "DB_PASSWORD: ${DB_PASSWORD}"
echo "DB_NAME: ${DB_NAME}"
echo "DB_SCHEMA: ${DB_SCHEMA}"
echo "BOOTSTRAP_SERVERS: ${BOOTSTRAP_SERVERS}"
echo "DB_HOSTNAME: ${_DEBEZIUM_SERVER_DB_HOSTNAME}"
echo "DB_PORT: ${_DEBEZIUM_SERVER_DB_PORT}"
echo "DB_AUTH_USERNAME: ${_DEBEZIUM_SERVER_DB_AUTH_USERNAME}"
echo "DB_AUTH_PASSWORD: ${_DEBEZIUM_SERVER_DB_AUTH_PASSWORD}"
echo "DB_USERNAME: ${_DEBEZIUM_SERVER_DB_USERNAME}"
echo "DB_PASSWORD: ${_DEBEZIUM_SERVER_DB_PASSWORD}"
echo "DB_NAME: ${_DEBEZIUM_SERVER_DB_NAME}"
echo "DB_SCHEMA: ${_DEBEZIUM_SERVER_DB_SCHEMA}"
echo "BOOTSTRAP_SERVERS: ${KAFKA_BROKERS}"
echo "SCHEMA_REGISTRY_URL: ${SCHEMA_REGISTRY_URL}"
echo "FARM: ${FARM}"
echo "INTERNAL_TOPIC_PREFIX: ${INTERNAL_TOPIC_PREFIX}"
echo "SLOT_NAME: ${SLOT_NAME}"
echo "OFFSET_TOPIC: ${OFFSET_TOPIC}"
echo "PUBLICATION_NAME: ${PUBLICATION_NAME}"
echo "OUTBOX_TABLE: ${OUTBOX_TABLE}"
echo "HEARTBEAT_TABLE: ${HEARTBEAT_TABLE}"
echo "TARGET_TOPIC: ${TARGET_TOPIC}"
echo "FARM: ${_DEBEZIUM_SERVER_FARM}"
echo "INTERNAL_TOPIC_PREFIX: ${_DEBEZIUM_SERVER_INTERNAL_TOPIC_PREFIX}"
echo "SLOT_NAME: ${_DEBEZIUM_SERVER_SLOT_NAME}"
echo "OFFSET_TOPIC: ${_DEBEZIUM_SERVER_OFFSET_TOPIC}"
echo "PUBLICATION_NAME: ${_DEBEZIUM_SERVER_PUBLICATION_NAME}"
echo "OUTBOX_TABLE: ${_DEBEZIUM_SERVER_OUTBOX_TABLE}"
echo "HEARTBEAT_TABLE: ${_DEBEZIUM_SERVER_HEARTBEAT_TABLE}"
echo "TARGET_TOPIC: ${_DEBEZIUM_SERVER_TARGET_TOPIC}"
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,40 @@ debezium.format.delegate.converter.type.schemas.enable=false
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.tasks.max=1
debezium.source.plugin.name=pgoutput
debezium.source.topic.prefix=${FARM}.${INTERNAL_TOPIC_PREFIX}
debezium.source.topic.prefix=${_DEBEZIUM_SERVER_FARM}.${_DEBEZIUM_SERVER_INTERNAL_TOPIC_PREFIX}
debezium.source.publication.autocreate.mode=filtered
debezium.source.slot.name=${SLOT_NAME}
debezium.source.slot.name=${_DEBEZIUM_SERVER_SLOT_NAME}
debezium.source.slot.drop.on.stop=false
debezium.source.table.include.list=${DB_SCHEMA}.${OUTBOX_TABLE}, ${DB_SCHEMA}.${HEARTBEAT_TABLE}
debezium.source.publication.name=${PUBLICATION_NAME}
debezium.source.table.include.list=${_DEBEZIUM_SERVER_DB_SCHEMA}.${_DEBEZIUM_SERVER_OUTBOX_TABLE}, ${_DEBEZIUM_SERVER_DB_SCHEMA}.${_DEBEZIUM_SERVER_HEARTBEAT_TABLE}
debezium.source.publication.name=${_DEBEZIUM_SERVER_PUBLICATION_NAME}
debezium.source.offset.storage=org.apache.kafka.connect.storage.KafkaOffsetBackingStore
debezium.source.offset.storage.topic=${OFFSET_TOPIC}
debezium.source.offset.storage.topic=${_DEBEZIUM_SERVER_OFFSET_TOPIC}
debezium.source.offset.storage.partitions=1
debezium.source.offset.storage.replication.factor=1
debezium.transforms=router, heartbeat1, heartbeat2
debezium.transforms.router.type=io.debezium.transforms.outbox.EventRouter
debezium.transforms.router.route.topic.replacement=${FARM}.$1
debezium.transforms.router.route.topic.replacement=${_DEBEZIUM_SERVER_FARM}.$1
debezium.transforms.router.route.by.field=topic
debezium.transforms.router.table.field.event.payload=payload
debezium.transforms.router.table.field.event.key=partition_key
debezium.transforms.router.table.field.event.id=id
debezium.transforms.router.predicate=outbox
debezium.predicates=outbox
debezium.predicates.outbox.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
debezium.predicates.outbox.pattern=${FARM}.${INTERNAL_TOPIC_PREFIX}.${DB_SCHEMA}.${OUTBOX_TABLE}
debezium.predicates.outbox.pattern=${_DEBEZIUM_SERVER_FARM}.${_DEBEZIUM_SERVER_INTERNAL_TOPIC_PREFIX}.${_DEBEZIUM_SERVER_DB_SCHEMA}.${_DEBEZIUM_SERVER_OUTBOX_TABLE}
debezium.transforms.heartbeat1.type=org.apache.kafka.connect.transforms.RegexRouter
debezium.transforms.heartbeat1.regex=__debezium-heartbeat.${FARM}.${INTERNAL_TOPIC_PREFIX}
debezium.transforms.heartbeat1.replacement=${FARM}.${INTERNAL_TOPIC_PREFIX}.debezium-heartbeat-interval
debezium.transforms.heartbeat1.regex=__debezium-heartbeat.${_DEBEZIUM_SERVER_FARM}.${_DEBEZIUM_SERVER_INTERNAL_TOPIC_PREFIX}
debezium.transforms.heartbeat1.replacement=${_DEBEZIUM_SERVER_FARM}.${_DEBEZIUM_SERVER_INTERNAL_TOPIC_PREFIX}.debezium-heartbeat-interval
debezium.transforms.heartbeat2.type=org.apache.kafka.connect.transforms.RegexRouter
debezium.transforms.heartbeat2.regex=${FARM}.${INTERNAL_TOPIC_PREFIX}.${DB_SCHEMA}.${HEARTBEAT_TABLE}(.*?)
debezium.transforms.heartbeat2.replacement=_${FARM}.${INTERNAL_TOPIC_PREFIX}.debezium-heartbeat-table
debezium.source.database.initial.statements=INSERT INTO ${DB_SCHEMA}.${HEARTBEAT_TABLE} (id, heartbeat_recorded) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET heartbeat_recorded=EXCLUDED.heartbeat_recorded;
debezium.source.heartbeat.action.query=INSERT INTO ${DB_SCHEMA}.${HEARTBEAT_TABLE} (id, heartbeat_recorded) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET heartbeat_recorded=EXCLUDED.heartbeat_recorded;
debezium.transforms.heartbeat2.regex=${_DEBEZIUM_SERVER_FARM}.${_DEBEZIUM_SERVER_INTERNAL_TOPIC_PREFIX}.${_DEBEZIUM_SERVER_DB_SCHEMA}.${_DEBEZIUM_SERVER_HEARTBEAT_TABLE}(.*?)
debezium.transforms.heartbeat2.replacement=_${_DEBEZIUM_SERVER_FARM}.${_DEBEZIUM_SERVER_INTERNAL_TOPIC_PREFIX}.debezium-heartbeat-table
debezium.source.database.initial.statements=INSERT INTO ${_DEBEZIUM_SERVER_DB_SCHEMA}.${_DEBEZIUM_SERVER_HEARTBEAT_TABLE} (id, heartbeat_recorded) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET heartbeat_recorded=EXCLUDED.heartbeat_recorded;
debezium.source.heartbeat.action.query=INSERT INTO ${_DEBEZIUM_SERVER_DB_SCHEMA}.${_DEBEZIUM_SERVER_HEARTBEAT_TABLE} (id, heartbeat_recorded) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET heartbeat_recorded=EXCLUDED.heartbeat_recorded;
debezium.source.heartbeat.interval.ms=10000
debezium.source.database.hostname=${DB_HOSTNAME}
debezium.source.database.port=${DB_PORT}
debezium.source.database.user=${DB_USERNAME}
debezium.source.database.password=${DB_PASSWORD}
debezium.source.database.dbname=${DB_NAME}
debezium.source.bootstrap.servers=${BOOTSTRAP_SERVERS}
debezium.sink.kafka.producer.bootstrap.servers=${BOOTSTRAP_SERVERS}
debezium.source.database.hostname=${_DEBEZIUM_SERVER_DB_HOSTNAME}
debezium.source.database.port=${_DEBEZIUM_SERVER_DB_PORT}
debezium.source.database.user=${_DEBEZIUM_SERVER_DB_USERNAME}
debezium.source.database.password=${_DEBEZIUM_SERVER_DB_PASSWORD}
debezium.source.database.dbname=${_DEBEZIUM_SERVER_DB_NAME}
debezium.source.bootstrap.servers=${KAFKA_BROKERS}
debezium.sink.kafka.producer.bootstrap.servers=${KAFKA_BROKERS}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ BEGIN
END IF;

-- Set publication owner if not already set
ALTER PUBLICATION dbz_publication OWNER TO dbz_replication_role;
ALTER PUBLICATION dbz_publication OWNER TO dbz_replication_role;
END $$;

-- Allow UUID extension for inserting into the outbox table
Expand Down
19 changes: 9 additions & 10 deletions plugins/debezium-server/config/populate/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
{
"compilerOptions": {
"declaration": true,
"noImplicitReturns": true,
"inlineSourceMap": true,
"inlineSources": true,
"experimentalDecorators": true,
"noEmit": true,
"esModuleInterop": true
}
"compilerOptions": {
"declaration": true,
"noImplicitReturns": true,
"inlineSourceMap": true,
"inlineSources": true,
"experimentalDecorators": true,
"noEmit": true,
"esModuleInterop": true
}
}
33 changes: 19 additions & 14 deletions plugins/debezium-server/config/process-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,43 @@ processes:
availability:
restart: always

init_topic:
command: devbox run create_heartbeat_topic
debezium-server-install-deps:
command: devbox run debezium-server-install-deps

debezium-server-init-topic:
command: devbox run debezium-server-create-heartbeat-topic
depends_on:
kafka_local:
condition: process_healthy

init_outbox:
command: devbox run init_outbox
debezium-server-init-outbox:
command: devbox run debezium-server-init-outbox
depends_on:
postgres_invariant_checks:
debezium-server-postgres-invariant-checks:
condition: process_completed_successfully
postgres_wait:
debezium-server-postgres-wait:
condition: process_healthy

postgres_wait:
command: "echo 'Waiting on postgresql...'; tail -f /dev/null"
debezium-server-postgres-wait:
command: "echo 'Waiting on postgresql at ${_DEBEZIUM_SERVER_DB_HOSTNAME}:${_DEBEZIUM_SERVER_DB_PORT}...'; tail -f /dev/null"
readiness_probe:
period_seconds: 5
failure_threshold: 1000
exec:
command: pg_isready -h ${DB_HOSTNAME} -U postgres -p ${DB_PORT}
command: pg_isready -h ${_DEBEZIUM_SERVER_DB_HOSTNAME} -U postgres -p ${_DEBEZIUM_SERVER_DB_PORT}

postgres_invariant_checks:
command: devbox run postgres-version-check
debezium-server-postgres-invariant-checks:
command: devbox run debezium-server-postgres-version-check

debezium_server:
debezium-server:
working_dir: {{.Virtenv}}
command: run_debezium
depends_on:
init_outbox:
debezium-server-install-deps:
condition: process_completed_successfully
debezium-server-init-outbox:
condition: process_completed_successfully
init_topic:
debezium-server-init-topic:
condition: process_completed_successfully
availability:
restart: on_failure
92 changes: 44 additions & 48 deletions plugins/debezium-server/plugin.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,68 +11,64 @@
"DEVBOX_COREPACK_ENABLED": "true",
"CONNECTOR_CONF_PATH": "{{.Virtenv}}",
"DEBEZIUM_OPTS": "",
"JAVA_OPTS": ""
"JAVA_OPTS": "",
"_DEBEZIUM_SERVER_DB_HOSTNAME": "localhost",
"_DEBEZIUM_SERVER_DB_PORT": "5432",
"_DEBEZIUM_SERVER_DB_AUTH_USERNAME": "postgres",
"_DEBEZIUM_SERVER_DB_AUTH_PASSWORD": "postgres",
"_DEBEZIUM_SERVER_DB_USERNAME": "dbz_user",
"_DEBEZIUM_SERVER_DB_PASSWORD": "mydbzpassword",
"_DEBEZIUM_SERVER_DB_NAME": "postgres",
"_DEBEZIUM_SERVER_DB_SCHEMA": "public",
"_DEBEZIUM_SERVER_FARM": "local",
"_DEBEZIUM_SERVER_INTERNAL_TOPIC_PREFIX": "my_test_prefix",
"_DEBEZIUM_SERVER_SLOT_NAME": "my_test_slot",
"_DEBEZIUM_SERVER_OFFSET_TOPIC": "my_test_offset_topic",
"_DEBEZIUM_SERVER_PUBLICATION_NAME": "dbz_publication",
"_DEBEZIUM_SERVER_OUTBOX_TABLE": "outbox_table",
"_DEBEZIUM_SERVER_HEARTBEAT_TABLE": "debezium_heartbeat",
"_DEBEZIUM_SERVER_TARGET_TOPIC": "my_target_topic",
"_DEBEZIUM_SERVER_SCHEMA_PATH": "com.cultureamp.test.user.v1-value.avsc",
"_DEBEZIUM_SERVER_SAMPLE_DATA_PATH": "sample-data.json"
},
"shell": {
"init_hook": [
"export PATH=$VENV_DIR/bin:$PATH",
"export DEVBOX_DIR=$VENV_DIR",
"export DB_HOSTNAME=localhost",
"export DB_PORT=${PGPORT:-5432}",
"export DB_AUTH_USERNAME=postgres",
"export DB_AUTH_PASSWORD=postgres",
"export DB_USERNAME=dbz_user",
"export DB_PASSWORD=mydbzpassword",
"export DB_NAME=postgres",
"export DB_SCHEMA=public",
"export BOOTSTRAP_SERVERS=${KAFKA_BROKERS}",
"export SCHEMA_REGISTRY_URL=${SCHEMA_REGISTRY_URL}",
"export FARM=local",
"export INTERNAL_TOPIC_PREFIX=my_test_prefix",
"export SLOT_NAME=my_test_slot",
"export OFFSET_TOPIC=my_test_offset_topic",
"export PUBLICATION_NAME=dbz_publication",
"export OUTBOX_TABLE=outbox_table",
"export HEARTBEAT_TABLE=debezium_heartbeat",
"export TARGET_TOPIC=my_target_topic",
"export SCHEMA_PATH=com.cultureamp.test.user.v1-value.avsc",
"export SAMPLE_DATA_PATH=sample-data.json",
"pnpm install -C {{.Virtenv}}"
],
"scripts": {
"init_outbox": [
"psql -v debezium_user=${DB_USERNAME} \\",
"-v debezium_password=${DB_PASSWORD} \\",
"-v schema_name=${DB_SCHEMA} \\",
"-v outbox_table_name=${OUTBOX_TABLE} \\",
"-v heartbeat_table_name=${HEARTBEAT_TABLE} \\",
"-h ${DB_HOSTNAME} \\",
"-p ${DB_PORT} \\",
"-U ${DB_AUTH_USERNAME} \\",
"${DB_NAME} < {{.Virtenv}}/init_outbox.sql"
"debezium-server-install-deps": [
"pnpm install -C {{.Virtenv}}"
],
"create_db": [
"dropdb -h ${DB_HOSTNAME} -p ${DB_PORT} -U ${DB_AUTH_USERNAME} --if-exists ${DB_NAME}",
"createdb -h ${DB_HOSTNAME} -p ${DB_PORT} -U ${DB_AUTH_USERNAME} ${DB_NAME}"
"debezium-server-init-outbox": [
"psql -v debezium_user=${_DEBEZIUM_SERVER_DB_USERNAME} \\",
"-v debezium_password=${_DEBEZIUM_SERVER_DB_PASSWORD} \\",
"-v schema_name=${_DEBEZIUM_SERVER_DB_SCHEMA} \\",
"-v outbox_table_name=${_DEBEZIUM_SERVER_OUTBOX_TABLE} \\",
"-v heartbeat_table_name=${_DEBEZIUM_SERVER_HEARTBEAT_TABLE} \\",
"-h ${_DEBEZIUM_SERVER_DB_HOSTNAME} \\",
"-p ${_DEBEZIUM_SERVER_DB_PORT} \\",
"-U ${_DEBEZIUM_SERVER_DB_AUTH_USERNAME} \\",
"${_DEBEZIUM_SERVER_DB_NAME} < {{.Virtenv}}/init_outbox.sql"
],
"create_heartbeat_topic": [
"kaf topic create _${FARM}.${INTERNAL_TOPIC_PREFIX}.debezium-heartbeat-table -p 1 -r 1 || true"
"debezium-server-create-db": [
"dropdb -h ${_DEBEZIUM_SERVER_DB_HOSTNAME} -p ${_DEBEZIUM_SERVER_DB_PORT} -U ${_DEBEZIUM_SERVER_DB_AUTH_USERNAME} --if-exists ${_DEBEZIUM_SERVER_DB_NAME}",
"createdb -h ${_DEBEZIUM_SERVER_DB_HOSTNAME} -p ${_DEBEZIUM_SERVER_DB_PORT} -U ${_DEBEZIUM_SERVER_DB_AUTH_USERNAME} ${_DEBEZIUM_SERVER_DB_NAME}"
],
"populate": [
"debezium-server-create-heartbeat-topic": [
"kaf topic create _${_DEBEZIUM_SERVER_FARM}.${_DEBEZIUM_SERVER_INTERNAL_TOPIC_PREFIX}.debezium-heartbeat-table -p 1 -r 1 || true"
],
"debezium-server-populate": [
"pnpm run -C {{.Virtenv}} populate"
],
"reset_offset": [
"devbox services stop debezium_server && \\",
"debezium-server-reset-offset": [
"devbox services stop debezium-server && \\",
"PID=$(ps aux | grep '[i]o.debezium.server.Main' | awk '{print $2}' | head -n 1) && \\",
"[ -n \"$PID\" ] && timeout=0; while ps -p $PID > /dev/null && [ $timeout -lt 10 ]; do echo 'Waiting for debezium server to stop..'; sleep 1; ((timeout++)); done && \\",
"echo \"Tombstoning connector offset\"",
"echo \"[\\\"kafka\\\",{\\\"server\\\":\\\"${FARM}.${INTERNAL_TOPIC_PREFIX}\\\"}]|\" | \\",
"kcat -P -Z -b ${KAFKA_BROKERS_SASL} -X sasl.mechanism=PLAIN -X sasl.username=${KAFKA_SASL_USER} -X sasl.password=${KAFKA_SASL_PASSWORD} -t ${OFFSET_TOPIC} -K \\| -p 0 && \\",
"echo \"[\\\"kafka\\\",{\\\"server\\\":\\\"${_DEBEZIUM_SERVER_FARM}.${_DEBEZIUM_SERVER_INTERNAL_TOPIC_PREFIX}\\\"}]|\" | \\",
"kcat -P -Z -b ${_DEBEZIUM_SERVER_KAFKA_BROKERS_SASL} -X sasl.mechanism=PLAIN -X sasl.username=${_DEBEZIUM_SERVER_KAFKA_SASL_USER} -X sasl.password=${_DEBEZIUM_SERVER_KAFKA_SASL_PASSWORD} -t ${_DEBEZIUM_SERVER_OFFSET_TOPIC} -K \\| -p 0 && \\",
"echo \"Restarting debezium server\" && \\",
"devbox services start debezium_server"
"devbox services start debezium-server"
],
"debezium-server-readme": "{{.Virtenv}}/bin/debezium-server-readme",
"postgres-version-check": "{{.Virtenv}}/bin/postgres-version-check"
"debezium-server-postgres-version-check": "{{.Virtenv}}/bin/postgres-version-check"
}
},
"create_files": {
Expand Down
Loading