Debezium is a self-hosted distributed platform that can read data from a variety of sources and import it into Kafka. You can use Debezium to migrate data to CockroachDB from another database that is accessible over the public internet.
As of this writing, Debezium supports the following database sources:
- MongoDB
- MySQL
- PostgreSQL
- SQL Server
- Oracle
- Db2
- Cassandra
- Vitess (incubating)
- Spanner (incubating)
- JDBC (incubating)
Migrating with Debezium requires familiarity with Kafka. Refer to the Debezium documentation for information on how Debezium is deployed with Kafka Connect.
Before you begin
Complete the following items before using Debezium:
- Configure a secure publicly-accessible CockroachDB cluster running the latest v24.1 production release with at least one SQL user, make a note of the credentials for the SQL user.
- Install and configure Debezium, Kafka Connect, and Kafka.
Migrate data to CockroachDB
Once all of the prerequisite steps are completed, you can use Debezium to migrate data to CockroachDB.
To write data from Kafka to CockroachDB, use the Confluent JDBC Sink Connector. First use the following
dockerfile
to create a custom image with the JDBC driver:FROM quay.io/debezium/connect:latest ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc ARG POSTGRES_VERSION=latest ARG KAFKA_JDBC_VERSION=latest # Deploy PostgreSQL JDBC Driver RUN cd /kafka/libs && curl -sO https://jdbc.postgresql.org/download/postgresql-$POSTGRES_VERSION.jar # Deploy Kafka Connect JDBC RUN mkdir $KAFKA_CONNECT_JDBC_DIR && cd $KAFKA_CONNECT_JDBC_DIR &&\ curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar
Create the JSON configuration file that you will use to add data from your source database to a Kafka topic. For example:
{ "name": "pg-source", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.dbname": "{database}", "database.hostname": "{hostname}", "database.password": "", "database.port": "5432", "database.user": "postgres", "plugin.name": "pgoutput", "table.include.list": "public.test_table_small", "tasks.max": "1", "topic.creation.default.cleanup.policy": "delete", "topic.creation.default.partitions": "10", "topic.creation.default.replication.factor": "1", "topic.creation.default.retention.ms": "604800000", "topic.creation.enable": "true", "topic.prefix": "{username}", "slot.name" : "debezium" } }
Create the JSON configuration file that you will use to create the sink. For example:
{ "name": "pg-sink", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "10", "topics" : "{topic.example.table}", "connection.url": "jdbc:postgresql://{host}:{port}/{username}?sslmode=require", "connection.user": "{username}", "connection.password": "{password}", "insert.mode": "upsert", "pk.mode": "record_value", "pk.fields": "id", "database.time_zone": "UTC", "auto.create": true, "auto.evolve": false, "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState" } }
Specify
connection.url
in JDBC format. For information about where to find the CockroachDB connection parameters, see Connect to a CockroachDB Cluster.The preceding snippet is an example configuration. For details on the configurable fields, see the Confluent JDBC Sink Connector documentation.
To create the sink,
POST
the JSON configuration file to the Kafka Connect/connectors
endpoint. Refer to the Kafka Connect API documentation for more information.