skip to Main Content

Ingest Salesforce Data for Analysis Using StreamSets

By Posted in Data Integration April 29, 2016 origin allows ingest from SalesforceUPDATE – Salesforce origin and destination stages, as well as a destination for Salesforce Wave Analytics, were released in StreamSets Data Collector Use the supported, shipping Salesforce stages rather than the unsupported code mentioned below!

As I’ve mentioned a couple of times, my previous gig was as a developer evangelist at Salesforce, with particular focus on integration. A few weeks ago, I wrote a custom destination allowing StreamSets Data Collector (SDC) to write data to Salesforce Wave Analytics; today, I’ll show you how to ingest data from Salesforce and write it to any destination supported by SDC.

You’ll need to download the destination tarball from the StreamSets Data Collector Origin for project on GitHub – just click here to get the file. Extract the content to SDC’s user-libs directory, restart SDC, and you should see a new origin in the palette (see the screenshot on the left). It’s called the Origin to reflect the fact that integration APIs such as the Bulk API are considered part of

This new origin works very similarly to the existing JDBC origin; you specify credentials to connect to Salesforce, the query that you want to run, an offset field and an initial offset value. The query must of the form:

SELECT OffsetField [, OptionallyMoreFields] 
FROM SObjectName 
WHERE OffsetField > '${OFFSET}' [AND OptionallyOtherConditions] 
ORDER BY OffsetField [, OptionallyOtherFields]

Like the JDBC origin, the Origin issues periodic queries, with a configurable interval. There are two modes of operation: incremental and full. In incremental mode, the initial offset is used in the very first query, after which the value of the offset field from the last retrieved record is used. This mode is appropriate for records that are written to but not updated – the pipeline will receive newly created records, but no updates or deletions.

In full mode, each query uses the specified initial offset, so the pipeline receives the current state of all of the records every time. This can quickly result in large numbers of records, so use full mode with care.

Since Salesforce assigns a unique, monotonically increasing, identifier to each record, the Id field is ideal for use as the offset field; specify 000000000000000 as the initial offset to get all of the data for a given object.

This video shows the Origin in operation. For testing purposes, I’m writing data to a local text file, but you could send records to a Kafka queue, HBase, or any other supported destination.


It’s worth noting that this initial version of the origin should be considered ‘proof of concept’. Feel free to evaluate it and log issues to the GitHub project, but be cautious of putting it into production. A future version will use the Streaming API to get change data capture-style notifications from Salesforce, a more efficient mechanism than polling with queries, and will likely be added to the StreamSets Data Collector product.

Conduct Data Ingestion and Transformations In One Place

Deploy across hybrid and multi-cloud
Schedule a Demo
Back To Top