JDBC Query Consumer
Supported pipeline types:
|
The JDBC Query Consumer origin reads database data using a user-defined SQL query through a JDBC connection. The origin returns data as a map with column names and field values.
When you configure the JDBC Query Consumer origin, you define the SQL query that the origin uses to read data from a single table or from a join of tables.
When you configure JDBC Query Consumer, you specify connection information, query interval, and custom JDBC configuration properties to determine how the origin connects to the database. You configure the query mode and SQL query to define the data returned by the database. When in full query mode and reading from certain databases, you can use a stored procedure instead of a SQL query. When the source database has high-precision timestamps, such as IBM Db2 TIMESTAMP(9) fields, you can configure the origin to write strings rather than datetime values to maintain the precision.
You can configure JDBC Query Consumer to perform change data capture for databases that store the information in a table. And you can specify what the origin does when encountering an unsupported data type: convert the data to string or stop the pipeline.
You can specify custom properties that your driver requires. You can configure advanced connection properties. To use a JDBC version older than 4.0, you specify the driver class name and define a health check query.
By default, the origin generates JDBC record header and field attributes that provide additional information about each record and field.
The origin can generate events for an event stream. For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.
Database Vendors and Drivers
The JDBC Query Consumer origin can read database data from multiple database vendors.
Database Vendor | Versions and Drivers |
---|---|
MySQL |
|
Oracle | Oracle 11g and 19c, and Oracle RAC 12c and 19c with the Oracle 19.3.0 JDBC driver |
PostgreSQL |
When connecting to a PostgreSQL database, you do not need to install a JDBC driver. Data Collector includes the JDBC driver required for PostgreSQL. |
Microsoft SQL Server | SQL Server
2017 When connecting to Microsoft SQL Server, you do not need to install a JDBC driver. Data Collector includes the JDBC driver required for SQL Server. |
MySQL Data Types
The JDBC Query Consumer origin converts MySQL data types into Data Collector data types.
MySQL Data Type | Data Collector Data Type |
---|---|
Bigint | Long |
Bigint Unsigned | Decimal |
Binary | Byte Array |
Blob | Byte Array |
Char | String |
Date | Date |
Datetime | Datetime |
Decimal | Decimal |
Double | Double |
Enum | String |
Float | Float |
Int | Integer |
Int Unsigned | Long |
Json | String |
Linestring | Byte Array |
Medium Int | Integer |
Medium Int Unsigned | Long |
Numeric | Decimal |
Point | Byte Array |
Polygon | Byte Array |
Set | String |
Smallint | Short |
Smallint Unsigned | Integer |
Text | String |
Time | Time |
Timestamp | Datetime |
Tinyint, Tinyint Unsigned | Short |
Varbinary | Byte Array |
Varchar | String |
Year | Date |
Oracle Data Types
The JDBC Query Consumer origin converts Oracle data types into Data Collector data types.
Oracle Data Type | Data Collector Data Type |
---|---|
Number | Decimal |
Char | String |
Varchar, Varchar2 | String |
Nchar, NvarChar2 | String |
Binary_float | Float |
Binary_double | Double |
Date | Datetime |
Timestamp | Datetime |
Timestamp with time zone | Zoned_datetime |
Timestamp with local time zone | Zoned_datetime |
Long | String |
Blob | Byte_array |
Clob | String |
Nclob | String |
XMLType | String |
PostgreSQL Data Types
The JDBC Query Consumer origin converts PostgreSQL data types into Data Collector data types.
PostgreSQL Data Type | Data Collector Data Type |
---|---|
Array | String |
Bigint | Long |
Bit Varying | String |
Boolean | Boolean |
Box | String |
Bytea | Byte Array |
Char | String |
Cidr | String |
Circle | String |
Composite Type | String |
Date | Date |
Daterange | String |
Decimal | Decimal |
Double Precision | Double |
Enum | String |
Inet | String |
Int4range, Int8range | String |
Integer | Integer |
Interval | String |
Json, Jsonb | String |
Line | String |
Lseg | String |
Macaddr | String |
Money | Double |
Numeric | Decimal |
Numrange | String |
Path | String |
Point | String |
Polygon | String |
Real | Float |
Smallint | Short |
Text | String |
Time, Time with Time Zone | Time |
Timestamp, Timestamp with Time Zone | Time |
Tsrange | String |
Txtzrange | String |
Uuid | String |
Varchar | String |
SQL Server Data Types
The JDBC Query Consumer origin converts SQL Server data types into Data Collector data types.
SQL Server Data Type | Data Collector Data Type |
---|---|
Bigint | Long |
Binary | Byte_Array |
Bit | Boolean |
Char | String |
Date | Date |
Datetime, Datetime2 | Datetime |
Datetimeoffset | Zoned_datetime |
Decimal | Decimal |
Float | Double |
Image | Byte_Array |
Int | Integer |
Money | Decimal |
Nchar | String |
Ntext | String |
Numeric | Decimal |
Nvarchar | String |
Real | Float |
Smalldatetime | Datetime |
Smallint | Short |
Smallmoney | Decimal |
Text | String |
Time | Time |
Tinyint | Short |
Varbinary | Byte_Array |
Varchar | String |
XML | String |
- Geography
- Geometry
Installing the JDBC Driver
For information about installing additional drivers, see Install External Libraries.
Offset Column and Offset Value
JDBC Query Consumer uses an offset column and initial offset value to determine where to start reading data within a table. Include both the offset column and the offset value in the WHERE clause of the SQL query.
The offset column must be a column in the table with unique non-null values, such as a primary key or indexed column. The initial offset value is a value within the offset column where you want the origin to start reading.
When the origin performs an incremental query, you must configure the offset column and offset value. For full queries, you can optionally configure them.
Full and Incremental Mode
JDBC Query Consumer can perform queries in two modes:
- Incremental mode
- When the JDBC Query Consumer performs an incremental query, it uses the initial offset as the offset value in the first SQL query. As the origin completes processing the results of the first query, it saves the last offset value that it processes. Then it waits the specified query interval before performing a subsequent query.
- Full mode
- When the JDBC Query Consumer origin performs a full query, it runs the specified SQL query. If you optionally configure the offset column and initial offset value, the origin uses the initial offset as the offset value in the SQL query each time it requests data.
Recovery
JDBC Query Consumer supports recovery after a deliberate or unexpected stop when it performs incremental queries. Recovery is not supported for full queries.
In incremental mode, the origin uses offset values in the offset column to determine where to continue processing after a deliberate or unexpected stop. To ensure seamless recovery in incremental mode, use a primary key or indexed column as the offset column. As JDBC Query Consumer processes data, it tracks the offset value internally. When the pipeline stops, the origin notes where it stopped processing data. When you restart the pipeline, it continues from the last-saved offset.
When JDBC Query Consumer performs full queries, the origin runs the full query again after you restart the pipeline.
SQL Query
The SQL query defines the data returned from the database. You define the query in the SQL Query property on the JDBC tab.
You can also define the query in a runtime
resource, and then use the runtime:loadResource
function in
the SQL Query property to load the query from the resource file at runtime. For example,
you might enter the following expression for the property:
${runtime:loadResource("myquery.sql", false)}
When running the origin in full query mode and reading from certain databases, you can define a stored procedure, then call the stored procedure using the SQL Query property.
SQL Query for Incremental Mode
When you define the SQL query for incremental mode, JDBC Query Consumer requires a WHERE and ORDER BY clause in the query.
Use the following guidelines when you define the WHERE and ORDER BY clauses in the query:
- In the WHERE clause, include the offset column and the offset value
- The origin uses an offset column and value to determine the data that is returned. Include both in the WHERE clause of the query.
- Use the OFFSET constant to represent the offset value
- In the WHERE clause, use ${OFFSET} to represent the offset value.
- In the ORDER BY clause, include the offset column as the first column
- To avoid returning duplicate data, use the offset column as the first column in the ORDER BY clause.
SELECT * FROM invoice WHERE id > ${OFFSET} ORDER BY id
SQL Query for Full Mode
You can define any type of SQL query for full mode.
SELECT * FROM invoice
When you define the SQL query for full mode, you can optionally include the WHERE and ORDER BY clauses using the same guidelines as for incremental mode. However, using these clauses to read from large tables can cause performance issues.
Stored Procedure in Full Mode
When reading from certain databases, you can call a stored procedure from the JDBC Query Consumer origin. Currently, you can use stored procedures with MySQL, PostgreSQL, and SQL Server databases.
You can call a stored procedure when using JDBC Query Consumer in full mode. Using stored procedures in other modes is not supported.
- In your database, define the stored procedure.
- In the origin, on the JDBC tab, configure the SQL Query property to call the stored procedure. Use the appropriate syntax for your database.
- Also on the JDBC tab, clear the Incremental Mode property, which is selected by default.
- Test the pipeline to ensure that the procedure performs as expected.
Examples
- MySQL database
- To read all data from a MySQL table, you might create a stored procedure
as
follows:
CREATE PROCEDURE <procedure_name>() BEGIN SELECT * FROM <table_name>; END;
- PostgreSQL database
- To read all data from a PostgreSQL table, you might create a stored
procedure as
follows:
create or replace function <procedure_name>() returns table (id int) language plpgsql as $$ begin return query select * from <table_name>; end;$$
- SQL Server database
- To read all data from a SQL Server table, you might create a stored
procedure as
follows:
CREATE PROCEDURE <procedure_name> AS SELECT * FROM <table_name> RETURN
JDBC Attributes
The JDBC Query Consumer origin generates record header attributes and field attributes that provide additional information about each record and field.
The origin receives these details from the JDBC driver.
JDBC Header Attributes
By default, the JDBC Query Consumer generates JDBC record header attributes that provide additional information about each record, such as the original data type of a field or the source tables for the record. The origin receives these details from the JDBC driver.
You can use the record:attribute
or
record:attributeOrDefault
functions to access the information
in the attributes. For more information about working with record header attributes,
see Working with Header Attributes.
JDBC header attributes include a user-defined prefix to
differentiate the JDBC header attributes from other record header attributes. By
default, the prefix is jdbc
.
You can change the prefix that the origin uses and you can configure the origin not to create JDBC header attributes using the Create JDBC Header Attributes and JDBC Header Prefix properties on the Advanced tab.
JDBC Header Attribute | Description |
---|---|
<JDBC prefix>.tables |
Provides a
comma-separated list of source tables for the fields in the
record.
Note: Not all JDBC drivers provide this
information.
Oracle uses all caps for schema, table, and column names by default. Names can be lower- or mixed-case only if the schema, table, or column was created with quotation marks around the name. |
<JDBC prefix>.<column name>.jdbcType | Provides the numeric value of the original SQL data type for each field in the record. See the Java documentation for a list of the data types that correspond to numeric values. |
<JDBC prefix>.<column name>.precision | Provides the original precision for all numeric and decimal fields. |
<JDBC prefix>.<column name>.scale | Provides the original scale for all numeric and decimal fields. |
Header Attributes with the Drift Synchronization Solution
When you use the JDBC Query Consumer with the Drift Synchronization Solution, ensure that the origin creates JDBC header attributes.
JDBC header attributes allow the Hive Metadata processor to use the precision and scale information in the attributes to define decimal fields.
- In the JDBC Query Consumer, on the Advanced tab, make sure that Create JDBC Header Attributes is selected.
- On the same tab, you can optionally configure JDBC Header Prefix.
- In the Hive Metadata processor, if necessary, configure the Decimal
Precision Expression and Decimal Scale
Expression properties on the Hive tab.
If you changed the default value for JDBC Header Prefix in the JDBC Query Consumer, then update the "jdbc." string in the expressions to use the correct JDBC Header Prefix.
If you did not change the JDBC Header Prefix default value, then use the default expressions for the properties.
JDBC Field Attributes
The JDBC Query Consumer origin generates field attributes for columns converted to the Decimal or Datetime data types in Data Collector. The attributes provide additional information about each field.
- Database Decimal and Numeric data types are converted to the Data Collector Decimal data type, which does not store scale and precision.
- The database Timestamp data type is converted to the Data Collector Datetime data type, which does not store nanoseconds.
Data Collector Data Type | Generated Field Attribute | Description |
---|---|---|
Decimal | precision | Provides the original precision for every decimal or numeric column. |
Decimal | scale | Provides the original scale for every decimal or numeric column. |
Datetime | nanoSeconds | Provides the original nanoseconds for every timestamp column. |
You can use the record:fieldAttribute
or
record:fieldAttributeOrDefault
functions to access the information
in the attributes. For more information about working with field attributes, see Field Attributes.
CDC for Microsoft SQL Server
You can use the JDBC Query Consumer to process change capture data from Microsoft SQL Server.
- In the JDBC Query Consumer origin, on the JDBC tab, make sure Incremental Mode is enabled.
- Configure the Offset Column property to use
__$start_lsn.
Microsoft SQL Server uses _$start_lsn as the offset column in change data capture tables.
- Configure the Initial Offset property.
This determines where the origin starts the read when you start the pipeline. To read all available data, set it to 0.
- Configure the SQL Query property:
- In the SELECT statement, use the CDC table name.
- In the WHERE clause, use __$start_lsn as the offset column, and since __$start_lsn stores the offset in binary format, add a command to convert the integer offset to Binary(10).
- In the ORDER BY clause, use __$start_lsn as the offset column and optionally specify reading in ascending or descending order. By default, the origin reads in ascending order.
The following query summarizes these points:SELECT * from <CDC table name> WHERE __$start_lsn > CAST(0x${OFFSET} as binary(10)) ORDER BY __$start_lsn <ASC | DESC>
- If you want to group row updates from the same transaction, configure the
properties on the Change Data Capture tab:
- For the Transaction ID Column Name use __$start_lsn. The __$start_lsn column includes transaction information in the offset.
- Set the Max Transaction Size. This property overrides the Data Collector maximum batch size. For more information about both of these properties, see Group Rows by Transaction.
CRUD Record Header Attribute
When reading change capture data from Microsoft SQL Server, the JDBC Query Consumer origin includes the CRUD operation type in the sdc.operation.type record header attribute.
If you use a CRUD-enabled destination in the pipeline such as
JDBC Producer or Elasticsearch, the destination can use the operation type when
writing to destination systems. When necessary, you can use an Expression
Evaluator or scripting processors to manipulate the value in the
sdc.operation.type
header attribute. For an overview of Data Collector changed data
processing and a list of CRUD-enabled destinations, see Processing Changed Data.
- 1 for INSERT
- 2 for DELETE
- 3 for UPDATE
- 5 for unsupported codes
Group Rows by Transaction
When reading from Microsoft SQL Server, JDBC Query Consumer can group row updates from the same transaction when reading from a change log table. This maintains consistency when performing change data capture.
To enable this feature, specify the transaction ID column and maximum transaction size. When these properties are defined, JDBC Query Consumer processes data as a batch up to the maximum transaction size, overriding the Data Collector maximum batch size.
When the transaction is larger than the maximum transaction size, JDBC Query Consumer uses multiple batches as needed.
To preserve transactional integrity, increase the maximum transaction size as necessary. Note that setting this property too high can cause out of memory errors.
Event Generation
The JDBC Query Consumer origin can generate events that you can use in an event stream. When you enable event generation, the origin generates an event when it completes processing the data returned by the specified query. The origin also generates an event when a query completes successfully and when it fails to complete.
- With the Pipeline Finisher executor to stop the pipeline and
transition the pipeline to a Finished state when the origin completes processing
available data.
When you restart a pipeline stopped by the Pipeline Finisher executor, the origin processes data based on how you configured the origin. For example, if you configure the origin to run in incremental mode, the origin saves the offset when the executor stops the pipeline. When it restarts, the origin continues processing from the last-saved offset. In contrast, if you configure the origin to run in full mode, when you restart the pipeline, the origin uses the initial offset, if specified.
For an example, see Stopping a Pipeline After Processing All Available Data.
- With the Email executor to send a custom email
after receiving an event.
For an example, see Sending Email During Pipeline Processing.
-
With a destination to store information about completed queries.
For an example, see Preserving an Audit Trail of Events.
For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.
Event Record
Record Header Attribute | Description |
---|---|
sdc.event.type | Event type. Uses one of the following types:
|
sdc.event.version | Integer that indicates the version of the event record type. |
sdc.event.creation_timestamp | Epoch timestamp when the stage created the event. |
- No-more-data
- The origin generates a no-more-data event record when it completes processing all data returned by a query.
- Query success
- The origin generates a query success event record when it completes processing the data returned from a query.
- Query failure
- The origin generates a query failure event record when it fails to complete processing the data returned from a query.
Configuring a JDBC Query Consumer
Configure a JDBC Query Consumer origin to use a single configured SQL query to read database data through a JDBC connection.