This page provides step-by-step examples for using Core and Enterprise changefeeds. Creating Enterprise changefeeds is available on CockroachDB Dedicated, on CockroachDB Serverless, and with an Enterprise license on CockroachDB Self-Hosted clusters. Core changefeeds are available in all products.
For a summary of Core and Enterprise changefeed features, refer to the Change Data Capture Overview page.
Enterprise changefeeds can connect to the following sinks:
- Kafka
- Google Cloud Pub/Sub
- Cloud Storage (Amazon S3, Google Cloud Storage, Azure Storage)
- Webhook
- New in v24.1: Azure Event Hubs
- New in v24.1: Apache Pulsar (Preview)
Refer to the Changefeed Sinks page for more detail on forming sink URIs, available sink query parameters, and specifics on configuration.
Cockroach Labs recommends monitoring your changefeeds to track retryable errors and protected timestamp usage. Refer to the Monitor and Debug Changefeeds page for more information.
Use the following filters to show usage examples for either Enterprise or Core changefeeds:
Before you run the examples, verify that you have the CHANGEFEED
privilege in order to create and manage changefeed jobs. Refer to Required privileges for more details.
You can create an external connection to represent a changefeed sink URI. This allows you to specify the external connection's name in statements rather than the provider-specific URI. For detail on using external connections, see the CREATE EXTERNAL CONNECTION
page.
Create a changefeed connected to Kafka
CREATE CHANGEFEED
is an Enterprise-only feature. For the Core version, see the CHANGEFEED FOR
example.
In this example, you'll set up a changefeed for a single-node cluster that is connected to a Kafka sink. The changefeed will watch two tables.
If you do not already have one, request a trial Enterprise license.
Use the
cockroach start-single-node
command to start a single-node cluster:cockroach start-single-node --insecure --listen-addr=localhost
Download and extract the Confluent Open Source platform (which includes Kafka).
In a new terminal window, go to the extracted
confluent-<version>
directory and start Confluent:./bin/confluent local services start
Only
zookeeper
andkafka
are needed. To troubleshoot Confluent, see their docs and the Quick Start Guide.Create two Kafka topics:
./bin/kafka-topics \ --create \ --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \ --topic office_dogs
./bin/kafka-topics \ --create \ --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \ --topic employees
Note:You are expected to create any Kafka topics with the necessary number of replications and partitions. Topics can be created manually or Kafka brokers can be configured to automatically create topics with a default partition count and replication factor.
As the
root
user, open the built-in SQL client:$ cockroach sql --insecure
Set your organization name and Enterprise license key that you received via email:
> SET CLUSTER SETTING cluster.organization = '<organization name>';
> SET CLUSTER SETTING enterprise.license = '<secret>';
Enable the
kv.rangefeed.enabled
cluster setting:> SET CLUSTER SETTING kv.rangefeed.enabled = true;
Note:If you are working on a CockroachDB Serverless cluster, the
kv.rangefeed.enabled
cluster setting is enabled by default.Create a database called
cdc_demo
:> CREATE DATABASE cdc_demo;
Set the database as the default:
> SET DATABASE = cdc_demo;
Create a table and add data:
> CREATE TABLE office_dogs ( id INT PRIMARY KEY, name STRING);
> INSERT INTO office_dogs VALUES (1, 'Petee'), (2, 'Carl');
> UPDATE office_dogs SET name = 'Petee H' WHERE id = 1;
Create another table and add data:
> CREATE TABLE employees ( dog_id INT REFERENCES office_dogs (id), employee_name STRING);
> INSERT INTO employees VALUES (1, 'Lauren'), (2, 'Spencer');
Start the changefeed:
CREATE CHANGEFEED FOR TABLE office_dogs, employees INTO 'kafka://localhost:9092';
job_id +--------------------+ 360645287206223873 (1 row) NOTICE: changefeed will emit to topic office_dogs NOTICE: changefeed will emit to topic employees
This will start up the changefeed in the background and return the
job_id
. The changefeed writes to Kafka.In a new terminal, move into the extracted
confluent-<version>
directory and start watching the Kafka topics:./bin/kafka-console-consumer \ --bootstrap-server=localhost:9092 \ --from-beginning \ --include 'office_dogs|employees'
{"after": {"id": 1, "name": "Petee H"}} {"after": {"id": 2, "name": "Carl"}} {"after": {"id": 1, "name": "Lauren", "rowid": 528514320239329281}} {"after": {"id": 2, "name": "Spencer", "rowid": 528514320239362049}}
The initial scan displays the state of the tables as of when the changefeed started (therefore, the initial value of
"Petee"
is omitted).Note:This example only prints the value. To print both the key and value of each message in the changefeed (e.g., to observe what happens with
DELETE
s), use the--property print.key=true
flag.Back in the SQL client, insert more data:
INSERT INTO office_dogs VALUES (3, 'Ernie');
Back in the terminal where you're watching the Kafka topics, the following output has appeared:
{"after": {"id": 3, "name": "Ernie"}}
When you are done, exit the SQL shell (
\q
).To stop
cockroach
:Get the process ID of the node:
ps -ef | grep cockroach | grep -v grep
501 21766 1 0 6:21PM ttys001 0:00.89 cockroach start-single-node --insecure --listen-addr=localhost
Gracefully shut down the node, specifying its process ID:
kill -TERM 21766
initiating graceful shutdown of server server drained and shutdown completed
To stop Kafka, move into the extracted
confluent-<version>
directory and stop Confluent:./bin/confluent local services stop
Create a changefeed connected to Kafka using Avro
CREATE CHANGEFEED
is an Enterprise-only feature. For the Core version, see the CHANGEFEED FOR
example.
In this example, you'll set up a changefeed for a single-node cluster that is connected to a Kafka sink and emits Avro records. The changefeed will watch two tables.
If you do not already have one, request a trial Enterprise license.
Use the
cockroach start-single-node
command to start a single-node cluster:cockroach start-single-node --insecure --listen-addr=localhost
Download and extract the Confluent Open Source platform (which includes Kafka).
Move into the extracted
confluent-<version>
directory and start Confluent:./bin/confluent local services start
Only
zookeeper
,kafka
, andschema-registry
are needed. To troubleshoot Confluent, see their docs and the Quick Start Guide.Create two Kafka topics:
./bin/kafka-topics \ --create \ --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \ --topic office_dogs
./bin/kafka-topics \ --create \ --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \ --topic employees
Note:You are expected to create any Kafka topics with the necessary number of replications and partitions. Topics can be created manually or Kafka brokers can be configured to automatically create topics with a default partition count and replication factor.
As the
root
user, open the built-in SQL client:$ cockroach sql --insecure
Set your organization name and Enterprise license key that you received via email:
> SET CLUSTER SETTING cluster.organization = '<organization name>';
> SET CLUSTER SETTING enterprise.license = '<secret>';
Enable the
kv.rangefeed.enabled
cluster setting:> SET CLUSTER SETTING kv.rangefeed.enabled = true;
Note:If you are working on a CockroachDB Serverless cluster, the
kv.rangefeed.enabled
cluster setting is enabled by default.Create a database called
cdc_demo
:> CREATE DATABASE cdc_demo;
Set the database as the default:
> SET DATABASE = cdc_demo;
Create a table and add data:
> CREATE TABLE office_dogs ( id INT PRIMARY KEY, name STRING);
> INSERT INTO office_dogs VALUES (1, 'Petee'), (2, 'Carl');
> UPDATE office_dogs SET name = 'Petee H' WHERE id = 1;
Create another table and add data:
> CREATE TABLE employees ( dog_id INT REFERENCES office_dogs (id), employee_name STRING);
> INSERT INTO employees VALUES (1, 'Lauren'), (2, 'Spencer');
Start the changefeed:
CREATE CHANGEFEED FOR TABLE office_dogs, employees INTO 'kafka://localhost:9092' WITH format = avro, confluent_schema_registry = 'http://localhost:8081';
To connect to Confluent Cloud, use the following URL structure:
'https://{API_KEY_ID}:{API_SECRET_URL_ENCODED}@{CONFLUENT_REGISTRY_URL}:443'
. See the Stream a Changefeed to a Confluent Cloud Kafka Cluster tutorial for further detail.job_id +--------------------+ 360645287206223873 (1 row) NOTICE: changefeed will emit to topic office_dogs NOTICE: changefeed will emit to topic employees
This will start up the changefeed in the background and return the
job_id
. The changefeed writes to Kafka.In a new terminal, move into the extracted
confluent-<version>
directory and start watching the Kafka topics:./bin/kafka-avro-console-consumer \ --bootstrap-server=localhost:9092 \ --from-beginning \ --include 'office_dogs|employees'
{"after":{"office_dogs":{"id":{"long":1},"name":{"string":"Petee H"}}}} {"after":{"office_dogs":{"id":{"long":2},"name":{"string":"Carl"}}}} {"after":{"employees":{"dog_id":{"long":1},"employee_name":{"string":"Lauren"},"rowid":{"long":528537452042682369}}}} {"after":{"employees":{"dog_id":{"long":2},"employee_name":{"string":"Spencer"},"rowid":{"long":528537452042747905}}}}
The initial scan displays the state of the table as of when the changefeed started (therefore, the initial value of
"Petee"
is omitted).Note:This example only prints the value. To print both the key and value of each message in the changefeed (e.g., to observe what happens with
DELETE
s), use the--property print.key=true
flag.Back in the SQL client, insert more data:
INSERT INTO office_dogs VALUES (3, 'Ernie');
Back in the terminal where you're watching the Kafka topics, the following output has appeared:
{"after":{"office_dogs":{"id":{"long":3},"name":{"string":"Ernie"}}}}
When you are done, exit the SQL shell (
\q
).To stop
cockroach
:Get the process ID of the node:
ps -ef | grep cockroach | grep -v grep
501 21766 1 0 6:21PM ttys001 0:00.89 cockroach start-single-node --insecure --listen-addr=localhost
Gracefully shut down the node, specifying its process ID:
kill -TERM 21766
initiating graceful shutdown of server server drained and shutdown completed
To stop Kafka, move into the extracted
confluent-<version>
directory and stop Confluent:./bin/confluent local services stop
Create a changefeed connected to a Confluent Cloud sink
In this example, you'll set up a changefeed for a single-node cluster that is connected to a Confluent Cloud managed Kafka cluster. The changefeed will watch a table and send messages to the sink.
If you do not already have one, request a trial Enterprise license.
Use the
cockroach start-single-node
command to start a single-node cluster:cockroach start-single-node --insecure --listen-addr=localhost
In this example, you'll run CockroachDB's Movr application workload to set up some data for your changefeed.
In a new terminal, first create the schema for the workload:
cockroach workload init movr "postgresql://root@127.0.0.1:26257?sslmode=disable"
Then run the workload:
cockroach workload run movr --duration=1m "postgresql://root@127.0.0.1:26257?sslmode=disable"
As the
root
user, open the built-in SQL client:$ cockroach sql --insecure
Set your organization name and Enterprise license key that you received via email:
> SET CLUSTER SETTING cluster.organization = '<organization name>';
> SET CLUSTER SETTING enterprise.license = '<secret>';
Enable the
kv.rangefeed.enabled
cluster setting:> SET CLUSTER SETTING kv.rangefeed.enabled = true;
Note:If you are working on a CockroachDB Serverless cluster, the
kv.rangefeed.enabled
cluster setting is enabled by default.To prepare the Confluent Cloud Kafka sink, sign up on the Confluent Cloud trial page.
On the Create cluster page, select the necessary cluster type. The Basic cluster type is sufficient to run this example. Select the configuration and regions. After you have confirmed payment and set a cluster name, Launch the cluster.
From the Overview page, click on API Keys in the navigation menu. Click Create key and select the scope for this API key. Global access is sufficient for this example. Granular access is more suitable for production. Copy or download the key and secret.
Create a topic in Confluent Cloud. Under Topics select Add topic. Add a topic name and define the number of partitions and then Create. CockroachDB defaults to using the table name as the topic name. If you would like to send messages to an alternate topic, you can specify the
topic_name
parameter.Note:You can enable auto topic creation for Confluent Cloud Dedicated clusters under Cluster Settings in the console.
Construct the URI to connect your changefeed to the Confluent Cloud cluster. You will need:
- The prefix scheme
confluent-cloud://
. - The address and port of the bootstrap server. Click on Cluster settings in your Confluent Cloud console. Under Endpoints, you will find the Bootstrap server. It will be something like:
pkc-lzvrd.us-west4.gcp.confluent.cloud:9092
. - Ensure that you follow the cluster address with
?
before the remaining parameters, and also connect the following parameters with&
. - Your API key and secret from the previous step passed with the
api_key
andapi_secret
parameters. You must URL encode the secret before adding to the connection string. (Optional) Any further parameters. Refer to Query parameters.
'confluent-cloud://pkc-lzvrd.us-west4.gcp.confluent.cloud:9092?api_key={API key}&api_secret={url-encoded API secret}'
- The prefix scheme
Back in the SQL shell, create a changefeed:
CREATE CHANGEFEED FOR TABLE movr.users, movr.vehicles INTO 'confluent-cloud://pkc-lzvrd.us-west4.gcp.confluent.cloud:9092?api_key={API key}&api_secret={url-encoded API secret}&topic_name=users_and_vehicles' WITH resolved;
job_id ---------------------- 913248812656525313 (1 row) NOTICE: changefeed will emit to topic users_and_vehicles
Create a changefeed connected to a Google Cloud Pub/Sub sink
In this example, you'll set up a changefeed for a single-node cluster that is connected to a Google Cloud Pub/Sub sink. The changefeed will watch a table and send messages to the sink.
You'll need access to a Google Cloud Project to set up a Pub/Sub sink. In this example, the Google Cloud CLI (gcloud
) is used, but you can also complete each of these steps within your Google Cloud Console.
If you do not already have one, request a trial Enterprise license.
Use the
cockroach start-single-node
command to start a single-node cluster:cockroach start-single-node --insecure --listen-addr=localhost
In this example, you'll run CockroachDB's Movr application workload to set up some data for your changefeed.
In a new terminal, first create the schema for the workload:
cockroach workload init movr "postgresql://root@127.0.0.1:26257?sslmode=disable"
Then run the workload:
cockroach workload run movr --duration=1m "postgresql://root@127.0.0.1:26257?sslmode=disable"
As the
root
user, open the built-in SQL client:$ cockroach sql --insecure
Set your organization name and Enterprise license key that you received via email:
> SET CLUSTER SETTING cluster.organization = '<organization name>';
> SET CLUSTER SETTING enterprise.license = '<secret>';
Enable the
kv.rangefeed.enabled
cluster setting:> SET CLUSTER SETTING kv.rangefeed.enabled = true;
Note:If you are working on a CockroachDB Serverless cluster, the
kv.rangefeed.enabled
cluster setting is enabled by default.Next, you'll prepare your Pub/Sub sink.
In a new terminal window, create a Service Account attached to your Google Project:
gcloud iam service-accounts create cdc-demo --project cockroach-project
In this example,
cdc-demo
will represent the name of the service account, andcockroach-project
is the name of the Google Project.To ensure that your Service Account has the correct permissions to publish to the sink, use the following command to give the Service Account the predefined Pub/Sub Editor role:
gcloud projects add-iam-policy-binding cockroach-project --member='serviceAccount:cdc-demo@cockroach-project.iam.gserviceaccount.com' --role='roles/pubsub.editor'
Create the Pub/Sub topic to which your changefeed will emit messages:
gcloud pubsub topics create movr-users --project cockroach-project
Run the following command to create a subscription within the
movr-users
topic:gcloud pubsub subscriptions create movr-users-sub --topic=movr-users --topic-project=cockroach-project
With the topic and subscription set up, you can now download your Service Account's key. Use the following command to specify where to download the json key file (
key.json
):gcloud iam service-accounts keys create key.json --iam-account=cdc-demo@cockroach-project.iam.gserviceaccount.com
Next, base64 encode your credentials key using the command specific to your platform.
If you're working on macOS:
cat key.json | base64
If you're working on Linux, run the following to ensure that lines are not wrapped in the output:
cat key.json | base64 -w 0
Copy the output so that you can add it to your
CREATE CHANGEFEED
statement in the next step. When you create your changefeed, it is necessary that the key is base64 encoded before passing it in the URI.Back in the SQL shell, create a changefeed that will emit messages to your Pub/Sub topic. Ensure that you pass the base64-encoded credentials for your Service Account:
CREATE CHANGEFEED FOR TABLE users INTO 'gcpubsub://cockroach-project?region=us-east1&topic_name=movr-users&AUTH=specified&CREDENTIALS={base64-encoded key}';
You can include the
region
parameter for your topic, or use the WITHunordered
option for multi-region Pub/Sub. See the Changefeed Sinks page for more detail.The output will confirm the topic where the changefeed will emit messages to.
job_id ---------------------- 756641304964792321 (1 row) NOTICE: changefeed will emit to topic movr-users
To view all the messages delivered to your topic, you can use:
- The Google Cloud Console. From the Pub/Sub menu, select Subscriptions in the left-hand navigation and then select the subscription ID from your list of subscriptions. On the subscription's overview, click Messages, and then Pull to view messages.
The
gcloud
CLI. From your terminal, run the following command:gcloud pubsub subscriptions pull movr-users-sub --auto-ack --limit=10
This command will only pull these messages once per subscription. For example, if you ran this command again you would receive 10 different messages in your output. To receive more than one message at a time, pass the
--limit
flag. For more details, refer to the gcloud pubsub subscriptions pull documentation.┌─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬───────────────────┬──────────────┬────────────┬──────────────────┬────────────┐ │ DATA │ MESSAGE_ID │ ORDERING_KEY │ ATTRIBUTES │ DELIVERY_ATTEMPT │ ACK_STATUS │ ├─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼───────────────────┼──────────────┼────────────┼──────────────────┼────────────┤ │ {"Key":["amsterdam", "09ee2856-5856-40c4-85d3-7d65bed978f0"],"Value":{"after": {"address": "84579 Peter Divide Apt. 47", "city": "amsterdam", "credit_card": "0100007510", "id": "09ee2856-5856-40c4-85d3-7d65bed978f0", "name": "Timothy Jackson"}},"Topic":"movr-users"} │ 11249015757941393 │ │ │ │ SUCCESS │ │ {"Key":["new york", "8803ab9e-5001-4994-a2e6-68d587f95f1d"],"Value":{"after": {"address": "37546 Andrew Roads Apt. 68", "city": "new york", "credit_card": "4731676650", "id": "8803ab9e-5001-4994-a2e6-68d587f95f1d", "name": "Susan Harrington"}},"Topic":"movr-users"} │ 11249015757941394 │ │ │ │ SUCCESS │ │ {"Key":["seattle", "32e27201-ca0d-4a0c-ada2-fbf47f6a4711"],"Value":{"after": {"address": "86725 Stephen Gardens", "city": "seattle", "credit_card": "3639690115", "id": "32e27201-ca0d-4a0c-ada2-fbf47f6a4711", "name": "Brad Hill"}},"Topic":"movr-users"} │ 11249015757941395 │ │ │ │ SUCCESS │ │ {"Key":["san francisco", "27b03637-ef9f-49a0-9b58-b16d7a9e34f4"],"Value":{"after": {"address": "85467 Tiffany Field", "city": "san francisco", "credit_card": "0016125921", "id": "27b03637-ef9f-49a0-9b58-b16d7a9e34f4", "name": "Mark Garcia"}},"Topic":"movr-users"} │ 11249015757941396 │ │ │ │ SUCCESS │ │ {"Key":["rome", "982e1863-88d4-49cb-adee-0a35baae7e0b"],"Value":{"after": {"address": "54918 Sutton Isle Suite 74", "city": "rome", "credit_card": "6015706174", "id": "982e1863-88d4-49cb-adee-0a35baae7e0b", "name": "Kimberly Nichols"}},"Topic":"movr-users"} │ 11249015757941397 │ │ │ │ SUCCESS │ │ {"Key":["washington dc", "7b298994-7b12-414c-90ef-353c7105f012"],"Value":{"after": {"address": "45205 Romero Ford Apt. 86", "city": "washington dc", "credit_card": "3519400314", "id": "7b298994-7b12-414c-90ef-353c7105f012", "name": "Taylor Bullock"}},"Topic":"movr-users"} │ 11249015757941398 │ │ │ │ SUCCESS │ │ {"Key":["boston", "4f012f57-577b-4853-b5ab-0d79d0df1369"],"Value":{"after": {"address": "15765 Vang Ramp", "city": "boston", "credit_card": "6747715133", "id": "4f012f57-577b-4853-b5ab-0d79d0df1369", "name": "Ryan Garcia"}},"Topic":"movr-users"} │ 11249015757941399 │ │ │ │ SUCCESS │ │ {"Key":["seattle", "9ba85917-5545-4674-8ab2-497fa47ac00f"],"Value":{"after": {"address": "24354 Whitney Lodge", "city": "seattle", "credit_card": "8642661685", "id": "9ba85917-5545-4674-8ab2-497fa47ac00f", "name": "Donald Walsh"}},"Topic":"movr-users"} │ 11249015757941400 │ │ │ │ SUCCESS │ │ {"Key":["seattle", "98312fb3-230e-412d-9b22-074ec97329ff"],"Value":{"after": {"address": "72777 Carol Shoal", "city": "seattle", "credit_card": "7789799678", "id": "98312fb3-230e-412d-9b22-074ec97329ff", "name": "Christopher Davis"}},"Topic":"movr-users"} │ 11249015757941401 │ │ │ │ SUCCESS │ └─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴───────────────────┴──────────────┴────────────┴──────────────────┴────────────┘
Create a changefeed connected to a cloud storage sink
CREATE CHANGEFEED
is an Enterprise-only feature. For the Core version, see the CHANGEFEED FOR
example.
In this example, you'll set up a changefeed for a single-node cluster that is connected to an AWS S3 sink. The changefeed watches two tables. Note that you can set up changefeeds for any of these cloud storage providers.
If you do not already have one, request a trial Enterprise license.
Use the
cockroach start-single-node
command to start a single-node cluster:$ cockroach start-single-node --insecure --listen-addr=localhost
As the
root
user, open the built-in SQL client:$ cockroach sql --insecure
Set your organization name and Enterprise license key that you received via email:
> SET CLUSTER SETTING cluster.organization = '<organization name>';
> SET CLUSTER SETTING enterprise.license = '<secret>';
Enable the
kv.rangefeed.enabled
cluster setting:> SET CLUSTER SETTING kv.rangefeed.enabled = true;
Note:If you are working on a CockroachDB Serverless cluster, the
kv.rangefeed.enabled
cluster setting is enabled by default.Create a database called
cdc_demo
:> CREATE DATABASE cdc_demo;
Set the database as the default:
> SET DATABASE = cdc_demo;
Create a table and add data:
> CREATE TABLE office_dogs ( id INT PRIMARY KEY, name STRING);
> INSERT INTO office_dogs VALUES (1, 'Petee'), (2, 'Carl');
> UPDATE office_dogs SET name = 'Petee H' WHERE id = 1;
Create another table and add data:
> CREATE TABLE employees ( dog_id INT REFERENCES office_dogs (id), employee_name STRING);
> INSERT INTO employees VALUES (1, 'Lauren'), (2, 'Spencer');
Start the changefeed:
> CREATE CHANGEFEED FOR TABLE office_dogs, employees INTO 's3://example-bucket-name/test?AWS_ACCESS_KEY_ID=enter_key-here&AWS_SECRET_ACCESS_KEY=enter_key_here' with updated, resolved='10s';
job_id +--------------------+ 360645287206223873 (1 row)
This will start up the changefeed in the background and return the
job_id
. The changefeed writes to AWS.Monitor your changefeed on the DB Console. For more information, see Changefeeds Dashboard.
When you are done, exit the SQL shell (
\q
).To stop
cockroach
:Get the process ID of the node:
ps -ef | grep cockroach | grep -v grep
501 21766 1 0 6:21PM ttys001 0:00.89 cockroach start-single-node --insecure --listen-addr=localhost
Gracefully shut down the node, specifying its process ID:
kill -TERM 21766
initiating graceful shutdown of server server drained and shutdown completed
Create a changefeed connected to an Azure Event Hubs sink
New in v24.1: In this example, you'll set up a changefeed for a single-node cluster that is connected to Azure Event Hubs. The changefeed watches two tables. You'll need access to the Azure Developer Portal, you can sign up for an Azure free account.
If you do not already have one, request a trial Enterprise license.
Use the
cockroach start-single-node
command to start a single-node cluster:cockroach start-single-node --insecure --listen-addr=localhost
In this example, you'll run CockroachDB's Movr application workload to set up some data for your changefeed.
In a new terminal, first create the schema for the workload:
cockroach workload init movr "postgresql://root@127.0.0.1:26257?sslmode=disable"
Then run the workload:
cockroach workload run movr --duration=1m "postgresql://root@127.0.0.1:26257?sslmode=disable"
As the
root
user, open the built-in SQL client:$ cockroach sql --insecure
Set your organization name and Enterprise license key that you received via email:
> SET CLUSTER SETTING cluster.organization = '<organization name>';
> SET CLUSTER SETTING enterprise.license = '<secret>';
Enable the
kv.rangefeed.enabled
cluster setting:> SET CLUSTER SETTING kv.rangefeed.enabled = true;
Note:If you are working on a CockroachDB Serverless cluster, the
kv.rangefeed.enabled
cluster setting is enabled by default.To prepare the Azure Event Hubs sink, in the Portal, create a resource group.
Once you have a resource group, select Event Hubs in the portal navigation menu and then + Create from the Event Hubs page tool bar to create a namespace. Complete the naming details for the namespace and select the resource group you created in the previous step. For Pricing Tier, ensure that you select Standard or Premium (the Basic tier does not support Apache Kafka workloads). Complete your configuration options, and then create the namespace.
Select Go to resource after the deployment is complete. On the namespace overview page, select + Event Hub in the tool bar to create an event hub within this namespace. An Event Hub is equivalent to a Kafka topic. On the creation page, add a name and the configuration for partitions and message retention.
Note:You can manually create an Event Hub, or Event Hubs will create automatically when you start the changefeed.
To send changefeed messages to your Event Hub, you will need a
shared_access_key
. Find Shared access policies in the left-hand navigation. Click + Add and provide a name for the policy as well as confirming the permissions. Once created, click on the policy name and then copy the Primary key — you will need this to create your changefeed.Build the connection string to your Event Hub. You will need the following:
- The
azure-event-hub://
orkafka://
URI scheme. - The Host name from your Event Hubs namespace overview page. It will be something like:
{your-event-hubs-namespace}.servicebus.windows.net
. - The port
:9093
for the Kafka protocol. - The
shared_access_key_name
parameter with the name of your policy. - The
shared_access_key
parameter with your URL-encoded primary key.
'azure-event-hub://{event-hubs-namespace}.servicebus.windows.net:9093?shared_access_key_name={policy-name}&shared_access_key={url-encoded key}'
- The
You can also use a kafka://
scheme in the URI:
'kafka://{event-hubs-namespace}.servicebus.windows.net:9093?shared_access_key_name={policy-name}&shared_access_key={url-encoded key}'
You can include the topic_name
or topic_prefix
parameters to create and name an Event Hub for emitted messages.
Create an external connection to store your connection string:
CREATE EXTERNAL CONNECTION eventhub AS 'azure-event-hub://{your-event-hubs-namespace}.servicebus.windows.net:9093?shared_access_key_name={policy-name}&shared_access_key={url-encoded key}';
Create the changefeed to your Event Hub:
CREATE CHANGEFEED FOR TABLE movr.rides, movr.users INTO 'external://eventhub';
job_id ---------------------- 953466342378274817 (1 row) NOTICE: changefeed will emit to topic rides NOTICE: changefeed will emit to topic users
To confirm the messages emitted to your Event Hub, navigate to the Event Hub's overview page that contains Requests, Messages, and Throughput counters. View the messages by clicking Process Data in the left-hand navigation menu. Then select the Enable real time insights from events feature box. On the Query page, your messages will load under Input Preview.
Create a changefeed connected to a webhook sink
CREATE CHANGEFEED
is an Enterprise-only feature. For the Core version, see the CHANGEFEED FOR
example.
In this example, you'll set up a changefeed for a single-node cluster that is connected to a local HTTP server via a webhook. For this example, you'll use an example HTTP server to test out the webhook sink.
If you do not already have one, request a trial Enterprise license.
Use the
cockroach start-single-node
command to start a single-node cluster:$ cockroach start-single-node --insecure --listen-addr=localhost
In this example, you'll run CockroachDB's Movr application workload to set up some data for your changefeed.
First create the schema for the workload:
cockroach workload init movr "postgresql://root@127.0.0.1:26257?sslmode=disable"
Then run the workload:
cockroach workload run movr --duration=1m "postgresql://root@127.0.0.1:26257?sslmode=disable"
As the
root
user, open the built-in SQL client:$ cockroach sql --insecure
Set your organization name and Enterprise license key that you received via email:
> SET CLUSTER SETTING cluster.organization = '<organization name>';
> SET CLUSTER SETTING enterprise.license = '<secret>';
Enable the
kv.rangefeed.enabled
cluster setting:> SET CLUSTER SETTING kv.rangefeed.enabled = true;
Note:If you are working on a CockroachDB Serverless cluster, the
kv.rangefeed.enabled
cluster setting is enabled by default.In a separate terminal window, set up your HTTP server. Clone the test repository:
git clone https://github.com/cockroachlabs/cdc-webhook-sink-test-server.git
cd cdc-webhook-sink-test-server/go-https-server
Next make the script executable and then run the server (passing a specific port if preferred, otherwise it will default to
:3000
):chmod +x ./server.sh
./server.sh <port>
Back in your SQL shell, run the following statement to create a changefeed that emits to your webhook sink:
CREATE CHANGEFEED FOR TABLE movr.vehicles INTO 'webhook-https://localhost:3000?insecure_tls_skip_verify=true' WITH updated;
You set up a changefeed on the
vehicles
table, which emits changefeed messages to the local HTTP server.See the options table for more information on the options available for creating your changefeed to a webhook sink.
job_id ---------------------- 687842491801632769 (1 row)
In the terminal where your HTTP server is running, you'll receive output similar to:
2021/08/24 14:00:21 {"payload":[{"after":{"city":"rome","creation_time":"2019-01-02T03:04:05","current_location":"39141 Travis Curve Suite 87","ext":{"brand":"Schwinn","color":"red"},"id":"d7b18299-c0c4-4304-9ef7-05ae46fd5ee1","dog_owner_id":"5d0c85b5-8866-47cf-a6bc-d032f198e48f","status":"in_use","type":"bike"},"key":["rome","d7b18299-c0c4-4304-9ef7-05ae46fd5ee1"],"topic":"vehicles","updated":"1629813621680097993.0000000000"}],"length":1} 2021/08/24 14:00:22 {"payload":[{"after":{"city":"san francisco","creation_time":"2019-01-02T03:04:05","current_location":"84888 Wallace Wall","ext":{"color":"black"},"id":"020cf7f4-6324-48a0-9f74-6c9010fb1ab4","dog_owner_id":"b74ea421-fcaf-4d80-9dcc-d222d49bdc17","status":"available","type":"scooter"},"key":["san francisco","020cf7f4-6324-48a0-9f74-6c9010fb1ab4"],"topic":"vehicles","updated":"1629813621680097993.0000000000"}],"length":1} 2021/08/24 14:00:22 {"payload":[{"after":{"city":"san francisco","creation_time":"2019-01-02T03:04:05","current_location":"3893 Dunn Fall Apt. 11","ext":{"color":"black"},"id":"21b2ec54-81ad-4af7-a76d-6087b9c7f0f8","dog_owner_id":"8924c3af-ea6e-4e7e-b2c8-2e318f973393","status":"lost","type":"scooter"},"key":["san francisco","21b2ec54-81ad-4af7-a76d-6087b9c7f0f8"],"topic":"vehicles","updated":"1629813621680097993.0000000000"}],"length":1}
For more detail on emitted changefeed messages, see responses.
Create a changefeed connected to an Apache Pulsar sink
This feature is in preview. This feature is subject to change. To share feedback and/or issues, contact Support.
New in v24.1: In this example, you'll set up a changefeed for a single-node cluster that is connected to an Apache Pulsar sink. The changefeed will watch a table and send messages to the sink.
If you do not already have one, request a trial Enterprise license.
Use the
cockroach start-single-node
command to start a single-node cluster:cockroach start-single-node --insecure --listen-addr=localhost
In this example, you'll run CockroachDB's Movr application workload to set up some data for your changefeed.
In a new terminal, first create the schema for the workload:
cockroach workload init movr "postgresql://root@127.0.0.1:26257?sslmode=disable"
Then run the workload:
cockroach workload run movr --duration=1m "postgresql://root@127.0.0.1:26257?sslmode=disable"
As the
root
user, open the built-in SQL client:$ cockroach sql --insecure
Set your organization name and Enterprise license key that you received via email:
> SET CLUSTER SETTING cluster.organization = '<organization name>';
> SET CLUSTER SETTING enterprise.license = '<secret>';
Enable the
kv.rangefeed.enabled
cluster setting:> SET CLUSTER SETTING kv.rangefeed.enabled = true;
Note:If you are working on a CockroachDB Serverless cluster, the
kv.rangefeed.enabled
cluster setting is enabled by default.To prepare a Pulsar sink, refer to the Apache Pulsar documentation for setup guides to host Pulsar on a local cluster, Docker, or a Kubernetes cluster.
In a terminal window where your Pulsar sink is hosted, start the cluster:
bin/pulsar standalone
If you're running Pulsar in a Docker container, use the
docker run
command to start the cluster:docker run -it -p 6650:6650 -p 8080:8080 --name pulsar-standalone apachepulsar/pulsar:latest bin/pulsar standalone
Note:You can start a changefeed, and Pulsar will automatically use the table as the topic name.
If you want to create a topic name first, use the
pulsar-admin
tool to specify the topic's tenant and namespace. This example uses the default namespace:bin/pulsar-admin topics create persistent://public/default/topic-name
For more detail on persistent topics and working with topic resources, refer to the Manage Topics Apache Pulsar documentation.
Enter the SQL shell:
cockroach sql --insecure
Create your changefeed:
CREATE CHANGEFEED FOR TABLE movr.rides INTO 'pulsar://{host IP}:6650';
By default, Apache Pulsar listens for client connections on port
:6650
. For more detail on configuration, refer to the Apache Pulsar documentation.Changefeeds emitting to a Pulsar sink do not support external connections or a number of changefeed options. For a full list, refer to the Changefeed Sinks page.
In a different terminal window, start a Pulsar consumer to read messages from the changefeed. This example consumes messages from the
rides
topic:bin/pulsar-client consume rides -s sub1 -n 0
If you're running Pulsar in a Docker container, use the
docker run
command to start a consumer:docker run -it --network="host" apachepulsar/pulsar:latest bin/pulsar-client consume rides -s sub1 -n 0
You will receive the changefeed's messages similar to the following:
----- got message ----- key:[null], properties:[], content:{"Key":["seattle", "09265ab7-5f3a-40cb-a543-d37c8c893793"],"Value":{"after": {"city": "seattle", "end_address": null, "end_time": null, "id": "09265ab7-5f3a-40cb-a543-d37c8c893793", "revenue": 53.00, "rider_id": "44576296-d4a7-4e79-add9-f880dd951064", "start_address": "25795 Alyssa Extensions", "start_time": "2024-05-09T12:18:42.022952", "vehicle_city": "seattle", "vehicle_id": "a0c935f6-8872-408e-bc12-4d0b5a85fa71"}},"Topic":"rides"} ----- got message ----- key:[null], properties:[], content:{"Key":["amsterdam", "b3548485-9475-44cf-9769-66617b9cb151"],"Value":{"after": {"city": "amsterdam", "end_address": null, "end_time": null, "id": "b3548485-9475-44cf-9769-66617b9cb151", "revenue": 25.00, "rider_id": "adf4656f-6a0d-4315-b035-eaf7fa6b85eb", "start_address": "49614 Victoria Cliff Apt. 25", "start_time": "2024-05-09T12:18:42.763718", "vehicle_city": "amsterdam", "vehicle_id": "eb1d1d2c-865e-4a40-a7d7-8f396c1c063f"}},"Topic":"rides"} ----- got message ----- key:[null], properties:[], content:{"Key":["amsterdam", "d119f344-318f-41c0-bfc0-b778e6e38f9a"],"Value":{"after": {"city": "amsterdam", "end_address": null, "end_time": null, "id": "d119f344-318f-41c0-bfc0-b778e6e38f9a", "revenue": 24.00, "rider_id": "1a242414-f704-4e1f-9f5e-2b468af0c2d1", "start_address": "54909 Douglas Street Suite 51", "start_time": "2024-05-09T12:18:42.369755", "vehicle_city": "amsterdam", "vehicle_id": "99d98e05-3114-460e-bb02-828bcd745d44"}},"Topic":"rides"} ----- got message ----- key:[null], properties:[], content:{"Key":["rome", "3c7d6676-f713-4985-ba52-4c19fe6c3692"],"Value":{"after": {"city": "rome", "end_address": null, "end_time": null, "id": "3c7d6676-f713-4985-ba52-4c19fe6c3692", "revenue": 27.00, "rider_id": "c15a4926-fbb2-4931-a9a0-6dfabc6c506b", "start_address": "39415 Brandon Avenue Apt. 29", "start_time": "2024-05-09T12:18:42.055498", "vehicle_city": "rome", "vehicle_id": "627dad1a-3531-4214-a173-16bcc6b93036"}},"Topic":"rides"}
For more detail on emitted changefeed messages, refer to Responses.
Core changefeeds stream row-level changes to a client until the underlying SQL connection is closed.
Create a Core changefeed
In this example, you'll set up a core changefeed for a single-node cluster.
In a terminal window, start
cockroach
:$ cockroach start-single-node \ --insecure \ --listen-addr=localhost \ --background
As the
root
user, open the built-in SQL client:$ cockroach sql \ --url="postgresql://root@127.0.0.1:26257?sslmode=disable" \ --format=csv
Note:Because core changefeeds return results differently than other SQL statements, they require a dedicated database connection with specific settings around result buffering. In normal operation, CockroachDB improves performance by buffering results server-side before returning them to a client; however, result buffering is automatically turned off for core changefeeds. Core changefeeds also have different cancellation behavior than other queries: they can only be canceled by closing the underlying connection or issuing a
CANCEL QUERY
statement on a separate connection. Combined, these attributes of changefeeds mean that applications should explicitly create dedicated connections to consume changefeed data, instead of using a connection pool as most client drivers do by default.Note:To determine how wide the columns need to be, the default
table
display format incockroach sql
buffers the results it receives from the server before printing them to the console. When consuming core changefeed data usingcockroach sql
, it's important to use a display format likecsv
that does not buffer its results. To set the display format, use the--format=csv
flag when starting the built-in SQL client, or set the\set display_format=csv
option once the SQL client is open.Enable the
kv.rangefeed.enabled
cluster setting:> SET CLUSTER SETTING kv.rangefeed.enabled = true;
Create table
foo
:> CREATE TABLE foo (a INT PRIMARY KEY);
Insert a row into the table:
> INSERT INTO foo VALUES (0);
Start the core changefeed:
> EXPERIMENTAL CHANGEFEED FOR foo;
table,key,value foo,[0],"{""after"": {""a"": 0}}"
In a new terminal, add another row:
$ cockroach sql --insecure -e "INSERT INTO foo VALUES (1)"
Back in the terminal where the core changefeed is streaming, the following output has appeared:
foo,[1],"{""after"": {""a"": 1}}"
Note that records may take a couple of seconds to display in the core changefeed.
To stop streaming the changefeed, enter CTRL+C into the terminal where the changefeed is running.
To stop
cockroach
:Get the process ID of the node:
ps -ef | grep cockroach | grep -v grep
501 21766 1 0 6:21PM ttys001 0:00.89 cockroach start-single-node --insecure --listen-addr=localhost
Gracefully shut down the node, specifying its process ID:
kill -TERM 21766
initiating graceful shutdown of server server drained and shutdown completed
Create a Core changefeed using Avro
In this example, you'll set up a core changefeed for a single-node cluster that emits Avro records. CockroachDB's Avro binary encoding convention uses the Confluent Schema Registry to store Avro schemas.
Use the
cockroach start-single-node
command to start a single-node cluster:$ cockroach start-single-node \ --insecure \ --listen-addr=localhost \ --background
Download and extract the Confluent Open Source platform.
Move into the extracted
confluent-<version>
directory and start Confluent:$ ./bin/confluent local services start
Only
zookeeper
,kafka
, andschema-registry
are needed. To troubleshoot Confluent, see their docs and the Quick Start Guide.As the
root
user, open the built-in SQL client:$ cockroach sql --url="postgresql://root@127.0.0.1:26257?sslmode=disable" --format=csv
Note:Because core changefeeds return results differently than other SQL statements, they require a dedicated database connection with specific settings around result buffering. In normal operation, CockroachDB improves performance by buffering results server-side before returning them to a client; however, result buffering is automatically turned off for core changefeeds. Core changefeeds also have different cancellation behavior than other queries: they can only be canceled by closing the underlying connection or issuing a
CANCEL QUERY
statement on a separate connection. Combined, these attributes of changefeeds mean that applications should explicitly create dedicated connections to consume changefeed data, instead of using a connection pool as most client drivers do by default.Note:To determine how wide the columns need to be, the default
table
display format incockroach sql
buffers the results it receives from the server before printing them to the console. When consuming core changefeed data usingcockroach sql
, it's important to use a display format likecsv
that does not buffer its results. To set the display format, use the--format=csv
flag when starting the built-in SQL client, or set the\set display_format=csv
option once the SQL client is open.Enable the
kv.rangefeed.enabled
cluster setting:> SET CLUSTER SETTING kv.rangefeed.enabled = true;
Create table
bar
:> CREATE TABLE bar (a INT PRIMARY KEY);
Insert a row into the table:
> INSERT INTO bar VALUES (0);
Start the core changefeed:
> EXPERIMENTAL CHANGEFEED FOR bar WITH format = avro, confluent_schema_registry = 'http://localhost:8081';
table,key,value bar,\000\000\000\000\001\002\000,\000\000\000\000\002\002\002\000
In a new terminal, add another row:
$ cockroach sql --insecure -e "INSERT INTO bar VALUES (1)"
Back in the terminal where the core changefeed is streaming, the output will appear:
bar,\000\000\000\000\001\002\002,\000\000\000\000\002\002\002\002
Note that records may take a couple of seconds to display in the core changefeed.
To stop streaming the changefeed, enter CTRL+C into the terminal where the changefeed is running.
To stop
cockroach
:Get the process ID of the node:
ps -ef | grep cockroach | grep -v grep
501 21766 1 0 6:21PM ttys001 0:00.89 cockroach start-single-node --insecure --listen-addr=localhost
Gracefully shut down the node, specifying its process ID:
kill -TERM 21766
initiating graceful shutdown of server server drained and shutdown completed
To stop Confluent, move into the extracted
confluent-<version>
directory and stop Confluent:$ ./bin/confluent local services stop
To terminate all Confluent processes, use:
$ ./bin/confluent local destroy
For further information on Core changefeeds, see EXPERIMENTAL CHANGEFEED FOR
.