SQL Server Producer

The SQL Server Producer destination writes data to a Microsoft SQL Server database table.

When you configure the Microsoft SQL Server Producer destination, you specify connection information and the database table to write to. The destination requires a secure connection to the database server using an SSH tunnel or SSL/TLS encryption.

By default, the destination writes data to the table based on the matching field names. You can override the default field mappings by defining specific mappings.

The SQL Server Producer destination can insert, update, or delete data in the database - based on the CRUD operation defined in the sdc.operation.type record header attribute. You can define a default operation for records without the header attribute or value. You can also configure whether to use multi-row operations for inserts and deletes, and how to handle records with unsupported operations.

When processing data from a CDC-enabled origin, you can specify the origin change log to aid record processing. For information about change data processing and a list of CDC-enabled origins, see Processing Changed Data.

Note: The destination includes advanced options with default values that should work in most cases. By default, the destination hides the advanced options. To configure the advanced options, select Show Advanced Options at the top of the properties pane.

Prerequisites

Before you can write to a Microsoft SQL Server database, you must set up the database server or an intermediary SSH server to accept incoming connections from StreamSets Cloud.

For more information, see Database Connections.

Secure Connections

The SQL Server Producer destination requires a secure connection to SQL Server.

You must configure the destination to use an SSH tunnel or SSL/TLS encryption to securely connect to the database.

SSH Tunnel

You can secure the connection to SQL Server by configuring the destination to use an SSH tunnel.

When you configure the stage to connect to the database through an SSH tunnel, the stage uses an encrypted connection to communicate with the SSH server. The SSH server then uses an unencrypted connection to communicate with the database server.

For additional information including instructions to set up the SSH server to accept incoming connections from StreamSets Cloud, see Use an SSH Tunnel.

Tip: If you also configure SSL/TLS encryption, then the stage uses SSL/TLS encryption from the SSH server to the database server. However, it is not possible to verify the host name of the server in the connection string during SSL/TLS encryption when using SSH tunneling. Instead, you must specify the server host name as an alternate host name to verify.

SSL/TLS Encryption

You can secure the connection to SQL Server by configuring the destination to use SSL/TLS encryption.

Before configuring the destination to use SSL/TLS encryption, verify that SQL Server is correctly configured to use SSL/TLS. For more information, see the SQL Server documentation.

After you enable encryption for the destination, you determine how the destination handles the following information before it establishes an SSL/TLS connection to the database server:
Server certificate
You can configure the stage to trust the certificate of the database server or to verify the certificate using the certificate provided in the stage properties.
When the certificate is signed by a public Certificate Authority (CA), you can configure the stage to trust the server certificate. The stage establishes an SSL/TLS connection after verifying the certificate of the database server using the CA certificate included in the default Java truststore file.
When the certificate is signed by a private CA, configure the stage not to trust the server certificate. The stage establishes an SSL/TLS connection using the certificate of the database server provided in the stage properties. You must paste the full contents of the PEM encoded server certificate into the Server Certificate PEM property, including the header and footer in the file. Use a text editor to open the PEM encoded certificate, and then copy and paste the full contents of the file into the origin property, as follows:
-----BEGIN CERTIFICATE REQUEST-----
MIIB9TCCAWACAQAwgbgxGTAXBgNVHJoMEFF1b1ZlZGwzIEkpbWl0ZWLxHLAaBgNV
                        
......
                        
98TwDIK/39WEB/V607As+KoYajQL9drorw==
-----END CERTIFICATE REQUEST-----
Server host name
The stage always verifies that the host name of the database server matches the common name (CN) specified in the server certificate before it establishes an SSL/TLS connection to the server.
The stage can verify the host name specified in one of the following locations:
  • Connection string on the JDBC tab.

    To verify the host name specified in the connection string, select the Verify Hostname in Connection String property on the SSL/TLS Encryption tab.

    When also using an SSH tunnel, it is not possible to verify the host name of the server in the connection string. Instead, you must specify the host name in the alternate host name property.

  • Alternate host name property on the SSL/TLS Encryption tab.

    To verify another host name, clear the Verify Hostname in Connection String property on the SSL/TLS Encryption tab and then specify the host name in the Verify Alternate Hostname property.

    Use an alternate host name when the database server is not reachable using the original host name specified in the issued certificate. Or use an alternate host name to specify the actual host name when also using an SSH tunnel.

Define the CRUD Operation

The SQL Server Producer destination can insert, update, or delete data. The destination writes the records based on the CRUD operation defined in a CRUD operation header attribute or in operation-related stage properties.

You can define the CRUD operation in the following ways:

CRUD operation header attribute
You can define the CRUD operation in a CRUD operation record header attribute. The destination looks for the CRUD operation to use in the sdc.operation.type record header attribute.
The attribute can contain one of the following numeric values:
  • 1 for INSERT
  • 2 for DELETE
  • 3 for UPDATE
If your pipeline includes a CRUD-enabled origin that processes changed data, the destination simply reads the operation type from the sdc.operation.type header attribute that the origin generates. If your pipeline uses a non-CDC origin, you can use the Expression Evaluator to define the record header attribute. For more information about changed data processing and a list of CDC-enabled origins, see Processing Changed Data.
Operation stage properties
You define a default operation in the destination properties. The destination uses the default operation when the sdc.operation.type record header attribute is not set.
You can also define how to handle records with unsupported operations defined in the sdc.operation.type header attribute. The destination can discard them, send them to error, or use the default operation.

Update and Delete Operations

For update and delete operations, the SQL Server Producer destination automatically detects the primary key of the table and uses that key in the WHERE clause that updates or deletes rows. The destination supports compound primary keys, keys that consist of more than one column.

For example, in the following database table, named customer, the id column is the primary key:
id first middle last
1 john f smith
2 john m doe
3 mary jane smith
Suppose the sdc.operation.type record header attribute for the following record is set to 2, to delete the record from the table:
{
 "id": 1,
 "first": "john",
 "middle": "m",
 "last": "doe"
}
Then, the destination matches the row with the same primary key and creates the following query:
DELETE FROM customer WHERE id = 1

Note that the destination matches the row based on the primary key and not the other fields in the record.

Single and Multi-row Operations

The SQL Server Producer destination performs single-row operations by default. That is, it executes a SQL statement for each record. You can configure the SQL Server Producer destination to perform multi-row operations. Depending on the sequence of the data, multi-row operations can improve pipeline performance.

When performing multi-row operations, the destination creates a single SQL statement for sequential insert rows and for sequential delete rows. The destination does not perform multi-row update operations. You can configure the Statement Parameter Limit property to limit the number of parameters in an insert operation - that is, you can limit the number of records included in an insert statement.

For example, say the pipeline generates three insert records, followed by two update records, and four delete records. If you enable multi-row operations and do not set a statement parameter limit, the destination generates a single insert SQL statement for the three insert records, two update statements - one for each of the update records, and a single delete statement for the four delete records. On the other hand, if you enable multi-row operations and set the statement parameter limit to two, the destination generates two insert SQL statements - one for two insert records and one for the third insert record, two update statements - one for each of the update records, and a single delete statement for the four delete records.

When an error occurs during a multi-row operation, the SQL Server database does not report which record causes the error. As a result, the destination sends all the records from the statement to the error stream.

For multi-row inserts, the destination uses the following SQL statement:
INSERT INTO <table name> (<col1>, <col2>, <col3>) 
     VALUES (<record1 field1>,<record1 field2>,<record1 field3>), 
     (<r2 f1>,<r2 f2>,<r2 f3>), (<r3 f1>,<r3 f2>,<r3 f3>),...;
For multi-row deletes, the destination uses the following SQL statement for tables with a single primary key:
DELETE FROM <table name> WHERE <primary key> IN (<key1>, <key2>, <key3>,...);
For multi-row deletes, the destination uses the following SQL statement for tables with multiple primary keys:
DELETE FROM <table name> WHERE (<pkey1>, <pkey2>, <pkey3>)
      IN ((<key1-1>, <key1-2>, <key1-3>),(<key2-1>, <key2-2>, <key2-2>),...);

SQL Server Data Types

The SQL Server Producer destination converts StreamSets Cloud data types into SQL Server data types before writing data to SQL Server.

The destination converts each StreamSets Cloud data type into one of the following SQL Server data types, according to the data type of the SQL Server column:
StreamSets Cloud Data Type SQL Server Data Type
Boolean Bit
Byte_Array Binary, Image, Varbinary
Date Date
Datetime Datetime, Datetime2, Smalldatetime
Decimal Decimal, Money, Numeric, Smallmoney
Double Float
Float Real
Integer Int
Long Bigint
Short Smallint, Tinyint
String Char, Datetimeoffset, Nchar, Ntext, Nvarchar, Text, Varchar, XML
Time Time
The stage does not support the following SQL Server data types:
  • Geography
  • Geometry

Configuring a SQL Server Producer Destination

Configure a SQL Server Producer destination to write data to a Microsoft SQL Server database. Before you use the destination in a pipeline, complete the required prerequisites.
Note: Some of these properties are advanced options, which the destination hides by default. To configure the advanced options, select Show Advanced Options at the top of the properties pane.
  1. In the properties pane, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions.

    Records that do not meet all preconditions are processed based on the error handling configured for the stage.

    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. To securely connect to the database server through an SSH tunnel, configure the following properties on the SSH Tunnel tab:
    SSH Tunnel Property Description
    Use SSH Tunneling Enables the stage to connect to the database server through an SSH tunnel.

    When cleared, the stage must connect to the database server using SSL/TLS encryption.

    SSH Tunnel Host Host name for the SSH server.
    SSH Tunnel Port Port number for the SSH server.

    For example: 22.

    SSH Tunnel Host Fingerprint Host key fingerprint used to verify the identity of the SSH server. For example: 97:3c:ae:76:73:f3:ef:a7:18:02:6a:c6:57:43:82:f6.

    You can enter multiple fingerprints separated by commas. Be sure to properly generate the fingerprint.

    Enter the host key fingerprint when you want the stage to verify the identity of the SSH server. Leave blank when you want the stage to establish an SSH connection without any verification.

    SSH Tunnel Username SSH user account used to install the StreamSets Cloud public SSH key on the SSH server.
    SSH Public Key Public SSH key for your StreamSets Cloud account.

    You download and install the key on the SSH server when you set up the SSH server.

  3. On the JDBC tab, configure the following properties:
    JDBC Property Description
    Connection String Connection string used to connect to the SQL Server database.

    Use the connections string format required by SQL Server. For example:

    jdbc:sqlserver://<host_name>:<port_number>;DatabaseName=<database>
    Optionally, include credentials in the connection string as follows:
    jdbc:sqlserver://<host_name>:<port_number>;DatabaseName=<database>;user=<username>;password=<password>;
    Use Credentials Enables entering credentials on the Credentials tab. Use when you do not include credentials in the connection string.
    Schema Name Optional database or schema name to use.

    Use when the database requires a fully-qualified table name.

    Table Name Database table name to use. Use the table name format required by the database.
    Field to Column Mapping Use to override the default field to column mappings. By default, fields are written to columns of the same name.
    When you override the mappings, you can define parameterized values to apply SQL functions to the field values before writing them to columns. For example, to convert a field value to an integer, enter the following for the parameterized value:
    CAST(? AS INTEGER)

    The question mark (?) is substituted with the value of the field. Leave the default value of ? if you do not need to apply a SQL function.

    Using simple or bulk edit mode, click the Add icon to create additional field to column mappings.

    Enclose Object Names Encloses the database or schema name, table name, and column names in quotation marks when writing to the database.

    Enables using case-sensitive names or names with special characters. Select only when the database or tables were created with quotation marks around the names.

    Change Log Format Format of change capture data. Use when processing change capture data.
    Default Operation Default CRUD operation to perform if the sdc.operation.type record header attribute is not set.
    Unsupported Operation Handling Action to take when the CRUD operation type defined in the sdc.operation.type record header attribute is not supported:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Use Default Operation - Writes the record to the destination system using the default operation.
    Use Multi-Row Operation Combines sequential insert operations and sequential delete operations into single statements. Select to insert and delete multiple records at the same time.

    By default, the stage performs single-row operations.

    Statement Parameter Limit Number of parameters allowed in the prepared statement for multi-row inserts.

    Use -1 to disable the parameter limit. Default is -1.

    Rollback Batch on Error Rolls back the entire batch when an error occurs within the batch.
    Additional JDBC Configuration Properties Additional JDBC configuration properties to use. To add properties, click Add and define the JDBC property name and value.

    Use the property names and values as expected by JDBC.

  4. To enter credentials separately from the JDBC connection string, on the Credentials tab, configure the following properties:
    Credentials Property Description
    Username User name for the connection to the database.
    Password Password for the account.
    Note: When you enter secrets such as user names and passwords, the stage encrypts the secret values.
  5. To use SSL/TLS encryption to securely connect to the database server, configure the following properties on the SSL/TLS Encryption tab:
    SSL/TLS Encryption Property Description
    Enable Encryption Uses SSL/TLS encryption to securely connect to the database server.

    When cleared, the stage must use an SSH tunnel to connect to the database server.

    Trust Server Certificate Determines whether the stage trusts the SSL/TLS certificate of the database server.
    Server Certificate PEM Server certificate in PEM format used to verify the SSL/TLS certificate of the database server.

    Use a text editor to open the PEM encoded certificate, and then copy and paste the full contents of the file into the property, including the header and footer.

    Required when configured not to trust the server certificate.

    Verify Hostname in Connection String Verifies that the host name specified in the connection string on the JDBC tab matches the common name (CN) specified in the server certificate.
    Verify Alternate Hostname Alternate host name that must match the common name (CN) specified in the server certificate.

    Use an alternate host name when the database server is not reachable using the original host name specified in the issued certificate. Or use an alternate host name to specify the actual host name when also using an SSH tunnel.

  6. On the Advanced tab, optionally configure advanced properties.

    The defaults for these properties should work in most cases:

    Advanced Property Description
    Maximum Pool Size Maximum number of connections to create.

    Default is 1. The recommended value is 1.

    Minimum Idle Connections Minimum number of connections to create and maintain. To define a fixed connection pool, set to the same value as Maximum Pool Size.

    Default is 1.

    Connection Timeout Maximum time to wait for a connection. Use a time constant in an expression to define the time increment.
    Default is 30 seconds, defined as follows:
    ${30 * SECONDS}
    Idle Timeout Maximum time to allow a connection to idle. Use a time constant in an expression to define the time increment.

    Use 0 to avoid removing any idle connections.

    When the entered value is close to or more than the maximum lifetime for a connection, the origin ignores the idle timeout.

    Default is 10 minutes, defined as follows:
    ${10 * MINUTES}
    Max Connection Lifetime Maximum lifetime for a connection. Use a time constant in an expression to define the time increment.

    Use 0 to set no maximum lifetime.

    When a maximum lifetime is set, the minimum valid value is 30 minutes.

    Default is 30 minutes, defined as follows:
    ${30 * MINUTES}
    Transaction Isolation Transaction isolation level used to connect to the database.
    Default is the default transaction isolation level set for the database. You can override the database default by setting the level to any of the following:
    • Read committed
    • Read uncommitted
    • Repeatable read
    • Serializable
    Init Query

    SQL query to perform immediately after the stage connects to the database. Use to set up the database session as needed.

    Data SQLSTATE Codes List of SQLSTATE codes to treat as data errors. The destination applies error record handling to records that trigger a listed code.

    When a record triggers a SQLSTATE code not listed, the destination generates a stage error that stops the pipeline.

    To add a code, click Add and enter the code.