Oracle Producer

The Oracle Producer destination writes data to an Oracle database table.

When you configure the Oracle Producer destination, you specify connection information and the database table to write to. The destination requires a secure connection to the database server using an SSH tunnel or SSL/TLS encryption.

By default, the destination writes data to the table based on the matching field names. You can override the default field mappings by defining specific mappings.

The Oracle Producer destination can insert, update, or delete data in the database - based on the CRUD operation defined in the sdc.operation.type record header attribute. You can define a default operation for records without the header attribute or value. You can also configure whether to use multi-row operations for inserts and deletes, and how to handle records with unsupported operations.

When processing data from a CDC-enabled origin, you can specify the origin change log to aid record processing. For information about change data processing and a list of CDC-enabled origins, see Processing Changed Data.

Note: The destination includes advanced options with default values that should work in most cases. By default, the destination hides the advanced options. To configure the advanced options, select Show Advanced Options at the top of the properties pane.

Prerequisites

Before you can write to an Oracle database, you must set up the database server or an intermediary SSH server to accept incoming connections from StreamSets Cloud.

For more information, see Database Connections.

Secure Connections

The Oracle Producer destination requires a secure connection to the Oracle database server.

You must configure the origin to use an SSH tunnel or SSL/TLS encryption to securely connect to the database.

SSH Tunnel

You can secure the connection to the Oracle server by configuring the destination to use an SSH tunnel.

When you configure the stage to connect to the database through an SSH tunnel, the stage uses an encrypted connection to communicate with the SSH server. The SSH server then uses an unencrypted connection to communicate with the database server.

For additional information including instructions to set up the SSH server to accept incoming connections from StreamSets Cloud, see Use an SSH Tunnel.

Tip: If you also configure SSL/TLS encryption, then the stage uses SSL/TLS encryption from the SSH server to the database server.

SSL/TLS Encryption

You can secure the connection to the Oracle server by configuring the destination to use SSL/TLS encryption.

Before configuring the destination to use SSL/TLS encryption, verify that the Oracle server is correctly configured to use SSL/TLS. For more information, see the Oracle documentation.

When you enable encryption for the origin, the destination verifies the following information before it establishes an SSL/TLS connection to the database server:

Server certificate
The stage verifies the certificate of the database server provided in the stage properties. You must paste the full contents of the PEM encoded server certificate into the Server Certificate PEM property, including the header and footer in the file. Use a text editor to open the PEM encoded certificate, and then copy and paste the full contents of the file into the origin property, as follows:
-----BEGIN CERTIFICATE REQUEST-----
MIIB9TCCAWACAQAwgbgxGTAXBgNVHJoMEFF1b1ZlZGwzIEkpbWl0ZWLxHLAaBgNV
                        
......
                        
98TwDIK/39WEB/V607As+KoYajQL9drorw==
-----END CERTIFICATE REQUEST-----
You also specify the cipher suites used for encryption and data integrity of the certificate. For example, you might enter TLS_RSA_WITH_AES_256_CBC_SHA as the cipher suite. You can enter multiple cipher suites separated by commas. For a full list of available cipher suites, see Cipher Suites in the Oracle documentation.
Server host name
The stage can optionally verify the host name of the database server provided in the stage properties.
To verify the host name, select the Verify Hostname property and then specify the SSL/TLS distinguished name (DN) of the server. The stage verifies that the specified distinguished name matches the DN specified in the server certificate.

Define the CRUD Operation

The Oracle Producer destination can insert, update, or delete data. The destination writes the records based on the CRUD operation defined in a CRUD operation header attribute or in operation-related stage properties.

You can define the CRUD operation in the following ways:

CRUD operation header attribute
You can define the CRUD operation in a CRUD operation record header attribute. The destination looks for the CRUD operation to use in the sdc.operation.type record header attribute.
The attribute can contain one of the following numeric values:
  • 1 for INSERT
  • 2 for DELETE
  • 3 for UPDATE
If your pipeline includes a CRUD-enabled origin that processes changed data, the destination simply reads the operation type from the sdc.operation.type header attribute that the origin generates. If your pipeline uses a non-CDC origin, you can use the Expression Evaluator to define the record header attribute. For more information about changed data processing and a list of CDC-enabled origins, see Processing Changed Data.
Operation stage properties
You define a default operation in the destination properties. The destination uses the default operation when the sdc.operation.type record header attribute is not set.
You can also define how to handle records with unsupported operations defined in the sdc.operation.type header attribute. The destination can discard them, send them to error, or use the default operation.

Update and Delete Operations

For update and delete operations, the Oracle Producer destination automatically detects the primary key of the table and uses that key in the WHERE clause that updates or deletes rows. The destination supports compound primary keys, keys that consist of more than one column.

For example, in the following database table, named customer, the id column is the primary key:
id first middle last
1 john f smith
2 john m doe
3 mary jane smith
Suppose the sdc.operation.type record header attribute for the following record is set to 2, to delete the record from the table:
{
 "id": 1,
 "first": "john",
 "middle": "m",
 "last": "doe"
}
Then, the destination matches the row with the same primary key and creates the following query:
DELETE FROM customer WHERE id = 1

Note that the destination matches the row based on the primary key and not the other fields in the record.

Single and Multi-row Operations

The Oracle Producer destination performs single-row operations by default. That is, it executes a SQL statement for each record. You can configure the Oracle Producer destination to perform multi-row operations. Depending on the sequence of the data, multi-row operations can improve pipeline performance.

When performing multi-row operations, the destination creates a single SQL statement for sequential insert rows and for sequential delete rows. The destination does not perform multi-row update operations. You can configure the Statement Parameter Limit property to limit the number of parameters in an insert operation - that is, you can limit the number of records included in an insert statement.

For example, say the pipeline generates three insert records, followed by two update records, and four delete records. If you enable multi-row operations and do not set a statement parameter limit, the destination generates a single insert SQL statement for the three insert records, two update statements - one for each of the update records, and a single delete statement for the four delete records. On the other hand, if you enable multi-row operations and set the statement parameter limit to two, the destination generates two insert SQL statements - one for two insert records and one for the third insert record, two update statements - one for each of the update records, and a single delete statement for the four delete records.

For multi-row inserts, the destination uses the following SQL statement:
INSERT INTO <table name> (<col1>, <col2>, <col3>) 
     VALUES (<record1 field1>,<record1 field2>,<record1 field3>), 
     (<r2 f1>,<r2 f2>,<r2 f3>), (<r3 f1>,<r3 f2>,<r3 f3>),...;
For multi-row deletes, the destination uses the following SQL statement for tables with a single primary key:
DELETE FROM <table name> WHERE <primary key> IN (<key1>, <key2>, <key3>,...);
For multi-row deletes, the destination uses the following SQL statement for tables with multiple primary keys:
DELETE FROM <table name> WHERE (<pkey1>, <pkey2>, <pkey3>)
      IN ((<key1-1>, <key1-2>, <key1-3>),(<key2-1>, <key2-2>, <key2-2>),...);

Oracle Data Types

The Oracle Producer destination converts StreamSets Cloud data types into Oracle data types before writing data to Oracle.

The destination converts each StreamSets Cloud data type into one of the following Oracle data types, according to the data type of the Oracle column:
StreamSets Cloud Data Type Oracle Type
Byte_Array Blob
Datetime Date, Timestamp
Decimal Number
Double Binary_double
Float Binary_float
String Char, Clob, Long, Nchar, Nclob, NvarChar2, Varchar, Varchar2, XMLType
Zoned_datetime Timestamp with time zone, Timestamp with local time zone

The stage does not support the Oracle UriType data type.

Configuring an Oracle Producer Destination

Configure an Oracle Producer destination to write data to an Oracle database. Before you use the destination in a pipeline, complete the required prerequisites.
Note: Some of these properties are advanced options, which the destination hides by default. To configure the advanced options, select Show Advanced Options at the top of the properties pane.
  1. In the properties pane, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions.

    Records that do not meet all preconditions are processed based on the error handling configured for the stage.

    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. To securely connect to the database server through an SSH tunnel, configure the following properties on the SSH Tunnel tab:
    SSH Tunnel Property Description
    Use SSH Tunneling Enables the stage to connect to the database server through an SSH tunnel.

    When cleared, the stage must connect to the database server using SSL/TLS encryption.

    SSH Tunnel Host Host name for the SSH server.
    SSH Tunnel Port Port number for the SSH server.

    For example: 22.

    SSH Tunnel Host Fingerprint Host key fingerprint used to verify the identity of the SSH server. For example: 97:3c:ae:76:73:f3:ef:a7:18:02:6a:c6:57:43:82:f6.

    You can enter multiple fingerprints separated by commas. Be sure to properly generate the fingerprint.

    Enter the host key fingerprint when you want the stage to verify the identity of the SSH server. Leave blank when you want the stage to establish an SSH connection without any verification.

    SSH Tunnel Username SSH user account used to install the StreamSets Cloud public SSH key on the SSH server.
    SSH Public Key Public SSH key for your StreamSets Cloud account.

    You download and install the key on the SSH server when you set up the SSH server.

  3. On the JDBC tab, configure the following properties:
    JDBC Property Description
    Database Host Host name of the machine where the Oracle database server is installed.
    Database Secure Port Secure port number of the Oracle database server.

    The stage requires a secure connection to the database server using an SSH tunnel or SSL/TLS encryption.

    Database SID Name of the Oracle database on the server, also known as the service name or SID.
    Schema Name Database or schema name to use.
    Note: Oracle uses all caps for schema, table, and column names by default. Names can be lower- or mixed-case only if the schema, table, or column was created with quotation marks around the name.

    To use a lower- or mixed-case schema name, enter the name and enable the Enclosed Object Names property.

    Table Name Database table name to use.
    Note: Oracle uses all caps for schema, table, and column names by default. Names can be lower- or mixed-case only if the schema, table, or column was created with quotation marks around the name.

    To use a lower- or mixed-case schema name, enter the name and enable the Enclosed Object Names property.

    Field to Column Mapping Use to override the default field to column mappings. By default, fields are written to columns of the same name.
    When you override the mappings, you can define parameterized values to apply SQL functions to the field values before writing them to columns. For example, to convert a field value to an integer, enter the following for the parameterized value:
    CAST(? AS INTEGER)

    The question mark (?) is substituted with the value of the field. Leave the default value of ? if you do not need to apply a SQL function.

    Using simple or bulk edit mode, click the Add icon to create additional field to column mappings.

    Enclose Object Names Encloses the database or schema name, table name, and column names in quotation marks when writing to the database.

    Enables using case-sensitive names or names with special characters. Select only when the database or tables were created with quotation marks around the names.

    Change Log Format Format of change capture data. Use when processing change capture data.
    Default Operation Default CRUD operation to perform if the sdc.operation.type record header attribute is not set.
    Unsupported Operation Handling Action to take when the CRUD operation type defined in the sdc.operation.type record header attribute is not supported:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Use Default Operation - Writes the record to the destination system using the default operation.
    Use Multi-Row Operation Combines sequential insert operations and sequential delete operations into single statements. Select to insert and delete multiple records at the same time.

    By default, the stage performs single-row operations.

    Statement Parameter Limit Number of parameters allowed in the prepared statement for multi-row inserts.

    Use -1 to disable the parameter limit. Default is -1.

    Rollback Batch on Error Rolls back the entire batch when an error occurs within the batch.
    Additional JDBC Configuration Properties Additional JDBC configuration properties to use. To add properties, click Add and define the JDBC property name and value.

    Use the property names and values as expected by JDBC.

  4. On the Credentials tab, configure the following properties:
    Credentials Property Description
    Username User name for the connection to the database.
    Password Password for the account.
    Note: When you enter secrets such as user names and passwords, the stage encrypts the secret values.
  5. To use SSL/TLS encryption to securely connect to the database server, configure the following properties on the SSL/TLS Encryption tab:
    SSL/TLS Encryption Property Description
    Enable Encryption Uses SSL/TLS encryption to securely connect to the database server.

    When cleared, the stage must use an SSH tunnel to connect to the database server.

    Server Certificate PEM Server certificate in PEM format used to verify the SSL/TLS certificate of the database server.

    Use a text editor to open the PEM encoded certificate, and then copy and paste the full contents of the file into the property, including the header and footer.

    Cipher Suites Cipher suites used for encryption and data integrity of the certificate. For example, you might enter TLS_RSA_WITH_AES_256_CBC_SHA as the cipher suite.

    You can enter multiple cipher suites separated by commas. For a full list of available cipher suites, see Cipher Suites in the Oracle documentation.

    Verify Hostname Verifies the host name of the database server.
    SSL Distinguished Name SSL/TLS distinguished name (DN) of the database server that must match the DN specified in the server certificate.
  6. On the Advanced tab, optionally configure advanced properties.

    The defaults for these properties should work in most cases:

    Advanced Property Description
    Maximum Pool Size Maximum number of connections to create.

    Default is 1. The recommended value is 1.

    Minimum Idle Connections Minimum number of connections to create and maintain. To define a fixed connection pool, set to the same value as Maximum Pool Size.

    Default is 1.

    Connection Timeout Maximum time to wait for a connection. Use a time constant in an expression to define the time increment.
    Default is 30 seconds, defined as follows:
    ${30 * SECONDS}
    Idle Timeout Maximum time to allow a connection to idle. Use a time constant in an expression to define the time increment.

    Use 0 to avoid removing any idle connections.

    When the entered value is close to or more than the maximum lifetime for a connection, the origin ignores the idle timeout.

    Default is 10 minutes, defined as follows:
    ${10 * MINUTES}
    Max Connection Lifetime Maximum lifetime for a connection. Use a time constant in an expression to define the time increment.

    Use 0 to set no maximum lifetime.

    When a maximum lifetime is set, the minimum valid value is 30 minutes.

    Default is 30 minutes, defined as follows:
    ${30 * MINUTES}
    Transaction Isolation Transaction isolation level used to connect to the database.
    Default is the default transaction isolation level set for the database. You can override the database default by setting the level to any of the following:
    • Read committed
    • Read uncommitted
    • Repeatable read
    • Serializable
    Init Query

    SQL query to perform immediately after the stage connects to the database. Use to set up the database session as needed.

    For example, the following query sets the time zone for the session for a MySQL database: SET time_zone = timezone;

    Data SQLSTATE Codes List of SQLSTATE codes to treat as data errors. The destination applies error record handling to records that trigger a listed code.

    When a record triggers a SQLSTATE code not listed, the destination generates a stage error that stops the pipeline.

    To add a code, click Add and enter the code.