EXPERIMENTAL CHANGEFEED FOR
is the core implementation of changefeeds. For the enterprise-only version, see CREATE CHANGEFEED
.
The EXPERIMENTAL CHANGEFEED FOR
statement creates a new core changefeed, which streams row-level changes to the client indefinitely until the underlying connection is closed or the changefeed is canceled. A core changefeed can watch one table or multiple tables in a comma-separated list.
For more information, see Change Data Capture.
This is an experimental feature. The interface and output are subject to change.
Required privileges
Changefeeds can only be created by superusers, i.e., members of the admin
role. The admin role exists by default with root
as the member.
Considerations
Because core changefeeds return results differently than other SQL statements, they require a dedicated database connection with specific settings around result buffering. In normal operation, CockroachDB improves performance by buffering results server-side before returning them to a client; however, result buffering is automatically turned off for core changefeeds. Core changefeeds also have different cancellation behavior than other queries: they can only be canceled by closing the underlying connection or issuing a
CANCEL QUERY
statement on a separate connection. Combined, these attributes of changefeeds mean that applications should explicitly create dedicated connections to consume changefeed data, instead of using a connection pool as most client drivers do by default.This cancellation behavior (i.e., close the underlying connection to cancel the changefeed) also extends to client driver usage; in particular, when a client driver calls
Rows.Close()
after encountering errors for a stream of rows. The pgwire protocol requires that the rows be consumed before the connection is again usable, but in the case of a core changefeed, the rows are never consumed. It is therefore critical that you close the connection, otherwise the application will be blocked forever onRows.Close()
.In most cases, each version of a row will be emitted once. However, some infrequent conditions (e.g., node failures, network partitions) will cause them to be repeated. This gives our changefeeds an at-least-once delivery guarantee. For more information, see Change Data Capture - Ordering Guarantees.
Synopsis
> EXPERIMENTAL CHANGEFEED FOR table_name [ WITH (option [= value] [, ...]) ];
Parameters
Parameter | Description |
---|---|
table_name |
The name of the table (or tables in a comma separated list) to create a changefeed for. |
option / value |
For a list of available options and their values, see Options below. |
Options
Option | Value | Description |
---|---|---|
updated |
N/A | Include updated timestamps with each row. |
resolved |
INTERVAL |
Periodically emit resolved timestamps to the changefeed. Optionally, set a minimum duration between emitting resolved timestamps. If unspecified, all resolved timestamps are emitted. Example: resolved='10s' |
envelope |
key_only / row |
Use key_only to emit only the key and no value, which is faster if you only want to know when the key changes.Default: envelope=row |
cursor |
Timestamp | Emits any changes after the given timestamp, but does not output the current state of the table first. If cursor is not specified, the changefeed starts by doing a consistent scan of all the watched rows and emits the current value, then moves to emitting any changes that happen after the scan.cursor can be used to start a new changefeed where a previous changefeed ended.Example: CURSOR=1536242855577149065.0000000000 |
format |
json / experimental_avro |
Format of the emitted record. Currently, support for Avro is limited and experimental. Default: format=json . |
confluent_schema_registry |
Schema Registry address | The Schema Registry address is required to use experimental_avro . |
Avro limitations
Currently, support for Avro is limited and experimental. Below is a list of unsupported SQL types and values for Avro changefeeds:
- Decimals must have precision specified.
Decimals with
NaN
or infinite values cannot be written in Avro.Note:To avoid
NaN
or infinite values, add aCHECK
constraint to prevent these values from being inserted into decimal columns.time
,date
,interval
,uuid
,inet
,array
, andjsonb
are not supported in Avro yet.
Examples
Create a changefeed
In this example, you'll set up a core changefeed for a single-node cluster.
Use the
cockroach start-single-node
command to start a single-node cluster:$ cockroach start-single-node \ --insecure \ --listen-addr=localhost \ --background
As the
root
user, open the built-in SQL client:$ cockroach sql \ --format=csv \ --insecure
Note:To determine how wide the columns need to be, the default
table
display format incockroach sql
buffers the results it receives from the server before printing them to the console. When consuming core changefeed data usingcockroach sql
, it's important to use a display format likecsv
that does not buffer its results. To set the display format, use the--format=csv
flag when starting the built-in SQL client, or set the\set display_format=csv
option once the SQL client is open.Enable the
kv.rangefeed.enabled
cluster setting:> SET CLUSTER SETTING kv.rangefeed.enabled = true;
Create table
foo
:> CREATE TABLE foo (a INT PRIMARY KEY);
Insert a row into the table:
> INSERT INTO foo VALUES (0);
Start the core changefeed:
> EXPERIMENTAL CHANGEFEED FOR foo WITH resolved = '10s';
table,key,value foo,[0],"{""after"": {""a"": 0}}" NULL,NULL,"{""resolved"":""1590611959605806000.0000000000""}"
This changefeed will emit
resolved
timestamps every 10 seconds. Depending on how quickly you insert into your watched table, the output could look different than what is shown here.In a new terminal, add another row:
$ cockroach sql --insecure -e "INSERT INTO foo VALUES (1)"
Back in the terminal where the core changefeed is streaming, the following output has appeared:
table,key,value foo,[0],"{""after"": {""a"": 0}}" NULL,NULL,"{""resolved"":""1590611959605806000.0000000000""}" foo,[1],"{""after"": {""a"": 1}}" NULL,NULL,"{""resolved"":""1590611970141415000.0000000000""}"
Note that records may take a couple of seconds to display in the core changefeed.
To stop streaming the changefeed, enter CTRL+C into the terminal where the changefeed is running.
To stop
cockroach
, run:$ cockroach quit --insecure
Create a changefeed with Avro
In this example, you'll set up a core changefeed for a single-node cluster that emits Avro records. CockroachDB's Avro binary encoding convention uses the Confluent Schema Registry to store Avro schemas.
Use the
cockroach start-single-node
command to start a single-node cluster:$ cockroach start-single-node \ --insecure \ --listen-addr=localhost \ --background
Download and extract the Confluent Open Source platform.
Move into the extracted
confluent-<version>
directory and start Confluent:$ ./bin/confluent start
Only
zookeeper
,kafka
, andschema-registry
are needed. To troubleshoot Confluent, see their docs.As the
root
user, open the built-in SQL client:$ cockroach sql \ --format=csv \ --insecure
Note:To determine how wide the columns need to be, the default
table
display format incockroach sql
buffers the results it receives from the server before printing them to the console. When consuming core changefeed data usingcockroach sql
, it's important to use a display format likecsv
that does not buffer its results. To set the display format, use the--format=csv
flag when starting the built-in SQL client, or set the\set display_format=csv
option once the SQL client is open.Enable the
kv.rangefeed.enabled
cluster setting:> SET CLUSTER SETTING kv.rangefeed.enabled = true;
Create table
bar
:> CREATE TABLE bar (a INT PRIMARY KEY);
Insert a row into the table:
> INSERT INTO bar VALUES (0);
Start the core changefeed:
> EXPERIMENTAL CHANGEFEED FOR bar \ WITH format = experimental_avro, confluent_schema_registry = 'http://localhost:8081', resolved = '10s';
table,key,value bar,\000\000\000\000\001\002\000,\000\000\000\000\002\002\002\000 NULL,NULL,\000\000\000\000\003\002<1590612821682559000.0000000000
This changefeed will emit
resolved
timestamps every 10 seconds. Depending on how quickly you insert into your watched table, the output could look different than what is shown here.In a new terminal, add another row:
$ cockroach sql --insecure -e "INSERT INTO bar VALUES (1)"
Back in the terminal where the core changefeed is streaming, the output will appear:
bar,\000\000\000\000\001\002\002,\000\000\000\000\002\002\002\002 NULL,NULL,\000\000\000\000\003\002<1590612831891317000.0000000000
Note that records may take a couple of seconds to display in the core changefeed.
To stop streaming the changefeed, enter CTRL+C into the terminal where the changefeed is running.
To stop
cockroach
, run:$ cockroach quit --insecure
To stop Confluent, move into the extracted
confluent-<version>
directory and stop Confluent:$ ./bin/confluent stop
To terminate all Confluent processes, use:
$ ./bin/confluent destroy