SQL Server CDC Client

The SQL Server CDC Client origin processes data in Microsoft SQL Server change data capture (CDC) tables. The origin uses multiple threads to enable parallel processing of data.

Use the SQL Server CDC Client origin to generate records from CDC tables. To read data from Microsoft SQL Server change tracking tables, use the SQL Server Change Tracking origin. For more information about the differences between CDC and change tracking data, see the Microsoft SQL Server documentation.

The SQL Server CDC Client origin includes the CRUD operation type in a record header attribute so generated records can be easily processed by CRUD-enabled destinations. For an overview of Data Collector changed data processing and a list of CRUD-enabled destinations, see Processing Changed Data.

You might use this origin to perform database replication. You can use a separate pipeline with the JDBC Query Consumer or JDBC Multitable Consumer origin to read existing data. Then start a pipeline with the SQL Server CDC Client origin to process subsequent changes.

When you configure the origin, you specify the SQL Server capture instance names - the origin processes the related CDC tables. You can define groups of tables in the same database and any initial offsets to use. When you omit initial offsets, the origin processes all available data in the CDC tables.

You can enable late table processing to allow the origin to process tables that appear after the pipeline starts. You can also configure the origin to check for schema changes in processed tables and to generate an event after discovering a change.

To determine how the origin connects to the database, you specify connection information, a query interval, number of retries, and any custom JDBC configuration properties that you need.

To use a JDBC version older than 4.0, you can specify the driver class name and define a health check query.

The origin can generate events for an event stream. For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.

Installing the JDBC Driver

Before you use the SQL Server CDC Client origin, install the JDBC driver for the database. You cannot access the database until you install the required driver.

For information about installing additional drivers, see Install External Libraries.

Supported Operations

The SQL Server CDC Client origin supports the SQL Server insert and delete operations. Updates captured after the update operation are treated as update, and updates captured before the update operation are treated as an unsupported operation.

Multithreaded Processing

The SQL Server CDC Client origin performs parallel processing and enables the creation of a multithreaded pipeline.

When you start the pipeline, the SQL Server CDC Client origin retrieves the list of CDC tables associated with the source tables defined in the table configurations. The origin then uses multiple concurrent threads based on the Number of Threads property. Each thread reads data from a single table.

Note: The Maximum Pool Size property on the Advanced tab defines the maximum number of connections the origin can make to the database. It must be equal to or greater than the value defined for the Number of Threads property.

As the pipeline runs, each thread connects to the origin system and creates a batch of data, and passes the batch to an available pipeline runner. A pipeline runner is a sourceless pipeline instance - an instance of the pipeline that includes all of the processors and destinations in the pipeline and performs all pipeline processing after the origin.

Each pipeline runner processes one batch at a time, just like a pipeline that runs on a single thread. When the flow of data slows, the pipeline runners wait idly until they are needed.

Multithreaded pipelines preserve the order of records within each batch, just like a single-threaded pipeline. But since batches are processed by different pipeline instances, the order that batches are written to destinations is not ensured.

For more information about multithreaded pipelines, see Multithreaded Pipeline Overview.

Example

Say you are reading from 10 tables. You set the Number of Threads property to 5 and the Maximum Pool Size property to 6. When you start the pipeline, the origin retrieves the list of tables. The origin then creates five threads to read from the first five tables, and by default Data Collector creates a matching number of pipeline runners. Upon receiving data, a thread passes a batch to each of the pipeline runners for processing.

At any given moment, the five pipeline runners can each process a batch, so this multithreaded pipeline processes up to five batches at a time. When incoming data slows, the pipeline runners sit idle, available for use as soon as the data flow increases.

Batch Strategy

Each origin thread creates a batch of data from a single table. You can define one of the following strategies that the threads use to create each batch:

Process All Available Rows from the Table
Each thread creates multiple batches of data from one table, until all available rows are read from that table. The thread runs one SQL query for all batches created from the table. Then, the thread switches to the next available table, running another SQL query to read all available rows from that table.
For example, let's say that the batch size for the origin is set to 100. The origin is configured to use two concurrent threads and to read from four tables, each of which contains 1,000 rows. The first thread runs one SQL query to create 10 batches of 100 rows each from table1, while the second thread uses the same strategy to read data from table2. When table1 and table2 are fully read, the threads switch to table3 and table4 and complete the same process. When the first thread finishes reading from table3, the thread switches back to the next available table to read all available data from the last saved offset.
Switch Tables
Each thread creates a set of batches from one table based on the Batches from Result Set property, and then switches to the next available table to create the next set of batches. The thread runs an initial SQL query to create the first set of batches from the table. The database caches the remaining rows in a result set in the database for the same thread to access again, and then the thread switches to the next available table. A table is available in the following situations:
  • The table does not have an open result set cache. In this case, the thread runs an initial SQL query to create the first batch, caching the remaining rows in a result set in the database.
  • The table has an open result set cache created by that same thread. In this case, the thread creates the batch from the result set cache in the database rather than running another SQL query.
A table is not available when the table has an open result set cache created by another thread. No other thread can read from that table until the result set is closed.
When you configure a switch table strategy, define the result set cache size and the number of batches that a thread can create from the result set. After a thread creates the configured number of batches, the database closes the result set and then a different thread can read from the table.
Note: By default, the origin instructs the database to cache an unlimited number of result sets. A thread can create an unlimited number of batches from that result set.
For example, let's say that the batch size for the origin is set to 100. The origin is configured to use two concurrent threads and to read from four tables, each of which contains 10,000 rows. You set the result set cache size to 500 and set the number of batches read from the result set to 5.
Thread1 runs an SQL query on table1, which returns all 10,000 rows. The thread creates a batch when it reads the first 100 rows. The next 400 rows are cached as a result set in the database. Since thread2 is similarly processing table2, thread1 switches to the next available table, table3, and repeats the same process. After creating a batch from table3, thread1 switches back to table1 and retrieves the next batch of rows from the result set that it previously cached in the database.
After thread1 creates five batches using the result set cache for table1, the database closes the result set cache. Thread1 switches to the next available table. A different thread runs an SQL query to read additional rows from table1, beginning from the last saved offset.

Table Configuration

When you configure SQL Server CDC Client, you can define multiple CDC tables using a single set of table configuration properties. You can also define multiple table configurations to process multiple groups of CDC tables.

When you define table configurations you can define the following properties for each set of tables:
Capture Instance Name
Determines the CDC tables to process. The naming convention for Microsoft SQL Server CDC tables is <capture instance name>_CT. When specifying this property, use the capture instance name, not the names of the CDC tables to be processed. For example, specify the dbo.customer source table, not the associated CDC table, dbo_customer_CT.
When you configure the Capture Instance Name property, you can specify a pattern that describes a set of capture instance names to use. If capture instance names are not set up in the database, then you specify a pattern for the table names to process.
You can use SQL-like syntax to define the set of CDC tables to process. For example, the pattern east% matches tables whose names start with "east". For more information about SQL LIKE syntax, see https://msdn.microsoft.com/en-us/library/ms179859.aspx.
When configuring the Capture Instance Name property, you can use one of the following formats:
  • To process the CDC tables that match the specified capture instance name pattern, use the following format:
    <capture instance name pattern>

    Use this format when CDC tables are created based on capture instance names. You can use the pattern to process a full set of CDC tables or to exclude some CDC tables from processing.

    For example, say you have a Sales.Accounts table with a CDC table named Sales_Accounts_CT. After adding several columns to the table, you create a new CDC table called Sales_Accounts2_CT.

    To process both CDC tables, you can specify the following capture instance name pattern: Sales_Accounts%. To process only the CDC data that occurred after the schema change, you can specify the following capture instance name: Sales_Accounts2.

  • To process all available CDC tables for the specified data tables, use the following format:
    <schema name>_<data table name pattern>

    Use this format when CDC tables are created based on data tables instead of capture instance names.

    For example, to process all available CDC tables for data tables in a Sales schema, you might use Sales_%. Or, to process the CDC tables associated with a set of data tables with the Transact prefix, you might use Sales_Transact%.

  • To process all CDC tables associated with the schema, use the following format:
    <schema name>_%

    For example, to process all tables in the sales schema, enter sales_%.

Table exclusion pattern
Optionally specify a regex pattern for the table names that you want to exclude from the query.

For example, say you want to process all CDC tables in the schema except for those that start with "dept". You can use the default % for the table name pattern, and enter dept* for the table exclusion pattern.

For more information about using regular expressions with Data Collector, see Regular Expressions Overview.

Initial offset
To process existing data, specify an initial offset. When not set, the origin processes all available CDC data.
The SQL Server CDC Client origin uses the __$start_lsn column as the offset column. To process existing data, define the offset value to use. The offset is used for all tables included in the table configuration.
Important: When you specify an initial offset, the origin starts with the first value greater than the specified offset.

Initial Table Order Strategy

You can define the initial order that the origin uses to read the tables.

Define one of the following initial table order strategies:
None
Reads the tables in the order that they are listed in the database.
Alphabetical
Reads the tables in alphabetical order.

The origin uses the table order strategy only for the initial reading of the tables. When threads switch back to previously read tables, they read from the next available table, regardless of the defined order.

Allow Late Table Processing

You can configure the SQL Server CDC Client to process data in CDC tables that appear after the pipeline starts.

When you allow late table processing, the SQL Server CDC Client origin uses a background thread to check for late CDC tables. The origin checks at regular user-defined intervals.

To enable late table processing, configure the following properties:
  • On the JDBC tab, select the Allow Late Tables property.
  • To define the time to wait before checking for new tables, configure the New Table Discovery Interval property.
  • On the Advanced tab, set the Maximum Pool Size and Minimum Idle Connections properties to one thread more than the Number of Threads property.

Checking for Schema Changes

You can configure the SQL Server CDC Client origin to check for schema changes in the tables being processed. When checking for schema changes, the origin includes a schema check statement in the SQL query.

When the origin checks for schema changes, it performs the following tasks:
  1. Compares current table schemas with the original table schemas at regular intervals, based on the Query Interval property.
  2. If it determines that the schema of a table has changed, it generates a schema-change event that states the table or capture instance name with the changed schema.

    The origin generates a schema-change event each time that it finds a schema change: one for each table with a schema change.

    Note: Since the origin continues to check for schema changes at regular intervals until the pipeline stops, a single schema change can generate a large volume of events.
  3. It can write the exact column name or data type change to the Data Collector log. To enable writing to the log, the log level must be set to Trace. For information about changing the log level, see Modifying the Log Level.
To enable the SQL Server CDC Client origin to check for schema changes and generate events when discovering them, enable both of the following properties:
  • On the General tab, select the Produce Events property.
  • On the JDBC tab, select the Enable Schema Changes Event property.

Generated Record

When the SQL Server CDC Client origin generates a record, it places the CDC information, such as the CDC operation and start LSN values, in record header attributes. And it places row data provided by the CDC tables into the fields of the generated records.

Record Header Attributes

The SQL Server CDC Client origin generates JDBC record header attributes that provide the SQL Server CDC data for each record, such as the start or end log sequence numbers (LSN).

The origin also includes the sdc.operation.type attribute and information from the SQL Server CDC tables. The SQL Server CDC Client header attributes are prefixed with "jdbc". The names of the SQL Server CDC column names are included in the header attribute name, as follows: jdbc.<CDC column name>.

You can use the record:attribute or record:attributeOrDefault functions to access the information in the attributes. For more information about working with record header attributes, see Working with Header Attributes.

The origin provides the following header attributes:
Header Attribute Name Description
sdc.operation.type
The origin uses the following values to represent the operation type:
  • 1 for Insert
  • 2 for Delete
  • 3 for Update, including updates captured after the update operation
  • 5 for unsupported operations, including updates captured before the update operation
jdbc.tables Provides a comma-separated list of source tables for the fields in the record.
Note: Not all JDBC drivers provide this information.

Oracle uses all caps for schema, table, and column names by default. Names can be lower- or mixed-case only if the schema, table, or column was created with quotation marks around the name.

jdbc.<column name>.jdbcType Provides the original SQL data type for each field in the record.

Because the record read from the SQL Server CDC table includes CDC columns, the generated record also includes corresponding jdbc.<column name>.jdbcType header attributes for those columns.

For example, since the original data includes a __$start_lsn column, the resulting record has a jdbc.__$start_lsn.jdbc.Type header attribute. It also has a jdbc.__$start_lsn attribute that is generated separately by the origin, as described below.

jdbc.<column name>.jdbc.precision Provides the original precision for all numeric and decimal fields.
jdbc.<column name>.jdbc.scale Provides the original scale for all numeric and decimal fields.
jdbc. __$command_id Data from the SQL Server CDC __$command_id column.
jdbc.__$end_lsn Data from the SQL Server CDC __$end_lsn column.
jdbc.__$operation The CRUD operation type using SQL Server codes, as defined in the SQL Server CDC __$operation column.
jdbc.__$seqval Data from the SQL Server CDC __$seqval column.
jdbc.__$start_lsn Data from the SQL Server CDC __$start_lsn column.
jdbc.__$update_mask Data from the SQL Server CDC __$update_mask column.

For details about the CDC attributes, see the SQL Server documentation.

CRUD Operation Header Attributes

When generating records, the SQL Server CDC Client origin specifies the operation type in both of the following record header attributes:
sdc.operation.type
The SQL Server CDC Client origin writes the operation type to the sdc.operation.type record header attribute.
The origin uses the following values in the sdc.operation.type record header attribute to represent the operation type:
  • 1 for Insert
  • 2 for Delete
  • 3 for Update, including updates captured after the update operation
  • 5 for unsupported operations, including updates captured before the update operation

If you use a CRUD-enabled destination in the pipeline such as JDBC Producer or Elasticsearch, the destination can use the operation type when writing to destination systems. When necessary, you can use an Expression Evaluator or scripting processors to manipulate the value in the sdc.operation.type header attribute. For an overview of Data Collector changed data processing and a list of CRUD-enabled destinations, see Processing Changed Data.

When using CRUD-enabled destinations, the destination looks for the operation type in the sdc.operation.type attribute only.
jdbc.__$operation
The SQL Server CDC Client origin places the values from the SQL Server __$operation column in the jdbc.__$operation record header attribute. As a result, the jdbc.__$operation record header attribute contains the CRUD operation type as defined using SQL Server CDC codes.
Note that CRUD-enabled stages only use the sdc.operation.type header attribute, they do not check the jdbc.__$operation header attribute.
SQL Server CDC uses the following codes to define the operation type:
  • 1 for delete
  • 2 for insert
  • 3 for updates captured before the update operation
  • 4 for updates captured after the update operation

Event Generation

The SQL Server CDC Client origin can generate events that you can use in an event stream. When you enable event generation, the origin generates an event when it completes processing the data returned by the specified queries for all tables.

If you enable schema change event generation, the origin also generates an event each time it finds a schema change.

SQL Server CDC Client events can be used in any logical way. For example:
  • With the Pipeline Finisher executor to stop the pipeline and transition the pipeline to a Finished state when the origin completes processing available data.

    When you restart a pipeline stopped by the Pipeline Finisher executor, the origin continues processing from the last-saved offset unless you reset the origin.

    For an example, see Case Study: Stop the Pipeline.

  • With a destination to store event information.

    For an example, see Case Study: Event Storage.

For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.

Event Record

Event records generated by SQL Server CDC Client origin have the following event-related record header attributes:
Record Header Attribute Description
sdc.event.type Event type. Uses one of the following types:
  • no-more-data - Generated when the origin completes processing the requested CDC data.
  • schema-change - Generated only when checking for schema changes is enabled and when the origin determines that a schema change has occurred for one of the tables being processed.
sdc.event.version An integer that indicates the version of the event record type.
sdc.event.creation_timestamp Epoch timestamp when the stage created the event.
The SQL Server CDC Client origin can generate the following types of event records:
no-more-data

The origin generates a no-more-data event record when the origin completes processing all available data and the number of seconds configured for Batch Wait Time elapses without any new files appearing to be processed.

The no-more-data event record generated by the origin has the sdc.event.type set to no-more-data and does not include any additional fields.

schema-change

The origin generates a schema-change event record only when you enable the origin to check for schema changes, and the origin discovers a schema change.

The schema-change event record generated by the origin has the sdc.event.type set to schema-change and includes the following fields:
Event Record Field Description
capture-instance-name The name of the capture instance or CDC table associated with the table with the schema change.
source-table-schema-name The name of the schema that contains the data table.
source-table-name The name of the data table that has a schema changes.

Configuring a SQL Server CDC Origin

Configure a SQL Server CDC Client origin to process data in Microsoft SQL Server CDC tables.
  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Produce Events Generates event records when events occur. Use for event handling.
    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the JDBC tab, configure the following properties:
    JDBC Property Description
    JDBC Connection String

    Connection string to use to connect to the database.

    Use Credentials Enables entering credentials on the Credentials tab. Use when you do not include credentials in the JDBC connection string.
    Query Interval Amount of time to wait between queries. Enter an expression based on a unit of time. You can use SECONDS, MINUTES, or HOURS.

    Also used as the interval between checking for schema changes when Enable Schema Changes Event is enabled.

    Default is 10 seconds: ${10 * SECONDS}.

    Number of Threads Number of threads the origin generates and uses for multithreaded processing.

    Configure the Maximum Pool Size property on the Advanced tab to be equal to or greater than this value.

    Per Batch Strategy Strategy to create each batch of data:
    • Switch Tables - Each thread creates a set of batches from one table, and then switches to the next available table to create the next set of batches. Define the Result Set Cache Size and the Batches from Result Set properties when you configure a switch tables strategy.
    • Process All Available Rows from the Table - Each thread creates multiple batches of data from one table, until all available rows are read from that table.
    Max Batch Size (records) Maximum number of records to include in a batch.
    Batches from Result Set Number of batches to create from the result set. After a thread creates this number of batches, the database closes the result set and then another thread can read from the same table.

    Use a positive integer to set a limit on the number of batches created from the result set. Use -1 to opt out of this property.

    By default, the origin creates an unlimited number of batches from the result set, keeping the result set open as long as possible.

    Result Set Cache Size Number of result sets to cache in the database. Use a positive integer to set a limit on the number of cached result sets. Use -1 to opt out of this property.

    By default, the origin caches an unlimited number of result sets.

    Max Clob Size (characters) Maximum number of characters to be read in a Clob field. Larger data is truncated.
    Max Blob Size (bytes)

    Maximum number of bytes to be read in a Blob field.

    Number of Retries on SQL Error Number of times a thread tries to read a batch of data after receiving an SQL error. After a thread retries this number of times, the thread handles the error based on the error handling configured for the origin.

    Use to handle transient network or connection issues that prevent a thread from reading a batch of data.

    Default is 0.

    Enable Schema Changes Event Enables regular checks for schema changes.

    When enabled, the origin checks for schema changes for all processed tables at regular intervals based on the Query Interval property. The origin generates a schema change event each time it discovers a schema change.

    Allow Late Tables Allows the origin to process tables that appear after the pipeline starts.

    When enabled, the origin uses a background thread to check for additional tables to process. For information about adjusting related configuration properties, see Allow Late Table Processing.

    New Table Discovery Interval Time to wait before checking for additional tables to process.
    Fetch Size Maximum number of rows to fetch and store in memory on the Data Collector machine. The size cannot be zero.

    Default is 1,000.

    Additional JDBC Configuration Properties Additional JDBC configuration properties to use. To add properties, click Add and define the JDBC property name and value.

    Use the property names and values as expected by JDBC.

  3. On the CDC tab, define one or more table configurations. Using simple or bulk edit mode, click the Add icon to define another table configuration.
    CDC Property Description
    Capture Instance Name Determines the set of CDC tables to process. Use SQL LIKE syntax to define a table name pattern for the table names. Use one of the following formats:
    • To process the CDC tables that match the specified capture instance name pattern, use the following format:
      <capture instance name pattern>
    • To process all available CDC tables for the specified data tables, use the following format:
      <schema name>_<data table name pattern>
    • To process all CDC tables associated with the schema, use the following format:
      <schema name>_%

    Default is dbo_%, which processes all available CDC tables in the default dbo schema.

    Table Exclusion Pattern Pattern of the table names to exclude from being read for this table configuration. Use a Java-based regular expression, or regex, to define the pattern.

    Leave empty if you do not need to exclude any tables.

    Initial Offset Offset value to use for this table configuration when the pipeline starts. When processing an offset, the origin starts with the first value greater than the specified offset.

    Use -1 to opt out of an initial offset. With the initial offset set to -1, the origin ignores existing data and begins processing with new, incoming changes.

  4. To enter JDBC credentials separately from the JDBC connection string, on the Credentials tab, configure the following properties:
    Credentials Property Description
    Username User name for the JDBC connection.
    Password Password for the JDBC account.
    Tip: To secure sensitive information such as usernames and passwords, you can use runtime resources or credential stores.
  5. When using JDBC versions older than 4.0, on the Legacy Drivers tab, optionally configure the following properties:
    Legacy Driver Property Description
    JDBC Class Driver Name Class name for the JDBC driver. Required for JDBC versions older than version 4.0.
    Connection Health Test Query Optional query to test the health of a connection. Recommended only when the JDBC version is older than 4.0.
  6. On the Advanced tab, optionally configure the following properties:
    The defaults for these properties should work in most cases:
    Advanced Property Description
    Maximum Pool Size The maximum number of connections to create. Must be equal to or greater than the value of the Number of Threads property.

    Default is 1.

    Minimum Idle Connections The minimum number of connections to create and maintain. To define a fixed connection pool, set to the same value as Maximum Pool Size.

    Default is 1.

    Connection Timeout Maximum time to wait for a connection. Use a time constant in an expression to define the time increment.
    Default is 30 seconds, defined as follows:
    ${30 * SECONDS}
    Idle Timeout Maximum time to allow a connection to idle. Use a time constant in an expression to define the time increment.

    Use 0 to avoid removing any idle connections.

    Default is 30 minutes, defined as follows:
    ${30 * MINUTES}
    Max Connection Lifetime Maximum lifetime for a connection. Use a time constant in an expression to define the time increment.

    Use 0 to avoid removing any idle connections.

    Default is 30 seconds, defined as follows:
    ${30 * SECONDS}
    Auto Commit Determines if auto-commit mode is enabled. In auto-commit mode, the database commits the data for each record.

    Default is disabled.

    Enforce Read-only Connection Creates read-only connections to avoid any type of write.

    Default is enabled. Disabling this property is not recommended.

    Transaction Isolation Transaction isolation level used to connect to the database.

    Default is the default transaction isolation level set for the database. You can override the database default by setting the level to any of the following:

    • Read committed
    • Read uncommitted
    • Repeatable read
    • Serializable
    Initial Table Order Strategy Initial order used to read the tables:
    • None - Reads the tables in the order that they are listed in the database.
    • Alphabetical - Reads the tables in alphabetical order.