Logical Replication using CockroachDB Changefeeds and CDC-SINK
While CockroachDB replicates data across nodes, it is also capable of sending data out of the cluster for the purposes of refreshing data in an analytics data store, data bus or an operational data store. CockroachDB enables a changefeed which watches all changes across one to many tables and emits those changes to a sink such as kafka or an http endpoint. This is a reliable way to get table mutations as CockroachDB guarantees ordering. However, for transactions which may span across rows the data received has to be applied adhering to transaction integrity. Since changefeeds send data rows [with timestamps per key] in batches across resolved timestamps, these batches of rows can be examined for proper order across target entities. The emitted rows can be applied to the target tables by upserting the rows in batches while maintaining transactional ordering scoped at the entity level. By default products such as Kafka connect will not handle Foreign key constraints on the target and resolve to eventually consistency for a nested model, with retrying over time the model is consistent however the constraints have to be disabled. A good use case is applying child rows in the proper order to their parent rows in a parent child relationship and such is the case of this blog which illustrates an example of replication data from a primary database which is the SOR to not another primary database which serves as the ODS layer.
As CockroachDB only emits these rows to a sink, another tool is required to restructure the output and apply to a target database such as kafka connect or another streaming product such as Goldengate, Striim or debezium. At the time of the article, it appears only GoldenGate can handle a nested model from oracle to oracle. Now we come to cdc-sink, developed by a few of our field engineers (Bob and Bram). Cdc-sink handles not only applying the changes from the source, it applies the changes as per the nested model and the target database is always consistent. Granted the process to stage the emitted rows, sort and transform into writes will have a lag, the relationships across the tables are consistent with no orphaned rows. Now lets review the set up steps and run through some tests.
The schema has the parent orders table and two child tables. For each parent order, there will be at least one execution and history for the order fulfillment. The target schema is identical.
The environment consists of the following:
- The primary cluster with the above schema with a load balancer to access all nodes
- The second cluster with the same schema with a load balancer to access all nodes
- The source and target should at least be in sync or the target trailing in data
- Applications in the same location as the primary cluster with connections to the load balancer
- A cdc-sink host which as access to the load balancer to the secondary cluster
On the primary cluster create a change feed to start with all watched rows as the full contents of the table or start emitting rows from a timestamp using the with cursor option. For this purpose we will start the change feed on a table with 0 rows.
Which emits rows to the cdc_sink processing hosted on 10.142.1.181 port 30004. The cdc-sink is running as a service using the following command. Notice the corresponding target in the create changefeed command
into ‘http://10.142.1.181:30004/orders/public?insecure_tls_skip_verify=true'
And the — targetConn postgres://root@localhost:26257/?sslmode=disable which is the load balancer connecting to all the nodes in the second cluster with the target schema.
The diagram below shows the end to end process where transactions are sent from clients via the Load balancer for the primary cluster.
In the secondary cluster, we have an additional staging schema where the cdc sink processes stores all the emitted rows which in turn are applied to the target schema. For each resolved time that is emitted from the source change feed cluster, all the rows within that window are upserted into the target table in micro batches. A detailed explanation of the process is found here. The _cdc_sink staging schema contains the following tables
Of which resolved_timestamps tracks all the sets of schema changes by timestamps and applied time where a null value indicates the rows have not been upserted to the target schema.
Checking the table, we can see all the source rows have been applied.
We can check the apply time lag compared to arrival with following for the staging process
We can then check the insert time to the target compared to the source schema with the following SQL which displays the rows arriving from 4 to 9 seconds later than the source.
Now lets look at the workload execution and changefeed process.
Processing a few cycles of order executions ..
We can see what the emitted rows look like, highlighting the key metrics such as primary key and source timestamp.
Now let’s run a workload for a few minutes and make some observations. On the source system the QPS is roughly 1000,
The changefeed latency is consistency below 10 seconds,
And we see an stream of 4k rows emitted per second.
Target displays a steady apply process on DML operations
And we can see the lag bunches up in the 4 to 8 second range for applying staging data to the target cluster.
And the target tables arrive within the range of 4 to 11 seconds.
Using the changefeeds to cdc-sink, we are able to achieve logical replication from a source database to other databases while maintaining the ordering of transactions with a nested schema.