Salesforce

The Salesforce origin reads data from Salesforce.

When you configure the Salesforce origin, you define connection information that the origin uses to connect to Salesforce, including the API type and version that the origin uses to connect to Salesforce.

You can configure the origin to read data in one or both of the following ways:

When processing existing data, you can configure the origin to query Salesforce at regular intervals. You cannot use repeating queries when subscribed to notifications. When repeating a query, the origin can perform a full or incremental read.

The origin generates Salesforce record header attributes and Salesforce field attributes that provide additional information about each record and field. The origin also includes the CRUD operation type in a record header attribute so generated records can be easily processed by CRUD-enabled destinations. For an overview of Data Collector changed data processing and a list of CRUD-enabled destinations, see Processing Changed Data.

You can optionally use an HTTP proxy to connect to Salesforce.

The origin can also generate events for an event stream. For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.

Query Existing Data

The Salesforce origin can execute a query to read existing data from Salesforce. Use the Salesforce Object Query Language (SOQL) to write the query.

When you configure the origin to query existing data, you specify whether the origin uses the Salesforce Bulk API or SOAP API to read from Salesforce. The Bulk API is optimized to process large sets of data. The SOAP API supports more complex queries, but is less practical when processing large sets of data. For more information about when to use the Bulk or SOAP API, see the Salesforce Developer documentation.

If the origin uses the SOAP API or uses the Bulk API version 39.0 or later, you can configure the origin to retrieve deleted records from the Salesforce recycle bin.

The Salesforce origin uses an offset field and initial offset value to determine where to start reading data within an object. Include both the offset field and the offset value in the WHERE clause of the SOQL query. By default, the offset field is defined as the Salesforce Id field. The Id system field contains a unique identifier for each record in a Salesforce object. The initial offset value is a value within the offset field where you want the Salesforce origin to start reading.

If you configure the origin to query existing data but not to subscribe to notifications, you can configure the origin to run the query once or to repeat the query. When running the query once, the pipeline stops when it finishes reading all data from the Salesforce object. If you start the pipeline again, the origin uses the initial offset value to start reading, reading the entire set of existing data again.

If the pipeline stops before it finishes reading all data, the Salesforce origin saves the last read offset value. When the pipeline starts again, the origin uses the last read offset value to continue processing from where it stopped. You can reset the origin to process all requested objects. When you configure the origin to run the query more than once, the pipeline runs continuously so it can repeat the query at regular intervals. You can choose how the origin repeats the query. For more information, see Repeat Query.

When the origin is configured to subscribe to notifications, the pipeline runs continuously so that it can receive changed events. If you configure the origin to both query existing data and to subscribe to notifications, the origin first subscribes for changes, then queries for existing data. The changes are queued and processed after the query is complete. As a result, some changes might be processed twice.

SOQL Query

When you configure the Salesforce origin to query existing data, you define the SOQL query that the origin uses to return data from Salesforce. The Salesforce origin requires a WHERE and ORDER BY clause in the query.

Use the following guidelines when you define the WHERE and ORDER BY clauses:

In the WHERE clause, include the offset field and the offset value
The origin uses an offset field and value to determine the data that is returned. Include both in the WHERE clause of the query.
Use the OFFSET constant to represent the offset value
In the WHERE clause, use ${OFFSET} to represent the offset value.
For example, when you start a pipeline, the following query returns all data from the object where the data in the offset field is greater than the initial offset value:
SELECT Id, Name FROM <object> WHERE <offset field> > ${OFFSET}
Tip: When the offset values are strings, enclose ${OFFSET} in single quotation marks.
In the ORDER BY clause, include the offset field as the first field
To avoid returning duplicate data, use the offset field as the first field in the ORDER BY clause.
Note: Using a field that is not the Id field in the ORDER BY clause can slow performance.

The complete SOQL query must use the following syntax:

SELECT <offset field>, <field name1>, <field name2>, ... FROM <object> WHERE <offset field> > ${OFFSET} ORDER BY <offset field>

If you specify SELECT * FROM <object> in the SOQL query, the origin expands * to all fields in the Salesforce object that are accessible to the configured user. Note that the origin adds components of compound fields to the query, rather than adding the compound fields themselves. For example, the origin adds BillingStreet, BillingCity, etc., rather than adding BillingAddress. Similarly, it adds Location__Latitude__s and Location__Longitude__s rather than Location__c.

Example

Let's assume that you want to read all names and account numbers from the Salesforce Account object. The object contains a large number of records, so you choose to use the Salesforce Bulk API. You use the default value of Id for the offset field. You use the default value of 000000000000000 for the offset value to ensure that the origin reads all records in the object.

You define the SOQL query as follows, including the offset field and offset value in the WHERE and ORDER BY clauses as required:

SELECT Id, Name, AccountNumber FROM Account WHERE Id > '${OFFSET}' ORDER BY Id

The configured Query tab looks like so:

Repeat Query

When the Salesforce origin processes existing data and is not subscribed to notifications, it can repeat the specified query at regular intervals. You can configure the origin to repeat a full or incremental query:

Full query
When the origin repeats a full query, it runs the defined query using the initial offset as the offset value in the query each time it requests data.
Repeat a full query to capture all record updates. You might use a Record Deduplicator in the pipeline to minimize repeated records. Not ideal for objects with large numbers of records.
Incremental query
When the Salesforce origin repeats an incremental query, it uses the initial offset as the offset value in the first query.
As the origin completes processing the results of the first query, it saves the last offset value that it processes. When it repeats the query, it uses the last-saved offset to perform an incremental query. The incremental query processes only the subset of data that arrived after the last query. When necessary, you can reset the origin to use the initial offset value.
Repeat an incremental query for append-only objects or when you do not need to capture changes to older records.

Subscribe to Notifications

The Salesforce origin can subscribe to the Force.com Streaming API to receive notifications for changes to Salesforce data.

To configure the origin to subscribe to notifications, you must first create a PushTopic in Salesforce based on an SOQL query. The PushTopic query defines which record create, update, delete, or undelete events generate a notification. If the record changes match the criteria of the PushTopic query, a notification is generated and received by the subscribed clients.

The Salesforce origin is the client that subscribes to the PushTopic. In the origin configuration, you specify the name of the PushTopic, which subscribes the origin to the PushTopic channel.

When you start a pipeline configured to subscribe to Salesforce notifications, the pipeline runs continuously, receiving any changed data events in the origin as records.

Note: The Streaming API stores events for 24 hours. If the pipeline stops and then restarts within 24 hours, the origin can receive notifications about past events. However, if the pipeline is inactive for more than 24 hours, the origin might miss some events.

For more information about creating PushTopic queries, see the Salesforce Streaming API developer documentation.

Notification Record Format

When the PushTopic encounters a change event that generates a notification, it sends the event to the subscribing Salesforce origin as a JSON message in the following format:

{
  "channel": "/topic/AccountUpdates",
  "clientId": "j24ylcz8l0t0fyp0pze6uzpqlt",
  "data": {
    "event": {
      "createdDate": "2016-09-15T06:01:40.000+0000",
      "type": "updated"
    },
    "sobject": {
      "AccountNumber": "3221320",
      "Id": "0013700000dC9xLAAS",
      "Name": "StreamSets",
      ...more fields...
    }
  }
}

The data/event/type property indicates the type of change - created, updated, deleted, or undeleted.

When the Salesforce origin receives the data, it creates a record with field names and values corresponding to the data/sobject property of the message.

The record also includes record header attributes corresponding to the data/event property of the message, as described in Salesforce Header Attributes.

Reading Custom Objects or Fields

If the origin reads custom Salesforce objects or fields, you might want to use a Field Renamer in the pipeline to rename the custom fields.

If you extend Salesforce objects, custom object and field names are appended with the suffix __c. For example, if you create a custom Transaction object, Salesforce names the object Transaction__c. The Transaction object might contain fields named Credit_Card__c, Fare_Amount__c, and Payment_Type__c.

Instead of using field names appended with the suffix __c throughout the rest of the pipeline, you can add a Field Renamer to remove the suffix from the field names.

For more information about Salesforce custom objects, see the Salesforce documentation.

Salesforce Attributes

The Salesforce origin generates Salesforce record header attributes and Salesforce field attributes that provide additional information about each record and field. The origin receives these details from Salesforce.

Salesforce attributes include a user-defined prefix to differentiate the Salesforce attributes from other attributes. By default, the prefix is "salesforce.". You can change the prefix that the origin uses and you can configure the origin not to create Salesforce attributes.

Salesforce Header Attributes

The Salesforce origin generates Salesforce record header attributes that provide additional information about each record, such as the source objects for the record. The origin receives these details from Salesforce.

You can use the record:attribute or record:attributeOrDefault functions to access the information in the attribute.

The Salesforce origin can provide the following Salesforce header attributes:

Salesforce Header Attribute Description
<Salesforce prefix>.sobjectType Provides the Salesforce source object for the record.

Generated when the origin executes a query or subscribes to notifications.

<Salesforce prefix>.cdc.createdDate Provides the date that the Salesforce PushTopic encountered the change event.

Generated when the origin subscribes to notifications.

<Salesforce prefix>.cdc.type Provides the type of change that the Salesforce PushTopic encountered - created, updated, deleted, or undeleted.

Generated when the origin subscribes to notifications.

For more information about record header attributes, see Record Header Attributes.

CRUD Operation Header Attribute

When the Salesforce origin subscribes to notifications and reads changed data from a PushTopic, the origin includes the CRUD operation type for a record in the sdc.operation.type header attribute.

If you use a CRUD-enabled destination in the pipeline such as JDBC Producer or Kudu, the destination can use the operation type when writing to destination systems. When necessary, you can use an Expression Evaluator or scripting processors to manipulate the value in the sdc.operation.type header attribute. For an overview of Data Collector changed data processing and a list of CRUD-enabled destinations, see Processing Changed Data.

The Salesforce origin uses the following values in the sdc.operation.type record header attribute to represent the operation type:
  • 1 for INSERT
  • 2 for DELETE
  • 3 for UPDATE
  • 5 for unsupported operations
  • 6 for UNDELETED
Tip: Records that are undeleted contain only the record ID. If you need the record data, you can use the Salesforce Lookup to retrieve it.

Salesforce Field Attributes

The Salesforce origin generates Salesforce field attributes that provide additional information about each field, such as the data type of the Salesforce field. The origin receives these details from Salesforce.

You can use the record:fieldAttribute or record:fieldAttributeOrDefault functions to access the information in the attribute.

The Salesforce origin can provide the following Salesforce field attributes:

Salesforce Field Attribute Description
<Salesforce prefix>salesforceType Provides the original Salesforce data type for the field.
<Salesforce prefix>length Provides the original length for all string and textarea fields.
<Salesforce prefix>precision Provides the original precision for all double fields.
<Salesforce prefix>scale Provides the original scale for all double fields.
<Salesforce prefix>digits Provides the maximum number of digits for all integer fields.

For more information about field attributes, see Field Attributes.

Event Generation

The Salesforce origin can generate events that you can use in an event stream. When you enable event generation, the origin generates an event when it completes processing the data returned by the specified query.

Salesforce events can be used in any logical way. For example:
  • With the Pipeline Finisher executor to stop the pipeline and transition the pipeline to a Finished state when the origin completes processing available data.

    When you restart a pipeline stopped by the Pipeline Finisher executor, the origin processes data based on how you configured the origin. For example, if you configure the origin to repeat an incremental query, the origin saves the offset when the executor stops the pipeline. When it restarts, the origin continues processing from the last-saved offset. If you configure the origin to repeat a full query, when you restart the pipeline, the origin uses the initial offset.

    For an example, see Case Study: Stop the Pipeline.

  • With the Email executor to send a custom email after receiving an event.

    For an example, see Case Study: Sending Email.

For more information about dataflow triggers and the event framework, see Dataflow Triggers Overview.

Event Record

Event records generated by the Salesforce origin have the following event-related record header attributes:
Record Header Attribute Description
sdc.event.type Event type. Uses the following type:
  • no-more-data - Generated when the origin completes processing all data returned by a query.
sdc.event.version An integer that indicates the version of the event record type.
sdc.event.creation_timestamp Epoch timestamp when the stage created the event.

The no-more-data event record includes no record fields.

Changing the API Version

Data Collector ships with version 39.0 of the Salesforce Web Services Connector libraries. You can use a different Salesforce API version if you need to access functionality not present in version 39.0.

  1. On the Salesforce tab, set the API Version property to the version that you want to use, for example 36.0.
  2. Download the relevant version of the following JAR files from Salesforce Web Services Connector (WSC):
    • WSC JAR file - force-wsc-<version>.0.0.jar

    • Partner API JAR file - force-partner-api-<version>.0.0.jar

    Where <version> is the API version number, for example, 36.

    For information about downloading libraries from Salesforce WSC, see https://developer.salesforce.com/page/Introduction_to_the_Force.com_Web_Services_Connector.

  3. In the following Data Collector directory, replace the default force-wsc-39.0.0.jar and force-partner-api-39.0.0.jar files with the versioned JAR files that you downloaded:
    $SDC_DIST/streamsets-libs/streamsets-datacollector-salesforce-lib/lib/
  4. Restart Data Collector for the changes to take effect.

Configuring a Salesforce Origin

Configure a Salesforce origin to read data from Salesforce.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Produce Events Generates event records when events occur. Use for event handling.
    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the Salesforce tab, configure the following properties:
    Salesforce Property Description
    Username Salesforce username in the following email format: <text>@<text>.com.
    Password Salesforce password.
    Tip: To secure sensitive information such as usernames and passwords, you can use runtime resources or Hashicorp Vault secrets. For more information, see Using Runtime Resources or Accessing Hashicorp Vault Secrets.
    Auth Endpoint Salesforce SOAP API authentication endpoint. Enter one of the following values:
    • login.salesforce.com - Use to connect to a Production or Developer Edition organization.
    • test.salesforce.com - Use to connect to a sandbox organization.

    Default is login.salesforce.com.

    API Version Salesforce API version to use to connect to Salesforce.

    Default is 39.0. If you change the version, you also must download the relevant JAR files from Salesforce Web Services Connector (WSC).

    Query Existing Data Determines whether to execute a query to read existing data from Salesforce.
    Subscribe to Notifications Determines whether to subscribe to the Force.com Streaming API to receive notifications for changes to Salesforce data.
    Max Batch Size (records) Maximum number of records processed at one time. Honors values up to the Data Collector maximum batch size.

    Default is 1000. The Data Collector default is 1000.

    Batch Wait Time (ms) Number of milliseconds to wait before sending a partial or empty batch.
  3. To query existing data, on the Query tab, configure the following properties:
    Query Property Description
    Use Bulk API Determines whether the stage uses the Salesforce Bulk API or SOAP API to write to Salesforce. Select to use the Bulk API. Clear to use the SOAP API.
    SOQL Query SOQL query to use when reading existing data from Salesforce.
    Include Deleted Records Determines whether the SOQL query also retrieves deleted records from the Salesforce recycle bin.

    The query can retrieve deleted records when the stage uses the Salesforce SOAP API or the Bulk API version 39.0 or later. Earlier versions of the Bulk API do not support retrieving deleted records.

    Repeat Query Determines whether the origin runs the query more than once. Available when the origin processes existing data and is not subscribed to notifications. Uses one of the following properties:
    • Repeat Full Query - Repeats the query using the initial offset in each query.
    • Repeat Incremental Query - Repeats the query using the initial offset for the first query and then using the last-saved offset for subsequent queries.
    Query Interval Amount of time to wait between queries. Enter an expression based on a unit of time. You can use SECONDS, MINUTES, or HOURS.

    Default is 1 minute: ${1 * MINUTES}.

    Initial Offset Offset value to use when the pipeline starts.

    Default is 000000000000000.

    Offset Field Field to use for the offset value.

    Default is the Id field.

  4. To subscribe to notifications, on the Subscribe tab, configure the following property:
    Subscribe Property Description
    Push Topic Name of the existing Salesforce PushTopic to subscribe to.
  5. On the Advanced tab, configure the following properties:
    Advanced Property Description
    Create Salesforce Attributes Adds Salesforce header attributes to records and field attributes to fields. The origin creates Salesforce attributes by default.
    Salesforce Attribute Prefix Prefix for Salesforce attributes.
    Use Proxy Specifies whether to use an HTTP proxy to connect to Salesforce.
    Proxy Hostname Proxy host.
    Proxy Port Proxy port.
    Proxy Requires Credentials Specifies whether the proxy requires a user name and password.
    Proxy Username User name for proxy credentials.
    Proxy Password Password for proxy credentials.
    Tip: To secure sensitive information such as usernames and passwords, you can use runtime resources or Hashicorp Vault secrets. For more information, see Using Runtime Resources or Accessing Hashicorp Vault Secrets.