CREATE CHANGEFEED
is an Enterprise-only feature. For the core version, see EXPERIMENTAL CHANGEFEED FOR
.
The CREATE CHANGEFEED
statement creates a new Enterprise changefeed, which targets an allowlist of tables called "watched rows". Every change to a watched row is emitted as a record in a configurable format (JSON
or Avro) to a configurable sink (Kafka, a cloud storage sink, or a webhook sink). You can create, pause, resume, or cancel an Enterprise changefeed.
For more information, see Use Changefeeds.
Required privileges
To create a changefeed, the user must be a member of the admin
role or have the CREATECHANGEFEED
parameter set.
Synopsis
Parameters
Parameter | Description |
---|---|
table_name |
The name of the table (or tables in a comma separated list) to create a changefeed for. Note: Before creating a changefeed, consider the number of changefeeds versus the number of tables to include in a single changefeed. Each scenario can have an impact on total memory usage or changefeed performance. See Create and Configure Changefeeds for more detail. |
sink |
The location of the configurable sink. The scheme of the URI indicates the type. For more information, see Sink URI below. |
option / value |
For a list of available options and their values, see Options below. |
Sink URI
The sink URI follows the basic format of:
'{scheme}://{host}:{port}?{query_parameters}'
URI Component | Description |
---|---|
scheme |
The type of sink: kafka , any cloud storage sink, or webhook sink. |
host |
The sink's hostname or IP address. |
port |
The sink's port. |
query_parameters |
The sink's query parameters. |
See Changefeed Sinks for considerations when using each sink and detail on configuration.
Kafka
Example of a Kafka sink URI:
'kafka://broker.address.com:9092?topic_prefix=bar_&tls_enabled=true&ca_cert=LS0tLS1CRUdJTiBDRVJUSUZ&sasl_enabled=true&sasl_user={sasl user}&sasl_password={url-encoded password}&sasl_mechanism=SASL-SCRAM-SHA-256'
Cloud Storage
The following are example file URLs for each of the cloud storage schemes:
Location | Example |
---|---|
Amazon S3 | 's3://{BUCKET NAME}/{PATH}?AWS_ACCESS_KEY_ID={KEY ID}&AWS_SECRET_ACCESS_KEY={SECRET ACCESS KEY}' |
Azure Blob Storage | 'azure://{CONTAINER NAME}/{PATH}?AZURE_ACCOUNT_NAME={ACCOUNT NAME}&AZURE_ACCOUNT_KEY={URL-ENCODED KEY}' |
Google Cloud | 'gs://{BUCKET NAME}/{PATH}?AUTH=specified&CREDENTIALS={ENCODED KEY' |
HTTP | 'http://localhost:8080/{PATH}' |
Use Cloud Storage for Bulk Operations explains the requirements for authentication and encryption for each supported cloud storage sink. See Changefeed Sinks for considerations when using cloud storage.
Webhook
Example of a webhook URI:
'webhook-https://{your-webhook-endpoint}?insecure_tls_skip_verify=true'
Query parameters
Parameters should always be URI-encoded before they are included the changefeed's URI, as they often contain special characters. Use Javascript's encodeURIComponent function or Go language's url.QueryEscape function to URI-encode the parameters. Other languages provide similar functions to URI-encode special characters.
Query parameters include:
Parameter | Sink Type |
Type |
Description |
---|---|---|---|
New in v21.2: topic_name |
Kafka | STRING |
Allows arbitrary topic naming for Kafka topics. See the Kafka topic naming limitations for detail on supported characters etc. For example, CREATE CHANGEFEED FOR foo,bar INTO 'kafka://sink?topic_name=all' will emit all records to a topic named all . Note that schemas will still be registered separately. Can be combined with the topic_prefix option. Default: table name. |
topic_prefix |
Kafka, cloud | STRING |
Adds a prefix to all topic names. For example, CREATE CHANGEFEED FOR TABLE foo INTO 'kafka://...?topic_prefix=bar_' would emit rows under the topic bar_foo instead of foo . |
tls_enabled |
Kafka | BOOL |
If true , enable Transport Layer Security (TLS) on the connection to Kafka. This can be used with a ca_cert (see below). Default: false |
ca_cert |
Kafka, webhook, (Confluent schema registry) | STRING |
The base64-encoded ca_cert file. Specify ca_cert for a Kafka sink, webhook sink, and/or a Confluent schema registry. For usage with a Kafka sink, see Kafka Sink URI. It's necessary to state https in the schema registry's address when passing ca_cert : confluent_schema_registry='https://schema_registry:8081?ca_cert=LS0tLS1CRUdJTiBDRVJUSUZ' See confluent_schema_registry for more detail on using this option. Note: To encode your ca.cert , run base64 -w 0 ca.cert . |
client_cert |
Kafka | STRING |
The base64-encoded Privacy Enhanced Mail (PEM) certificate. This is used with client_key . |
client_key |
Kafka | STRING |
The base64-encoded private key for the PEM certificate. This is used with client_cert .Note: Client keys are often encrypted. You will receive an error if you pass an encrypted client key in your changefeed statement. To decrypt the client key, run: openssl rsa -in key.pem -out key.decrypt.pem -passin pass:{PASSWORD} . Once decrypted, be sure to update your changefeed statement to use the new key.decrypt.pem file instead. |
sasl_enabled |
Kafka | BOOL |
If true , the authentication protocol can be set to SCRAM or PLAIN using the sasl_mechanism parameter. You must have tls_enabled set to true to use SASL. Default: false |
sasl_mechanism |
Kafka | STRING |
Can be set to SASL-SCRAM-SHA-256 , SASL-SCRAM-SHA-512 , or SASL-PLAIN . A sasl_user and sasl_password are required. Default: SASL-PLAIN |
sasl_user |
Kafka | STRING |
Your SASL username. |
sasl_password |
Kafka | STRING |
Your SASL password. Note: Passwords should be URL encoded since the value can contain characters that would cause authentication to fail. |
file_size |
cloud | STRING |
The file will be flushed (i.e., written to the sink) when it exceeds the specified file size. This can be used with the WITH resolved option, which flushes on a specified cadence. Default: 16MB |
insecure_tls_skip_verify |
Kafka, webhook | BOOL |
If true , disable client-side validation of responses. Note that a CA certificate is still required; this parameter means that the client will not verify the certificate. Warning: Use this query parameter with caution, as it creates MITM vulnerabilities unless combined with another method of authentication. Default: false |
S3_storage_class |
Amazon S3 cloud storage sink | STRING |
Specify the Amazon S3 storage class for files created by the changefeed. See Create a changefeed with an S3 storage class for the available classes and an example. Default: STANDARD |
Options
Option | Value | Description |
---|---|---|
updated |
N/A | Include updated timestamps with each row. If a cursor is provided, the "updated" timestamps will match the MVCC timestamps of the emitted rows, and there is no initial scan. If a cursor is not provided, the changefeed will perform an initial scan (as of the time the changefeed was created), and the "updated" timestamp for each change record emitted in the initial scan will be the timestamp of the initial scan. Similarly, when a backfill is performed for a schema change, the "updated" timestamp is set to the first timestamp for when the new schema is valid. |
resolved |
Duration string | Emits resolved timestamp events per changefeed in a format dependent on the connected sink. Resolved timestamp events do not emit until all ranges in the changefeed have progressed to a specific point in time. Set an optional minimal duration between emitting resolved timestamps. Example: resolved='10s' . This option will only emit a resolved timestamp event if the timestamp has advanced and at least the optional duration has elapsed. If unspecified, all resolved timestamps are emitted as the high-water mark advances. |
envelope |
key_only / wrapped |
Use key_only to emit only the key and no value, which is faster if you only want to know when the key changes.Default: envelope=wrapped |
cursor |
Timestamp | Emit any changes after the given timestamp, but does not output the current state of the table first. If cursor is not specified, the changefeed starts by doing an initial scan of all the watched rows and emits the current value, then moves to emitting any changes that happen after the scan.When starting a changefeed at a specific cursor , the cursor cannot be before the configured garbage collection window (see gc.ttlseconds ) for the table you're trying to follow; otherwise, the changefeed will error. With default garbage collection settings, this means you cannot create a changefeed that starts more than 25 hours in the past.cursor can be used to start a new changefeed where a previous changefeed ended.Example: CURSOR='1536242855577149065.0000000000' |
format |
json / avro |
Format of the emitted record. For mappings of CockroachDB types to Avro types, see the table and detail on Avro limitations below. Default: format=json . |
mvcc_timestamp |
N/A | New in v21.2: Include the MVCC timestamp for each emitted row in a changefeed. With the mvcc_timestamp option, each emitted row will always contain its MVCC timestamp, even during the changefeed's initial backfill. |
confluent_schema_registry |
Schema Registry address | The Schema Registry address is required to use avro .To connect to Confluent Cloud, use the following URL structure: 'https://{API_KEY_ID}:{API_SECRET_URL_ENCODED}@{CONFLUENT_REGISTRY_URL}:443' . See the Confluent Cloud Schema Registry Tutorial for further detail. |
key_in_value |
N/A | Make the primary key of a deleted row recoverable in sinks where each message has a value but not a key (most have a key and value in each message). key_in_value is automatically used for cloud storage sinks and webhook sinks. |
diff |
N/A | Publish a before field with each message, which includes the value of the row before the update was applied. |
compression |
gzip |
Compress changefeed data files written to a cloud storage sink. Currently, only Gzip is supported for compression. |
on_error |
pause / fail |
New in v21.2: Use on_error=pause to pause the changefeed when encountering non-retryable errors. on_error=pause will pause the changefeed instead of sending it into a terminal failure state. Note: Retryable errors will continue to be retried with this option specified. Use with protect_data_from_gc_on_pause to protect changes from garbage collection. Default: on_error=fail |
protect_data_from_gc_on_pause |
N/A | When a changefeed is paused, ensure that the data needed to resume the changefeed is not garbage collected. Note: If you use this option, changefeeds left paused can prevent garbage collection for long periods of time. |
schema_change_events |
default / column_changes |
The type of schema change event that triggers the behavior specified by the schema_change_policy option:
Default: schema_change_events=default |
schema_change_policy |
backfill / nobackfill / stop |
The behavior to take when an event specified by the schema_change_events option occurs:
Default: schema_change_policy=backfill |
initial_scan / no_initial_scan |
N/A | Control whether or not an initial scan will occur at the start time of a changefeed. initial_scan and no_initial_scan cannot be used simultaneously. If neither initial_scan nor no_initial_scan is specified, an initial scan will occur if there is no cursor , and will not occur if there is one. This preserves the behavior from previous releases.Default: initial_scan If used in conjunction with cursor , an initial scan will be performed at the cursor timestamp. If no cursor is specified, the initial scan is performed at now() . |
kafka_sink_config |
STRING |
New in v21.2: Set fields to configure the required level of message acknowledgement from the Kafka server, the version of the server, and batching parameters for Kafka sinks. See Kafka sink configuration for more detail on configuring all the available fields for this option. Example: CREATE CHANGEFEED FOR table INTO 'kafka://localhost:9092' WITH kafka_sink_config='{"Flush": {"MaxMessages": 1, "Frequency": "1s"}, "RequiredAcks": "ONE"}' |
initial_scan / no_initial_scan |
N/A | Control whether or not an initial scan will occur at the start time of a changefeed. initial_scan and no_initial_scan cannot be used simultaneously. If neither initial_scan nor no_initial_scan is specified, an initial scan will occur if there is no cursor , and will not occur if there is one. This preserves the behavior from previous releases.Default: initial_scan If used in conjunction with cursor , an initial scan will be performed at the cursor timestamp. If no cursor is specified, the initial scan is performed at now() . |
kafka_sink_config |
STRING |
New in v21.2: Set fields to configure the required level of message acknowledgement from the Kafka server, the version of the server, and batching parameters for Kafka sinks. See Kafka sink configuration for more detail on configuring all the available fields for this option. Example: CREATE CHANGEFEED FOR table INTO 'kafka://localhost:9092' WITH kafka_sink_config='{"Flush": {"MaxMessages": 1, "Frequency": "1s"}, "RequiredAcks": "ONE"}' |
full_table_name |
N/A | Use fully-qualified table name in topics, subjects, schemas, and record output instead of the default table name. This can prevent unintended behavior when the same table name is present in multiple databases. Example: CREATE CHANGEFEED FOR foo... WITH full_table_name will create the topic name defaultdb.public.foo instead of foo . |
avro_schema_prefix |
Schema prefix name | Provide a namespace for the schema of a table in addition to the default, the table name. This allows multiple databases or clusters to share the same schema registry when the same table name is present in multiple databases. Example: CREATE CHANGEFEED FOR foo WITH format=avro, confluent_schema_registry='registry_url', avro_schema_prefix='super' will register subjects as superfoo-key and superfoo-value with the namespace super . |
webhook_client_timeout |
INTERVAL |
New in v21.2: If a response is not recorded from the sink within this timeframe, it will error and retry to connect. Note this must be a positive value. Default: "3s" |
webhook_auth_header |
STRING |
New in v21.2: Pass a value (password, token etc.) to the HTTP Authorization header with a webhook request for a "Basic" HTTP authentication scheme. Example: With a username of "user" and password of "pwd", add a colon between "user:pwd" and then base64 encode, which results in "dXNlcjpwd2Q=". WITH webhook_auth_header='Basic dXNlcjpwd2Q=' . |
topic_in_value |
BOOL |
New in v21.2: Set to include the topic in each emitted row update. Note this is automatically set for webhook sinks. |
webhook_sink_config |
STRING |
New in v21.2: Set fields to configure sink batching and retries. The schema is as follows:{ "Flush": { "Messages": ..., "Bytes": ..., "Frequency": ..., }, "Retry": {"Max": ..., "Backoff": ..., } } . Note that if either Messages or Bytes are nonzero, then a non-zero value for Frequency must be provided. See Webhook sink configuration for more details on using this option. |
metrics_label |
STRING |
This is an experimental feature. Define a metrics label to which the metrics for one or multiple changefeeds increment. All changefeeds also have their metrics aggregated. The maximum length of a label is 128 bytes. There is a limit of 1024 unique labels. WITH metrics_label=label_name For more detail on usage and considerations, see Using changefeed metrics labels. |
Using the format=avro
, envelope=key_only
, and updated
options together is rejected. envelope=key_only
prevents any rows with updated fields from being emitted, which makes the updated
option meaningless.
Files
The files emitted to a sink use the following naming conventions:
The timestamp format is YYYYMMDDHHMMSSNNNNNNNNNLLLLLLLLLL
.
General file format
/[date]/[timestamp]-[uniquer]-[topic]-[schema-id]
For example:
/2020-04-02/202004022058072107140000000000000-56087568dba1e6b8-1-72-00000000-test_table-1.ndjson
Resolved file format
/[date]/[timestamp].RESOLVED
For example:
/2020-04-04/202004042351304139680000000000000.RESOLVED
Examples
Create a changefeed connected to Kafka
> CREATE CHANGEFEED FOR TABLE name, name2, name3
INTO 'kafka://host:port'
WITH updated, resolved;
+--------------------+
| job_id |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)
For more information on how to create a changefeed that emits an Avro record, see this step-by-step example. The parameters table on the Changefeed Sinks page provides a list of all kafka-specific query parameters.
Create a changefeed connected to Kafka using Avro
> CREATE CHANGEFEED FOR TABLE name, name2, name3
INTO 'kafka://host:port'
WITH format = avro, confluent_schema_registry = <schema_registry_address>;
+--------------------+
| job_id |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)
For more information on how to create a changefeed that emits an Avro record, see this step-by-step example. The parameters table on the Changefeed Sinks page provides a list of all kafka-specific query parameters.
Create a changefeed connected to a cloud storage sink
> CREATE CHANGEFEED FOR TABLE name, name2, name3
INTO 'scheme://host?parameters'
WITH updated, resolved;
+--------------------+
| job_id |
+--------------------+
| 360645287206223873 |
+--------------------+
(1 row)
For step-by-step guidance on creating a changefeed connected to a cloud storage sink, see Changefeed Examples. The parameters table on the Changefeed Sinks page provides a list of the available cloud storage parameters.
Create a changefeed with an S3 storage class
New in v21.2.6:
To associate the changefeed message files with a specific storage class in your Amazon S3 bucket, use the S3_STORAGE_CLASS
parameter with the class. For example, the following S3 connection URI specifies the INTELLIGENT_TIERING
storage class:
CREATE CHANGEFEED FOR TABLE name INTO 's3://{BUCKET NAME}?AWS_ACCESS_KEY_ID={KEY ID}&AWS_SECRET_ACCESS_KEY={SECRET ACCESS KEY}&S3_STORAGE_CLASS=INTELLIGENT_TIERING' WITH resolved;
Use the parameter to set one of these storage classes listed in Amazon's documentation. For more general usage information, see Amazon's Using Amazon S3 storage classes documentation.
Create a changefeed connected to a webhook sink
The webhook sink is currently in beta — see usage considerations, available parameters, and options for more information.
CREATE CHANGEFEED FOR TABLE name, name2, name3
INTO 'webhook-https://{your-webhook-endpoint}?insecure_tls_skip_verify=true'
WITH updated;
+---------------------+
| job_id |
----------------------+
| 687842491801632769 |
+---------------------+
(1 row)
For step-by-step guidance on creating a changefeed connected to a webhook sink, see Changefeed Examples. The parameters table on the Changefeed Sinks page provides a list of the available webhook parameters.
Manage a changefeed
New in v21.2:
For enterprise changefeeds, use SHOW CHANGEFEED JOBS
to check the status of your changefeed jobs:
> SHOW CHANGEFEED JOBS;
Use the following SQL statements to pause, resume, or cancel a changefeed.
Pause a changefeed
> PAUSE JOB job_id;
For more information, see PAUSE JOB
.
Resume a paused changefeed
> RESUME JOB job_id;
For more information, see RESUME JOB
.
Cancel a changefeed
> CANCEL JOB job_id;
For more information, see CANCEL JOB
.
Configuring all changefeeds
It is useful to be able to pause all running changefeeds during troubleshooting, testing, or when a decrease in CPU load is needed.
To pause all running changefeeds:
PAUSE JOBS (SELECT * FROM [SHOW CHANGEFEED JOBS] WHERE status = ('running'));
This will change the status for each of the running changefeeds to paused
, which can be verified with SHOW CHANGEFEED JOBS
.
To resume all running changefeeds:
RESUME JOBS (SELECT * FROM [SHOW CHANGEFEED JOBS] WHERE status = ('paused'));
This will resume the changefeeds and update the status for each of the changefeeds to running
.
Start a new changefeed where another ended
Find the high-water timestamp for the ended changefeed:
> SELECT * FROM crdb_internal.jobs WHERE job_id = <job_id>;
job_id | job_type | ... | high_water_timestamp | error | coordinator_id
+--------------------+------------+ ... +--------------------------------+-------+----------------+
383870400694353921 | CHANGEFEED | ... | 1537279405671006870.0000000000 | | 1
(1 row)
Use the high_water_timestamp
to start the new changefeed:
> CREATE CHANGEFEED FOR TABLE name, name2, name3
INTO 'kafka//host:port'
WITH cursor = '<high_water_timestamp>';
Note that because the cursor is provided, the initial scan is not performed.