JDBC Query Consumer

The JDBC Query Consumer origin reads database data using a user-defined SQL query through a JDBC connection. The origin returns data as a map with column names and field values.

When you configure the origin, you define the SQL query that the origin uses to read data from a single table or from a join of tables.

Note: To read from multiple tables in the same database, use the JDBC Multitable Consumer origin. The origin generates SQL queries based on the table configurations that you define. You might want to use the JDBC Multitable Consumer origin for database replication.

When you configure JDBC Query Consumer, you specify connection information, query interval, and custom JDBC configuration properties to determine how the origin connects to the database. You configure the query mode and SQL query to define the data returned by the database. You can also call stored procedures from the SQL query.

You can configure the JDBC Query Consumer to perform change data capture for databases that store the information in a table. You can also configure custom properties that your driver requires.

To use a JDBC version older than 4.0, you can specify the driver class name and define a health check query.

By default, the origin creates JDBC header attributes to provide information about source data in record headers.

When reading changed data from Microsoft SQL Server, the origin includes the CRUD operation type in a record header attribute so generated records can be easily processed by CRUD-enabled destinations. For an overview of Data Collector changed data processing and a list of CRUD-enabled destinations, see Processing Changed Data.

The origin can also generate events for an event stream. For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.

Installing the JDBC Driver

Before you use the JDBC Query Consumer, install the JDBC driver for the database. You cannot access the database until you install the required driver.

For more information about installing drivers, see Install External Libraries.

Offset Column and Offset Value

JDBC Query Consumer uses an offset column and initial offset value to determine where to start reading data within a table. Include both the offset column and the offset value in the WHERE clause of the SQL query.

Ideally, the offset column is a column in the table with unique values, such as a primary key or indexed column. The initial offset value is a value within the offset column where you want JDBC Query Consumer to start reading.

When the origin performs an incremental query, you must configure the offset column and offset value. For full queries, you can optionally configure them.

Full and Incremental Mode

JDBC Query Consumer can perform queries in two modes:

Incremental mode
When the JDBC Query Consumer performs an incremental query, it uses the initial offset as the offset value in the first SQL query. As the origin completes processing the results of the first query, it saves the last offset value that it processes. Then it waits the specified query interval before performing a subsequent query.
When the origin performs a subsequent query, it returns data based on the last-saved offset. You can reset the origin to use the initial offset value.
Use incremental mode for append-only tables or when you do not need to capture changes to older rows. By default, JDBC Query Consumer uses incremental mode.
Full mode
When the JDBC Query Consumer origin performs a full query, it runs the specified SQL query. If you optionally configure the offset column and initial offset value, the origin uses the initial offset as the offset value in the SQL query each time it requests data.
When the origin completes processing the results of the full query, it waits the specified query interval, and then performs the same query again.
Use full mode to capture all row updates. You might use a Record Deduplicator in the pipeline to minimize repeated rows. Not ideal for large tables.
Tip: If you want to process the results from a single full query and then stop the pipeline, you can enable the origin to generate events and use the Pipeline Finisher to stop the pipeline automatically. For more information, see Event Generation.

Recovery

JDBC Query Consumer supports recovery after a deliberate or unexpected stop when it performs incremental queries. Recovery is not supported for full queries.

In incremental mode, JDBC Query Consumer uses offset values in the offset column to determine where to continue processing after a deliberate or unexpected stop. To ensure seamless recovery in incremental mode, use a primary key or indexed column as the offset column. As JDBC Query Consumer processes data, it tracks the offset value internally. When the pipeline stops, JDBC Query Consumer notes where it stopped processing data. When you restart the pipeline, it continues from the last-saved offset.

When JDBC Query Consumer performs full queries, the origin runs the full query again after you restart the pipeline.

SQL Query

The SQL query defines the data returned from the database. The SQL query guidelines depend on whether you configure the origin to perform an incremental or full query.

SQL Query for Incremental Mode

When you define the SQL query for incremental mode, JDBC Query Consumer requires a WHERE and ORDER BY clause in the query.

Use the following guidelines when you define the WHERE and ORDER BY clauses in the query:

In the WHERE clause, include the offset column and the offset value
The origin uses an offset column and value to determine the data that is returned. Include both in the WHERE clause of the query.
Use the OFFSET constant to represent the offset value
In the WHERE clause, use ${OFFSET} to represent the offset value.
For example, when you start a pipeline, the following query returns all data from the table where the data in the offset column is greater than the initial offset value:
SELECT * FROM <tablename> WHERE <offset column> > ${OFFSET}
Tip: When the offset values are strings, enclose ${OFFSET} in single quotation marks.
In the ORDER BY clause, include the offset column as the first column
To avoid returning duplicate data, use the offset column as the first column in the ORDER BY clause.
Note: Using column that is not a primary key or indexed column in the ORDER BY clause can slow performance.
For example, the following query for incremental mode returns data from an Invoice table where the ID column is the offset column. The query returns all data where the ID is greater than the offset and orders the data by the ID:
 SELECT * FROM invoice WHERE id > ${OFFSET} ORDER BY id

SQL Query for Full Mode

You can define any type of SQL query for full mode.

For example, you can run the following query to return all data from an Invoice table:
SELECT * FROM invoice

When you define the SQL query for full mode, you can optionally include the WHERE and ORDER BY clauses using the same guidelines as for incremental mode. However, using these clauses to read from large tables can cause performance issues.

Stored Procedures

You can use stored procedures with the JDBC Query Consumer origin.

You can call stored procedures from the SQL query when using JDBC Query Consumer in full mode. Do not call stored procedures when using the origin in incremental mode.

JDBC Record Header Attributes

The JDBC Query Consumer generates JDBC record header attributes that provide additional information about each record, such as the original data type of a field or the source tables for the record. The origin receives these details from the JDBC driver.

You can use the record:attribute or record:attributeOrDefault functions to access the information in the attributes.

JDBC header attributes include a user-defined prefix to differentiate the JDBC header attributes from other record header attributes. By default, the prefix is "jdbc". You can change the prefix that the origin uses and you can configure the origin not to create JDBC header attributes.

The origin can provide the following JDBC header attributes:
JDBC Header Attribute Description
<JDBC prefix>.tables Provides a comma-separated list of source tables for the fields in the record.
Note: Not all JDBC drivers provide this information.
<JDBC prefix>.<column name>.jdbcType Provides the original SQL data type for each field in the record.
<JDBC prefix>.<column name>.precision Provides the original precision for all numeric and decimal fields.
<JDBC prefix>.<column name>.scale Provides the original scale for all numeric and decimal fields.

Header Attributes with the Drift Synchronization Solution

When you use the JDBC Query Consumer with the Drift Synchronization Solution, ensure that the origin creates JDBC header attributes.

JDBC header attributes allow the Hive Metadata processor to use the precision and scale information in the attributes to define decimal fields.

To enable the Hive Metadata processor to define decimal fields as needed, perform the following steps:
  1. In the JDBC Query Consumer, on the Advanced tab, make sure that Create JDBC Header Attributes is selected.
  2. On the same tab, you can optionally configure JDBC Header Prefix.
  3. In the Hive Metadata processor, if necessary, configure the Decimal Precision Expression and Decimal Scale Expression properties on the Hive tab.

    If you changed the default value for JDBC Header Prefix in the JDBC Query Consumer, then update the "jdbc." string in the expressions to use the correct JDBC Header Prefix.

    If you did not change the JDBC Header Prefix default value, then use the default expressions for the properties.

CDC for Microsoft SQL Server

You can use the JDBC Query Consumer to process change capture data from Microsoft SQL Server.

To process Microsoft SQL Server changed capture data, perform the following tasks:
  1. In the JDBC Query Consumer origin, on the JDBC tab, make sure Incremental Mode is enabled.
  2. Configure the Offset Column property to use __$start_lsn.

    Microsoft SQL Server uses _$start_lsn as the offset column in change data capture tables.

  3. Configure the Initial Offset property.

    This determines where the origin starts the read when you start the pipeline. To read all available data, set it to 0.

  4. Configure the SQL Query property:
    • In the SELECT statement, use the CDC table name.
    • In the WHERE clause, use __$start_lsn as the offset column, and since __$start_lsn stores the offset in binary format, add a command to convert the integer offset to Binary(10).
    • In the ORDER BY clause, use __$start_lsn as the offset column and optionally specify reading in ascending or descending order. By default, the origin reads in ascending order.
    The following query summarizes these points:
    SELECT * from <CDC table name>
    WHERE __$start_lsn > CAST(0x${OFFSET} as binary(10))
    ORDER BY __$start_lsn <ASC | DESC>
  5. If you want to group row updates from the same transaction, configure the properties on the Change Data Capture tab:
    • For the Transaction ID Column Name use __$start_lsn. The __$start_lsn column includes transaction information in the offset.
    • Set the Max Transaction Size. This property overrides the Data Collector maximum batch size. For more information about both of these properties, see Group Rows by Transaction.

CRUD Record Header Attribute

When reading change capture data from Microsoft SQL Server, the JDBC Query Consumer origin includes the CRUD operation type in the sdc.operation.type record header attribute.

If you use a CRUD-enabled destination in the pipeline such as JDBC Producer or Kudu, the destination can use the operation type when writing to destination systems. When necessary, you can use an Expression Evaluator or scripting processors to manipulate the value in the sdc.operation.type header attribute. For an overview of Data Collector changed data processing and a list of CRUD-enabled destinations, see Processing Changed Data.

The origin uses the following values in the sdc.operation.type record header attribute to represent the operation type:
  • 1 for INSERT
  • 2 for DELETE
  • 3 for UPDATE
  • 5 for unsupported codes

Group Rows by Transaction

When reading from Microsoft SQL Server, JDBC Query Consumer can group row updates from the same transaction when reading from a change log table. This maintains consistency when performing change data capture.

To enable this feature, specify the transaction ID column and maximum transaction size. When these properties are defined, JDBC Query Consumer processes data as a batch up to the maximum transaction size, overriding the Data Collector maximum batch size.

When the transaction is larger than the maximum transaction size, JDBC Query Consumer uses multiple batches as needed.

To preserve transactional integrity, increase the maximum transaction size as necessary. Note that setting this property too high can cause out of memory errors.

Event Generation

The JDBC Query Consumer origin can generate events that you can use in an event stream. When you enable event generation, the origin generates an event when it completes processing the data returned by the specified query. The origin also generates an event when a query completes successfully and when it fails to complete.

JDBC Query Consumer events can be used in any logical way. For example:
  • With the Pipeline Finisher executor to stop the pipeline and transition the pipeline to a Finished state when the origin completes processing available data.

    When you restart a pipeline stopped by the Pipeline Finisher executor, the origin processes data based on how you configured the origin. For example, if you configure the origin to run in incremental mode, the origin saves the offset when the executor stops the pipeline. When it restarts, the origin continues processing from the last-saved offset. In contrast, if you configure the origin to run in full mode, when you restart the pipeline, the origin uses the initial offset.

    For an example, see Case Study: Stop the Pipeline.

  • With the Email executor to send a custom email after receiving an event.

    For an example, see Case Study: Sending Email.

For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.

Event Record

Event records generated by JDBC Query Consumer origin have the following event-related record header attributes:
Record Header Attribute Description
sdc.event.type Event type. Uses one of the following types:
  • no-more-data - Generated when the origin completes processing all data returned by a query.
  • jdbc-query-success - Generated when the origin successfully completes a query.
  • jdbc-query-failure - Generated when the origin fails to complete a query.
sdc.event.version An integer that indicates the version of the event record type.
sdc.event.creation_timestamp Epoch timestamp when the stage created the event.
The destination can generate the following types of event records:
No-more-data
The destination generates a no-more-data event record when it completes processing all data returned by a query.
The no-more-data event records have the sdc.event.type record header attribute set to no-more-data and include no additional fields.
Query success
The destination generates a query success event record when it completes processing the data returned from a query.
The query success event records have the sdc.event.type record header attribute set to jdbc-query-success and include the following fields:
Field Description
query Query that completed successfully.
timestamp Timestamp when the query completed.
row-count Number of processed rows.
source-offset Offset after the query completed.
Query failure
The destination generates a query failure event record when it fails to complete processing the data returned from a query.
The query failure event records have the sdc.event.type record header attribute set to jdbc-query-failure and include the following fields:
Field Description
query Query that failed to complete.
timestamp Timestamp when the query failed to complete.
row-count Number of records from the query that were processed.
source-offset Origin offset after query failure.
error First error message.

Configuring a JDBC Query Consumer

Configure a JDBC Query Consumer origin to use a single configured SQL query to read database data through a JDBC connection.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Produce Events Generates event records when events occur. Use for event handling.
    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the JDBC tab, configure the following properties:
    JDBC Property Description
    JDBC Connection String Connection string to use to connect to the database.

    Some databases, such as Postgres, require the schema in the connection string. Use the connection string format required by the database.

    Incremental Mode Defines how JDBC Query Consumer queries the database. Select to perform incremental queries. Clear to perform full queries.

    To process CDC data from Microsoft SQL Server, select this option. For more information about CDC for Microsoft SQL Server, see CDC for Microsoft SQL Server.

    Default is incremental mode.

    SQL Query SQL query to use when reading data from the database.
    Initial Offset Offset value to use when the pipeline starts.

    Required in incremental mode.

    Offset Column Column to use for the offset value.

    Required in incremental mode.

    Query Interval Amount of time to wait between queries. Enter an expression based on a unit of time. You can use SECONDS, MINUTES, or HOURS.

    Default is 10 seconds: ${10 * SECONDS}.

    Use Credentials Enables entering credentials on the Credentials tab. Use when you do not include credentials in the JDBC connection string.
    Root Field Type Root field type to use for generated records. Use the default List-Map option unless using the origin in a pipeline built with Data Collector version 1.1.0 or earlier.
    Max Batch Size (records) Maximum number of records to include in a batch.
    Max Clob Size (characters) Maximum number of characters to be read in a Clob field. Larger data is truncated.
    Max Blob Size (bytes)

    Maximum number of bytes to be read in a Blob field.

    Number of Retries on SQL Error Number of times the origin tries to execute the query after receiving an SQL error. After retrying this number of times, the origin handles the error based on the error handling configured for the origin.

    Use to handle transient network or connection issues that prevent the origin from submitting a query.

    Default is 0.

    Additional JDBC Configuration Properties Additional JDBC configuration properties to use. To add properties, click Add and define the JDBC property name and value.

    Use the property names and values as expected by JDBC.

  3. To enter JDBC credentials separately from the JDBC connection string, on the Credentials tab, configure the following properties:
    Credentials Property Description
    Username User name for the JDBC connection.
    Password Password for the JDBC account.
    Tip: To secure sensitive information such as usernames and passwords, you can use runtime resources or Hashicorp Vault secrets. For more information, see Using Runtime Resources or Accessing Hashicorp Vault Secrets.
  4. To process change capture data from Microsoft SQL Server, on the Change Data Capture tab, optionally configure the following properties to group rows by transaction:
    Change Data Capture Property Description
    Transaction ID Column Name Transaction ID column name, typically __$start_lsn.
    Max Transaction Size (rows) Maximum number of rows to include in a batch.

    Overrides the Data Collector maximum batch size.

  5. When using JDBC versions older than 4.0, on the Legacy Drivers tab, optionally configure the following properties:
    Legacy Driver Property Description
    JDBC Class Driver Name Class name for the JDBC driver. Required for JDBC versions older than version 4.0.
    Connection Health Test Query Optional query to test the health of a connection. Recommended only when the JDBC version is older than 4.0.
  6. On the Advanced tab, optionally configure advanced properties.
    The defaults for these properties should work in most cases:
    Advanced Property Description
    Maximum Pool Size The maximum number of connections to create.

    Default is 1. The recommended value is 1.

    Minimum Idle Connections The minimum number of connections to create and maintain. To define a fixed connection pool, set to the same value as Maximum Pool Size.

    Default is 1.

    Connection Timeout Maximum time to wait for a connection. Use a time constant in an expression to define the time increment.
    Default is 30 seconds, defined as follows:
    ${30 * SECONDS}
    Idle Timeout Maximum time to allow a connection to idle. Use a time constant in an expression to define the time increment.

    Use 0 to avoid removing any idle connections.

    Default is 30 minutes, defined as follows:
    ${30 * MINUTES}
    Max Connection Lifetime Maximum lifetime for a connection. Use a time constant in an expression to define the time increment.

    Use 0 to avoid removing any idle connections.

    Default is 30 seconds, defined as follows:
    ${30 * SECONDS}
    Auto Commit Determines if auto-commit mode is enabled. In auto-commit mode, the database commits the data for each record.

    Default is disabled.

    Enforce Read-only Connection Creates read-only connections to avoid any type of write.

    Default is enabled. Disabling this property is not recommended.

    Transaction Isolation Transaction isolation level used to connect to the database.

    Default is the default transaction isolation level set for the database. You can override the database default by setting the level to any of the following:

    • Read committed
    • Read uncommitted
    • Repeatable read
    • Serializable
    Create JDBC Header Attributes Adds JDBC header attributes to records. The origin creates JDBC header attributes by default.
    Note: When using the origin with the Drift Synchronization Solution, make sure this property is selected.
    JDBC Header Prefix Prefix for JDBC header attributes.
    Disable Query Validation Disables the query validation that occurs by default. Use to avoid time consuming query validation situations, like when querying Hive, which requires using a MapReduce job to perform the validation.
    Warning: Query validation prevents running a pipeline with invalid queries. Use this option with care.