Oracle Query Consumer

The Oracle Query Consumer origin reads data from an Oracle database using a user-defined SQL query. The origin returns data as a map with column names and field values.

Note: You can also use the Oracle Multitable Consumer origin to read from an Oracle database. Use the Oracle Multitable Consumer origin for database replication or to read from multiple tables in the same database. The Oracle Multitable Consumer origin generates SQL queries based on the table configurations that you define.

When you configure the Oracle Query Consumer origin, you specify connection information and the query interval to determine how the origin connects to the database. The origin requires a secure connection to the database server using an SSH tunnel or SSL/TLS encryption.

You define the query mode and the SQL query that the origin uses to read data from a single table or from a join of tables.

By default, the origin creates JDBC header attributes to provide information about source data in record headers.

When the pipeline stops, the origin notes where it stops reading. When the pipeline runs again, the origin continues processing from where it stopped by default. You can configure a pipeline run to start from a previous offset saved with a previous run or to start from the beginning to read all available data.

The origin can generate events for an event stream.

Note: The origin includes advanced options with default values that should work in most cases. By default, the origin hides the advanced options. To configure the advanced options, select Show Advanced Options at the top of the properties pane.

Prerequisites

Before you can read from an Oracle database, you must set up the database server or an intermediary SSH server to accept incoming connections from StreamSets Cloud.

For more information, see Database Connections.

Secure Connections

The Oracle Query Consumer origin requires a secure connection to the Oracle database server.

You must configure the origin to use an SSH tunnel or SSL/TLS encryption to securely connect to the database.

SSH Tunnel

You can secure the connection to the Oracle server by configuring the origin to use an SSH tunnel.

When you configure the stage to connect to the database through an SSH tunnel, the stage uses an encrypted connection to communicate with the SSH server. The SSH server then uses an unencrypted connection to communicate with the database server.

For additional information including instructions to set up the SSH server to accept incoming connections from StreamSets Cloud, see Use an SSH Tunnel.

Tip: If you also configure SSL/TLS encryption, then the stage uses SSL/TLS encryption from the SSH server to the database server.

SSL/TLS Encryption

You can secure the connection to the Oracle server by configuring the origin to use SSL/TLS encryption.

Before configuring the origin to use SSL/TLS encryption, verify that the Oracle server is correctly configured to use SSL/TLS. For more information, see the Oracle documentation.

When you enable encryption for the origin, the origin verifies the following information before it establishes an SSL/TLS connection to the database server:

Server certificate
The stage verifies the certificate of the database server provided in the stage properties. You must paste the full contents of the PEM encoded server certificate into the Server Certificate PEM property, including the header and footer in the file. Use a text editor to open the PEM encoded certificate, and then copy and paste the full contents of the file into the origin property, as follows:
-----BEGIN CERTIFICATE REQUEST-----
MIIB9TCCAWACAQAwgbgxGTAXBgNVHJoMEFF1b1ZlZGwzIEkpbWl0ZWLxHLAaBgNV
                        
......
                        
98TwDIK/39WEB/V607As+KoYajQL9drorw==
-----END CERTIFICATE REQUEST-----
You also specify the cipher suites used for encryption and data integrity of the certificate. For example, you might enter TLS_RSA_WITH_AES_256_CBC_SHA as the cipher suite. You can enter multiple cipher suites separated by commas. For a full list of available cipher suites, see Cipher Suites in the Oracle documentation.
Server host name
The stage can optionally verify the host name of the database server provided in the stage properties.
To verify the host name, select the Verify Hostname property and then specify the SSL/TLS distinguished name (DN) of the server. The stage verifies that the specified distinguished name matches the DN specified in the server certificate.

Offset Column and Value

The Oracle Query Consumer origin uses an offset column and initial offset value to determine where to start reading data within a table. Include both the offset column and the offset value in the WHERE clause of the SQL query.

The offset column must be a column in the table with unique non-null values, such as a primary key or indexed column. The initial offset value is a value within the offset column where you want the origin to start reading.

When the origin performs an incremental query, you must configure the offset column and offset value. For full queries, you can optionally configure them.

Full and Incremental Mode

The Oracle Query Consumer origin can perform queries in two modes:

Incremental mode
When the origin performs an incremental query, it uses the initial offset as the offset value in the first SQL query. As the origin completes processing the results of the first query, it saves the last offset value that it processes. Then it waits the specified query interval before performing a subsequent query.
When the origin performs a subsequent query, it returns data based on the last-saved offset. When you restart the pipeline, you can configure the pipeline to start from the beginning to use the initial offset value.
Use incremental mode for append-only tables or when you do not need to capture changes to older rows. By default, the origin uses incremental mode.
Default is incremental mode.
Full mode
When the origin performs a full query, it runs the specified SQL query. If you optionally configure the offset column and initial offset value, the origin uses the initial offset as the offset value in the SQL query each time it requests data.
When the origin completes processing the results of the full query, it waits the specified query interval, and then performs the same query again.
Use full mode to capture all row updates. You might use a Record Deduplicator processor in the pipeline to minimize repeated rows. Not ideal for large tables.

Recovery

The Oracle Query Consumer origin supports recovery after a deliberate or unexpected stop when it performs incremental queries. Recovery is not supported for full queries.

In incremental mode, the origin uses offset values in the offset column to determine where to continue processing after a deliberate or unexpected stop. To ensure seamless recovery in incremental mode, use a primary key or indexed column as the offset column. As the origin processes data, it tracks the offset value internally. When the pipeline stops, the origin notes where it stopped processing data. When you restart the pipeline, it continues from the last-saved offset.

When the origin performs full queries, the origin runs the full query again after you restart the pipeline.

SQL Query

The SQL query defines the data returned from the database.

You define the query in the SQL Query property on the JDBC tab. The SQL query guidelines depend on whether you configure the origin to perform an incremental or full query.

SQL Query for Incremental Mode

When you define the SQL query for incremental mode, the Oracle Query Consumer origin requires a WHERE and ORDER BY clause in the query.

Use the following guidelines when you define the WHERE and ORDER BY clauses in the query:
In the WHERE clause, include the offset column and the offset value
The origin uses an offset column and value to determine the data that is returned. Include both in the WHERE clause of the query.
Use the OFFSET constant to represent the offset value
In the WHERE clause, use ${OFFSET} to represent the offset value.
For example, when you start a pipeline, the following query returns all data from the table where the data in the offset column is greater than the initial offset value:
SELECT * FROM <tablename> WHERE <offset column> > ${OFFSET}
Tip: When the offset values are strings, enclose ${OFFSET} in single quotation marks.
In the ORDER BY clause, include the offset column as the first column
To avoid returning duplicate data, use the offset column as the first column in the ORDER BY clause.
Note: Using column that is not a primary key or indexed column in the ORDER BY clause can slow performance.
For example, the following query for incremental mode returns data from an Invoice table where the ID column is the offset column. The query returns all data where the ID is greater than the offset and orders the data by the ID:
 SELECT * FROM invoice WHERE id > ${OFFSET} ORDER BY id

SQL Query for Full Mode

You can define any type of SQL query for full mode.

For example, you can run the following query to return all data from an Invoice table:
SELECT * FROM invoice

When you define the SQL query for full mode, you can optionally include the WHERE and ORDER BY clauses using the same guidelines as for incremental mode. However, using these clauses to read from large tables can cause performance issues.

Stored Procedures

You can call stored procedures from the SQL query when using the origin in full mode. Do not call stored procedures when using the origin in incremental mode.

JDBC Header Attributes

By default, the Oracle Query Consumer origin generates JDBC record header attributes that provide additional information about each record, such as the original data type of a field or the source tables for the record.

You can use the record:attribute or record:attributeOrDefault functions to access the information in the attributes. For more information about working with record header attributes, see Working with Header Attributes.

JDBC header attributes include a user-defined prefix to differentiate the JDBC header attributes from other record header attributes. By default, the prefix is "jdbc".

You can change the prefix that the origin uses and you can configure the origin not to create JDBC header attributes using the Create JDBC Header Attributes and JDBC Header Prefix properties on the Advanced tab.

The origin can provide the following JDBC header attributes:
JDBC Header Attribute Description
<JDBC prefix>.tables

Provides a comma-separated list of source tables for the fields in the record.

<JDBC prefix>.<column name>.jdbcType Provides the numeric value of the original SQL data type for each field in the record. See the Java documentation for a list of the data types that correspond to numeric values.
<JDBC prefix>.<column name>.precision Provides the original precision for all numeric and decimal fields.
<JDBC prefix>.<column name>.scale Provides the original scale for all numeric and decimal fields.

Event Generation

The Oracle Query Consumer origin can generate events that you can use in an event stream. When you enable event generation, the origin generates an event when it completes processing the data returned by the specified query. The origin also generates an event when a query completes successfully and when it fails to complete.

Oracle Query Consumer events can be used in any logical way. For example:
  • With the Pipeline Finisher executor to stop the pipeline and transition the pipeline to a Finished state when the origin completes processing available data.

    When you restart a pipeline stopped by the Pipeline Finisher executor, the origin processes data based on how you configured the origin. For example, if you configure the origin to run in incremental mode, the origin saves the offset when the executor stops the pipeline. Upon restart, the origin continues processing from the last-saved offset. In contrast, if you configure the origin to run in full mode, then upon restart, the origin uses the initial offset, if specified.

    For an example, see Stopping a Pipeline After Processing All Available Data.

  • With a destination to store event information.

    For an example, see Preserving an Audit Trail of Events.

For more information about dataflow triggers and the event framework, see Overview.

Event Record

Event records generated by the Oracle Query Consumer origin have the following event-related record header attributes:
Record Header Attribute Description
sdc.event.type Event type. Uses one of the following types:
  • no-more-data - Generated when the origin completes processing all data returned by a query.
  • jdbc-query-success - Generated when the origin successfully completes a query.
  • jdbc-query-failure - Generated when the origin fails to complete a query.
sdc.event.version Integer that indicates the version of the event record type.
sdc.event.creation_timestamp Epoch timestamp when the stage created the event.
The origin can generate the following types of event records:
No-more-data
The origin generates a no-more-data event record when it completes processing all data returned by a query.
You can configure the origin to delay the generation of the no-more-data event by a specified number of seconds. You might configure a delay to ensure that the query success or query failure events are generated and delivered to the pipeline before the no-more-data event record.

To use a delay, configure the No-more-data Event Generation Delay property.

No-more-data event records generated by the origin have the sdc.event.type set to no-more-data and include the following field:
Event Record Field Description
record-count Number of records successfully generated since the pipeline started or since the last no-more-data event was created.
Query success
The origin generates a query success event record when it completes processing the data returned from a query.
The query success event records have the sdc.event.type record header attribute set to jdbc-query-success and include the following fields:
Field Description
query Query that completed successfully.
timestamp Timestamp when the query completed.
row-count Number of processed rows.
source-offset Offset after the query completed.
Query failure
The origin generates a query failure event record when it fails to complete processing the data returned from a query.
The query failure event records have the sdc.event.type record header attribute set to jdbc-query-failure and include the following fields:
Field Description
query Query that failed to complete.
timestamp Timestamp when the query failed to complete.
row-count Number of records from the query that were processed.
source-offset Origin offset after query failure.
error First error message.

Oracle Data Types

The Oracle Query Consumer origin converts Oracle data types into StreamSets Cloud data types.

The origin supports the following Oracle data types:
Oracle Data Type StreamSets Cloud Data Type
Blob Byte_array
Binary_double Double
Binary_float Float
Char String
Clob String
Date Datetime
Long String
Nchar String
Nclob String
Number Decimal
Nvarchar2 String
Timestamp Datetime
Timestamp with time zone, Timestamp with local time zone Zoned_datetime
Varchar, Varchar2 String
XMLType String

The stage does not support the Oracle UriType data type.

Configuring an Oracle Query Consumer Origin

Configure an Oracle Query Consumer origin to read data from an Oracle database using a user-defined SQL query. Before you use the origin in a pipeline, complete the required prerequisites.
Note: Some of these properties are advanced options, which the origin hides by default. To configure the advanced options, select Show Advanced Options at the top of the properties pane.
  1. In the properties pane, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Produce Events Generates event records when events occur. Use for event handling.
    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. To securely connect to the database server through an SSH tunnel, configure the following properties on the SSH Tunnel tab:
    SSH Tunnel Property Description
    Use SSH Tunneling Enables the stage to connect to the database server through an SSH tunnel.

    When cleared, the stage must connect to the database server using SSL/TLS encryption.

    SSH Tunnel Host Host name for the SSH server.
    SSH Tunnel Port Port number for the SSH server.

    For example: 22.

    SSH Tunnel Host Fingerprint Host key fingerprint used to verify the identity of the SSH server. For example: 97:3c:ae:76:73:f3:ef:a7:18:02:6a:c6:57:43:82:f6.

    You can enter multiple fingerprints separated by commas. Be sure to properly generate the fingerprint.

    Enter the host key fingerprint when you want the stage to verify the identity of the SSH server. Leave blank when you want the stage to establish an SSH connection without any verification.

    SSH Tunnel Username SSH user account used to install the StreamSets Cloud public SSH key on the SSH server.
    SSH Public Key Public SSH key for your StreamSets Cloud account.

    You download and install the key on the SSH server when you set up the SSH server.

  3. On the JDBC tab, configure the following properties:
    JDBC Property Description
    Database Host Host name of the machine where the Oracle database server is installed.
    Database Secure Port Secure port number of the Oracle database server.

    The stage requires a secure connection to the database server using an SSH tunnel or SSL/TLS encryption.

    Database SID Name of the Oracle database on the server, also known as the service name or SID.
    Incremental Mode Defines how the origin queries the database. Select to perform incremental queries. Clear to perform full queries.

    Default is incremental mode.

    SQL Query SQL query to use when reading data from the database.
    Initial Offset Offset value to use when the pipeline starts.

    Required in incremental mode.

    Offset Column Column to use for the offset value.

    As a best practice, an offset column should be an incremental and unique column that does not contain null values. Having an index on this column is strongly encouraged since the underlying query uses an ORDER BY and inequality operators on this column.

    Required in incremental mode.

    Query Interval Amount of time to wait between queries. Enter an expression based on a unit of time. You can use SECONDS, MINUTES, or HOURS.

    Default is 10 seconds: ${10 * SECONDS}.

    Max Batch Size (records) Maximum number of records to include in a batch.
    Max Clob Size (characters) Maximum number of characters to be read in a Clob field. Larger data is truncated.
    Max Blob Size (bytes) Maximum number of bytes to be read in a Blob field.
    Number of Retries on SQL Error Number of times the origin tries to execute the query after receiving an SQL error. After retrying this number of times, the origin handles the error based on the error handling configured for the origin.

    Use to handle transient network or connection issues that prevent the origin from submitting a query.

    Default is 0.

    Convert Timestamp To String Enables the origin to write timestamps as string values rather than datetime values. Strings maintain the precision stored in the source database.

    When writing timestamps to StreamSets Cloud date or time data types that do not store nanoseconds, the origin stores any nanoseconds from the timestamp in a field attribute.

    Maximum Transaction Length Not supported at this time.
    No-more-data Event Generation Delay (seconds) Number of seconds to delay generating the no-more-data event. Use to allow the specified number of seconds to pass to verify that no additional data arrives before generating the no-more-data event.
    Additional JDBC Configuration Properties Additional JDBC configuration properties to use. To add properties, click Add and define the JDBC property name and value.

    Use the property names and values as expected by JDBC.

  4. On the Credentials tab, configure the following properties:
    Credentials Property Description
    Username User name for the connection to the database.
    Password Password for the account.
    Note: When you enter secrets such as user names and passwords, the stage encrypts the secret values.
  5. To use SSL/TLS encryption to securely connect to the database server, configure the following properties on the SSL/TLS Encryption tab:
    SSL/TLS Encryption Property Description
    Enable Encryption Uses SSL/TLS encryption to securely connect to the database server.

    When cleared, the stage must use an SSH tunnel to connect to the database server.

    Server Certificate PEM Server certificate in PEM format used to verify the SSL/TLS certificate of the database server.

    Use a text editor to open the PEM encoded certificate, and then copy and paste the full contents of the file into the property, including the header and footer.

    Cipher Suites Cipher suites used for encryption and data integrity of the certificate. For example, you might enter TLS_RSA_WITH_AES_256_CBC_SHA as the cipher suite.

    You can enter multiple cipher suites separated by commas. For a full list of available cipher suites, see Cipher Suites in the Oracle documentation.

    Verify Hostname Verifies the host name of the database server.
    SSL Distinguished Name SSL/TLS distinguished name (DN) of the database server that must match the DN specified in the server certificate.
  6. On the Advanced tab, optionally configure advanced properties.

    The defaults for these properties should work in most cases:

    Advanced Property Description
    Maximum Pool Size Maximum number of connections to create.

    Default is 1. The recommended value is 1.

    Minimum Idle Connections Minimum number of connections to create and maintain. To define a fixed connection pool, set to the same value as Maximum Pool Size.

    Default is 1.

    Connection Timeout Maximum time to wait for a connection. Use a time constant in an expression to define the time increment.
    Default is 30 seconds, defined as follows:
    ${30 * SECONDS}
    Idle Timeout Maximum time to allow a connection to idle. Use a time constant in an expression to define the time increment.

    Use 0 to avoid removing any idle connections.

    When the entered value is close to or more than the maximum lifetime for a connection, the origin ignores the idle timeout.

    Default is 10 minutes, defined as follows:
    ${10 * MINUTES}
    Max Connection Lifetime Maximum lifetime for a connection. Use a time constant in an expression to define the time increment.

    Use 0 to set no maximum lifetime.

    When a maximum lifetime is set, the minimum valid value is 30 minutes.

    Default is 30 minutes, defined as follows:
    ${30 * MINUTES}
    Auto Commit Determines if auto-commit mode is enabled. In auto-commit mode, the database commits the data for each record.

    Default is disabled.

    Enforce Read-only Connection Creates read-only connections to avoid any type of write.

    Default is enabled. Disabling this property is not recommended.

    Transaction Isolation Transaction isolation level used to connect to the database.
    Default is the default transaction isolation level set for the database. You can override the database default by setting the level to any of the following:
    • Read committed
    • Read uncommitted
    • Repeatable read
    • Serializable
    Init Query

    SQL query to perform immediately after the stage connects to the database. Use to set up the database session as needed.

    Create JDBC Header Attributes Adds JDBC header attributes to records. The origin creates JDBC header attributes by default.
    JDBC Header Prefix Prefix for JDBC header attributes.
    Disable Query Validation Disables the query validation that occurs by default. Use to avoid time consuming query validation situations, like when querying Hive, which requires using a MapReduce job to perform the validation.
    Warning: Query validation prevents running a pipeline with invalid queries. Use this option with care.
    On Unknown Type Action to take when the origin encounters a record with an unsupported data type:
    • Stop Pipeline - Stops the pipeline after completing processing the previous records.
    • Convert to String - Converts the data to string and continues processing.