Loading Data into Databricks Delta Lake

You can use several StreamSets Cloud solutions to load data into a Delta Lake table on Databricks.

Before continuing with one of the solutions, ensure that you have completed all of the required prerequisites in Databricks, including generating a personal access token, configuring and starting your Databricks cluster version 6.3 or later, and then locating the JDBC URL used to access the cluster.

For detailed prerequisite steps, see one of the following Databricks articles depending on your storage location:

If you haven't already, verify that you meet all StreamSets Cloud requirements. Requirements include using a supported browser and whitelisting all StreamSets Cloud IP addresses in the origin systems that you want to read data from.

Then use one of the following solutions to build a pipeline that loads data into a Delta Lake table on Databricks:
  • Bulk load data into a Delta Lake table

    Build a pipeline that reads Salesforce data and then loads the data into a storage location in Amazon S3. After the Amazon S3 destination completes writing to an object, the event triggers the Databricks Delta Lake executor to run a Spark SQL query that copies the data from the storage location into a Delta Lake table.

  • Merge changed data into a Delta Lake table

    Build a pipeline that processes change data capture (CDC) data using the MySQL Binary Log origin. The pipeline first loads data into a storage location in Amazon S3. After the Amazon S3 destination completes writing to an object, the event triggers the Databricks Delta Lake executor to run multiple Spark SQL queries that merge the changed data into a Delta Lake table.

Bulk Loading Data into a Delta Lake Table

This solution describes how to build a pipeline that bulk loads Salesforce data into a Delta Lake table on Databricks.

Tip: You can download the sample pipeline named Salesforce to Delta Lake.json from the StreamSets Cloud pipelines GitHub repository, import the pipeline into StreamSets Cloud, and then follow these steps for more details on the solution.

Let's say that you want to bulk load Salesforce account data into Databricks Delta Lake for further analysis. You'd like the pipeline to clean some of the account data before loading it into a temporary Amazon S3 storage location. After the Amazon S3 destination completes writing to an object, the event triggers the Databricks Delta Lake executor to run a Spark SQL query that copies the data from the storage location into a Delta Lake table.

To build this pipeline, you first must create the Delta Lake table in Databricks.

Then you complete the following tasks in StreamSets Cloud:
  • Create the pipeline and configure a Salesforce origin to read account data from Salesforce.
  • Configure an Expression Evaluator processor to clean the input data.
  • Configure an Amazon S3 destination to load data into a storage location.
  • Configure a Databricks Delta Lake executor that runs a Spark SQL query to copy the data from the Amazon S3 storage location into a Delta Lake table.
  • Run the pipeline to move the data from Salesforce to Delta Lake.

Create the Delta Lake Table

Before you build a StreamSets Cloud pipeline that bulk loads data into a Delta Lake table on Databricks, you must create the Delta Lake table that you want to load data into. The Databricks Delta Lake executor can copy data to an existing Delta Lake table.

For example, run the following command in a notebook on Databricks to create an accounts table in the sales database:

create table sales.accounts
(
Id string,
Name string,
Type string,
BillingStreet string,
BillingCity string,
BillingState string,
BillingPostalCode string,
BillingCountry string,
ShippingStreet string,
ShippingCity string,
ShippingState string,
ShippingPostalCode string,
ShippingCountry string,
Phone string,
Fax string
)
using delta location '/sales/accounts'

Note that this command creates an accounts table for only some attributes of the Salesforce account object. When you configure the Salesforce origin to read the account data, you similarly configure the origin to read only these attributes.

Create the Pipeline and Configure the Salesforce Origin

Create the pipeline and then configure the Salesforce origin to read account data from Salesforce.

Important: StreamSets Cloud reads data from Salesforce using a set of well-known IP addresses. To ensure that StreamSets Cloud can connect to your Salesforce data, you must whitelist all StreamSets Cloud IP addresses in your Salesforce organization.

For more detailed information about this origin, see Salesforce origin.

  1. On the Pipelines page in StreamSets Cloud, click Create.
  2. Enter a name for the pipeline, such as BulkLoadDeltaLake, and then click Create.
    The pipeline opens in the pipeline canvas.
  3. From the stage type list, select Origins to filter the stages by origins, as follows:
  4. In the Search field, type sale, and then select the Salesforce origin.

    The origin is added to the canvas.

  5. On the Salesforce tab, enter your Salesforce user name and password.
    Note: When you enter secrets such as user names and passwords, the stage encrypts the secret values.
  6. Clear the Subscribe for Notifications checkbox.

    This way, the origin runs a query to process existing data and is not subscribed to notifications.

  7. Leave the default values for the remaining properties.

    The Salesforce tab should be configured as follows:

  8. Click the Query tab and enter the following query for the SOQL Query property so that the origin reads only these attributes from the Salesforce account object:
    SELECT Id,
    Name,
    Type,
    BillingStreet,
    BillingCity,
    BillingState,
    BillingPostalCode,
    BillingCountry,
    ShippingStreet,
    ShippingCity,
    ShippingState,
    ShippingPostalCode,
    ShippingCountry,
    Phone,
    Fax
    FROM Account
    WHERE Id > '${OFFSET}'
    ORDER BY Id
  9. Leave the default values for the remaining properties.
  10. In the toolbar above the pipeline canvas, click the Preview icon: .

    When you preview the pipeline, you can verify that you correctly entered the Salesforce connection information and you can view several records of data read from Salesforce.

  11. In the Preview Configuration dialog box, accept the defaults and then click Confirm.

    It can take about a minute for the preview to start. If the Salesforce connection information is valid, the preview displays several records of Salesforce data, as follows:

  12. Click the Preview icon again to close the preview and continue building the pipeline.

Configure the Expression Evaluator Processor

Next you add and configure the Expression Evaluator processor to clean some of the account data.

The Type field read from Salesforce contains either Customer - Direct or Customer - Channel as the value. You'd like to clean this data by keeping only Direct or Channel as the value before loading the data into a Delta Lake table.

So you add an Expression Evaluator processor to the pipeline and define an expression that uses the str:regExCapture() function to replace the value of the Type field with only Direct or Channel.

Note: The Expression Evaluator processor performs calculations using the StreamSets expression language and writes the results to new or existing fields. For more detailed information about this processor, see Expression Evaluator processor.
  1. From the stage type list, select Processors to filter the stages by processors.
  2. In the Search field, type exp, and then select the Expression Evaluator processor.

    The processor is added to the canvas.

  3. Use your cursor to connect the origin to the processor so that the pipeline looks like this:

  4. Select the Expression Evaluator processor in the pipeline canvas, and then click the Expressions tab.
  5. In the Field Expressions section, enter /Type for the Output Field and then enter the following expression for the Field Expression:
    ${str:regExCapture(record:value('/Type'),'(.*) - (.*)',2)}

    The Expressions tab should be configured as follows:

  6. To verify that the expression cleans the data as expected, click the Preview icon () and then click Confirm in the dialog box.
  7. When the preview starts, select the Expression Evaluator processor in the pipeline canvas.

    The preview displays the input and output data of the processor, highlighting the changed data in the Type field and confirming that the expression correctly removes the string Customer - from field values, as follows:

  8. Click the Preview icon again to close the preview and continue configuring the next stage in the pipeline.

Configure the Amazon S3 Destination

When loading data to Delta Lake, the Databricks Delta Lake executor connects to an Amazon S3 or Azure Data Lake Storage Gen2 storage location and then copies data from that location into an existing Delta Lake table.

This solution uses Amazon S3 as the storage location, so instructs you to add and configure an Amazon S3 destination. If you prefer, you can configure an Azure Data Lake Storage Gen2 destination instead.
Note: For more detailed information about either destination, see Amazon S3 destination or Azure Data Lake Storage Gen2 destination.
  1. From the stage type list, select Destinations, type ama, and then select the Amazon S3 destination.

    The destination is added to the canvas.

  2. Use your cursor to connect the Expression Evaluator processor to the destination.
  3. Select the Amazon S3 destination in the pipeline canvas, and then click the Data Format tab.
  4. Configure the following properties with the required values:
    Property Value
    Data Format Select Delimited.
    Delimiter Format Select PostgreSQL CSV.

    This format produces double quoted field values.

    Header Line Select With Header Line.
  5. Leave the default values for the remaining properties.

    The Data Format tab should be configured as follows:

  6. Click the Amazon S3 tab, and then configure the following properties with the required values:
    Property Value
    Access Key ID Enter your AWS access key ID.
    Secret Access Key Enter your AWS secret access key.
    Region Select the Amazon S3 region for the bucket that you want to write to.
    Bucket Enter the name of the bucket to write to.
    Common Prefix Enter a common prefix that determines where objects are written.
  7. Leave the default values for the remaining properties.

    The Amazon S3 tab should be configured as follows:

Configure the Databricks Delta Lake Executor

Databricks provides the COPY command which you can use to quickly bulk load a large amount of data from Amazon S3 into the Delta Lake table. To use the COPY command, you add the Databricks Delta Lake executor to the pipeline and then define a Spark SQL query that copies the data from the storage location into a Delta Lake table.

The Databricks Delta Lake executor runs one or more Spark SQL queries on a Delta Lake table on Databricks each time it receives an event record.

The Amazon S3 destination can generate events each time it completes writing to an object. When it generates an object-written event, the destination records the bucket where the object is located and the object key name that was written. So first, you'll enable the Amazon S3 destination to produce events, and then you'll add and configure the Databricks Delta Lake executor to consume these events.
Note: For more detailed information about this executor, see Databricks Delta Lake executor.
  1. Select the Amazon S3 destination in the pipeline canvas, click the General tab, and then select Produce Events.

    The event output stream becomes available for the destination, as follows:

  2. From the stage type list, select Executors, type del, and then select the Databricks Delta Lake executor.

    The executor is added to the canvas.

  3. Use your cursor to connect the event output stream to the executor, as follows:

  4. Select the Databricks Delta Lake executor, and then click the JDBC tab.
  5. For the JDBC Connection String property, enter the JDBC URL that the executor uses to connect to the Databricks cluster.

    For example: jdbc:spark://dbc-7g9hba4d-a123.cloud.databricks.com:443/default;transportMode=http:ssl=1;httpPath=sql/protocolv1/o/89266567230988377/1123-1001003-abc1;AuthMech=3;

  6. Click Add next to the Spark SQL Query property, and then enter the following query:
    COPY INTO sales.accounts
    FROM 's3a://${record:value('/bucket')}/${record:value('/objectKey')}'
    FILEFORMAT = CSV
    FORMAT_OPTIONS ('header' = 'true')

    Notice how the query uses the s3a URI scheme to connect to Amazon S3. The query also uses the record:value() function available with the StreamSets expression language to dynamically determine the storage location based on the event record fields generated by the Amazon S3 destination.

    The JDBC tab should be configured as follows:

  7. Click the Credentials tab, and then configure the following properties with the required values:
    Property Value
    Username Enter token.
    User Token Enter the personal access token that you generated as a prerequisite in Databricks.
  8. Click the Storage tab, and then set the Storage Location to None.

    The Storage tab defines how the executor connects to the storage location in Amazon S3 or Azure Data Lake Storage Gen2. This solution assumes that you correctly configured the Databricks cluster to connect to Amazon S3 when you completed the prerequisites in Databricks. In this case, the Databricks cluster connects to the storage location in Amazon S3 when it runs the Spark SQL query.

    If you did not configure the Databricks cluster to connect to Amazon S3, enter the storage location details as described in Storage Connection.

Run the Pipeline

Run the pipeline to move the data from Salesforce to Delta Lake.

  1. In the toolbar above the pipeline canvas, click the Run icon: .
  2. In the Run Configuration dialog box, accept the defaults, and then click Run Pipeline.

    It takes a few minutes for the pipeline to allocate the required resources and then to deploy and start, as displayed in the following messages:

    When the pipeline successfully starts, the monitor pane displays the Monitoring tab. The monitor pane lets you monitor the health and performance of the pipeline by viewing real-time statistics and errors as data moves through the pipeline, as displayed in the following image:

    Because the Salesforce origin is configured to read all account data in bulk, the pipeline automatically stops after reading all account data.

  3. Verify that the pipeline loaded data into the Delta Lake table by running a SQL query in your Databricks notebook.
    For example, if you run the following SQL query:
    select * from sales.accounts

    Databricks displays the following results:

Merging Changed Data into a Delta Lake Table

This solution describes how to design a pipeline that reads change data capture (CDC) data from a database and replicates the changes to a Delta Lake table on Databricks.

Tip: You can download the sample pipeline named MySQL CDC to DeltaLake.json from the StreamSets Cloud pipelines GitHub repository, import the pipeline into StreamSets Cloud, and then follow these steps for more details on the solution.

Let's say that you want to track customer transactions in a MySQL table and apply those changes to a Delta Lake table for further analysis. That is, you need to apply the same set of updates, deletes, and inserts made to the MySQL table to the Delta Lake table. You first design and run a pipeline to bulk load the initial set of transactions in the MySQL table into the Delta Lake table. Then you design the CDC pipeline that processes subsequent changes.

In the CDC pipeline, you use a MySQL Binary Log origin to capture the changes from the MySQL master database. Due to the structure of the MySQL binary log records, you need to add processors to the pipeline to restructure the record and keep only the necessary fields. You then configure the pipeline to load the data into a temporary Amazon S3 storage location. After the Amazon S3 destination completes writing to an object, the event triggers the Databricks Delta Lake executor to run multiple Spark SQL queries that merge the changed data into a Delta Lake table.

To build this CDC pipeline, you first must create two Delta Lake tables in Databricks - the staging table and the target table.

Then you complete the following tasks in StreamSets Cloud:
  • Create the pipeline and configure a MySQL Binary Log origin to read CDC information provided by MySQL in binary logs.
  • Configure several processors to restructure the record based on the type of operation performed: INSERT, UPDATE, or DELETE.
  • Configure an Amazon S3 destination to load data into a storage location.
  • Configure a Databricks Delta Lake executor that runs multiple Spark SQL queries. The queries copy the data from the Amazon S3 storage location to the Delta Lake staging table, merge the data from the staging table into the Delta Lake target table, and then truncate the staging table to reset for the next set of queries.
  • Run the pipeline to replicate data from MySQL binary logs to the Delta Lake target table.

Create Delta Lake Tables

Before you build a StreamSets Cloud pipeline that merges CDC data to a Delta Lake table on Databricks, you must create the staging and target tables that you want to load data into. The Databricks Delta Lake executor can load data into existing Delta Lake tables.

To merge changed data into a Delta Lake table using an INSERT, UPDATE, or DELETE operation, you must create the following tables in Delta Lake:
Staging table
The staging table is a temporary table that contains all the changes that need to be applied. You'll use this staging table to apply a bulk merge into the target table.
Target table
The target table is the table where you want to apply the changed data.
To create the target table, simply create a replica of the MySQL table. For example, run the following command in a notebook on Databricks to create a customers_cdc table in the default delta database:
create table customers_cdc
(
customer_id bigint,
customer_name string,
address string,
city string,
state string,
zip_code string
) 
using delta location '/delta/customers_cdc';
To create the staging table, create a replica of the MySQL table with an additional column that stores the transaction type: insert, update, or delete. For example, run the following command in a notebook on Databricks to create a customers_cdc_staging table in the default delta database:
create table customers_cdc_staging
(
customer_id bigint,
customer_name string,
address string,
city string,
state string,
zip_code string,
Type string
) 
using delta location '/delta/customers_cdc_staging';

Create the Pipeline and Configure the MySQL Binary Log Origin

Create the pipeline and then configure the MySQL Binary Log origin to read CDC information provided by MySQL in binary logs.

Important: The MySQL Binary Log origin requires a secure connection to the MySQL database using an SSH tunnel or SSL/TLS encryption and requires that the MySQL database uses row-based logging. Before you use the origin, complete the required prerequisites. This solution assumes that you use SSL/TLS encryption, but you can use an SSH tunnel if you prefer.

For more detailed information about this origin, see MySQL Binary Log origin.

  1. On the Pipelines page in StreamSets Cloud, click Create.
  2. Enter a name for the pipeline, such as CDCDeltaLake, and then click Create.
    The pipeline opens in the pipeline canvas.
  3. From the stage type list, select Origins to filter the stages by origins, as follows:
  4. In the Search field, type my, and then select the MySQL Binary Log origin.

    The origin is added to the canvas.

  5. At the top of the stage properties pane, select Show Advanced Options.
  6. On the MySQL Binary Log tab, enter the MySQL server host name and port number.
  7. Optionally enter the replication server ID that the origin uses to connect to the master MySQL server.

    This solution assumes that the MySQL database is enabled for GTID which does not require that you configure the server ID.

  8. Select Start from Beginning.
  9. Leave the default values for the remaining properties.

    The MySQL Binary Log tab should be configured as follows:

  10. Click the Credentials tab and enter the user name and password to connect to MySQL.
    Note: When you enter secrets such as user names and passwords, the stage encrypts the secret values.
  11. Click the SSL/TLS Encryption tab and set the SSL mode to Required (trust server).

    The origin requires that you use an SSH tunnel or SSL/TLS encryption to securely connect to the database. This solution assumes that MySQL is correctly configured to use SSL/TLS and configures the stage to establish an SSL/TLS connection without any verification. To set up an SSH tunnel or a more secure SSL/TLS connection for the origin, see Secure Connections.

  12. In the toolbar above the pipeline canvas, click the Preview icon: .

    When you preview the pipeline, you can verify that you correctly entered the MySQL connection information and you can view several records of data read from the binary logs.

  13. In the Preview Configuration dialog box, accept the defaults and then click Confirm.

    It can take about a minute for the preview to start. If the MySQL connection information is valid and if the binary log contains pending transactions, the preview displays the pending transactions, as follows:

  14. Click the Preview icon again to close the preview and continue building the pipeline.

Configure Processors to Restructure the Record

Due to the structure of the MySQL binary log records, you need to add several processors to the pipeline to restructure the record and keep only the necessary fields.

Each record generated by the MySQL Binary Log origin includes the following information:

  • CRUD operation type in the Type field: INSERT, UPDATE, or DELETE.
  • Change data capture information such as the table, server ID, and timestamp in various fields.
  • New data to be inserted or updated in the Data map field.

  • Old data to be deleted in the OldData map field

For example, the origin might generate the following record for data that needs to be inserted:

You'll need to restructure the records differently, based on the operation type. You add a Stream Selector processor to the pipeline to route records with a DELETE operation in the Type field to one processing stream and to route records with an INSERT or UPDATE operation in the Type field to another processing stream. Then for each stream, you add a Field Remover processor to keep only the necessary fields and then a Field Flattener processor to flatten the fields in the Data or OldData map fields.

  1. From the stage type list, select Processors to filter the stages by processors.
  2. In the Search field, type str, and then select the Stream Selector processor.

    The processor is added to the canvas.

  3. Use your cursor to connect the origin to the processor.
  4. Select the Stream Selector processor in the pipeline canvas, and then click the Conditions tab.
  5. Click Add to add a condition.
  6. Enter the following expression for the condition:
    ${record:value('/Type') == 'DELETE'}

    This condition uses the StreamSets expression language to route records with a DELETE operation in the Type field to the first output stream of the processor. All other records, with an INSERT or UPDATE operation in the Type field, route to the default output stream.

    The configured Conditions tab and the pipeline should look like this. Note that the Stream Selector processor has two output streams:

  7. Add a Field Remover processor, and connect the first output stream of the Stream Selector processor to the new processor.
  8. Select the Field Remover processor in the pipeline canvas, and then on the General tab, enter Keep OldData Fields to DELETE for the processor name.
  9. Click the Remove/Keep tab.
  10. For Action, select Keep Listed Fields, and then enter the following field paths for the Fields property:
    • /OldData
    • /Type

    This configuration keeps only the OldData and Type fields for records with a DELETE operation, and removes all other fields. The pipeline and the configured Remove/Keep tab should look like this:

  11. Add another Field Remover processor, connecting the second output stream of the Stream Selector processor to this processor.
  12. Select the second Field Remover processor in the pipeline canvas, and then on the General tab, enter Keep Data Fields to INSERT/UPDATE for the processor name.
  13. Click the Remove/Keep tab.
  14. For Action, select Keep Listed Fields, and then enter the following field paths for the Fields property:
    • /Data
    • /Type

    This configuration keeps only the Data and Type fields for records with an INSERT or UPDATE operation, and removes all other fields. The configured Remove/Keep tab and the pipeline should look like this:

  15. Add two Field Flattener processors to the pipeline, connecting each to one of the Field Remover processors.
  16. Select the Field Flattener processor in the stream that keeps the OldData field, and then click the Flatten tab.
  17. Configure the following properties with the required values:
    Property Value
    Flatten Select Flatten specific fields.
    Fields Enter /OldData.
    Flatten in Place Clear the property.
    Target Field Enter / to write the flattened data to the root field.
  18. Leave the default values for the remaining properties.

    The Flatten tab should be configured as follows:

  19. Select the second Field Flattener processor in the stream that keeps the Data field, and then configure it the same way as the first Field Flattener processor, except enter /Data for the Fields property.
  20. To verify that you've restructured the data as expected, click the Preview icon () and then click Confirm in the dialog box.
  21. Assuming that the binary log contains pending insert or update transactions, select the Field Remover processor that keeps the Data field.

    The preview displays the input and output data of the processor, highlighting that only the Data and Type fields are included in the output, as follows:

  22. Next, select the Field Flattener processor connected to this Field Remover processor.

    The preview displays the input and output data of the Field Flattener processor, showing that the fields in the Data map field have been flattened to the root field, as follows:

  23. Click the Preview icon again to close the preview and continue configuring the next stage in the pipeline.

Configure the Amazon S3 Destination

When loading data to Delta Lake, the Databricks Delta Lake executor connects to an Amazon S3 or Azure Data Lake Storage Gen2 storage location and then copies data from that location into an existing Delta Lake table.

This solution uses Amazon S3 as the storage location, so instructs you to add and configure an Amazon S3 destination. If you prefer, you can configure an Azure Data Lake Storage Gen2 destination instead.
Note: For more detailed information about either destination, see Amazon S3 destination or Azure Data Lake Storage Gen2 destination.
  1. From the stage type list, select Destinations, type ama, and then select the Amazon S3 destination.

    The destination is added to the canvas.

  2. Use your cursor to connect both Field Flattener processors to the destination.
  3. Select the Amazon S3 destination in the pipeline canvas, click the Data Format tab, and then select JSON as the data format.
  4. Leave the default values for the remaining properties.

    The pipeline and Data Format tab should be configured as follows:

  5. Click the Amazon S3 tab, and then configure the following properties with the required values:
    Property Value
    Access Key ID Enter your AWS access key ID.
    Secret Access Key Enter your AWS secret access key.
    Region Select the Amazon S3 region for the bucket that you want to write to.
    Bucket Enter the name of the bucket to write to.
    Common Prefix Enter a common prefix that determines where objects are written.
  6. Leave the default values for the remaining properties.

    The Amazon S3 tab should be configured as follows:

Configure the Databricks Delta Lake Executor

Databricks provides the MERGE command which you can use to merge changed data into Delta Lake. The MERGE command uses a source or staging table in Delta Lake to merge data into the target table in a Delta Lake table.

To merge changed data, you add the Databricks Delta Lake executor to the pipeline and then define multiple Spark SQL queries that the executor runs. The queries copy the data from the Amazon S3 storage location to the Delta Lake staging table, merge the data from the staging table into the Delta Lake target table, and then truncate the staging table so that it can receive the next batch of data.

The Databricks Delta Lake executor runs one or more Spark SQL queries on a Delta Lake table on Databricks each time it receives an event record.

The Amazon S3 destination can generate events each time it completes writing to an object. When it generates an object-written event, the destination records the bucket where the object is located and the object key name that was written. So first, you'll enable the Amazon S3 destination to produce events, and then you'll add and configure the Databricks Delta Lake executor to consume these events.
Note: For more detailed information about this executor, see Databricks Delta Lake executor.
  1. Select the Amazon S3 destination in the pipeline canvas, click the General tab, and then select Produce Events.

    The event output stream becomes available for the destination, as follows:

  2. From the stage type list, select Executors, type del, and then select the Databricks Delta Lake executor.

    The executor is added to the canvas.

  3. Use your cursor to connect the event output stream to the executor, as follows:

  4. Select the Databricks Delta Lake executor, and then click the JDBC tab.
  5. For the JDBC Connection String property, enter the JDBC URL that the executor uses to connect to the Databricks cluster.

    For example: jdbc:spark://dbc-7g9hba4d-a123.cloud.databricks.com:443/default;transportMode=http:ssl=1;httpPath=sql/protocolv1/o/89266567230988377/1123-1001003-abc1;AuthMech=3;

  6. Click Add next to the Spark SQL Query property, and then enter the following query which copies the data in the Amazon S3 storage location into the staging table in Delta Lake:
    COPY INTO customers_cdc_staging
    FROM 's3a://${record:value('/bucket')}/${record:value('/objectKey')}'
    FILEFORMAT = JSON

    Notice how the query uses the s3a URI scheme to connect to Amazon S3. The query also uses the record:value() function available with the StreamSets expression language to dynamically determine the storage location based on the event record fields generated by the Amazon S3 destination.

  7. Click Add to add the second Spark SQL query, and then enter the following query which uses the staging table to merge the changed data into the target table in Delta Lake:
    MERGE INTO customers_cdc USING customers_cdc_staging ON customers_cdc_staging.customer_id = customers_cdc.customer_id
    WHEN MATCHED AND customers_cdc_staging.Type=="DELETE" THEN DELETE 
    WHEN MATCHED THEN UPDATE SET 
    customers_cdc.customer_name = customers_cdc_staging.customer_name,
    customers_cdc.address = customers_cdc_staging.address,
    customers_cdc.city = customers_cdc_staging.city,
    customers_cdc.state = customers_cdc_staging.state,
    customers_cdc.zip_code = customers_cdc_staging.zip_code 
    WHEN NOT MATCHED THEN INSERT *
  8. Click Add to add the third Spark SQL query, and then enter the following query which truncates the staging table in Delta Lake so that the table is ready to receive the next batch of data:
    truncate table customers_cdc_staging

    The JDBC tab should be configured as follows:

  9. Click the Credentials tab, and then configure the following properties with the required values:
    Property Value
    Username Enter token.
    User Token Enter the personal access token that you generated as a prerequisite in Databricks.
  10. Click the Storage tab, and then set the Storage Location to None.

    The Storage tab defines how the executor connects to the storage location in Amazon S3 or Azure Data Lake Storage Gen2. This solution assumes that you correctly configured the Databricks cluster to connect to Amazon S3 when you completed the prerequisites in Databricks. In this case, the Databricks cluster connects to the storage location in Amazon S3 when it runs the Spark SQL query.

    If you did not configure the Databricks cluster to connect to Amazon S3, enter the storage location details as described in Storage Connection.

Run the Pipeline

Run the pipeline to move the data from MySQL binary logs to Delta Lake.

  1. In the toolbar above the pipeline canvas, click the Run icon: .
  2. In the Run Configuration dialog box, accept the defaults, and then click Run Pipeline.

    It takes a few minutes for the pipeline to allocate the required resources and then to deploy and start, as displayed in the following messages:

    When the pipeline successfully starts, the monitor pane displays the Monitoring tab. The monitor pane lets you monitor the health and performance of the pipeline by viewing real-time statistics and errors as data moves through the pipeline, as displayed in the following image:

  3. Next verify that the pipeline loaded the data into the target table in Delta Lake by running a SQL query in your Databricks notebook.
    For example, if you run the following SQL query:
    select * from customers_cdc

    Databricks displays the following results:

  4. Verify that the pipeline successfully applies update operations to the Delta Lake table by running the following command on the MySQL database to update one of the rows:
    update retail.customers_cdc set address='10 Downing ST' where customer_id=6;

    Then in your Databricks notebook, verify that the Delta Lake table has been updated with the changed address for customer_id 6:

  5. Verify that the pipeline successfully applies delete operations to the Delta Lake table by running the following command on the MySQL database to delete one of the rows:
    delete from retail.customers_cdc where customer_id=7;

    Then in your Databricks notebook, verify that the customer_id 7 row has been deleted from the Delta Lake table:

  6. Click the Stop icon () to stop the pipeline.