skip to Main Content

The DataOps Blog

Where Change Is Welcome

Databricks Bulk Ingest of Salesforce Data Into Delta Lake

By Posted in Engineering February 24, 2020

Learn how to leverage newly released Databricks COPY command for bulk ingest into Delta Lake using the hosted StreamSets Cloud service.

StreamSets is proud to announce an expansion of its partnership with Databricks by participating in Databricks’ newly launched Data Ingestion Network. As part of the expanded partnership, StreamSets is offering additional functionality for StreamSets Cloud with a new connector for Delta Lake, an open source project that provides reliable data lakes at scale. The StreamSets Cloud service provides an integrated, cloud-based user experience for designing, deploying and monitoring your pipelines across your entire organization. A key component of this integration is leveraging the newly released Databricks COPY command for bulk ingest into Delta Lake using the StreamSets Cloud service.

Watch Demo Video

Let’s consider a simple example of ingesting accounts information from Salesforce and storing it in queryable format in a Delta Lake table.

Pipeline Overview

If you’d like to follow along, here are the details to get you started and here’s the GitHub link to the pipeline JSON that you can import in your environment.

Prerequisites

Here are the steps for designing our dataflow pipeline:

  • Configure Salesforce origin to read (accounts) information
  • Configure Expression Evaluator processor to transform input data attribute
  • Configure Amazon S3 destination to load data to a staging location
  • Configure Databricks Delta Lake executor to run a Spark SQL query to copy the data from Amazon S3 into the Delta Lake table

Salesforce—Origin

Configuration attribute of interest is SOQL Query that will retrieve account details from Salesforce.

SELECT Id, Name, Type, BillingStreet, BillingCity, BillingState, BillingPostalCode, BillingCountry, Website, PhotoUrl, AccountNumber, Industry, Rating, AnnualRevenue, NumberOfEmployees 
FROM Account 
WHERE Id > '${OFFSET}' 
Order By Id

Expressions Evaluator—Processor

Configuration attribute of interest is Field Expression that uses regular expression to remove redundant prefix text “Customer -” from account Type field.

${str:regExCapture(record:value('/Type'),'(.*) - (.*)',2)}

Amazon S3—Destination

Configuration attribute of interest is Data Format (set to JSON) in which the account detail objects will be stored on Amazon S3. 

Databricks Delta Lake—Executor

As noted earlier, Databricks provides COPY command to efficiently bulk load large amounts of data into Delta Lake. To use the COPY command, Databricks Delta Lake executor has been added to the pipeline. The Databricks Delta Lake executor is capable of running one or more Spark SQL queries on a Delta Lake table each time it receives an event.

In our example, the Amazon S3 destination is configured to generate events each time it completes writing an object. When it generates an object written event, it also records the bucket where the object is located and the object key name that was written. These bucket and object key attributes are used by the COPY command to load the data from Amazon S3 into an existing Delta Lake table. (See below Spark SQL query.)

Configuration attributes of interest:

  • JDBC Connection String
    jdbc:spark://dbc-5a9fba6c-c704.cloud.databricks.com:443/default;transportMode=http;ssl=1;httpPath=sql/protocolv1/o/8214879852432560/1123-001005-tow71;AuthMech=3;

Note: The JDBC connection string is where we can specify which Databricks cluster to connect to execute the COPY command. In our case, that’s 1123-001005-tow71.

  • Spark SQL Query
    COPY INTO accounts_d 
    FROM (Select Id, Name, Type, BillingStreet, BillingCity, BillingState, BillingCountry, CAST(BillingPostalCode AS INT), Website, PhotoUrl, AccountNumber, Industry, Rating, CAST(NumberOfEmployees AS INT), AnnualRevenue 
    FROM "s3a://${record:value('/bucket')}/${record:value('/objectKey')}") 
    FILEFORMAT = JSON 
    FORMAT_OPTIONS ('header' = 'true')

Preview Pipeline

Previewing the pipeline is a great way to see the transformations occurring on attribute values. See below the selected Expression Evaluator processor and before and after values of Type attribute.

Query Delta Lake

If everything looks good in preview mode, running the pipeline should bulk copy account information from Salesforce to Delta Lake.

Once the Delta Lake table is populated, you can start analyzing the data. For example, running the following query will give us insights into total revenue generated based on account ratings and types of accounts.

Total Revenue by Rating and Account Type

SELECT Rating, Type, format_number(SUM(AnnualRevenue),2) AS Total_Revenue 
FROM accounts_d
WHERE Rating is not null and Type is not null
GROUP BY Rating, Type

Summary

In this blog post, you’ve learned how to leverage Databricks COPY command for bulk ingest into Delta Lake table using the StreamSets Cloud service. 

Learn more about StreamSets for Databricks which is available on Microsoft Azure Marketplace and AWS Marketplace.

Back To Top