前提条件

架构

获取所有connector-plugins

curl http://localhost:3083/connector-plugins | jq

type: source:源数据插件

type: sink:目标数据插件

默认内置插件:

[kafka@kafka-connect-769747b4bb-vqkld connect]$ ls -l
total 52
drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-db2
drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-ibmi
drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-informix
drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-jdbc
drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-mongodb
drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-mysql
drwxr-xr-x 2 kafka kafka 8192 Jul 26 04:26 debezium-connector-oracle
drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-postgres
drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-spanner
drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-sqlserver
drwxr-xr-x 2 kafka kafka 4096 Jul 26 04:26 debezium-connector-vitess

源数据插件使用:io.debezium.connector.mysql.MySqlConnector

debezium镜像默认只有一个jdbc的目标数据插件:io.debezium.connector.jdbc.JdbcSinkConnector

curl http://localhost:3083/connector-plugins | jq | grep sink -C 1

创建source connector(源连接器)

{
	"name": "source-mysql-connector",
	"config": {
		"connector.class": "io.debezium.connector.mysql.MySqlConnector",
		"tasks.max": "1",
        // 源数据库配置 
        "database.hostname": "mysql.mysql.svc.cluster.local",  
        "database.port": "3306",
        "database.user": "root",
        "database.password": "password",
        //  此数据库客户机的数字ID,必须是唯一的,连接器使用此ID将MySQL数据库集群加入为另一台服务器。
        "database.server.id": "184054",
        // 源连接器的kafka主题,这个主题可以获取表的增删改操作
        "topic.prefix": "mysql",
        // 只获取testdb库
        "database.include.list": "testdb",
        // 仅同步testdb数据库下的test_tables表
        "table.include.list": "testdb.test_tables",
        // kafka地址
        "schema.history.internal.kafka.bootstrap.servers": "kafka.kafka.svc.cluster.local:9092",
        // 历史记录字段,仅debezium内部使用。
        "schema.history.internal.kafka.topic": "schema-changes.testdb",
        // 主键字段,testdb开头的数据库,id字段
        "message.key.columns": "testdb(.*):id"
        // 指定连接器向 Kafka 主题发送检测信号消息的频率,默认0
	    "heartbeat.interval.ms": "100",
        // 指定连接器是否将数据库架构发生的更改发布到具有数据库服务器 ID 名称的 Kafka 主题
	    "include.schema.changes": "true",
        // 指定连接器是否在元数据对象上分析和发布表和列注释。
	    "include.schema.comments": "true",
        // 快照模式,默认就是initial
	    "snapshot.mode": "initial",
        // 通过route获取数据库名作为kafka主题名
	    // "transforms": "route",
	    // "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
	    // "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
	    // "transforms.route.replacement": "$3",
        // 指定删除事件后是否跟有逻辑删除事件,默认就是true
	    // "tombstones.on.delete": "true"
    }
}

参数地址:https://debezium.io/documentation/reference/stable/connectors/mysql.html

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:3083/connectors -d @source-mysql-connector.json

创建sink connector(目标连接器)

{
 "name": "sink-mysql-connector",
 "config":{
   "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
   // 目标数据库:jdbc:mysql://mysql地址/数据库
   "connection.url": "jdbc:mysql://mysqltest.mysql.svc.cluster.local:3306/test",
   "connection.username": "root",
   "connection.password": "password",
   "tasks.max": "1",
   // 源kafka主题
   "topics": "mysql.testdb.test_tables",
   "dialect.name": "MySqlDatabaseDialect",
   // 保证删除能正确消费的参数
   "debezium.sink.databend.upsert-keep-deletes": "false",
   "transforms": "unwrap",
   "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
   "transforms.unwrap.drop.tombstones": "true",
   "transforms.unwrap.delete.handling.mode": "rewrite",
   "schema.evolution": "basic",
   // 如果主键不存在,则连接器执行INSERT操作,如果键存在,则连接器执行UPDATE操作
   "insert.mode": "upsert",
   // 
   "delete.enabled": "true",
   // 主键,如果"delete.enabled": "true",这个配置不能为空
   "primary.key.fields": "id"
   "primary.key.mode": "record_key",
   "database.time_zone": "UTC",
   }
}

参数地址:https://debezium.io/documentation/reference/stable/connectors/jdbc.html

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:3083/connectors -d @sink-mysql-connector.json

测试

源数据库,新增数据,并删除一部分

目标数据库查看

数据库表名为kafka主题名,.改为_