skip to Main Content

Analyzing Salesforce Data with StreamSets, Elasticsearch, and Kibana

By Posted in Advanced Analytics June 3, 2016

UPDATE – Salesforce origin and destination stages, as well as a destination for Salesforce Wave Analytics, were released in StreamSets Data Collector 2.2.0.0. Use the supported, shipping Salesforce stages rather than the unsupported code mentioned below!

After I published a proof-of-concept Salesforce Origin for StreamSets Data Collector (SDC), I noticed an article on the Elastic blog, Analyzing Salesforce Data with Logstash, Elasticsearch, and Kibana. In the blog entry, Elastic systems architect Russ Savage (now at Cask Data), explains the motivation for ingesting Salesforce data into Elasticsearch:

Working directly with sales and marketing operations, we outlined a number of challenges they had that might be solved with this solution. Those included:

  • Interactive time-series snapshot analysis across a number of dimensions. By sales rep, by region, by campaign and more.
  • Which sales reps moved the most pipeline the day before the end of month/quarter? What was the progression of Stage 1 opportunities over time.
  • Correlating data outside of Salesforce (like web traffic) to pipeline building and demand. By region/country/state/city and associated pipeline.

It’s very challenging to look back in time and see trends in the data. Many companies have configured Salesforce to save reporting snapshots, but if you’re like me, you want to see the data behind the aggregate report. I want the ability to drill down to any level of detail, for any timeframe, and find any metric. We found that Salesforce snapshots just aren’t flexible enough for that.

Since we have first-class support for Elasticsearch as a destination in SDC, I decided to recreate the use case with the Salesforce Origin and see if we could fulfill those same requirements while taking advantage of StreamSets’ interactive pipeline IDE and ability to continuously monitor origins for new data.

Querying Salesforce from StreamSets Data Collector

Currently, the Salesforce Origin is available via a GitHub repo, but the plan is to move it into SDC proper at some point in the near future. To install the origin, extract the tarball into the SDC user-libs directory:

$ cd path-to-sdc/user-libs
$ tar xvfz force-lib-1.0-SNAPSHOT.jar

Since the origin needs to connect to the Salesforce API, we must edit the SDC security policy file. Add a section to path-to-sdc/etc/sdc-security.policy:

grant codebase "file://${sdc.dist.dir}/user-libs/force-lib/-" {
    permission java.net.SocketPermission "*", "connect, resolve";
};

(Curious about why we need the * wildcard in the policy? Here’s the answer)

Once you’ve saved sdc-security.policy, restart SDC, and you should see the new origin in the processor palette:

Force.com OriginDrop it into a new pipeline and you’ll be able to configure it with your Salesforce credentials, the SOQL query you wish to use, and so on:

Force.com Origin ConfigFor this example, we want to continuously retrieve existing records from Salesforce, updating their state in Elasticsearch, so we’ll run the origin in ‘full’ mode, rather than ‘incremental’, and poll Salesforce every few minutes. We’ll also specify a subset of fields to retrieve from Salesforce in the SOQL query:

Force Origin Config Detail

Pushing the Data to Elasticsearch

To keep things simple for this example, we’ll just read from Salesforce and write to Elasticsearch, though we could drop any number of processors into the pipeline to filter or enrich the data:

Force and Elasticsearch

Before we run the pipeline, we need to send index and mapping metadata to Elasticsearch, so it knows how to work with our opportunity records. Here’s the metadata for our example:

curl -XPUT 'http://localhost:9200/opportunities' -d '{
  "mappings": {
    "opportunities" : {
      "properties" : {
        "Id": {
          "type": "string",
          "index": "not_analyzed"
        },
        "AccountId": {
          "type": "string",
          "index": "not_analyzed"
        },
        "Amount": {
          "type": "double"
        },
        "CloseDate": {
          "type": "date"
        },
        "LeadSource": {
          "type": "string",
          "index": "not_analyzed"
        },
        "Name": {
          "type": "string"
        },
        "OwnerId": {
          "type": "string",
          "index": "not_analyzed"
        },
        "StageName": {
          "type": "string",
          "index": "not_analyzed"
        },
        "TotalOpportunityQuantity": {
          "type": "double"
        },
        "Type": {
          "type": "string",
          "index": "not_analyzed"
        }
      }
    }
  }
}'

Note that we set picklist fields such as StageName and LeadSource to not_analyzed. We want to segment the data on these fields, but we don’t want to index the text in them.

We configure the Elasticsearch Destination with opportunities as its index and mapping, matching the metadata we uploaded, and set the Document ID field to ${record:value('/Id')}. Notice that Document ID is an expression, not a field name. If you set it to /Id, you’ll find that you end up with a single record in Elasticsearch, with index /Id. I know this from first-hand experience!

Finally, we check Enable Upsert, so records will automatically be updated or inserted into Elasticsearch based on the Document ID:

Elasticsearch ConfigNow we can run the pipeline, reading data from Salesforce and writing it to Elasticsearch:

Pipeline RunningAs a quick check, we can run a count query on Elasticsearch and see that all of our 483 opportunities have indeed been received:

$ curl -X GET 'http://localhost:9200/opportunities/_count'
{"count":483,"_shards":{"total":5,"successful":5,"failed":0}}

Visualizing the Data with Kibana

Once the data is in Elasticsearch, we can use Kibana for visualization. Here’s a vertical bar chart of opportunities from the last six months, segmented by stage:

Kibana 1

Since my pipeline is running continuously, I can add a new opportunity in Salesforce and quickly see the results in Kibana. The new record shows up as the purple bar on the far right:

Kibana 2

Conclusion

StreamSets Data Collector and its Salesforce Origin allow you to quickly extract data from Salesforce for ingest by a wide range of destinations. In this example, we focused on the process of writing opportunity data to Elasticsearch, but we could have filtered or enriched the data in the pipeline, or even sent it to multiple destinations, all from the drag and drop UI. Download StreamSets Data Collector today and build your next big data ingest pipeline in minutes!

Conduct Data Ingestion and Transformations In One Place

Deploy across hybrid and multi-cloud
Schedule a Demo
Back To Top