EXPERIMENTAL CHANGEFEED FOR
is the core implementation of changefeeds. For the Enterprise-only version, see CREATE CHANGEFEED
.
The EXPERIMENTAL CHANGEFEED FOR
statement creates a new core changefeed, which streams row-level changes to the client indefinitely until the underlying connection is closed or the changefeed is canceled. A core changefeed can watch one table or multiple tables in a comma-separated list.
For more information, see Change Data Capture Overview.
This feature is in preview. This feature is subject to change. To share feedback and/or issues, contact Support.
Required privileges
Starting in v22.2, CockroachDB introduces a new system-level privilege model that provides finer control over a user's privilege to work with the database, including creating and managing changefeeds.
There is continued support for the legacy privilege model for changefeeds in v22.2, however it will be removed in a future release of CockroachDB. We recommend implementing the new privilege model that follows in this section for all changefeeds.
New in v22.2:
You can grant a user the CHANGEFEED
privilege to allow them to create changefeeds on a specific table:
GRANT CHANGEFEED ON TABLE example_table TO user;
This privilege model provides a more granular way to grant users the ability to create a changefeed on a table. A user granted the CHANGEFEED
privilege can create changefeeds on the target table even if the user does not have the CONTROLCHANGEFEED
role option or the SELECT
privilege on the table.
Since you can grant the CHANGEFEED
privilege to a user or role without them needing the SELECT
privilege on a table, these users will be able to create changefeeds, but they will not be able to run a SELECT
query on that data directly. However, these users will be able to read this data indirectly as EXPERIMENTAL CHANGEFEED FOR
emits changefeed messages to the SQL session.
You can add CHANGEFEED
to the user or role's default privileges with ALTER DEFAULT PRIVILEGES
:
ALTER DEFAULT PRIVILEGES GRANT CHANGEFEED ON TABLES TO user;
Users with the CONTROLCHANGEFEED
role option must have SELECT
on each table, even if they are also granted the CHANGEFEED
privilege.
Legacy privilege model
Changefeeds can only be created by superusers, i.e., members of the admin
role. The admin role exists by default with root
as the member.
Considerations
Because core changefeeds return results differently than other SQL statements, they require a dedicated database connection with specific settings around result buffering. In normal operation, CockroachDB improves performance by buffering results server-side before returning them to a client; however, result buffering is automatically turned off for core changefeeds. Core changefeeds also have different cancellation behavior than other queries: they can only be canceled by closing the underlying connection or issuing a
CANCEL QUERY
statement on a separate connection. Combined, these attributes of changefeeds mean that applications should explicitly create dedicated connections to consume changefeed data, instead of using a connection pool as most client drivers do by default.This cancellation behavior (i.e., close the underlying connection to cancel the changefeed) also extends to client driver usage; in particular, when a client driver calls
Rows.Close()
after encountering errors for a stream of rows. The pgwire protocol requires that the rows be consumed before the connection is again usable, but in the case of a core changefeed, the rows are never consumed. It is therefore critical that you close the connection, otherwise the application will be blocked forever onRows.Close()
.In most cases, each version of a row will be emitted once. However, some infrequent conditions (e.g., node failures, network partitions) will cause them to be repeated. This gives our changefeeds an at-least-once delivery guarantee. For more information, see Ordering Guarantees.
As of v22.1, changefeeds filter out
VIRTUAL
computed columns from events by default. This is a backward-incompatible change. To maintain the changefeed behavior in previous versions whereNULL
values are emitted for virtual computed columns, see thevirtual_columns
option for more detail.
Synopsis
> EXPERIMENTAL CHANGEFEED FOR table_name [ WITH (option [= value] [, ...]) ];
Parameters
Parameter | Description |
---|---|
table_name |
The name of the table (or tables in a comma separated list) to create a changefeed for. |
option / value |
For a list of available options and their values, see Options below. |
Options
Option | Value | Description |
---|---|---|
confluent_schema_registry |
Schema Registry address | The Schema Registry address is required to use avro . |
cursor |
Timestamp | Emits any changes after the given timestamp, but does not output the current state of the table first. If cursor is not specified, the changefeed starts by doing a consistent scan of all the watched rows and emits the current value, then moves to emitting any changes that happen after the scan.cursor can be used to start a new changefeed where a previous changefeed ended.Example: CURSOR=1536242855577149065.0000000000 |
end_time |
Timestamp | Indicate the timestamp up to which the changefeed will emit all events and then complete with a successful status. Provide a future timestamp to end_time in number of nanoseconds since the Unix epoch. For example, end_time="1655402400000000000" . |
envelope |
key_only / row / wrapped |
key_only emits only the key and no value, which is faster if you only want to know when the key changes.row emits the row without any additional metadata fields in the message. row does not support avro format.wrapped emits the full message including any metadata fields. See Responses for more detail on message format.Default: envelope=wrapped |
format |
json / avro |
Format of the emitted record. Currently, support for Avro is limited. Default: format=json . |
initial_scan / no_initial_scan / initial_scan_only |
N/A | Control whether or not an initial scan will occur at the start time of a changefeed. initial_scan_only will perform an initial scan and then the changefeed job will complete with a successful status. You cannot use end_time and initial_scan_only simultaneously.If none of these options are specified, an initial scan will occur if there is no cursor , and will not occur if there is one. This preserves the behavior from previous releases. You cannot specify initial_scan and no_initial_scan or no_initial_scan and initial_scan_only simultaneously.Default: initial_scan If used in conjunction with cursor , an initial scan will be performed at the cursor timestamp. If no cursor is specified, the initial scan is performed at now() . |
min_checkpoint_frequency |
Duration string | Controls how often nodes flush their progress to the coordinating changefeed node. Changefeeds will wait for at least the specified duration before a flushing. This can help you control the flush frequency to achieve better throughput. If this is set to 0s , a node will flush as long as the high-water mark has increased for the ranges that particular node is processing. If a changefeed is resumed, then min_checkpoint_frequency is the amount of time that changefeed will need to catch up. That is, it could emit duplicate messages during this time. Note: resolved messages will not be emitted more frequently than the configured min_checkpoint_frequency (but may be emitted less frequently). Since min_checkpoint_frequency defaults to 30s , you must configure min_checkpoint_frequency to at least the desired resolved message frequency if you require resolved messages more frequently than 30s .Default: 30s |
mvcc_timestamp |
N/A | Include the MVCC timestamp for each emitted row in a changefeed. With the mvcc_timestamp option, each emitted row will always contain its MVCC timestamp, even during the changefeed's initial backfill. |
resolved |
INTERVAL |
Emits resolved timestamp events for the changefeed. Resolved timestamp events do not emit until all ranges in the changefeed have progressed to a specific point in time. Set an optional minimal duration between emitting resolved timestamps. Example: resolved='10s' . This option will only emit a resolved timestamp event if the timestamp has advanced and at least the optional duration has elapsed. If unspecified, all resolved timestamps are emitted as the high-water mark advances.Note: If you require resolved message frequency under 30s , then you must set the min_checkpoint_frequency option to at least the desired resolved frequency. This is because resolved messages will not be emitted more frequently than min_checkpoint_frequency , but may be emitted less frequently. |
split_column_families |
N/A | Target a table with multiple columns families. Emit messages for each column family in the target table. Each message will include the label: table.family . |
updated |
N/A | Include updated timestamps with each row. |
virtual_columns |
STRING |
Changefeeds omit virtual computed columns from emitted messages by default. To maintain the behavior of previous CockroachDB versions where the changefeed would emit NULL values for virtual computed columns, set virtual_columns = "null" when you start a changefeed. You may also define virtual_columns = "omitted" , though this is already the default behavior for v22.1+. If you do not set "omitted" on a table with virtual computed columns when you create a changefeed, you will receive a warning that changefeeds will filter out virtual computed values. Default: "omitted" |
Avro limitations
Below are clarifications for particular SQL types and values for Avro changefeeds:
- Decimals must have precision specified.
BYTES
(or its aliasesBYTEA
andBLOB
) are often used to store machine-readable data. When you stream these types through a changefeed withformat=avro
, CockroachDB does not encode or change the data. However, Avro clients can often include escape sequences to present the data in a printable format, which can interfere with deserialization. A potential solution is to hex-encodeBYTES
values when initially inserting them into CockroachDB. This will ensure that Avro clients can consistently decode the hexadecimal. Note that hex-encoding values at insertion will increase record size.BIT
andVARBIT
types are encoded as arrays of 64-bit integers.For efficiency, CockroachDB encodes
BIT
andVARBIT
bitfield types as arrays of 64-bit integers. That is, base-2 (binary format)BIT
andVARBIT
data types are converted to base 10 and stored in arrays. Encoding in CockroachDB is big-endian, therefore the last value may have many trailing zeroes. For this reason, the first value of each array is the number of bits that are used in the last value of the array.For instance, if the bitfield is 129 bits long, there will be 4 integers in the array. The first integer will be
1
; representing the number of bits in the last value, the second integer will be the first 64 bits, the third integer will be bits 65–128, and the last integer will either be0
or9223372036854775808
(i.e., the integer with only the first bit set, or1000000000000000000000000000000000000000000000000000000000000000
when base 2).This example is base-10 encoded into an array as follows:
{"array": [1, <first 64 bits>, <second 64 bits>, 0 or 9223372036854775808]}
For downstream processing, it is necessary to base-2 encode every element in the array (except for the first element). The first number in the array gives you the number of bits to take from the last base-2 number — that is, the most significant bits. So, in the example above this would be
1
. Finally, all the base-2 numbers can be appended together, which will result in the original number of bits, 129.In a different example of this process where the bitfield is 136 bits long, the array would be similar to the following when base-10 encoded:
{"array": [8, 18293058736425533439, 18446744073709551615, 13690942867206307840]}
To then work with this data, you would convert each of the elements in the array to base-2 numbers, besides the first element. For the above array, this would convert to:
[8, 1111110111011011111111111111111111111111111111111111111111111111, 1111111111111111111111111111111111111111111111111111111111111111, 1011111000000000000000000000000000000000000000000000000000000000]
Next, you use the first element in the array to take the number of bits from the last base-2 element,
10111110
. Finally, you append each of the base-2 numbers together — in the above array, the second, third, and truncated last element. This results in 136 bits, the original number of bits.
Examples
Create a changefeed
To start a changefeed:
EXPERIMENTAL CHANGEFEED FOR cdc_test;
In the terminal where the core changefeed is streaming, the output will appear:
table,key,value
cdc_test,[0],"{""after"": {""a"": 0}}"
For step-by-step guidance on creating a Core changefeed, see the Changefeed Examples page.
Create a changefeed with Avro
To start a changefeed in Avro format:
EXPERIMENTAL CHANGEFEED FOR cdc_test WITH format = avro, confluent_schema_registry = 'http://localhost:8081';
In the terminal where the core changefeed is streaming, the output will appear:
table,key,value
cdc_test,\000\000\000\000\001\002\000,\000\000\000\000\002\002\002\000
For step-by-step guidance on creating a Core changefeed with Avro, see the Changefeed Examples page.
Create a changefeed on a table with column families
To create a changefeed on a table with column families, use the FAMILY
keyword for a specific column family:
EXPERIMENTAL CHANGEFEED FOR TABLE cdc_test FAMILY f1;
To create a changefeed on a table and output changes for each column family, use the split_column_families
option:
EXPERIMENTAL CHANGEFEED FOR TABLE cdc_test WITH split_column_families;
For step-by-step guidance creating a Core changefeed on a table with multiple column families, see the Changefeed Examples page.