PostgreSQL CDC Client

The PostgreSQL CDC Client origin processes Write-Ahead Logging (WAL) data to generate change data capture records for a PostgreSQL database. Use the PostgreSQL CDC Client origin to process WAL data from PostgreSQL 9.4 or later. Earlier versions do not support WAL.

You might use this origin to perform database replication. You can use a separate pipeline with the JDBC Query Consumer or JDBC Multitable Consumer origin to read existing data. Then start a pipeline with the PostgreSQL CDC Client origin to process subsequent changes.

The PostgreSQL CDC Client treats a single transaction as a single record. It can generate two types of records: one for data changes and one for Data Definition Language (DDL) updates.

Since each data change transaction can include multiple CRUD operations, the PostgreSQL CDC Client origin can also include multiple operations in a record. As a result, the origin does not write the CRUD operations to the sdc.operation.type record header attribute. Depending on your use case, you might use a scripting processor to convert the records as needed. Or, you might use a Field Pivoter and other processors to separate the data to create a record for each operation.

When you configure the PostgreSQL CDC Client, you configure the change capture details, such as the schema and tables to read from, the initial change to use, and the operations to include.

You define the name for the replication slot to be used, and specify whether to remove replication slots on close. You can also specify the behavior when the origin encounters an unsupported data type and include the data for those fields in the record as unparsed strings.

To determine how the origin connects to the database, you specify connection information, a query interval, number of retries, and any custom JDBC configuration properties that you need. You can configure advanced connection properties. To use a JDBC version older than 4.0, you specify the driver class name and define a health check query.

PostgreSQL Prerequisite

To enable the PostgreSQL CDC Client origin to read Write-Ahead Logging (WAL) changed data capture information, you must install the wal2json logical decoder. Install wal2json on every PostgreSQL instance being monitored for changes.

StreamSets provides the wal2json logical decoder on GitHub. To install the wal2json, follow the instructions in the "Build and Install" section of the README.md file.

Then, follow the configuration instructions in the "Configuration" section of the README.md file.

Installing the JDBC Driver

Before you use the PostgreSQL CDC Client origin, install the JDBC driver for the database. You cannot access the database until you install the required driver.

For information about installing additional drivers, see Install External Libraries.

Schema, Table Name, and Exclusion Patterns

When you configure the PostgreSQL CDC Client origin, you specify the tables with the change capture data that you want to process. To specify the tables, you define the schema, a table name pattern, and an optional exclusion pattern.

When defining the schema and table name pattern, you can use a regular expression to define a set of tables or a set of tables across multiple schemas. When needed, you can also use a regular expression to define an exclusion pattern to exclude a subset of tables from the larger set.

For example, say you want to process change data capture data for all tables in the sales schema that start with SALES while excluding those that end with a dash (-) and single-character suffix. You can use the following configuration to specify the tables to process:
  • Schema: sales
  • Table Name Pattern: SALES*
  • Exclusion Pattern: SALES.*-.

Initial Change

The initial change is the point in the Write-Ahead Logging (WAL) data where you want to start processing. When you start the pipeline, PostgreSQL CDC Client starts processing from the specified initial change and continues until you stop the pipeline.

Note that PostgreSQL CDC Client processes only change capture data. If you need existing data, you might use a JDBC Query Consumer or a JDBC Multitable Consumer in a separate pipeline to read table data before you start a PostgreSQL CDC Client pipeline.

PostgreSQL CDC Client provides several ways to configure the initial change:
From the latest change
The origin processes all changes that occur after you start the pipeline.
From a specified datetime
The origin processes all changes that occurred at the specified datetime and later. Use the following format: DD-MM-YYYY HH24:MI:SS.
From a specified log sequence number (LSN)
The origin processes all changes that occurred in the specified LSN and later. When using the specified LSN, the origin starts processing with the timestamp associated with the LSN. If the LSN cannot be found in the WAL data, the origin continues reading from the next higher LSN that is available.
Typically, a database admin can provide the LSN to use.

Example

You want to process all existing data in the Sales schema and then capture changed data, writing all data to Google Bigtable. To do this, you create two pipelines.

To read the existing data from the schema, you use a pipeline with the JDBC Multitable Consumer and Google Bigtable destination as follows:

Once all existing data is read, you stop the JDBC Multitable Consumer pipeline and start the following PostgreSQL CDC Client pipeline. This pipeline is configured to pick up changes that occur after you start the pipeline, but if you wanted to prevent any chance of data loss, you could configure the initial change for an exact datetime or earlier LSN:

Generated Record

The PostgreSQL CDC Client treats a single transaction as a single record. It can generate two types of records: one for data changes and one for Data Definition Language (DDL) updates.

Since each data change transaction can include multiple CRUD operations, the PostgreSQL CDC Client origin can also include multiple operations in a record.

As a result, the origin does not write the CRUD operations to the sdc.operation.type record header attribute. Depending on your use case, you might use a scripting processor to convert the records as needed. Or, you might use a Field Pivoter and other processors to separate the data to create a record for each operation.

Every record generated by the PostgreSQL CDC Client origin includes the same basic fields. Records for changes in data include the change data capture details in the change field. Records for DDL changes, such as the insertion of a new column, have no details in the change field.

PostgreSQL CDC Client origin records include the following fields:

Field Name Description
xid Transaction ID.
nextlsn Next Logical Sequence Number (LSN).
timestamp Timestamp with sub-second granularity, including the time zone offset from UTC.
change A list field that includes the following details about each data change:
  • kind - Operation type: Insert, Update, or Delete.
  • schema - Schema name.
  • table - Table name.
  • columnnames - List of columns associated with the change.
  • columntypes - List of data types for the columns.
  • columnvalues - List of new values for the columns.
  • oldkeys - A map field that includes the previous information for the changed fields. Includes the following fields:
    • keynames - List of names for the columns.
    • keytypes - List of data types for the columns.
    • keyvalues - List of values for the columns.

DDL change records include no change details.

Changed Data Sample Record

Below is a sample record for changed data. Notice how this single xid includes six operations:
{
  "xid": 598,
  "nextlsn": "0/16751E8",
  "timestamp": "2018-07-13 13:24:44.152109-07",
  "change": [
    {
      "kind": "update",
      "schema": "public",
      "table": "table1",
      "columnnames": [
        "id",
        "value"
      ],
      "columntypes": [
        "integer",
        "character(33)"
      ],
      "columnvalues": [
        1,
        "a"
      ],
      "oldkeys": {
        "keynames": [
          "value"
        ],
        "keytypes": [
          "character(33)"
        ],
        "keyvalues": [
          "a"
        ]
      }
    },
  {
      "kind": "update",
      "schema": "public",
      "table": "table1",
      "columnnames": [
        "id",
        "value"
      ],
      "columntypes": [
        "integer",
        "character(33)"
      ],
      "columnvalues": [
        2,
        "b                                "
      ],
      "oldkeys": {
        "keynames": [
          "value"
        ],
        "keytypes": [
          "character(33)"
        ],
        "keyvalues": [
          "b                                "
        ]
      }
    },
    {
      "kind": "update",
      "schema": "public",
      "table": "table1",
      "columnnames": [
        "id",
        "value"
      ],
      "columntypes": [
        "integer",
        "character(33)"
      ],
      "columnvalues": [
        3,
        "c                                "
      ],
      "oldkeys": {
        "keynames": [
          "value"
        ],
        "keytypes": [
          "character(33)"
        ],
        "keyvalues": [
          "c                                "
        ]
      }
    },
    {
      "kind": "update",
      "schema": "public",
      "table": "table2",
      "columnnames": [
        "id",
        "name"
      ],
      "columntypes": [
        "integer",
        "character varying(255)"
      ],
      "columnvalues": [
        1,
        "a"
      ],
      "oldkeys": {
        "keynames": [
          "id"
        ],
        "keytypes": [
          "integer"
        ],
        "keyvalues": [
          1
        ]
      }
    },
    {
      "kind": "update",
      "schema": "public",
      "table": "table2",
      "columnnames": [
        "id",
        "name"
      ],
      "columntypes": [
        "integer",
        "character varying(255)"
      ],
      "columnvalues": [
        2,
        "b"
      ],
      "oldkeys": {
        "keynames": [
          "id"
        ],
        "keytypes": [
          "integer"
        ],
        "keyvalues": [
          2
        ]
      }
    },
    {
      "kind": "update",
      "schema": "public",
      "table": "table2",
      "columnnames": [
        "id",
        "name"
      ],
      "columntypes": [
        "integer",
        "character varying(255)"
      ],
      "columnvalues": [
        3,
        "c"
      ],
      "oldkeys": {
        "keynames": [
          "id"
        ],
        "keytypes": [
          "integer"
        ],
        "keyvalues": [
          3
        ]
      }
    }
  ]
}

DDL Sample Record

Below is a sample record for a DDL change. Notice, this record includes no details in the change field:
{
  "xid":600,
  "nextlsn":"0/16794A8",
  "timestamp":"2018-07-17 09:56:08.115141-07",
  "change":[]
}

CDC Header Attributes

The PostgreSQL CDC Client includes PostgreSQL change data capture information in the following record header attributes:
CDC Header Attribute Description
postgres.cdc.lsn Logical Sequence Number of this record.
postgres.cdc.xid Transaction ID.
postgres.cdc.timestamp Timestamp of transaction.

Configuring a PostgreSQL CDC Client

Configure a PostgreSQL CDC Client origin to process WAL change data capture data from a PostgreSQL database.

Before you use the origin, complete the prerequisite task.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the PostgreSQL CDC tab, configure the following properties:
    PostgreSQL CDC Property Description
    Tables Tables to track. Specify related properties as needed.

    Using simple or bulk edit mode, click the Add icon to define another table configuration.

    Schema Schema to use. You can enter a schema name or use a regular expression to specify a set of schemas.
    Table Name Pattern

    A table name pattern that specifies the tables to track. You can enter a table name or use a regular expression to specify a set of tables.

    Exclusion Pattern An optional table exclusion pattern to define a subset of tables to exclude. You can enter a table name or use a regular expression to specify a subset of tables to exclude.
    Initial Change The starting point for the read. Use one of the following options:
    • From Latest Change - Processes changes that arrive after you start the pipeline.
    • From Date - Processes changes starting from the specified date.
    • From LSN - Processes changes starting from the specified log sequence number.
    Start Date The datetime to read from when you start the pipeline.

    Use the following format: DD-MM-YYYY HH24:MI:SS.

    For a date-based initial change only.

    DB Time Zone Time zone of the database. For a date-based initial change only.
    Start LSN The log sequence number to start reading from when you start the pipeline. If the LSN cannot be found, the origin continues reading from the next higher LSN that is available.

    For an LSN-based initial change only.

    Remove Replication Slot on Close Removes the replication slot after each use. Keeping the replication slot results in the continued production of WAL records for this slot, but may cause a degradation in performance.
    Replication Slot Name for the replication slot.

    Default is sdc.

    Operations Operations to include.
    Add Unsupported Fields Includes fields with unsupported data types in the record. When enabled, the origin adds the field name and passes the data as an unparsed string.
    Query Timeout Time to wait before timing out a WAL query and returning the batch.

    Default is ${ 5 * MINUTES } which is 300 seconds.

    Poll Interval Interval between checking for updates.
  3. On the JDBC tab, configure the following JDBC properties:
    JDBC Property Description
    JDBC Connection String

    Connection string to use to connect to the database. Use the following syntax:

    jdbc:postgresql://<host>:<port>/<dbname>
    Note: If you include the JDBC credentials in the connection string, use the user account created for the origin.
    Max Batch Size (records) Maximum number of records processed at one time. Honors values up to the Data Collector maximum batch size.

    Default is 1000. The Data Collector default is 1000.

    Use Credentials Enables entering credentials on the Credentials tab. Use when you do not include credentials in the JDBC connection string.
    Additional JDBC Configuration Properties Additional JDBC configuration properties to use. To add properties, click Add and define the JDBC property name and value.

    Use the property names and values as expected by JDBC.

  4. To enter JDBC credentials separately from the JDBC connection string, on the Credentials tab, configure the following properties:
    Credentials Property Description
    Username User name for the JDBC connection.
    Password Password for the JDBC account.
    Tip: To secure sensitive information such as usernames and passwords, you can use runtime resources or credential stores.
  5. When using JDBC versions older than 4.0, on the Legacy Drivers tab, optionally configure the following properties:
    Legacy Driver Property Description
    JDBC Class Driver Name Class name for the JDBC driver. Required for JDBC versions older than version 4.0.
    Connection Health Test Query Optional query to test the health of a connection. Recommended only when the JDBC version is older than 4.0.
  6. On the Advanced tab, optionally configure advanced options.
    The defaults for these properties should work in most cases:
    Advanced Property Description
    Maximum Pool Size The maximum number of connections to create.

    Default is 1. The recommended value is 1.

    Minimum Idle Connections The minimum number of connections to create and maintain. To define a fixed connection pool, set to the same value as Maximum Pool Size.

    Default is 1.

    Connection Timeout Maximum time to wait for a connection. Use a time constant in an expression to define the time increment.
    Default is 30 seconds, defined as follows:
    ${30 * SECONDS}
    Idle Timeout Maximum time to allow a connection to idle. Use a time constant in an expression to define the time increment.

    Use 0 to avoid removing any idle connections.

    Default is 30 minutes, defined as follows:
    ${30 * MINUTES}
    Max Connection Lifetime Maximum lifetime for a connection. Use a time constant in an expression to define the time increment.

    Use 0 to avoid removing any idle connections.

    Default is 30 seconds, defined as follows:
    ${30 * SECONDS}
    Enforce Read-only Connection Creates read-only connections to avoid any type of write.

    Default is enabled. Disabling this property is not recommended.

    Transaction Isolation Transaction isolation level used to connect to the database.

    Default is the default transaction isolation level set for the database. You can override the database default by setting the level to any of the following:

    • Read committed
    • Read uncommitted
    • Repeatable read
    • Serializable
    Init Query SQL query to perform immediately after the stage connects to the database. Use to set up the database session as needed.