最近開発しているサービスがだんだん成長してきて、先々を考えるといくつかのサービスに分離したいなーと思いChange Data Capture (CDC)について色々と調べていました。
MySQLでの構築については、この記事DebeziumでCDCを構築してみたがとても丁寧に解説されているのでお薦めです。この記事の解説を参考にしてMySQL+Kafka+Debeziumで動作してお試しできる環境ができたので、色々と挙動を確認できました。
MySQLでの実験環境は簡単に構築できたのですが、今回導入を検討しているサービスではPostgreSQLを使用しています。
ということで、まずは手元でPostgreSQL + Kafka + DebeziumでCDC環境を構築してみます。
Kafkaの構築
こちらは前出のブログの記載とほぼ同じで、Docker hubにある公式イメージから構築します。
version: "3.8"
services:
pg-debezium-zookeeper:
container_name: pg-debezium-zookeeper
image: "bitnami/zookeeper:latest"
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
pg-debezium-kafka:
container_name: pg-debezium-kafka
image: "bitnami/kafka:latest"
ports:
- "9092:9092"
- "29092:29092"
environment:
- KAFKA_CFG_BROKER_ID=1
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_ZOOKEEPER_CONNECT=pg-debezium-zookeeper:2181
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://pg-debezium-kafka:9092,PLAINTEXT_HOST://127.0.0.1:29092
depends_on:
- pg-debezium-zookeeper
MySQLでの構築と同様に、PostgreSQLを利用する場合にも設定を変更する必要があります。
デフォルトでは、wal_level = REPLICATION
になっているのですが、これを wal_level = LOGICAL
に変更しより詳細なログを出力する必要があります。LOGICALに設定変更するとログのサイズが増えるようなので注意が必要かもしれません。
今回は設定ファイルを変更せずにコンテナ起動時のコマンドで設定を変更することにします。
docker-compose.ymlはこんな感じ
version: "3.8"
services:
... 省略
pg-debezium-postgres:
container_name: pg-debezium-postgres
image: postgres:14.7-alpine
command: [ "postgres", "-c", "wal_level=logical" ]
volumes:
- pg-debezium-postgres-data:/var/lib/postgresql/data:cached
environment:
POSTGRES_PASSWORD: "Passw0rd"
POSTGRES_INITDB_ARGS: "--encoding=UTF-8 --locale=ja_JP.UTF-8"
POSTGRES_USER: "test"
POSTGRES_DB: test
healthcheck:
test: [ "CMD-SHELL", "pg_isready" ]
テーブルの作成
テストに使用するテーブルを作っておきます
$ docker exec -it pg-debezium-postgres psql -U test test \
-c "CREATE TABLE test (id SERIAL PRIMARY KEY, subject text NOT NULL, created timestamptz NOT NULL);"
Debeziumの構築
Debeziumも、公式のイメージがあるのでそれを利用して、上記のKafkaと接続するように設定します。
version: "3.8"
services:
... 省略
pg-debezium:
container_name: pg-debezium
image: "debezium/connect:2.0"
ports:
- "8083:8083"
environment:
- BOOTSTRAP_SERVERS=pg-debezium-kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=_kafka_connect_configs
- OFFSET_STORAGE_TOPIC=_kafka_connect_offsets
- STATUS_STORAGE_TOPIC=_kafka_connect_statuses
depends_on:
- pg-debezium-zookeeper
- pg-debezium-kafka
- pg-debezium-postgres
最終的なdocker-compose.yml
これまでの解説分を全て含んだdocker-compose.ymlはこんな感じです
version: "3.8"
services:
pg-debezium-zookeeper:
container_name: pg-debezium-zookeeper
image: "bitnami/zookeeper:latest"
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
pg-debezium-kafka:
container_name: pg-debezium-kafka
image: "bitnami/kafka:latest"
ports:
- "9092:9092"
- "29092:29092"
environment:
- KAFKA_CFG_BROKER_ID=1
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_ZOOKEEPER_CONNECT=pg-debezium-zookeeper:2181
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://pg-debezium-kafka:9092,PLAINTEXT_HOST://127.0.0.1:29092
depends_on:
- pg-debezium-zookeeper
pg-debezium-postgres:
container_name: pg-debezium-postgres
image: postgres:14.7-alpine
command: [ "postgres", "-c", "wal_level=logical" ]
volumes:
- pg-debezium-postgres-data:/var/lib/postgresql/data:cached
environment:
POSTGRES_PASSWORD: "Passw0rd"
POSTGRES_INITDB_ARGS: "--encoding=UTF-8 --locale=ja_JP.UTF-8"
POSTGRES_USER: "test"
POSTGRES_DB: test
healthcheck:
test: [ "CMD-SHELL", "pg_isready" ]
pg-debezium:
container_name: pg-debezium
image: "debezium/connect:2.0"
ports:
- "8083:8083"
environment:
- BOOTSTRAP_SERVERS=pg-debezium-kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=_kafka_connect_configs
- OFFSET_STORAGE_TOPIC=_kafka_connect_offsets
- STATUS_STORAGE_TOPIC=_kafka_connect_statuses
depends_on:
- pg-debezium-zookeeper
- pg-debezium-kafka
- pg-debezium-postgres
networks:
internal:
driver: bridge
internal: true
external:
driver: bridge
internal: false
name: pg_debezium_external_network
volumes:
pg-debezium-postgres-data:
こんな感じで起動してみます
$ docker compose up -d
[+] Running 4/4
⠿ Container pg-debezium-postgres Started 0.4s
⠿ Container pg-debezium-zookeeper Started 0.4s
⠿ Container pg-debezium-kafka Started 0.6s
⠿ Container pg-debezium Started 0.9s
これで、PostgreSQL + Kafka + Debezium でのCDCの基盤が動作している状態になります。
CDCの構築
基盤の構築が終わったので、Debeziumコンテナ内にコネクタを作成します。
以下が今回の調査で作成した設定内容です。 sample.json
という名称で保存しておきます。
{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "pg-debezium-postgres",
"database.port": "5432",
"database.user": "test",
"database.password": "Passw0rd",
"database.dbname" : "test",
"database.server.name": "pg-debezium-postgres",
"table.whitelist": "public.test",
"plugin.name": "pgoutput",
"topic.prefix": "test_topic",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"database.connectionTimeZone": "UTC",
"include.schema.changes": "false"
}
}
PostgreSQLコネクタの設定内容は、公式ドキュメントを参照してください。
設定ファイルができたので以下のコマンドでコネクタを作成します。(出力されているJSON文字列は整形しています)
$ curl -i -X POST -H "Accept:application/json" -H \
"Content-Type:application/json" \
http://localhost:8083/connectors/ \
-d @./sample.json
HTTP/1.1 201 Created
Date: Sat, 06 May 2023 05:36:54 GMT
Location: http://localhost:8083/connectors/postgres-connector
Content-Type: application/json
Content-Length: 733
Server: Jetty(9.4.48.v20220622)
{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "pg-debezium-postgres",
"database.port": "5432",
"database.user": "test",
"database.password": "Passw0rd",
"database.dbname": "test",
"database.server.name": "pg-debezium-postgres",
"table.whitelist": "public.test",
"plugin.name": "pgoutput",
"topic.prefix": "test_topic",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"database.connectionTimeZone": "UTC",
"include.schema.changes": "false",
"name": "postgres-connector"
},
"tasks": [],
"type": "source"
}
トピックの確認
以下のコマンドで、トピックの一覧を表示することができます。まだレコードが存在しないので、先ほど作成したコネクタで定義したトピックはまだ存在しません。
$ docker-compose exec pg-debezium-kafka kafka-topics.sh --list --bootstrap-server pg-debezium-kafka:9092
__consumer_offsets
_kafka_connect_configs
_kafka_connect_offsets
_kafka_connect_statuses
レコードを追加してみます。レコードを追加するとトピックが作成されていることが確認できます。
$ docker exec -it pg-debezium-postgres psql -U test test \
-c "INSERT INTO test (subject, created) values ( 'Test 1', NOW());"
INSERT 0 1
$ docker-compose exec pg-debezium-kafka kafka-topics.sh --list --bootstrap-server pg-debezium-kafka:9092
__consumer_offsets
_kafka_connect_configs
_kafka_connect_offsets
_kafka_connect_statuses
test_topic.public.test
では、kafkaコンテナに用意されている、kafka-console-consumer.sh
を使用して購読してみます。
$ docker-compose exec pg-debezium-kafka kafka-console-consumer.sh \
--bootstrap-server 127.0.0.1:9092 \
--from-beginning --topic test_topic.public.test
{"before":null,"after":{"id":1,"subject":"Test 1","created":"2023-05-06T05:44:15.096521Z"},"source":{"version":"2.0.0.Final","connector":"postgresql","name":"test_topic","ts_ms":1683351855101,"snapshot":"false","db":"test","sequence":"[null,\"24280176\"]","schema":"public","table":"test","txId":738,"lsn":24280176,"xmin":null},"op":"c","ts_ms":1683351855591,"transaction":null}
無事、先ほど追加したレコードの内容は取得できています。では、別のターミナルを開いてデータベースを更新してみます。
$ docker exec -it pg-debezium-postgres psql -U test test \
-c "INSERT INTO test (subject, created) values ( 'Test 2', NOW());"
$ docker exec -it pg-debezium-postgres psql -U test test \
-c "UPDATE test SET subject = 'Test 2 updated' WHERE id = 2;"
コンシューマには以下のように表示されました。
{
"before": null,
"after": {
"id": 2,
"subject": "Test 2",
"created": "2023-05-06T05:46:17.579336Z"
},
"source": {
"version": "2.0.0.Final",
"connector": "postgresql",
"name": "test_topic",
"ts_ms": 1683351977580,
"snapshot": "false",
"db": "test",
"sequence": "[\"24280464\",\"24280856\"]",
"schema": "public",
"table": "test",
"txId": 739,
"lsn": 24280856,
"xmin": null
},
"op": "c",
"ts_ms": 1683351977937,
"transaction": null
}
{
"before": null,
"after": {
"id": 2,
"subject": "Test 2 updated",
"created": "2023-05-06T05:46:17.579336Z"
},
"source": {
"version": "2.0.0.Final",
"connector": "postgresql",
"name": "test_topic",
"ts_ms": 1683352381043,
"snapshot": "false",
"db": "test",
"sequence": "[\"24282144\",\"24282200\"]",
"schema": "public",
"table": "test",
"txId": 741,
"lsn": 24282200,
"xmin": null
},
"op": "u",
"ts_ms": 1683352381077,
"transaction": null
}
データを更新したのになぜか before
が null
になってしまっています。
色々調べた結果、Debeziumのユーザガイド内に記載がありました。 7.3.2. Debezium PostgreSQL 変更イベントの値 こちらの解説を参考に以下のように設定を変更します。
$ docker exec -it pg-debezium-postgres psql -U test test \
-c "ALTER TABLE public.test REPLICA IDENTITY FULL;"
ALTER TABLE
設定の変更ができたので、再度データを更新してみます。
$ docker exec -it pg-debezium-postgres psql -U test test \
-c "UPDATE test SET subject = 'Test 2 updated 2' WHERE id = 2;"
UPDATE 1
無事、before
に変更前のデータが入るようになりました。
{
"before": {
"id": 2,
"subject": "Test 2 updated",
"created": "2023-05-06T05:46:17.579336Z"
},
"after": {
"id": 2,
"subject": "Test 2 updated 2",
"created": "2023-05-06T05:46:17.579336Z"
},
"source": {
"version": "2.0.0.Final",
"connector": "postgresql",
"name": "test_topic",
"ts_ms": 1683352906329,
"snapshot": "false",
"db": "test",
"sequence": "[\"24284944\",\"24285000\"]",
"schema": "public",
"table": "test",
"txId": 743,
"lsn": 24285000,
"xmin": null
},
"op": "u",
"ts_ms": 1683352906769,
"transaction": null
}
では最後に削除を試してみます。
$ docker exec -it pg-debezium-postgres psql -U test test \
-c "DELETE FROM test WHERE id = 2;"
無事、after
が null
になりました
{
"before": {
"id": 2,
"subject": "Test 2 updated 2",
"created": "2023-05-06T05:46:17.579336Z"
},
"after": null,
"source": {
"version": "2.0.0.Final",
"connector": "postgresql",
"name": "test_topic",
"ts_ms": 1683353081106,
"snapshot": "false",
"db": "test",
"sequence": "[\"24285496\",\"24285552\"]",
"schema": "public",
"table": "test",
"txId": 744,
"lsn": 24285552,
"xmin": null
},
"op": "d",
"ts_ms": 1683353081505,
"transaction": null
}
これで、コンシューマを作成すれば、変更データをもとにコピーを作成したりデータを集計したりなど色々と処理ができますね!前出のブログでも紹介されている、KafkaJSを使ってコンシューマを作るのが良さそうです。
Azure Database for PostgreSQLでの設定
ここまで、手元環境のdoker上のPostgreSQLでDebeziumを試してみましたが、Azure Database for PostgreSQLで動作するか検証することにしてみます。
Azureの公式ドキュメントには、変更データ キャプチャ用に Azure Event Hubs の Apache Kafka Connect のサポートを Debezium と統合するという記事があり、Azure Event Hubsを利用した構築方法が解説されています。
このページにも 警告
として記載がありますが、Event Hubを利用する場合イベントの保存期間が制限されるという制約があるようです。これが実運用時に問題になるかはまだ把握できていないのですが、まずは自前でCDCを構築してみようと思います。
Azure Database for PostgreSQLの作成
今回は試験用なので、以下のように小さめ(お安い)で作成しています。
(フレキシブル サーバーでないと14などの新しいバージョンが利用できないので、フレキシブル サーバーを使っています)
設定の変更
まずは、 wal_level
を変更します。サーバーパラメータ編集ページで以下のようにwal_level
を LOGICAL
に変更して、設定を保存します。
次に、今回は手元環境から直接PostgreSQLに接続をする必要があるので、ネットワーク編集ページで 現在のクライアントIPを追加する
を選択して、接続元IPアドレスを追加して、設定を保存します。
今回は実験用なのでこのように外部からの接続を許可する設定をしましたが、実運用時の設定は各環境に合わせて適切に設定してください
これで、手元環境から接続できるようになったので、以下のコマンドで接続確認をします。
$ docker exec -it pg-debezium-postgres psql -h ホスト名.postgres.database.azure.com -U test test
Password for user test:
psql (14.7)
SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off)
Type "help" for help.
test=>
無事接続できましたので、先ほどと同様にテーブルを作成します。また、先ほど追加で設定したレプリケーションの設定なども更新しておきます。
$ docker exec -it pg-debezium-postgres psql -h ホスト名 -U test test \
-c "CREATE TABLE test (id SERIAL PRIMARY KEY, subject text NOT NULL, created timestamptz NOT NULL");
$ docker exec -it pg-debezium-postgres psql -h ホスト名 -U test test \
-c "ALTER TABLE public.test REPLICA IDENTITY FULL;"
$ docker exec -it pg-debezium-postgres psql -h ホスト名 -U test test \
-c "ALTER ROLE test WITH REPLICATION;"
CDCの構築
SSL接続設定の準備
Azure Database for PostgreSQLへの接続は、TLSを使用して接続する必要があるため、Debeziumからの接続に少し準備が必要です。
公式ドキュメントのAzure Database for PostgreSQL - フレキシブル サーバーでのトランスポート層セキュリティを使用した暗号化された接続の解説を参考にして、ルート証明書をダウンロードします。
docker exec -it pg-debezium curl -O https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem
コネクタの追加
準備ができたので、以下のような設定でコネクタを追加します。SSLでの接続が必要なため、ローカル用の設定に database.sslmode
/ database.sslrootcert
の2つを追加しています。
今回は、azure~sample.jsonという名前で保存します。
{
"name": "azure-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "データベースのホスト名",
"database.port": "5432",
"database.user": "test",
"database.password": "データベースのパスワード",
"database.dbname" : "test",
"database.server.name": "データベースのホスト名",
"database.sslmode": "verify-full",
"database.sslrootcert": "/kafka/DigiCertGlobalRootCA.crt.pem",
"table.whitelist": "public.test",
"plugin.name": "pgoutput",
"topic.prefix": "azure_pgsql_topic",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"database.connectionTimeZone": "UTC",
"include.schema.changes": "false"
}
}
先ほどと同様に以下のコマンドでコネクタを追加します。
$ curl -i -X POST -H "Accept:application/json" -H \
"Content-Type:application/json" \
http://localhost:8083/connectors/ \
-d @./azure-sample.json
HTTP/1.1 201 Created
Date: Sat, 06 May 2023 06:49:08 GMT
Location: http://localhost:8083/connectors/azure-postgres-connector
Content-Type: application/json
Content-Length: 924
Server: Jetty(9.4.48.v20220622)
{
"name": "azure-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "ホスト名",
"database.port": "5432",
"database.user": "test",
"database.password": "データベースのパスワード",
"database.dbname": "webapp",
"database.server.name": "ホスト名",
"database.sslmode": "verify-full",
"database.sslrootcert": "/kafka/DigiCertGlobalRootCA.crt.pem",
"table.whitelist": "public.test",
"plugin.name": "pgoutput",
"topic.prefix": "azure_pgsql_topic",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"database.connectionTimeZone": "UTC",
"include.schema.changes": "false",
"name": "azure-postgres-connector"
},
"tasks": [],
"type": "source"
}
以下のコマンドで、コネクタが追加されているか確認します
$ curl -i -X GET -H "Accept:application/json" \
-H "Content-Type:application/json" \
http://localhost:8083/connectors/
HTTP/1.1 200 OK
Date: Sat, 06 May 2023 06:51:04 GMT
Content-Type: application/json
Content-Length: 49
Server: Jetty(9.4.48.v20220622)
["postgres-connector","azure-postgres-connector"]
問題なく追加されているようです。
トピックの確認
準備ができたので、先ほどと同様に kafka-console-consumer.sh
を使用して購読してみます。
$ curl -i -X POST -H "Accept:application/json" -H \
"Content-Type:application/json" \
http://localhost:8083/connectors/ \
-d @./azure-sample.json
HTTP/1.1 201 Created
Date: Sat, 06 May 2023 06:49:08 GMT
Location: http://localhost:8083/connectors/azure-postgres-connector
Content-Type: application/json
Content-Length: 924
Server: Jetty(9.4.48.v20220622)
{
"name": "azure-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "ホスト名",
"database.port": "5432",
"database.user": "test",
"database.password": "パスワード",
"database.dbname": "webapp",
"database.server.name": "ホスト名",
"database.sslmode": "verify-full",
"database.sslrootcert": "/kafka/DigiCertGlobalRootCA.crt.pem",
"table.whitelist": "public.test",
"plugin.name": "pgoutput",
"topic.prefix": "azure_pgsql_topic",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"database.connectionTimeZone": "UTC",
"include.schema.changes": "false",
"name": "azure-postgres-connector"
},
"tasks": [],
"type": "source"
}
準備ができたので、別ターミナルからデータを追加・更新・削除してみます。
$ docker exec -it pg-debezium-postgres psql -h ホスト名 -U test test \
-c "INSERT INTO test (subject, created) values ( 'Test 1', NOW());"
$ docker exec -it pg-debezium-postgres psql -h ホスト名 -U test test \
-c "INSERT INTO test (subject, created) values ( 'Test 2', NOW());"
$ docker exec -it pg-debezium-postgres psql -h ホスト名 -U test test \
-c "UPDATE test SET subject = 'Test 2 updated' WHERE id = 2;"
$ docker exec -it pg-debezium-postgres psql -h ホスト名 -U test test \
-c "DELETE FROM test WHERE id = 2;"
コンシューマでは、問題なく以下のように変更を取得できました。
{"before":null,"after":{"id":1,"subject":"Test 1","created":"2023-05-06T02:28:46.075454Z"},"source":{"version":"2.0.0.Final","connector":"postgresql","name":"azure_pgsql_topic","ts_ms":1683355748922,"snapshot":"first","db":"webapp","sequence":"[null,\"20568873528\"]","schema":"public","table":"test","txId":48642,"lsn":20568873528,"xmin":null},"op":"r","ts_ms":1683355749255,"transaction":null}
{"before":null,"after":{"id":2,"subject":"Test 2","created":"2023-05-06T02:28:46.084254Z"},"source":{"version":"2.0.0.Final","connector":"postgresql","name":"azure_pgsql_topic","ts_ms":1683355748922,"snapshot":"true","db":"webapp","sequence":"[null,\"20568873528\"]","schema":"public","table":"test","txId":48642,"lsn":20568873528,"xmin":null},"op":"r","ts_ms":1683355749257,"transaction":null}
{"before":{"id":2,"subject":"Test 2","created":"2023-05-06T02:28:46.084254Z"},"after":{"id":2,"subject":"Test 2 updated","created":"2023-05-06T02:28:46.084254Z"},"source":{"version":"2.0.0.Final","connector":"postgresql","name":"azure_pgsql_topic","ts_ms":1683356320172,"snapshot":"false","db":"webapp","sequence":"[\"20602423752\",\"20602427936\"]","schema":"public","table":"test","txId":48718,"lsn":20602427936,"xmin":null},"op":"u","ts_ms":1683356320309,"transaction":null}
{"before":{"id":2,"subject":"Test 2 updated","created":"2023-05-06T02:28:46.084254Z"},"after":null,"source":{"version":"2.0.0.Final","connector":"postgresql","name":"azure_pgsql_topic","ts_ms":1683356329235,"snapshot":"false","db":"webapp","sequence":"[\"20602428392\",\"20602428648\"]","schema":"public","table":"test","txId":48720,"lsn":20602428648,"xmin":null},"op":"d","ts_ms":1683356329507,"transaction":null}
これで、Azure Database for PostgreSQLでもDebeziumを使用したCDCが構築できることが確認できました。
まとめ
今回は、Azure Database for PostgreSQL + DebeziumでCDCを構築するための準備として、ローカル環境 + Azure Database for PostgreSQLでCDCを構築するところまでを解説しました。
実際に利用するためには、まだまだ調査しないといけないことは多いですが、ひとまず以降の実験・調査をする土台までは検証できました。
さらっと書いていますが、概念を把握するのに色々実験したり、PostgreSQL・Azure Database for PostgreSQLで動作させるために何度も構築し直したり、色々と試行錯誤が必要でした。
この後は、以下のような残った課題を順次調査を進めていこうと思います
- Kafka + zookeeperをどこに構築するか検討して構築
- コネクタのパラメータの調整
- ログの管理や監視
- Azure Event Hubsを利用する形での実験と検討
また知見が溜まったら、ブログにまとめようと思います。
関連リンク