Databricks Delta Lake

The Databricks Delta Lake executor runs one or more Spark SQL queries on a Delta Lake table on Databricks each time it receives an event record. Use the executor as part of an event stream in the pipeline.

Important: The Databricks Delta Lake executor requires a Databricks cluster version 6.3 or later.
You might use the Databricks Delta Lake executor for the following use cases:
Bulk load data into a Delta Lake table
Build a pipeline that first loads data into a storage location in Amazon S3 or Azure Data Lake Storage Gen2. After the Amazon S3 or Azure Data Lake Storage Gen2 destination closes an output file, the event triggers the Databricks Delta Lake executor to run a Spark SQL query that copies the data from the storage location into a Delta Lake table.
For a detailed solution of how to design this pipeline, see Bulk Loading Data into a Delta Lake Table.
Merge changed data into a Delta Lake table
Build a pipeline that processes change data capture (CDC) data using a CDC-enabled origin. The pipeline first loads data into a storage location in Amazon S3 or Azure Data Lake Storage Gen2. After the Amazon S3 or Azure Data Lake Storage Gen2 destination closes an output file, the event triggers the Databricks Delta Lake executor to run multiple Spark SQL queries that merge the changed data into a Delta Lake table.
For a detailed solution of how to design this pipeline, see Merging Changed Data into a Delta Lake Table.

The executor can load data into a single Delta Lake table. To load data into multiple Delta Lake tables, create a separate pipeline for each Delta Lake table.

The Databricks Delta Lake executor uses a JDBC connection to connect to the Databricks cluster. When you configure the executor, you specify the JDBC connection string and credentials to use to connect to the Databricks cluster, and then you define the Spark SQL queries to run.

You also define the connection information that the executor uses to connect to the storage location in Amazon S3 or Azure Data Lake Storage Gen2.

You can also configure the executor to generate events for another event stream.

Before you use the Databricks Delta Lake executor, you must complete other prerequisite tasks.

Note: The executor includes advanced options with default values that should work in most cases. By default, the executor hides the advanced options. To configure the advanced options, select Show Advanced Options at the top of the properties pane.

Prerequisites

Before you configure the Databricks Delta Lake executor, prepare your Databricks cluster and create the tables that you want to load data into. The executor can load data into an existing Delta Lake table.

  1. In Databricks, configure and start your Databricks cluster version 6.3 or later, generate a personal access token, and locate the JDBC URL used to access the cluster.
    For detailed prerequisite steps, see one of the following Databricks articles depending on your storage location:
  2. In Delta Lake on Databricks, create the required tables based on your use case:
    • Bulk load data into a table

      Create the target table that you want to load data into.

    • Merge changed data into a table

      Create the staging table and the target table. The staging table is a temporary table that contains all the changes that need to be applied. The target table is the table where you want to apply the changed data. The executor uses the staging table to apply a bulk merge into the target table.

    Important: The schemas of the Delta Lake tables must match the incoming records. Databricks does not support using the mergeSchema option with the COPY command.

Spark SQL Queries

You define one or more Spark SQL queries that the Databricks Delta Lake executor runs on a Delta Lake table each time it receives an event record.

The Databricks Delta Lake executor waits for each query to complete before continuing with the next query for the same event record. It also waits for all queries to complete before starting the queries for the next event record. Depending on the speed of the pipeline and the complexity of the queries, the wait for query completion can slow pipeline performance.

When running multiple queries for an event record, the executor skips the subsequent queries if a query fails.

You can include the following elements in each query that you define for the executor:

Spark SQL
Use Spark SQL syntax in a query, such as:
COPY INTO <table identifier>
FROM <location>
FILEFORMAT = <format type>

You can use any Spark SQL syntax that is valid for Delta Lake on Databricks, as described in the Databricks documentation. However, in most cases, you'll use the COPY or MERGE command in a Spark SQL query to load data into a Delta Lake table.

StreamSets expression language functions
You can include a subset of the functions provided with the StreamSets expression language in a query. For example, when you define the location to copy the data from, you might use the record:value() function to define the location as follows:
FROM 's3a://${record:value('/bucket')}/${record:value('/objectKey')}'

When entering the query in the executor, press Ctrl + Space Bar to view the list of valid functions you can use.

Examples

Here are a few examples of Spark SQL queries developed for the main use cases for the Databricks Delta Lake executor.

Bulk Load Data from an Amazon S3 Storage Location

Let's say that your pipeline loads customer data into an Amazon S3 bucket and then copies the data from that Amazon S3 location into an existing Delta Lake table. The Amazon S3 destination generates an object-written event record that includes a bucket field that indicates where the object is located and an objectKey field that indicates the object key name that was written. So when you define the Amazon S3 location to copy the data from, you can use the record:value() function included with the StreamSets expression language to dynamically determine the storage location based on the event record fields.

You define the following Spark SQL query to copy the data from Amazon S3 into the existing Delta Lake table named customers in the default database using the CSV file format:
COPY INTO customers
FROM 's3a://${record:value('/bucket')}/${record:value('/objectKey')}'
FILEFORMAT = CSV
FORMAT_OPTIONS ('header' = 'true')

Notice how the query uses the s3a URI scheme to connect to Amazon S3.

Bulk Load Data from an Azure Data Lake Storage Gen2 Storage Location

Let's say that your pipeline loads sales data into Azure Data Lake Storage Gen2 and then copies the data from that Azure location into an existing Delta Lake table. The Azure Data Lake Storage Gen2 destination generates a file closure event record that includes a filepath field that indicates the absolute path to the closed file. So when you define the Azure Data Lake Storage Gen2 location to copy the data from, you can use the record:value() function included with the StreamSets expression language to dynamically determine the storage location based on the event record field.

You define the following Spark SQL query to copy the data from Azure Data Lake Storage Gen2 into the existing Delta Lake table named sales in the default database using the CSV file format:
COPY INTO sales
FROM 'abfss://my-data@storageaccount.dfs.core.windows.net/${record:value('/filepath')}'
FILEFORMAT = CSV
FORMAT_OPTIONS ('header' = 'true')

Notice how the query uses the abfss URI scheme to connect to Azure Data Lake Storage Gen2.

Merge CDC Data

Let's say that your pipeline processes change data capture (CDC) data using the MySQL Binary Log origin. The pipeline first loads the data into an Amazon S3 bucket. The Databricks Delta Lake executor then copies the data from that Amazon S3 location into an existing Delta Lake table that serves as the staging table for all changes that need to be applied to the target table. The executor then uses the staging table to merge the changed data into the target Delta Lake table, using the appropriate operation: delete, update, or insert. Finally, the executor truncates the staging table so that the table can receive the next batch of data.

To merge this CDC data, you define three Spark SQL queries for the Databricks Delta Lake executor. The first query copies the data from the Amazon S3 storage location into the existing Delta Lake table named customers_cdc_staging using the JSON file format. This table is used as the staging table:
COPY INTO customers_cdc_staging
FROM 's3a://${record:value('/bucket')}/${record:value('/objectKey')}'
FILEFORMAT = JSON

Notice how the query uses the s3a URI scheme to connect to Amazon S3.

The second query uses the staging table to merge the changed data into the existing target table in Delta Lake using the appropriate delete, update, or insert operation:
MERGE INTO customers_cdc USING customers_cdc_staging ON customers_cdc_staging.customer_id = customers_cdc.customer_id
WHEN MATCHED AND customers_cdc_staging.Type=="DELETE" THEN DELETE 
WHEN MATCHED THEN UPDATE SET 
customers_cdc.customer_name = customers_cdc_staging.customer_name,
customers_cdc.address = customers_cdc_staging.address,
customers_cdc.city = customers_cdc_staging.city,
customers_cdc.state = customers_cdc_staging.state,
customers_cdc.zip_code = customers_cdc_staging.zip_code 
WHEN NOT MATCHED THEN INSERT *
Finally, the last query truncates the staging table in Delta Lake:
truncate table customers_cdc_staging

Storage Connection

Depending on your use case, the Databricks Delta Lake executor connects to a storage location and then copies data from that location into an existing Delta Lake table.

You configure one of the following storage locations:
None
When using the executor to copy or merge data from a storage location into a Delta Lake table and the Databricks cluster has the necessary Amazon S3 or Azure storage information configured, then set the storage location to None. In this case, the Databricks cluster connects to the storage location when it runs the Spark SQL query.
Amazon S3
When using the executor to copy or merge data from Amazon S3 into a Delta Lake table, you can specify the Amazon S3 connection information in the executor properties. Any connection information specified in the executor properties takes precedence over the connection information configured in the Databricks cluster.
After selecting Amazon S3 as the storage location, you specify the AWS access key pair that the executor uses to connect to Amazon S3.
ADLS Gen2
When using the executor to copy or merge data from Azure Data Lake Storage Gen2 into a Delta Lake table, you can specify the Azure connection information in the executor properties. Any connection information specified in the executor properties takes precedence over the connection information configured in the Databricks cluster.
After selecting ADLS Gen2 as the storage location, you configure the executor to use the appropriate authentication method to connect to Azure Data Lake Storage Gen2.

ADLS Gen2 Authentication Information

When you configure the executor to connect to an ADLS Gen2 storage location, you select the authentication method that the executor uses to connect to Azure Data Lake Storage Gen2.

Select one of the following authentication methods:

OAuth Token
Connections made with OAuth Token authentication require the following information:
  • Application ID - Application ID for the Azure Active Directory application for StreamSets Cloud. Also known as the client ID.

    For information on accessing the application ID from the Azure portal, see the Azure documentation.

  • Auth Token Endpoint - OAuth 2.0 token endpoint for the Azure Active Directory v1.0 application for StreamSets Cloud. For example: https://login.microsoftonline.com/<uuid>/oauth2/token.
  • Application Key - Authentication key for the Azure Active Directory application for StreamSets Cloud. Also known as the client key.

    For information on accessing the application key from the Azure portal, see the Azure documentation.

Shared Key
Connections made with Shared Key authentication require the following information:
  • Account FQDN - Fully qualified domain name of the Azure Data Lake Storage Gen2 account. For example: <account-name>.dfs.core.windows.net.
  • Account Shared Key - Shared access key that Azure generated for the storage account.

    For more information on accessing the shared access key from the Azure portal, see the Azure documentation.

Manually Purging Data from Storage

When using the Databricks Delta Lake executor to copy or merge data from a storage location into a Delta Lake table, the executor does not automatically remove the data from Amazon S3 or from Azure Data Lake Storage Gen2. You must manually purge or remove the data after the executor successfully loads the data.

Event Generation

The Databricks Delta Lake executor can generate events that you can use in an event stream. When you enable event generation, the executor generates events for each successful or failed query.

Databricks Delta Lake events can be used with a destination to store event information. For an example, see Preserving an Audit Trail of Events.

For more information about dataflow triggers and the event framework, see Overview.

Event Records

Event records generated by the Databricks Delta Lake executor have the following event-related record header attributes. Record header attributes are stored as String values.
Record Header Attribute Description
sdc.event.type Event type. Uses the following event types:
  • successful-query - Generated after a query successfully completes.
  • failed-query - Generated after a query fails.
sdc.event.version Integer that indicates the version of the event record type.
sdc.event.creation_timestamp Epoch timestamp when the stage created the event.
The Databricks Delta Lake executor can generate the following types of event records:
Successful query

The executor generates a successful-query event record after successfully completing a query.

Successful-query event records have the sdc.event.type record header attribute set to sucessful-query and include the following fields:
Event Field Name Description
query Query completed.
query-result Number of rows affected by query. Included if the Include Query Result Count in Events property is selected.
Failed query

The executor generates a failed-query event record after failing to complete a query.

Failed-query event records have the sdc.event.type record header attribute set to failed-query and include the following field:
Event Field Name Description
query Query attempted.

Configuring a Databricks Delta Lake Executor

Configure a Databricks Delta Lake executor to run a Spark SQL query on a Delta Lake table on Databricks upon receiving an event.

Before you use the executor, you must perform some prerequisite tasks.
Note: Some of these properties are advanced options, which the executor hides by default. To configure the advanced options, select Show Advanced Options at the top of the properties pane.
  1. In the properties pane, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Produce Events Generates event records when events occur. Use for event handling.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions.

    Records that do not meet all preconditions are processed based on the error handling configured for the stage.

    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the JDBC tab, configure the following properties:
    JDBC Property Description
    JDBC Connection String JDBC connection string used to connect to the Databricks cluster.

    For example: jdbc:spark://dbc-7g9hba4d-a123.cloud.databricks.com:443/default;transportMode=http:ssl=1;httpPath=sql/protocolv1/o/89266567230988377/1123-1001003-abc1;AuthMech=3;

    Tip: In Databricks, you can locate the JDBC connection string for your cluster on the JDBC/ODBC tab in the cluster configuration details.
    Use Credentials Enables entering credentials on the Credentials tab. Use when you do not include credentials in the connection string.
    Spark SQL Query One or more Spark SQL queries to run each time the executor receives an event record.

    Using simple or bulk edit mode, click the Add icon to add additional queries.

    The executor processes multiple queries in order, and waits for each query to complete before continuing to the next query.

    Include Query Result Count in Events Includes the number of rows impacted by a query in generated event records.
    Additional JDBC Configuration Properties Additional JDBC configuration properties to use. To add properties, click Add and define the JDBC property name and value.

    Use the property names and values as expected by JDBC.

  3. To enter credentials separately from the JDBC connection string, on the Credentials tab, configure the following properties:
    Credentials Property Description
    Username Enter token .
    User Token Enter your personal access token used to connect to the Databricks cluster.
    Note: When you enter secrets such as user names and passwords, the stage encrypts the secret values.
  4. On the Storage tab, configure the following properties:
    Storage Property Description
    Storage Location Storage location to connect to and copy or merge data from:
    • None - Does not connect to a storage location.

      Select when copying or merging data from a storage location and the Databricks cluster has the necessary storage information configured.

    • Amazon S3 - Connects to an Amazon S3 storage location.

      Select when copying or merging data from Amazon S3 and you need to specify the connection information in the executor properties.

    • ADLS Gen2 - Connects to an Azure Data Lake Storage Gen2 storage location.

      Select when copying or merging data from Azure Data Lake Storage Gen2 and you need to specify the connection information in the executor properties.

    AWS Access Key AWS access key ID.

    Available when using the Amazon S3 storage location.

    AWS Secret Key AWS secret access key.
    Available when using the Amazon S3 storage location.
    Note: When you enter secrets such as access key pairs, the stage encrypts the secret values.
    Azure Authentication Method Authentication method used to connect to Azure:
    • OAuth 2.0
    • Shared Key

    Available when using the ADLS Gen2 storage location.

    Application ID Application ID for the Azure Active Directory application for StreamSets Cloud. Also known as the client ID.

    For information on accessing the application ID from the Azure portal, see the Azure documentation.

    Available when using the OAuth 2.0 authentication method for Azure.

    Application Key Authentication key for the Azure Active Directory application for StreamSets Cloud. Also known as the client key.

    For information on accessing the application ID from the Azure portal, see the Azure documentation.

    Available when using the OAuth 2.0 authentication method for Azure.

    Auth Token Endpoint OAuth 2.0 token endpoint for the Azure Active Directory v1.0 application for StreamSets Cloud. For example: https://login.microsoftonline.com/<uuid>/oauth2/token.

    Available when using the OAuth 2.0 authentication method for Azure.

    Account FQDN Fully qualified domain name of the Azure Data Lake Storage Gen2 account. For example: <account-name>.dfs.core.windows.net.

    Available when using the Shared Key authentication method for Azure.

    Account Shared Key Shared access key that Azure generated for the storage account.

    For more information on accessing the shared access key from the Azure portal, see the Azure documentation.

    Available when using the Shared Key authentication method for Azure.

  5. On the Advanced tab, optionally configure advanced properties.

    The defaults for these properties should work in most cases:

    Advanced Property Description
    Maximum Pool Size Maximum number of connections to create.

    Default is 1. The recommended value is 1.

    Minimum Idle Connections Minimum number of connections to create and maintain. To define a fixed connection pool, set to the same value as Maximum Pool Size.

    Default is 1.

    Enable Parallel Queries Runs insert and delete queries in parallel to improve throughput. When selected, the executor runs insert and delete queries simultaneously on all the configured connections to the database, but continues to run other queries serially. The executor runs insert queries, followed by other queries, and then delete queries. The executor commits data after each statement.
    Connection Timeout Maximum time to wait for a connection. Use a time constant in an expression to define the time increment.
    Default is 30 seconds, defined as follows:
    ${30 * SECONDS}
    Idle Timeout Maximum time to allow a connection to idle. Use a time constant in an expression to define the time increment.

    Use 0 to avoid removing any idle connections.

    When the entered value is close to or more than the maximum lifetime for a connection, the origin ignores the idle timeout.

    Default is 10 minutes, defined as follows:
    ${10 * MINUTES}
    Max Connection Lifetime Maximum lifetime for a connection. Use a time constant in an expression to define the time increment.

    Use 0 to set no maximum lifetime.

    When a maximum lifetime is set, the minimum valid value is 30 minutes.

    Default is 30 minutes, defined as follows:
    ${30 * MINUTES}
    Transaction Isolation Transaction isolation level used to connect to the database.
    Default is the default transaction isolation level set for the database. You can override the database default by setting the level to any of the following:
    • Read committed
    • Read uncommitted
    • Repeatable read
    • Serializable
    Init Query

    SQL query to perform immediately after the stage connects to the database. Use to set up the database session as needed.