JDBC Multitable Consumer

The JDBC Multitable Consumer origin reads database data from multiple tables through a JDBC connection. The origin returns data as a map with column names and field values.

When you configure the origin, you define groups of database tables to read. The origin generates SQL queries based on the table configurations that you define. The origin uses multiple threads to enable parallel processing of data. Use the JDBC Multitable Consumer origin to read multiple tables in the same database. For example, you might want to use the origin for database replication.

Note: To configure your own SQL query to read database data from a single table or from a join of tables, use the JDBC Query Consumer origin.

When you configure JDBC Multitable Consumer, you specify connection information, a query interval, and custom JDBC configuration properties to determine how the origin connects to the database.

You define a table configuration for each group of tables that you want to read. You define the number of threads that the origin uses to read from the tables and the strategy that the origin uses to create each batch of data. You also define the initial order that the origin uses to read the tables.

When the pipeline stops, JDBC Multitable Consumer notes where it stops reading. When the pipeline starts again, JDBC Multitable Consumer continues processing from where it stopped by default. You can reset the origin to process all available data.

To use a JDBC version older than 4.0, you can specify the driver class name and define a health check query.

The origin can also generate events for an event stream. For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.

Installing the JDBC Driver

Before you use the JDBC Multitable Consumer, install the JDBC driver for the database. You cannot access the database until you install the required driver.

For more information about installing drivers, see Install External Libraries.

Working with a MySQL JDBC Driver

Note the following issues that can occur when using a MySQL JDBC driver with the JDBC Multitable Consumer origin:
The driver returns time values to the second.

Due to a MySQL JDBC driver issue, the driver cannot return time values to the millisecond. Instead, the driver returns the values to the second. For example, if a column has a value of 20:12:50.581, the driver reads the value as 20:12:50.000.

The origin might not read new rows created in MySQL while the pipeline is running.
When using the default transaction isolation level, the origin might not read new rows that are created in MySQL as the pipeline is running. To resolve this issue, configure the origin to use the read committed transaction isolation level in the Advanced tab.

Multithreaded Processing

The JDBC Multitable Consumer origin performs parallel processing and enables the creation of a multithreaded pipeline.

When you start the pipeline, the JDBC Multitable Consumer origin retrieves the list of tables defined in the table configuration. The origin then uses multiple concurrent threads based on the Number of Threads property. Each thread reads data from a single table.
Note: The Maximum Pool Size property on the Advanced tab defines the maximum number of connections the origin can make to the database. It must be equal to or greater than the value defined for the Number of Threads property.

As the pipeline runs, each thread connects to the origin system and creates a batch of data, and passes the batch to an available pipeline runner. A pipeline runner is a sourceless pipeline instance - an instance of the pipeline that includes all of the processors and destinations in the pipeline and performs all pipeline processing after the origin.

Each pipeline runner processes one batch at a time, just like a pipeline that runs on a single thread. When the flow of data slows, the pipeline runners wait idly until they are needed.

Multithreaded pipelines preserve the order of records within each batch, just like a single-threaded pipeline. But since batches are processed by different pipeline instances, the order that batches are written to destinations is not ensured.

For more information about multithreaded pipelines, see Multithreaded Pipeline Overview.

Example

Say you are reading from ten tables. You set the Number of Threads property to 5 and the Maximum Pool Size property to 6. When you start the pipeline, the origin retrieves the list of tables. The origin then creates five threads to read from the first five tables, and by default Data Collector creates a matching number of pipeline runners. Upon receiving data, a thread passes a batch to each of the pipeline runners for processing.

At any given moment, the five pipeline runners can each process a batch, so this multithreaded pipeline processes up to five batches at a time. When incoming data slows, the pipeline runners sit idle, available for use as soon as the data flow increases.

Batch Strategy

Each origin thread creates a batch of data from a single table. You can define one of the following strategies that the threads use to create each batch:

Process All Available Rows from the Table
Each thread creates multiple batches of data from one table, until all available rows are read from that table. The thread runs one SQL query for all batches created from the table. Then, the thread switches to the next available table, running another SQL query to read all available rows from that table.
For example, let's say that the batch size for the origin is set to 100. The origin is configured to use two concurrent threads and to read from four tables, each of which contains 1,000 rows. The first thread runs one SQL query to create 10 batches of 100 rows each from table1, while the second thread uses the same strategy to read data from table2. When table1 and table2 are fully read, the threads switch to table3 and table4 and complete the same process. When the first thread finishes reading from table3, the thread switches back to the next available table to read all available data from the last saved offset.
Switch Tables
Each thread creates a batch of data from one table, and then switches to the next available table to create the next batch. The thread runs an initial SQL query to create the first batch from the table. The database caches the remaining rows in a result set in the database for the same thread to access again, and then the thread switches to the next available table. A table is available in the following situations:
  • The table does not have an open result set cache. In this case, the thread runs an initial SQL query to create the first batch, caching the remaining rows in a result set in the database.
  • The table has an open result set cache created by that same thread. In this case, the thread creates the batch from the result set cache in the database rather than running another SQL query.
A table is not available when the table has an open result set cache created by another thread. No other thread can read from that table until the result set is closed.
When you configure a switch table strategy, define the result set cache size and the number of batches that a thread can create from the result set. After a thread creates the configured number of batches, the database closes the result set and then a different thread can read from the table.
Note: By default, the origin instructs the database to cache an unlimited number of result sets. A thread can create an unlimited number of batches from that result set.
For example, let's say that the batch size for the origin is set to 100. The origin is configured to use two concurrent threads and to read from four tables, each of which contains 10,000 rows. You set the result set cache size to 500 and set the number of batches read from the result set to 5.
Thread1 runs an SQL query on table1, which returns all 10,000 rows. The thread creates a batch when it reads the first 100 rows. The next 400 rows are cached as a result set in the database. Since thread2 is similarly processing table2, thread1 switches to the next available table, table3, and repeats the same process. After creating a batch from table3, thread1 switches back to table1 and retrieves the next batch of rows from the result set that it previously cached in the database.
After thread1 creates five batches using the result set cache for table1, the database closes the result set cache. Thread1 switches to the next available table. A different thread runs an SQL query to read additional rows from table1, beginning from the last saved offset.

Table Configuration

When you configure JDBC Multitable Consumer, you define a table configuration for each group of tables that you want to read. A table configuration defines a group of tables from the same schema, that have the same table name pattern, and that have proper primary keys or have the same defined offset columns.

You can define one or more table configurations.

For example, you can define one table configuration to replicate a database that has a proper primary key for each table. You simply define the schema name and use the default table name pattern % which matches all tables in the schema.

Let's look at an example where you need to define more than one table configuration. Let's say that you want to copy tables in a relational database to an HBase cluster. The SALES schema contains ten tables, but you want to copy only the following four tables:
  • store_a
  • store_b
  • store_c
  • customers

The three store tables use orderID as the primary key. You want to override the primary key for the customers table, and so need to define customerID as the offset column for that table. You want to read all available data in the tables, so do not need to define an initial offset value.

You define the following two table configurations for the origin so that the origin can read all four tables:

Let's take a closer look at the table name pattern and offset properties that you define in a table configuration.

Table Name Pattern

You define the group of tables that the JDBC Multitable Consumer origin reads by defining a table name pattern for the table configuration. The origin reads all tables whose names match the pattern.

The table name pattern uses the SQL LIKE syntax. For example, the LIKE syntax uses the percentage wildcard (%) to represent any string of zero or more characters. The table name pattern st% matches tables whose names start with "st". The default table name pattern % matches all tables in the schema.

For more information about valid patterns for the SQL LIKE syntax, see https://msdn.microsoft.com/en-us/library/ms179859.aspx.

You can optionally define a table exclusion pattern to exclude some tables from being read. The table exclusion pattern uses a Java-based regular expression, or regex. For more information about using regular expressions with Data Collector, see Regular Expressions Overview.

For example, let's say that you want to read all tables in the schema except for tables that start with "dept". You enter the following table name and table exclusion patterns:
  • Table Name Pattern - %
  • Table Exclusion Pattern - dept.*

If you do not need to exclude any tables, simply leave the table exclusion pattern empty.

Offset Column and Value

The JDBC Multitable Consumer origin uses an offset column and initial offset value to determine where to start reading data within the tables.

By default, the origin uses the primary key of the tables as the offset column and uses no initial offset value. When a table has a composite primary key, the origin uses each primary key as an offset column. The origin reads all available data from each table when you start the pipeline.

When you use the default behavior, the origin generates SQL queries using the following syntax when you start the pipeline:
SELECT * FROM <table> ORDER BY <offset column_1>, <offset column_2>, ...

Where <offset column_n> represents each primary key of the table, such as when the table has a composite primary key. When you restart the pipeline or when the origin switches back to a previously read table, the origin adds a WHERE clause to the SQL query to continue reading from the last saved offset.

To use the default behavior, you do not need to configure any of the offset properties.

You can make the following changes to how the origin handles offset columns and initial offset values:
Override the primary key as the offset column
You can override the primary key and define another offset column or columns. Or if the table doesn’t have a primary key, you can define the offset column or columns to use. The offset columns that you define should be an incremental and unique columns, such as an indexed column.
Define an initial offset value
The initial offset value is a value within the offset column where you want JDBC Multitable Consumer to start reading. When you define an initial offset value, you must first enter the offset column name and then the value. If you are using the default primary key as the offset column, enter the name of the primary key.
If you define an initial offset value for a single offset column, the origin generates SQL queries using the following syntax:
SELECT * FROM <table> ORDER BY <offset column> WHERE <offset column> > ${offset}
If you defined multiple offset columns, you must define an initial offset value for each column, in the same order that the columns are defined. The origin uses the initial offset values of all columns to determine where to start reading data. For example, you override the primary key with the following offset columns: p1, p2, p3 and define an initial offset value for each column. The origin generates SQL queries using the following syntax:
SELECT * FROM <table> ORDER BY p1, p2, p3 WHERE (p1 > ${offset1}) OR (p1 = ${offset1} AND p2 > ${offset2}) OR (p1 = ${offset1} AND p2 = ${offset2} AND p3 > ${offset3})
Note: Data Collector stores offsets for Datetime columns as Long values. For offset columns with a Datetime data type, enter the initial value as a Long value. You can use the time functions to transform a Datetime value to a Long value. For example, the following expression converts a date entered as a String to a Date object, and then to a Long value:
${time:dateTimeToMilliseconds(time:extractDateFromString('2017-05-01 20:15:30.915','yyyy-MM-dd HH:mm:ss.SSS'))} 
Define additional offset column conditions
You can use the expression language to define additional conditions that the origin uses to determine where to start reading data. The origin adds the defined condition to the WHERE clause of the SQL query.
You can use the offset:column function in the condition to access an offset column by position. For example, if you have a table with offset columns p1 and p2, then offset:column(0) returns the value of p1 while offset:column(1) returns the value of p2.
Let's say that you defined a transaction_time column as the offset column. While the origin reads the table, multiple active transactions are being written to the table with the current timestamp for the transaction_time column. When the origin finishes reading the first record with the current timestamp, the origin continues reading with the next offset and skips some rows with the current timestamp. You can enter the following offset column condition to ensure that the origin reads from all offset columns with a timestamp less than the current time:
${offset:column(0)} < ${time:now}
Note: As a best practice, an offset column should be an incremental and unique column, such as an indexed column.

Initial Table Order Strategy

You can define the initial order that the origin uses to read the tables.

Define one of the following initial table order strategies:

None
Reads the tables in the order that they are listed in the database.
Alphabetical
Reads the tables in alphabetical order.
Referential Constraints
Reads the tables based on the dependencies between the tables. The origin reads the parent table first, and then reads the child tables that refer to the parent table with a foreign key.
You cannot use the referential constraints order when the tables to be read have a cyclic dependency. When the origin detects a cyclic dependency, the pipeline fails to validate with the following error:
JDBC_68 Tables referring to each other in a cyclic fashion.
Note that the referential constraints order can cause pipeline validation or initialization to slow down because the origin has to sort the tables before reading them.

The origin uses this table order only for the initial reading of the tables. When threads switch back to previously read tables, they read from the next available table, regardless of the defined order.

JDBC Header Attributes

The JDBC Multitable Consumer origin generates JDBC record header attributes that provide additional information about each record, such as the original data type of a field or the source tables for the record. The origin receives these details from the JDBC driver.

You can use the record:attribute or record:attributeOrDefault functions to access the information in the attributes.

JDBC record header attributes include a "jdbc" prefix to differentiate the JDBC attributes from other record header attributes.

The origin can provide the following JDBC header attributes:
JDBC Header Attribute Description
jdbc.tables Provides a comma-separated list of source tables for the fields in the record.
Note: Not all JDBC drivers provide this information.
jdbc.<column name>.jdbcType Provides the original SQL data type for each field in the record.
jdbc.<column name>.precision Provides the original precision for all numeric and decimal fields.
jdbc.<column name>.scale Provides the original scale for all numeric and decimal fields.

Event Generation

The JDBC Multitable Consumer origin can generate events that you can use in an event stream. When you enable event generation, the origin generates an event when it completes processing the data returned by the specified queries for all tables.

JDBC Multitable Consumer events can be used in any logical way. For example:
  • With the Pipeline Finisher executor to stop the pipeline and transition the pipeline to a Finished state when the origin completes processing available data.

    When you restart a pipeline stopped by the Pipeline Finisher executor, the origin continues processing from the last-saved offset unless you reset the origin.

    For an example, see Case Study: Stop the Pipeline.

  • With the Email executor to send a custom email after receiving an event.

    For an example, see Case Study: Sending Email.

For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.

Event Record

Event records generated by JDBC Multitable Consumer origin have the following event-related record header attributes:
Record Header Attribute Description
sdc.event.type Event type. Uses the following type:
  • no-more-data - Generated when the origin completes processing all data returned by the queries for all tables.
sdc.event.version An integer that indicates the version of the event record type.
sdc.event.creation_timestamp Epoch timestamp when the stage created the event.

The no-more-data event record includes no record fields.

Configuring a JDBC Multitable Consumer

Configure a JDBC Multitable Consumer origin to use a JDBC connection to read database data from multiple tables.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Produce Events Generates event records when events occur. Use for event handling.
    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the JDBC tab, configure the following properties:
    JDBC Property Description
    JDBC Connection String Connection string to use to connect to the database.

    Some databases, such as Postgres, require the schema in the connection string. Use the connection string format required by the database.

    Use Credentials Enables entering credentials on the Credentials tab. Use when you do not include credentials in the JDBC connection string.
    Query Interval Amount of time to wait between queries. Enter an expression based on a unit of time. You can use SECONDS, MINUTES, or HOURS.

    Default is 10 seconds: ${10 * SECONDS}.

    Number of Threads Number of threads the origin generates and uses for multithreaded processing.

    Configure the Maximum Pool Size property on the Advanced tab to be equal to or greater than this value.

    Per Batch Strategy Strategy to create each batch of data:
    • Switch Tables - Each thread creates a batch of data from one table, and then switches to the next available table to create the next batch. Define the Result Set Cache Size and the Batches from Result Set properties when you configure a switch tables strategy.
    • Process All Available Rows from the Table - Each thread creates multiple batches of data from one table, until all available rows are read from that table.
    Max Batch Size (records) Maximum number of records to include in a batch.
    Batches from Result Set Number of batches to create from the result set when using the switch tables batch strategy. After a thread creates this number of batches, the database closes the result set and then another thread can read from the same table.

    Use a positive integer to set a limit on the number of batches created from the result set. Use -1 to opt out of this property.

    By default, the origin creates an unlimited number of batches from the result set, keeping the result set open as long as possible.

    Result Set Cache Size Number of result sets to cache in the database when using the switch tables batch strategy. Use a positive integer to set a limit on the number of cached result sets. Use -1 to opt out of this property.

    By default, the origin caches an unlimited number of result sets.

    Max Clob Size (characters) Maximum number of characters to be read in a Clob field. Larger data is truncated.
    Max Blob Size (bytes)

    Maximum number of bytes to be read in a Blob field.

    Number of Retries on SQL Error Number of times a thread tries to read a batch of data after receiving an SQL error. After a thread retries this number of times, the thread handles the error based on the error handling configured for the origin.

    Use to handle transient network or connection issues that prevent a thread from reading a batch of data.

    Default is 0.

    Data Time Zone Time zone to use to evaluate datetime-based offset column conditions.
    Quote Character Quote character to use around schema, table, and column names in the query. Select the character used by the database to allow for lower case, mixed-case, or special characters in schema, table, or column names:
    • None - Uses no character around names in the query. For example: select * from mySchema.myTable order by myOffsetColumn.
    • Backtick - Uses a backtick around names in the query. For example: select * from `mySchema`.`myTable` order by `myOffsetColumn`.
    • Double Quotes - Uses double quotes around names in the query. For example: select * from "mySchema"."myTable" order by "myOffsetColumn".
    Fetch Size Maximum number of rows to fetch for the JDBC statement and store in memory on the Data Collector machine. The size cannot be zero.

    Default is 1,000.

    Note: By default, MySQL fetches and stores the complete result set in memory on the Data Collector machine. If the result sets have a large number of rows or large values that exceed available memory, specify a fetch size of Integer.MIN_VALUE so that MySQL streams the results to the Data Collector machine one row at a time.

    For more information about configuring a fetch size, see your database documentation.

    Additional JDBC Configuration Properties Additional JDBC configuration properties to use. To add properties, click Add and define the JDBC property name and value.

    Use the property names and values as expected by JDBC.

  3. On the Tables tab, define one or more table configurations. Click the Add icon to define another table configuration.
    Configure the following properties for each table configuration:
    Tables Property Description
    Schema Name Name of the schema to use for this table configuration.
    Table Name Pattern Pattern of the table names to read for this table configuration. Use the SQL LIKE syntax to define the pattern.

    Default is the percentage wildcard (%) which matches all tables in the schema.

    Table Exclusion Pattern Pattern of the table names to exclude from being read for this table configuration. Use a Java-based regular expression, or regex, to define the pattern.

    Leave empty if you do not need to exclude any tables.

    Override Offset Columns Determines whether to use the primary key or other columns as the offset columns for this table configuration.

    Select to override the primary key and define other offset columns. Clear to use the primary key as the offset column.

    Initial Offset Offset value to use for this table configuration when the pipeline starts. Enter the primary key name or offset column name and value. For Datetime columns, enter a Long value.

    When you define multiple offset columns, you must define an initial offset value for each column, in the same order that the columns are defined.

    Offset Column Conditions Additional conditions that the origin uses to determine where to start reading data for this table configuration. The origin adds the defined condition to the WHERE clause of the SQL query.

    Use the expression language to define the conditions. For example, you can use the offset:column function to compare the value of an offset column.

  4. To enter JDBC credentials separately from the JDBC connection string, on the Credentials tab, configure the following properties:
    Credentials Property Description
    Username User name for the JDBC connection.
    Password Password for the JDBC account.
    Tip: To secure sensitive information such as usernames and passwords, you can use runtime resources or Hashicorp Vault secrets. For more information, see Using Runtime Resources or Accessing Hashicorp Vault Secrets.
  5. When using JDBC versions older than 4.0, on the Legacy Drivers tab, optionally configure the following properties:
    Legacy Driver Property Description
    JDBC Class Driver Name Class name for the JDBC driver. Required for JDBC versions older than version 4.0.
    Connection Health Test Query Optional query to test the health of a connection. Recommended only when the JDBC version is older than 4.0.
  6. On the Advanced tab, optionally configure advanced properties.
    The defaults for these properties should work in most cases:
    Advanced Property Description
    Maximum Pool Size The maximum number of connections to create. Must be equal to or greater than the value of the Number of Threads property.

    Default is 1.

    Minimum Idle Connections The minimum number of connections to create and maintain. To define a fixed connection pool, set to the same value as Maximum Pool Size.

    Default is 1.

    Connection Timeout Maximum time to wait for a connection. Use a time constant in an expression to define the time increment.
    Default is 30 seconds, defined as follows:
    ${30 * SECONDS}
    Idle Timeout Maximum time to allow a connection to idle. Use a time constant in an expression to define the time increment.

    Use 0 to avoid removing any idle connections.

    Default is 30 minutes, defined as follows:
    ${30 * MINUTES}
    Max Connection Lifetime Maximum lifetime for a connection. Use a time constant in an expression to define the time increment.

    Use 0 to avoid removing any idle connections.

    Default is 30 seconds, defined as follows:
    ${30 * SECONDS}
    Auto Commit Determines if auto-commit mode is enabled. In auto-commit mode, the database commits the data for each record.

    Default is disabled.

    Enforce Read-only Connection Creates read-only connections to avoid any type of write.

    Default is enabled. Disabling this property is not recommended.

    Transaction Isolation Transaction isolation level used to connect to the database.

    Default is the default transaction isolation level set for the database. You can override the database default by setting the level to any of the following:

    • Read committed
    • Read uncommitted
    • Repeatable read
    • Serializable
    Initial Table Order Strategy Initial order used to read the tables:
    • None - Reads the tables in the order that they are listed in the database.
    • Alphabetical - Reads the tables in alphabetical order.
    • Referential Constraints - Reads the tables based on the dependencies between the tables.