skip to Main Content

Cache Salesforce Data in Redis with StreamSets Data Collector

By Posted in Data Integration July 6, 2017

Redis LogoRedis is an open-source, in-memory, NoSQL database implementing a networked key-value store with optional persistence to disk. Perhaps the most popular key-value database, Redis is widely used for caching web pages, sessions and other objects that require blazingly fast access – lookups are typically in the millisecond range.

At RedisConf 2017 I presented a session, Cache All The Things! Data Integration via Jedis (slides), looking at how the open source Jedis library provides a small, sane, easy to use Java interface to Redis, and how a StreamSets Data Collector (SDC) pipeline can read data from a platform such as Salesforce, write it to Redis via Jedis, and keep Redis up-to-date by subscribing for notifications of changes in Salesforce, writing new and updated data to Redis. In this blog entry, I’ll describe how I built the SDC pipeline I showed during my session.

Reading Business Data from Salesforce

Salesforce is the defacto system-of-record for customer data in many enterprises. Salesforce offers a variety of APIs for reading data, presenting trade-offs between latency, throughput and convenience (e.g. REST vs SOAP), but access to data is certainly not in the millisecond range. Salesforce’s performance metrics show that the platform is currently serving aroung 5 billion transactions/day, with an average response time of about 150 ms. Salesforce API calls are also a limited resource; depending on the Salesforce edition, subscribers are allowed between 1000 and 5000 API calls/day/user licence, with further caps on the total number of API calls/day for some editions.

Given Salesforce’s position as the system-of-record, we’d like to be able to use it to look up information such as customer details, but its latency and limits can be an obstacle, particularly when lookups are part of an automated workflow. Fortunately, we can combine the Salesforce APIs, StreamSets Data Collector and Redis to build a cache that not only provides millisecond access time, but is continuously updated.

StreamSets Data Collector Salesforce Origin

The Salesforce Origin, released last year in SDC 2.2.0.0, allows you to both configure a SOQL query to read existing data, and subscribe to a Salesforce Streaming API PushTopic for notifications as the data changes.

My example use case was looking up account details from an account number. The first step of building a pipeline, then, is to read the desired data from Salesforce. In StreamSets Data Collector, create a new pipeline, install the Salesforce stage library and restart, if necessary. Set the pipeline’s error handling, then drag a Salesforce origin onto the pipeline canvas.

Salesforce Origin

The Salesforce origin’s Salesforce tab needs a username & password; if you’re using a Salesforce sandbox you’ll also need to change the Auth Endpoint from login.salesforce.com to test.salesforce.com. Select Query Existing Data, leaving Subscribe for Notifications unchecked for now. The Salesforce Bulk API is very efficient for large volumes of data, but the SOAP API is more responsive for interactive use; we’ll want to preview the pipeline a few times as we get things working, so, on the Query tab, uncheck Use Bulk API. You can use the following SOQL query to get all of the fields that are accessible by the current user:

SELECT *
FROM Account
WHERE Id > '${OFFSET}'
ORDER BY Id

Although SOQL does not natively support SELECT *, SDC retrieves the account metadata and expands the * wildcard to the complete list of fields to which the current user has access.

Leave the remaining configuration items with their default values.

Hit the preview button and you should see some account data from Salesforce. I’m using a Developer Edition, so I see the standard Salesforce sample accounts:

Preview

StreamSets Data Collector Redis Destination

Now let’s write some data to Redis! Drop a Redis destination onto the canvas, connecting its input to the origin’s output.

Redis Destination

Configure the destination with your Redis URI; if you’re building a test/demo with a default Redis install on your machine, then redis://localhost:6379 should work. We want to index account details by account number, so set the Key field to /AccountNumber, the Value to / (i.e. the entire record), and the Data Type to Hash – we want Redis to store the entire account record as a collection of name-value pairs.

If you run the pipeline right now, you’ll notice that there is a problem:

Errors

Clicking into the stack trace and scrolling down to the cause, we can see the issue – we are trying to write null hash values to Redis.

Error Detail

We can easily rectify this by adding a Field Remover in between the origin and destination, with Action set to Remove Listed Fields If Their Values Are Null and Fields set to /* – we want to remove any field if its value is null.

Run the pipeline now and you should see that all of the accounts are successfully written to Redis:

Remove Null Fields

In redis-cli, we can explore the data:

127.0.0.1:6379> INFO Keyspace
# Keyspace
db0_keys=14,expires=0,avg_ttl=0

127.0.0.1:6379> KEYS *
 1) "SF111111"
 2) "CD355120-B"
 3) "CD451796"
 4) "AB123456"
 5) "CC978213"
 6) "CC634267"
 7) "CD355119-A"
 8) "NW654321"
 9) "CD355118"
10) "CD439877"
11) "CC947211"
12) "CC213425"
13) "CD736025"
14) "CD656092"

127.0.0.1:6379> HGETALL AB123456
 1) "Id"
 2) "0013600000gnbjJAAQ"
 3) "IsDeleted"
 4) "false"
 5) "Name"
 6) "StreamSets, Inc."
...

127.0.0.1:6379> HGET AB123456 Industry
"Technology"

Great – we’ve populated our Redis cache, but how do we keep it up to date as the data in Salesforce changes?

Subscribing for Notifications from Salesforce

The Salesforce Origin can subscribe to notifications via the Salesforce Streaming API. You will need to create a PushTopic via the Salesforce Developer Console. Here is the Apex I used in the ‘Execute Anonymous’ window:

PushTopic pushTopic = new PushTopic();
pushTopic.Name = 'AccountUpdates';
pushTopic.Query = 'SELECT Id FROM Account';
pushTopic.ApiVersion = 40.0;
pushTopic.NotifyForOperationCreate = true;
pushTopic.NotifyForOperationUpdate = true;
pushTopic.NotifyForOperationUndelete = true;
pushTopic.NotifyForOperationDelete = true;
pushTopic.NotifyForFields = 'All';
insert pushTopic;

This Apex code creates a new PushTopic, named AccountUpdates, that will notify subscribers of any changes to account records, sending the Id of the relevant account. We could list all of the account fields in the query (Salesforce doesn’t natively support SELECT *), but then we would have to update the PushTopic if any account fields were later added or removed. A more maintainable option is to SELECT just the record Id and let the subscriber retrieve its desired fields in an API call.

We can tell the Salesforce origin to subscribe to this PushTopic by enabling Subscribe for Notifications on the Salesforcetab, then setting Push Topic to AccountUpdates on the Subscribe tab. Since we only receive the record id in the notification, we need to add a Salesforce Lookup processor to our pipeline. Use a Stream Selector to separate out records from the initial load so that they do not trigger a lookup:

Stream Selector

Records received via the Streaming API have the attribute salesforce.cdc.type set to one of createdupdateddeleted or undeleted, so the Stream Selector can separate out these records with the condition:

${str:length(record:attribute('salesforce.cdc.type')) > 0}

The Salesforce Lookup processor simply reads all the available fields for the notified record:

SELECT * 
FROM Account
WHERE Id = '${record:value('/Id')}'

Reset the origin, restart the pipeline, and you should see the initial records load into Redis. Leave the pipeline running, go to Salesforce, and make some modification to an account – create a new one, delete or just change any field on an existing account.

Edit StreamSets Account Record

You should see the pipeline’s record count increment. Go to redis-cli, retrieve the changed field via the account number and field name, and you should see the new value:

127.0.0.1:6379> HGET AB123456 Industry
"Telecommunications"

Success! We now have a high speed cache of Salesforce account data, automatically updated as data in Salesforce changes. Watch this short video to see the pipeline in action:

What data are you caching in Redis? Let us know in the comments!

Conduct Data Ingestion and Transformations In One Place

Deploy across hybrid and multi-cloud
Schedule a Demo
Back To Top