New in v2.1: The CREATE CHANGEFEED
statement creates a new changefeed, which provides row-level change subscriptions.
Changefeeds target an allowlist of tables, called the "watched rows." Every change to a watched row is emitted as a record in a configurable format (JSON
) to a configurable sink (Kafka).
For more information, see Change Data Capture.
This feature is under active development and only works for a targeted a use case. Please file a Github issue if you have feedback on the interface.
CREATE CHANGEFEED
is an enterprise-only. There will be a core version in a future version.
Required privileges
Changefeeds can only be created by superusers, i.e., members of the admin
role. The admin role exists by default with root
as the member.
Synopsis
Parameters
Parameter | Description |
---|---|
table_name |
The name of the table (or tables in a comma separated list) to create a changefeed for. |
sink |
The location of the configurable sink. The scheme of the URI indicates the type; currently, only kafka . There are query parameters that vary per type. Currently, the kafka scheme only has topic_prefix , which adds a prefix to all of the topic names.Sink URI scheme: '[scheme]://[host]:[port][?topic_prefix=[foo]]' For example, CREATE CHANGEFEED FOR TABLE foo INTO 'kafka://...?topic_prefix=bar_' would emit rows under the topic bar_foo instead of foo . |
option / value |
For a list of available options and their values, see Options below. |
Options
Option | Value | Description |
---|---|---|
updated |
N/A | Include updated timestamps with each row. |
resolved |
INTERVAL |
Periodically emit resolved timestamps to the changefeed. Optionally, set a minimum duration between emitting resolved timestamps. If unspecified, all resolved timestamps are emitted. Example: resolved='10s' |
envelope |
key_only / row |
Use key_only to emit only the key and no value, which is faster if you only want to know when the key changes.Default: envelope=row |
cursor |
Timestamp | Emits any changes after the given timestamp, but does not output the current state of the table first. If cursor is not specified, the changefeed starts by doing a consistent scan of all the watched rows and emits the current value, then moves to emitting any changes that happen after the scan.cursor can be used to start a new changefeed where a previous changefeed ended.Example: CURSOR='1536242855577149065.0000000000' |
format |
json / 'experimental-avro' |
Format of the emitted record. Currently, support for Avro is limited and experimental. Default: format=json . |
confluent_schema_registry |
Schema Registry address | The Schema Registry address is required to use 'experimental-avro' . |
Examples
Create a changefeed
> CREATE CHANGEFEED FOR TABLE name
INTO 'kafka://host:port'
WITH updated, resolved;
+--------------------+
| job_id |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)
For more information on how to create a changefeed connected to Kafka, see Change Data Capture.
Create a changefeed with Avro
> CREATE CHANGEFEED FOR TABLE name
INTO 'kafka://host:port'
WITH format = 'experimental-avro', confluent_schema_registry = '<schema_registry_address>';
+--------------------+
| job_id |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)
For more information on how to create a changefeed that emits an Avro record, see Change Data Capture.
Manage a changefeed
Use the following SQL statements to pause, resume, and cancel a changefeed.
Changefeed-specific SQL statements (e.g., CANCEL CHANGEFEED
) will be added in the future.
Pause a changefeed
> PAUSE JOB job_id;
For more information, see PAUSE JOB
.
Resume a paused changefeed
> RESUME JOB job_id;
For more information, see RESUME JOB
.
Cancel a changefeed
> CANCEL JOB job_id;
For more information, see CANCEL JOB
.
Start a new changefeed where another ended
Find the high-water timestamp for the ended changefeed:
> SELECT * FROM crdb_internal.jobs WHERE job_id = <job_id>;
job_id | job_type | ... | high_water_timestamp | error | coordinator_id
+--------------------+------------+ ... +--------------------------------+-------+----------------+
383870400694353921 | CHANGEFEED | ... | 1537279405671006870.0000000000 | | 1
(1 row)
Use the high_water_timestamp
to start the new changefeed:
> CREATE CHANGEFEED FOR TABLE name
INTO 'kafka//host:port'
WITH cursor = '<high_water_timestamp>';