Processing Changed Data

Certain Data Collector stages enable you to easily process change capture data (CDC) or transactional data in a pipeline.

CDC-enabled origins can read change capture data. Some exclusively read change capture data, others can be configured to read it. When reading changed data, they determine the CRUD operation associated with the data and include CRUD operations - such as insert, update, upsert, or delete - in the sdc.operation.type record header attribute.

CRUD-enabled processors and destinations can use the CRUD operation type in the sdc.operation.type header attribute when writing records, enabling the external system to perform the appropriate operation.

Using a CDC-enabled origin and CRUD-enabled stages in a pipeline allows you to easily write changed data from one system into another. You can also use a CDC-enabled origin to write to non-CRUD destinations, and non-CDC origins to write to CRUD-enabled stages. For information on how that works, see Use Cases.

CRUD Operation Header Attribute

CDC-enabled origins read include the sdc.operation.type record header attribute in all records when reading changed data.

CRUD-enabled processors and destinations can use the CRUD operation type in the sdc.operation.type header attribute when writing records, enabling the external system to perform the appropriate operation.

The sdc.operation.type record header attribute uses the following integers to represent CRUD operations:
  • 1 for INSERT records
  • 2 for DELETE records
  • 3 for UPDATE records
  • 4 for UPSERT records
  • 5 for unsupported operations or codes
Note: Some origins use only a subset of the operations, based on the operations supported by the origin system. Similarly, destinations recognize only the subset of the operations that the destination systems support. See the origin and destination documentation for details about supported operations.

Earlier Implementations

Some origins were enabled for CDC using different record header attributes in earlier releases, but they all now include the sdc.operation.type record header attribute. All earlier CRUD header attributes are retained for backward compatibility.

Similarly, CRUD-enabled destinations that were enabled to look for the CRUD operation type in other header attributes can now look for the sdc.operation.type record header attribute first and check the alternate attribute afterwards. The alternate header attribute functionality is retained for backward compatibility.

CDC-Enabled Origins

CDC-enabled origins provide the CRUD operation type in the sdc.operation.type record header attribute. Some origins provide alternate and additional header attributes.

The following origins provide CRUD record header attributes:
CDC-Enabled Origin CRUD Record Header Attributes
JDBC Query Consumer for Microsoft SQL Server Includes the CRUD operation type in the sdc.operation.type record header attribute.

For more information, see CRUD Record Header Attribute.

MySQL Binary Log Includes the CRUD operation type in the sdc.operation.type record header attribute.

Includes additional CDC information in record fields.

For more information, see Generated Records.

Oracle CDC Client Includes the CRUD operation type in both of the following headers:
  • sdc.operation.type
  • oracle.cdc.operation

Includes additional CDC information in record header attributes with the oracle.cdc prefix, such as oracle.cdc.table.

For more information, see CRUD Operation Header Attributes.

MongoDB Oplog Includes the CRUD operation type in the sdc.operation.type record header attribute.

Can include additional CDC information in record header attributes, such as the op and ns attributes.

For more information, see Generated Records.

Salesforce Includes the CRUD operation type in the sdc.operation.type record header attribute.

For more information, see CRUD Operation Header Attribute.

CRUD-Enabled Stages

The following stages recognize CRUD operations stored in record header attributes and can perform writes based on those values. Some stages also provide CRUD-related properties.
CRUD-Enabled Stage Supported Operations Stage Processing
Elasticsearch destination
  • CREATE (INSERT)
  • UPDATE
  • INDEX (UPSERT)
  • DELETE
Determines the operation to use based on either of the following:
  • sdc.operation.type record header attribute
  • Default Operation and Unsupported Operation Handling properties in the stage

For more information, see Define the CRUD Operation.

JDBC Tee processor
  • INSERT
  • UPDATE
  • DELETE
Determines the operation to use based on either of the following:
  • sdc.operation.type record header attribute
  • Default Operation and Unsupported Operation Handling properties in the stage

Includes a Change Log property that enables processing records based on the CDC-enabled origin that you use. For more information, see Define the CRUD Operation.

JDBC Producer destination
  • INSERT
  • UPDATE
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute
  • Default Operation and Unsupported Operation Handling properties in the stage

Includes a Change Log property that enables processing records based on the CDC-enabled origin that you use. For more information, see Define the CRUD Operation.

Kudu destination
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute
  • Default Operation and Unsupported Operation Handling properties in the stage

For more information, see Define the CRUD Operation.

MongoDB destination
  • INSERT
  • UPSERT
  • DELETE
Determines the operation based on information in either of the following:
  • sdc.operation.type record header attribute
  • MONGO.OPERATION.TYPE record header attribute

For more information, see Define the CRUD Operation.

Salesforce destination
  • INSERT
  • UPDATE
  • UPSERT
  • DELETE
  • UNDELETE
Determines the operation to use based on:
  • sdc.operation.type record header attribute
  • Default Operation and Unsupported Operation Handling properties in the stage

For more information, see Define the CRUD Operation.

Processing the Record

Change logs can provide record data in different formats. The JDBC Tee processor and JDBC Producer can decode most change log formats to generate record data based on the origin change log. When using other CRUD-enabled destinations, you might need to add additional processing to the pipeline to alter the format of the record.

For example, Microsoft SQL CDC records created by the JDBC Query Consumer contains CDC fields in the record, in addition to record data. You might use a Field Remover to drop any unnecessary fields from the record.

In contrast, the MySQL Server binary logs read by the My SQL Binary Log origin provides new or updated data in a New Data map field and changed or deleted data in a Changed Data map field. You might want to use the Field Flattener processor to flatten the map field with the data that you need, and a Field Remover to remove any unnecessary fields.

For details on the format of generated records, see the documentation for the CDC-enabled origin.

Use Cases

You can use CDC-enabled origins and CRUD-enabled destinations in pipelines together or individually. Here are some typical use cases:
CDC-enabled origin with CRUD-enabled destinations
You can use a CDC-enabled origin and a CRUD-enabled destination to easily process changed records and write them to a destination system.

For example, say you want to write CDC data from Microsoft SQL Server to Kudu. To do this, you use the CDC-enabled JDBC Query Consumer origin to read data from a Microsoft SQL Server change capture table. The origin places the CRUD operation type in the sdc.operation.type header attribute, in this case: 1 for INSERT, 2 for DELETE, 3 for UPDATE.

You configure the pipeline to write to the CRUD-enabled Kudu destination. In the Kudu destination, you can specify a default operation for any record with no value set in the sdc.operation.type attribute, and you can configure error handling for invalid values. You set the default to INSERT and you configure the destination to use this default for invalid values. In the sdc.operation.type attribute, the Kudu destination supports 1 for INSERT, 2 for DELETE, 3 for UPDATE, and 4 for UPSERT.

When you run the pipeline, the JDBC Query Consumer origin determines the CRUD operation type for each record and writes it to the sdc.operation.type record header attribute. And the Kudu destination uses the operation in the sdc.operation.type attribute to inform the Kudu destination system how to process each record. Any record with an undeclared value in the sdc.operation.type attribute, such as a record created by the pipeline, is treated like an INSERT record. And any record with an invalid value uses the same default behavior.

CDC-enabled origin to non-CRUD destinations

If you need to write changed data to a destination system without a CRUD-enabled destination, you can use an Expression Evaluator or scripting processor to move the CRUD operation information from the sdc.operation.type header attribute to a field, so the information is retained in the record.

For example, say you want to read from Oracle LogMiner redo logs and write the records to Hive tables with all of the CDC information in record fields. To do this, you'd use the Oracle CDC Client origin to read the redo logs, then add an Expression Evaluator to pull the CRUD information from the sdc.operation.type header attribute into the record. Oracle CDC Client writes additional CDC information such as the table name and scn into oracle.cdc header attributes, so you can use expressions to pull that information into the record as well. Then you can use the Hadoop FS destination to write the enhanced records to Hive.

Non-CDC origin to CRUD destinations
When reading data from a non-CDC origin, you can use the Expression Evaluator or scripting processors to define the sdc.operation.type header attribute.
For example, say you want to read from a transactional database table and keep a dimension table in sync with the changes. You'd use the JDBC Query Consumer to read the source table and a JDBC Lookup processor to check the dimension table for the primary key value of each record. Then, based on the output of the lookup processor, you know if there was a matching record in the table or not. Using an Expression Evaluator, you set the sdc.operation.type record header attribute - 3 to update the records that had a matching record, and 1 to insert new records.
When you pass the records to the JDBC Producer destination, the destination uses the operation in the sdc.operation.type header attribute to determine how to write the records to the dimension table.