A BLOG POST
Debezium for CDC : Postgres to Postgres
Ever wondered how to duplicate your database in real time without pain? There is Change Data Capture which we will combine with Debezium to stream the data changes into Kafka topic and make use of it. This post uses Postgres to Postgres databases for the example.
10/22/2024, 12:59:12 AM
316 times
sh
1 +--------------+
2 | |
3 | PostgreSQL |
4 | |
5 +------+-------+
6 |
7 |
8 |
9 +---------------v------------------+
10 | |
11 | Kafka Connect |
12 | (Debezium, JDBC connectors) |
13 | |
14 +---------------+------------------+
15 |
16 |
17 |
18 |
19 +-------v--------+
20 | |
21 | PostgreSQL |
22 | |
23 +----------------+
I'm using Docker environment to do this, however if you want to try it on host machine, you can use Confluent Platform installed on your OS.
yaml
1version: "3.7"
2services:
3 postgres:
4 image: debezium/postgres:13
5 container_name: pg1
6 expose:
7 - 5433
8 ports:
9 - 5433:5433
10 environment:
11 - POSTGRES_USER=mine
12 - POSTGRES_PASSWORD=qwepoi123
13 - POSTGRES_DB=sharingcdc
14 - PGPORT=5433
15
16 postgres-copy:
17 image: debezium/postgres:13
18 container_name: pg2
19 expose:
20 - 5434
21 ports:
22 - 5434:5434
23 environment:
24 - POSTGRES_USER=mine
25 - POSTGRES_PASSWORD=qwepoi123
26 - POSTGRES_DB=sharingcdc-copy
27 - PGPORT=5434
28
29 zookeeper:
30 image: confluentinc/cp-zookeeper:5.5.3
31 container_name: zookeeper
32 environment:
33 ZOOKEEPER_CLIENT_PORT: 2181
34
35 kafka:
36 image: confluentinc/cp-enterprise-kafka:5.5.3
37 container_name: kafka
38 links:
39 - zookeeper
40 depends_on: [zookeeper]
41 environment:
42 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
43 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
44 KAFKA_BROKER_ID: 1
45 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
46 KAFKA_JMX_PORT: 9991
47 ports:
48 - 9092:9092
49
50 debezium:
51 image: dbz-conn-jdbc-sink
52 container_name: dbz
53 links:
54 - kafka
55 - postgres
56 - postgres-copy
57 environment:
58 BOOTSTRAP_SERVERS: kafka:9092
59 GROUP_ID: 1
60 CONFIG_STORAGE_TOPIC: connect_configs
61 OFFSET_STORAGE_TOPIC: connect_offsets
62 STATUS_STORAGE_TOPIC: connect-status
63 KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
64 VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
65 CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
66 CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
67 depends_on: [kafka]
68 ports:
69 - 8083:8083
70
71 schema-registry:
72 image: confluentinc/cp-schema-registry:5.5.3
73 container_name: schema
74 environment:
75 - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
76 - SCHEMA_REGISTRY_HOST_NAME=schema-registry
77 - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081,http://localhost:8081
78 ports:
79 - 8081:8081
80 depends_on: [zookeeper, kafka]
81
82 kafkacat:
83 image: edenhill/kafkacat:1.5.0
84 container_name: kafkacat
85 entrypoint:
86 - /bin/sh
87 - -c
88 - |
89 while [ 1 -eq 1 ];do sleep 60;done
First, we need the first database to have some data. Open pg1
(as we name it) bash and login:
sh
1psql -U mine -d sharingcdc -W
-W
option will prompt us to input password to connect. Once we are in the psql
mode, create table:
mysql
1CREATE TABLE public.todo (
2 id uuid NOT NULL,
3 created_date timestamp NULL DEFAULT now(),
4 is_done bool NULL,
5 modified_date timestamp NULL,
6 title varchar(255) NULL,
7 CONSTRAINT todo_pkey PRIMARY KEY (id)
8);
To enable CDC with Postgres, we need to give replication permission to our database:
mysql
1ALTER TABLE public.todo REPLICA IDENTITY FULL;
Let's insert some data in it:
mysql
1INSERT INTO public.todo
2(id, created_date, is_done, modified_date, title)
3VALUES('79fe5ffa-94ed-4871-a5cd-300471586914'::uuid, '2022-05-31 14:47:12.198', false, '2022-06-20 22:52:10.648', 'do laundry');
4INSERT INTO public.todo
5(id, created_date, is_done, modified_date, title)
6VALUES('129eef91-8f55-4edd-9c63-804c6f1a3f5b'::uuid, '2022-05-31 14:59:58.150', false, '2022-06-20 22:52:11.481', 'feed the dog');
Next, we need to establish a connector using Debezium connector to access our first database and stream it to a Kafka topic. Fortunately, it's quite easy to do that. Debezium connector can be accessed through REST API.
There are two connectors we need to create, one is for source
, the process when stream are inserted into a topic, and the other one is sink
, which taking out data from a topic.
To create source
connector, we need to use the PostgresConnector
class. Below is an example configs on how to do it.debezium-source-config.json
(for publisher / source
connector)
json
1{
2 "name": "sharingcdc-source-connector",
3 "config": {
4 "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
5 "plugin.name": "pgoutput",
6 "database.hostname": "postgres",
7 "database.port": "5433",
8 "database.user": "mine",
9 "database.password": "qwepoi123",
10 "database.dbname": "sharingcdc",
11 "database.server.name": "postgres",
12 "database.include.list":"sharingcdc",
13 "tasks.max": 1,
14 "transforms": "route",
15 "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
16 "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
17 "transforms.route.replacement": "$3"
18 }
19}
Finally, we can just use curl
to start creating the connector.
sh
1curl -i -X POST -H "Content-Type:application/json" -H "Accept:application/json" localhost:8083/connectors -d "@debezium-source-config.json"
The response we get will be:
json
1HTTP/1.1 201 Created
2Date: Tue, 12 Jul 2022 09:09:28 GMT
3Location: http://localhost:8083/connectors/sharingcdc-connector
4Content-Type: application/json
5Content-Length: 608
6Server: Jetty(9.4.33.v20201020)
7
8{
9 "name":"sharingcdc-connector",
10 "config":{
11 "connector.class":"io.debezium.connector.postgresql.PostgresConnector",
12 "plugin.name":"pgoutput",
13 "database.hostname":"postgres",
14 "database.port":"5433",
15 "database.user":"mine",
16 "database.password":"qwepoi123",
17 "database.dbname":"sharingcdc",
18 "database.server.name":"postgres",
19 "database.include.list":"sharingcdc",
20 "tasks.max":"1",
21 "transforms":"route",
22 "transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter",
23 "transforms.route.regex":"([^.]+)\\.([^.]+)\\.([^.]+)",
24 "transforms.route.replacement":"$3",
25 "name":"sharingcdc-connector"
26 },
27 "tasks":[],
28 "type":"source"
29}
Notice the HTTP status 201 Created
tells us the connection is successfully created.
sh
1curl localhost:8083/connectors/sharingcdc-connector/status
This will give result:
json
1{
2 "name":"sharingcdc-connector",
3 "connector":{
4 "state":"RUNNING",
5 "worker_id":"172.21.0.6:8083"
6 },
7 "tasks":[
8 {
9 "id":0,
10 "state":"RUNNING",
11 "worker_id":"172.21.0.6:8083"
12 }
13 ],
14 "type":"source"
15}
If you want to stop our connector, it is important to delete it before starting new connector with the same name.
For example above, we declared that the connector name is sharingcdc-connector
, to delete this connector we simply send a DELETE
to the API.
sh
1curl -X DELETE localhost:8083/connectors/sharingcdc-connector
To confirm that the connector has been deleted, inquiry the connectors that we have using:
sh
1curl -X GET localhost:8083/connectors
The /connectors
endpoint is the root context for Debezium connectors, and sending a GET
request without any parameters returns the list of available connectors.
According to the official documentation of Debezium connector for postgres : Postgres Topic Names, the topic name is generated automatically using the server name
+ schema name
+ table name
.
So in our case, since we defined the database.server.name
in the debezium-config.json
file as postgres
, and our schema name is public
and the table is todo
, the constructed topic name will be: postgres.public.todo
.
That should be it right? Nope!
If you see correctly in our debezium-source-config.json
, we defined a transformation route which transforms the predefined generated topic into just table name with $3
value.
json
1 "transforms": "route",
2 "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
3 "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
4 "transforms.route.replacement": "$3"
So in our case, to name of our topic should be `todo` instead of postgres.public.todo
To check your topic list, Kafka Connect also provide a CLI to check our topics using the kafka-topics.sh
file located in the /bin
directory.
Open terminal in our Kafka Connect container, and run:
sh
1./bin/kafka-topics.sh --bootstrap-server=kafka:9092 --list
ℹ--bootstrap-server
param can also be replaced with--zookeeper
if you prefer.
Another tool we can use is Kafkacat
which I also include in the docker-compose.yml
file. To use it, we simply run:
sh
1docker exec kafkacat kafkacat -b kafka:9092 -L # List existing topic in kafka:9092
Now, we need to check whether the source
connector is actually able to stream the data changes. There are several ways to do this:
We can use kafka-avro-console-consumer
, by running:
sh
1docker-compose exec schema-registry /usr/bin/kafka-avro-console-consumer --bootstrap-server kafka:9092 --from-beginning --property print.key=true --property schema-registry.url=http://schema-registry:8081 --topic postgres.public.todo
or if you prefer via bash
:
- Open
schema-registry
bash:
sh
1docker exec -it <container-name> sh
- then run
sh
1./usr/bin/kafka-avro-console-consumer -bootstrap-server kafka:9092 --from-beginning --property print.key=true --property schema-registry.url=http://schema-registry:8081 --topic postgres.public.todo
Or we can also use kafkacat
:
sh
1docker exec kafkacat kafkacat -b kafka:9092 -t todo -C
You can change todo
with the topic name you want to tail.
Now that we have tailed our connector, we can confirm by trying to modify data, insert new data or even delete data.
Try to insert new data
mysql
1INSERT INTO public.todo
2(id, created_date, is_done, modified_date, title)
3VALUES('6fc341f0-04a8-4c91-92bb-3dbb1ef0b557'::uuid, '2022-06-20 22:11:09.613', false, '2022-06-20 22:52:26.648', 'test');
What we will get in the topic monitoring console
json
1{
2 "id":"6fc341f0-04a8-4c91-92bb-3dbb1ef0b557"
3}
4{
5 "before":null,
6 "after":{
7 "postgres.public.todo.Value":{
8 "id":"6fc341f0-04a8-4c91-92bb-3dbb1ef0b557",
9 "created_date":{
10 "long":1655763069613000
11 },
12 "is_done":{
13 "boolean":false
14 },
15 "modified_date":{
16 "long":1655765546648000
17 },
18 "title":{
19 "string":"test"
20 }
21 }
22 },
23 "source":{
24 "version":"1.4.2.Final",
25 "connector":"postgresql",
26 "name":"postgres",
27 "ts_ms":1657617691850,
28 "snapshot":{
29 "string":"false"
30 },
31 "db":"sharingcdc",
32 "schema":"public",
33 "table":"todo",
34 "txId":{
35 "long":493
36 },
37 "lsn":{
38 "long":23876504
39 },
40 "xmin":null
41 },
42 "op":"c",
43 "ts_ms":{
44 "long":1657617692088
45 },
46 "transaction":null
47}
sh
1curl localhost:8081/subjects/todo-value/versions/1 | jq ".schema | fromjson"
This REST API will show us the schema as below.
json
1{
2 "type": "record",
3 "name": "Envelope",
4 "namespace": "postgres.public.todo",
5 "fields": [
6 {
7 "name": "before",
8 "type": [
9 "null",
10 {
11 "type": "record",
12 "name": "Value",
13 "fields": [
14 {
15 "name": "id",
16 "type": {
17 "type": "string",
18 "connect.version": 1,
19 "connect.name": "io.debezium.data.Uuid"
20 }
21 },
22 {
23 "name": "created_date",
24 "type": [
25 "null",
26 {
27 "type": "long",
28 "connect.version": 1,
29 "connect.name": "io.debezium.time.MicroTimestamp"
30 }
31 ],
32 "default": null
33 },
34 {
35 "name": "is_done",
36 "type": [
37 "null",
38 "boolean"
39 ],
40 "default": null
41 },
42 {
43 "name": "modified_date",
44 "type": [
45 "null",
46 {
47 "type": "long",
48 "connect.version": 1,
49 "connect.name": "io.debezium.time.MicroTimestamp"
50 }
51 ],
52 "default": null
53 },
54 {
55 "name": "title",
56 "type": [
57 "null",
58 "string"
59 ],
60 "default": null
61 }
62 ],
63 "connect.name": "postgres.public.todo.Value"
64 }
65 ],
66 "default": null
67 },
68 {
69 "name": "after",
70 "type": [
71 "null",
72 "Value"
73 ],
74 "default": null
75 },
76 {
77 "name": "source",
78 "type": {
79 "type": "record",
80 "name": "Source",
81 "namespace": "io.debezium.connector.postgresql",
82 "fields": [
83 {
84 "name": "version",
85 "type": "string"
86 },
87 {
88 "name": "connector",
89 "type": "string"
90 },
91 {
92 "name": "name",
93 "type": "string"
94 },
95 {
96 "name": "ts_ms",
97 "type": "long"
98 },
99 {
100 "name": "snapshot",
101 "type": [
102 {
103 "type": "string",
104 "connect.version": 1,
105 "connect.parameters": {
106 "allowed": "true,last,false"
107 },
108 "connect.default": "false",
109 "connect.name": "io.debezium.data.Enum"
110 },
111 "null"
112 ],
113 "default": "false"
114 },
115 {
116 "name": "db",
117 "type": "string"
118 },
119 {
120 "name": "schema",
121 "type": "string"
122 },
123 {
124 "name": "table",
125 "type": "string"
126 },
127 {
128 "name": "txId",
129 "type": [
130 "null",
131 "long"
132 ],
133 "default": null
134 },
135 {
136 "name": "lsn",
137 "type": [
138 "null",
139 "long"
140 ],
141 "default": null
142 },
143 {
144 "name": "xmin",
145 "type": [
146 "null",
147 "long"
148 ],
149 "default": null
150 }
151 ],
152 "connect.name": "io.debezium.connector.postgresql.Source"
153 }
154 },
155 {
156 "name": "op",
157 "type": "string"
158 },
159 {
160 "name": "ts_ms",
161 "type": [
162 "null",
163 "long"
164 ],
165 "default": null
166 },
167 {
168 "name": "transaction",
169 "type": [
170 "null",
171 {
172 "type": "record",
173 "name": "ConnectDefault",
174 "namespace": "io.confluent.connect.avro",
175 "fields": [
176 {
177 "name": "id",
178 "type": "string"
179 },
180 {
181 "name": "total_order",
182 "type": "long"
183 },
184 {
185 "name": "data_collection_order",
186 "type": "long"
187 }
188 ]
189 }
190 ],
191 "default": null
192 }
193 ],
194 "connect.name": "postgres.public.todo.Envelope"
195}
Isn't that great?
After we have all the provider config set up, now it's time to consume the message and replicate all the data into another PostgresDB.
Same with creating source
connector, we need set configurations to be sent to the connector endpoint to create sink
connector. Here, we're using JdbcSinkConnector
class which installed manually while creating Docker image.
debezium-sink-config.json
(for consumer / sink
connector)
json
1{
2 "name": "sharingcdc-sink-connector",
3 "config": {
4 "auto.create": "true",
5 "connection.url": "jdbc:postgresql://host.docker.internal:5434/sharingcdc-copy?user=mine&password=qwepoi123",
6 "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
7 "insert.mode": "upsert",
8 "pk.fields": "id",
9 "pk.mode": "record_key",
10 "delete.enabled": "true",
11 "tasks.max": "1",
12 "topics": "todo",
13 "transforms": "unwrap",
14 "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
15 "transforms.unwrap.drop.tombstones": "false"
16 }
17}
Send the payload through curl
:
sh
1curl -i -X POST -H "Content-Type:application/json" -H "Accept:application/json" localhost:8083/connectors -d "@debezium-sink-config.json"
It will gives us this:
json
1HTTP/1.1 201 Created
2Date: Tue, 12 Jul 2022 06:43:15 GMT
3Location: http://localhost:8083/connectors/sharingcdc-sink-connector
4Content-Type: application/json
5Content-Length: 539
6Server: Jetty(9.4.43.v20210629)
7
8{
9 "name":"sharingcdc-sink-connector",
10 "config":{
11 "auto.create":"true",
12 "connection.url":"jdbc:postgresql://host.docker.internal:5434/sharingcdc-copy?user=mine&password=qwepoi123",
13 "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
14 "insert.mode":"upsert",
15 "pk.fields":"id",
16 "pk.mode":"record_value",
17 "tasks.max":"1",
18 "topics":"postgres.public.todo",
19 "transforms":"unwrap",
20 "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
21 "transforms.unwrap.drop.tombstones":"false",
22 "name":"sharingcdc-sink-connector"
23 },
24 "tasks":[],
25 "type":"sink"
26}
Now, everything should be running normally. If so, let's try to connect to the second PostgreDB and check if it works 🤞
sh
1docker exec -it pg2 sh #run bash of pg2
2
3psql -U mine -d sharingcdc-copy -W #login to sharingcdc-copy database
After submitting the password, you can run:
sh
1\d --check all relations
We should see that table todo
automatically created for us, which we never created it before.
sh
1sharingcdc-copy=# \d
2 List of relations
3 Schema | Name | Type | Owner
4--------+------+-------+-------
5 public | todo | table | mine
6(1 row)
7
8
9sharingcdc-copy=# select * from todo;
10 is_done | id | created_date | modified_date | title
11---------+--------------------------------------+------------------+------------------+--------------
12 f | 129eef91-8f55-4edd-9c63-804c6f1a3f5b | 1654009198150000 | 1655765531481000 | feed the dog
13 f | 79fe5ffa-94ed-4871-a5cd-300471586914 | 1654008432198000 | 1655765530648000 | do laundry
14 f | 6fc341f0-04a8-4c91-92bb-3dbb1ef0b557 | 1655763069613000 | 1655765546648000 | test
15(3 rows)
Let's try to insert new data into our first database:
mysql
1sharingcdc=# INSERT INTO public.todo
2(id, created_date, is_done, modified_date, title)
3VALUES('850c5925-bb49-4795-9c19-4b7a6b4514b5'::uuid, '2022-06-20 22:12:02.335', false, NULL, 'Test12');
4INSERT 0 1
Then confirm it in the second database:
mysql
1sharingcdc-copy=# select * from todo;
2 is_done | id | created_date | modified_date | title
3---------+--------------------------------------+------------------+------------------+--------------
4 f | 129eef91-8f55-4edd-9c63-804c6f1a3f5b | 1654009198150000 | 1655765531481000 | feed the dog
5 f | 79fe5ffa-94ed-4871-a5cd-300471586914 | 1654008432198000 | 1655765530648000 | do laundry
6 f | 6fc341f0-04a8-4c91-92bb-3dbb1ef0b557 | 1655763069613000 | 1655765546648000 | test
7 f | 850c5925-bb49-4795-9c19-4b7a6b4514b5 | 1655763122335000 | | Test12
8(4 rows)
Voila! 🎉🎉🎉
You could also see the activity logged in the schema registry:
json
1{
2 "id":"850c5925-bb49-4795-9c19-4b7a6b4514b5"
3}
4{
5 "before":null,
6 "after":{
7 "postgres.public.todo.Value":{
8 "id":"850c5925-bb49-4795-9c19-4b7a6b4514b5",
9 "created_date":{
10 "long":1655763122335000
11 },
12 "is_done":{
13 "boolean":false
14 },
15 "modified_date":null,
16 "title":{
17 "string":"Test12"
18 }
19 }
20 },
21 "source":{
22 "version":"1.4.2.Final",
23 "connector":"postgresql",
24 "name":"postgres",
25 "ts_ms":1657618002239,
26 "snapshot":{
27 "string":"false"
28 },
29 "db":"sharingcdc",
30 "schema":"public",
31 "table":"todo",
32 "txId":{
33 "long":495
34 },
35 "lsn":{
36 "long":23877952
37 },
38 "xmin":null
39 },
40 "op":"c",
41 "ts_ms":{
42 "long":1657618002694
43 },
44 "transaction":null
45}
Source code of this project is also available in my Github repo here.
The latest Kafka connect image does not include JBDC Sink Connector we use. This is indicated with this message when we start the connector using the config above.
json
1HTTP/1.1 500 Internal Server Error
2Date: Mon, 11 Jul 2022 06:44:12 GMT
3Content-Type: application/json
4Content-Length: 4581
5Server: Jetty(9.4.33.v20201020)
6
7{
8 "error_code":500,
9 "message":"Failed to find any class that implements Connector and which name matches io.confluent.connect.jdbc.JdbcSinkConnector, available connectors are: PluginDesc{klass=class <snip> }"
10}
Since it did not come out of the box with the connectors, we need to install it manually. Solving it needs a bit more trick tho.
Solution
Tweaking around I found out that actually official Debezium developer (Jiri Pechanec) provided how to build our own custom Debezium connector with JDBC Sink Connector included.
In that `Dockerfile` example, the connector used is quay.io/debezium/
, however I decided to keep using the first tutorial I followed.
Here is my custom Dockerfile
:
batchfile
1FROM debezium/connect:1.4
2ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc
3
4ARG POSTGRES_VERSION=42.2.8
5ARG KAFKA_JDBC_VERSION=5.3.2
6
7# Deploy PostgreSQL JDBC Driver
8RUN cd /kafka/libs && curl -sO https://jdbc.postgresql.org/download/postgresql-$POSTGRES_VERSION.jar
9
10# Deploy Kafka Connect JDBC
11RUN mkdir $KAFKA_CONNECT_JDBC_DIR && cd $KAFKA_CONNECT_JDBC_DIR &&\
12curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar
Let's build it:
sh
1docker build -t dbz-conn-jdbc-sink .
Note that dbz-conn-jdbc-sink
tag above. It is the image that used in my docker-compose.yml
file for debezium
service.
If you happen to follow this tutorial, you can build the image yourself before running the docker-compose.yml
file.
When creating the sink
connector, for couple seconds the status endpoint tells us that it is running. However, I tried checking it after 10 seconds wondering why the data was not inserted into the new database.
json
1{
2 "name":"sharingcdc-sink-connector",
3 "connector":{
4 "state":"RUNNING",
5 "worker_id":"172.29.0.8:8083"
6 },
7 "tasks":[{
8 "id":0,
9 "state":"FAILED",
10 "worker_id":"172.29.0.8:8083",
11 "trace":"org.apache.kafka.common.config.ConfigException: Primary key mode must be 'record_key' when delete support is enabled <snip>"}],
12 "type":"sink"
13}
I found out that I'm using "pk.mode":"record_value"
instead of record_key
. So just simply change the value with record_key
and we're good to go.
json
1{
2 "name":"sharingcdc-sink-connector",
3 "connector":{
4 "state":"RUNNING",
5 "worker_id":"172.29.0.8:8083"
6 },
7 "tasks":[{
8 "id":0,
9 "state":"FAILED",
10 "worker_id":"172.29.0.8:8083",
11 "trace":"org.apache.kafka.connect.errors.ConnectException: <snip> Caused by: org.apache.kafka.connect.errors.ConnectException: org.postgresql.util.PSQLException: Connection to postgres:5434 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections. <snip>"
12 }],
13 "type":"sink"
14}
Careful when using localhost
in Docker. The term localhost
in the host machine and in the Docker are different. In my case, I need to make the connection to the host machine localhost
. Defining a static IP would solve the problem, but my IP is changed periodically, thus I need to use host.docker.internal
keyword to point to the host localhost
.
This keyword also used in my debezium-sink-config.json
file:
json
1{
2 ...
3 "connection.url": "jdbc:postgresql://host.docker.internal:5434/sharingcdc-copy?user=mine&password=qwepoi123"
4 ...
5}
Now we have learned how to stream our database changes to Kafka topic and take advantage of it to replicate to another database. More examples are coming for this CDC topic.
Hope this helps! 👋