The configurations and settings explained on this page will significantly impact a changefeed's behavior.
The following sections describe settings, configurations, and details to tune changefeeds for these use cases:
Some options for the kafka_sink_config
and webhook_sink_config
parameters are discussed on this page. However, for more information on specific tuning for Kafka and Webhook sinks, refer to the following pages:
MuxRangefeed
This feature is in preview. This feature is subject to change. To share feedback and/or issues, contact Support.
MuxRangefeed
is a subsystem that improves the performance of rangefeeds with scale. It significantly reduces the overhead of running rangefeeds. Without MuxRangefeed
enabled the number of RPC streams is proportional with the number of ranges in a table. For example, a large table could have tens of thousands of ranges. With MuxRangefeed
enabled, this proportion improves so that the number of RPC streams is relative to the number of nodes in a cluster.
You can enable its functionality with the changefeed.mux_rangefeed.enabled
cluster setting.
Use the following workflow to enable MuxRangefeed
:
Enable the cluster setting:
SET CLUSTER SETTING changefeed.mux_rangefeed.enabled = true;
After enabling the setting, pause the changefeed:
PAUSE JOB {job ID};
You can use
SHOW CHANGEFEED JOBS
to retrieve the job ID.Resume the changefeed for the cluster setting to take effect:
RESUME JOB {job ID};
Tuning for high durability delivery
When designing a system that relies on high durability message delivery—that is, not missing any message acknowledgement at the downstream sink—consider the following settings and configuration in this section:
- Pausing changefeeds and garbage collection
- Defining Kafka message acknowledgment
- Choosing changefeed sinks
- Defining schema change behavior
Before tuning these settings we recommend reading details on our changefeed at-least-once-delivery guarantee.
Pausing changefeeds and garbage collection
By default, protected timestamps will protect changefeed data from garbage collection up to the time of the checkpoint. Protected timestamps will protect changefeed data from garbage collection if the downstream changefeed sink is unavailable until you either cancel the changefeed or the sink becomes available once again.
However, if the changefeed lags too far behind, the protected changes could lead to an accumulation of garbage. This could result in increased disk usage and degraded performance for some workloads.
For more detail on changefeeds and protected timestamps, refer to Garbage collection and changefeeds.
Create changefeeds with the following options so that your changefeed protects data when it is paused:
protect_data_from_gc_on_pause
: to protect changes while the changefeed is paused until you resume the changefeed.on_error=pause
: to pause the changefeed when it encounters an error. By default, changefeeds treat errors as retryable apart from some exceptions.
Defining Kafka message acknowledgment
To determine what a successful write to Kafka is, you can configure the kafka_sink_config
option. The 'RequiredAcks'
field specifies what a successful write to Kafka is. CockroachDB guarantees at least once delivery of messages—the 'RequiredAcks'
value defines the delivery.
For high durability delivery, Cockroach Labs recommends setting:
kafka_sink_config='{'RequiredAcks': 'ALL'}'
ALL
provides the highest consistency level. A quorum of Kafka brokers that have committed the message must be reached before the leader can acknowledge the write.
You must also set acks
to ALL
in your server-side Kafka configuration for this to provide high durability delivery.
Choosing changefeed sinks
Use Kafka or cloud storage sinks when tuning for high durability delivery in changefeeds. Both Kafka and cloud storage sinks offer built-in advanced protocols, whereas the webhook sink, while flexible, requires an understanding of how messages are acknowledged and committed by the particular system used for the webhook in order to ensure the durability of message delivery.
Defining schema change behavior
Ensure that data is ingested downstream in its new format after a schema change by using the schema_change_events
and schema_schange_policy
options. For example, setting schema_change_events=column_changes
and schema_change_policy=stop
will trigger an error to the cockroach.log
file on a schema change and the changefeed to fail.
Tuning for high throughput
When designing a system that needs to emit a lot of changefeed messages, whether it be steady traffic or a burst in traffic, consider the following settings and configuration in this section:
- Setting the
resolved
option - Batching and buffering messages
- Configuring file and message format
- Configuring for tables with many ranges
- Adjusting concurrent changefeed work
Setting the resolved
option
When a changefeed emits a resolved message, it force flushes all outstanding messages that have buffered, which will diminish your changefeed's throughput while the flush completes. Therefore, if you are aiming for higher throughput, we suggest setting the duration higher (e.g., 10 minutes), or not using the resolved
option.
If you are setting the resolved
option when you are aiming for high throughput, you must also consider the min_checkpoint_frequency
option, which defaults to 30s
. This option controls how often nodes flush their progress to the coordinating changefeed node. As a result, resolved
messages will not be emitted more frequently than the configured min_checkpoint_frequency
. Set this option to at least as long as your resolved
option duration.
Batching and buffering messages
- Batch messages to your sink:
- For a Kafka sink, refer to the
Flush
parameter for thekafka_sink_config
option. - For a cloud storage sink, use the
file_size
parameter to flush a file when it exceeds the specified size.
- For a Kafka sink, refer to the
- Set the
changefeed.memory.per_changefeed_limit
cluster setting to a higher limit to give more memory for buffering changefeed data. This setting influences how often the changefeed will flush buffered messages. This is useful during heavy traffic.
Configuring file and message format
- Use
avro
as the emitted message format option with Kafka sinks; JSON encoding can potentially create a slowdown.
Compression
- Use the
compression
option when you create a changefeed emitting data files to a cloud storage sink. For larger files, setcompression
to thezstd
format. - Use the
snappy
compression format to emit messages to a Kafka sink. If you're intending to do large batching for Kafka, use thelz4
compression format.
File size
To configure changefeeds emitting to cloud storage sinks for high throughput, you should consider:
- Increasing the
file_size
parameter to control the size of the files that the changefeed sends to the sink. The default is16MB
. To configure for high throughput, we recommend32MB
–128MB
. Note that this is not a hard limit, and a changefeed will flush the file when it reaches the specified size. - When you compress a file, it will contain many more events.
- File size is also dependent on what kind of data the changefeed job is writing. For example, large JSON blobs will quickly fill up the
file_size
value compared to small rows. - When you change or increase
file_size
, ensure that you adjust thechangefeed.memory.per_changefeed_limit
cluster setting, which has a default of512MiB
. Buffering messages can quickly reach this limit if you have increased the file size.
Configuring for tables with many ranges
If you have a table with 10,000 or more ranges, you should consider increasing the following two cluster settings. We strongly recommend increasing these settings slowly. That is, increase the setting and then monitor its impact before adjusting further:
kv.rangefeed.catchup_scan_concurrency
: The number of catchups a rangefeed can execute concurrently. The default is8
.kv.rangefeed.concurrent_catchup_iterators
: The number of rangefeed catchup iterators a store will allow concurrently before queuing. The default is16
.
Adjusting concurrent changefeed work
- Increase the
changefeed.backfill.concurrent_scan_requests
setting, which controls the number of concurrent scan requests per node issued during a backfill event. The default behavior, when this setting is at0
, is that the number of scan requests will be 3 times the number of nodes in the cluster (to a maximum of 100). While increasing this number will allow for higher throughput, it will increase the cluster load overall, including CPU and IO usage. - The
kv.rangefeed.catchup_scan_iterator_optimization.enabled
setting ison
by default. This causes rangefeeds to use time-bound iterators for catch-up scans when possible. Catch-up scans are run for each rangefeed request. This setting improves the performance of changefeeds during some range-split operations.
Lagging ranges
New in v22.2.15:
Use the changefeed.lagging_ranges
metric to track the number of ranges that are behind in a changefeed. This is calculated based on the cluster settings:
changefeed.lagging_ranges_threshold
sets a duration from the present that determines the length of time a range is considered to be lagging behind, which will then track in thelagging_ranges
metric. Note that ranges undergoing an initial scan for longer than the threshold duration are considered to be lagging. Starting a changefeed with an initial scan on a large table will likely increment the metric for each range in the table. As ranges complete the initial scan, the number of ranges lagging behind will decrease.- Default:
3m
- Default:
changefeed.lagging_ranges_polling_interval
sets the interval rate for when lagging ranges are checked and thelagging_ranges
metric is updated. Polling adds latency to thelagging_ranges
metric being updated. For example, if a range falls behind by 3 minutes, the metric may not update until an additional minute afterward.- Default:
1m
- Default:
You can use the metrics_label
option to track the lagging_ranges
metric per changefeed.