Dataflow Performance Blog

Ingest Data into Splunk with StreamSets Data Collector

Splunk ChartSplunk indexes and correlates log and machine data, providing a rich set of search, analysis and visualization capabilities. In this blog post, I'll explain how to efficiently send high volumes of data to Splunk's HTTP Event Collector via the StreamSets Data Collector Jython Evaluator. I'll present a Jython script with which you'll be able to build pipelines to read records from just about anywhere and send them to Splunk for indexing, analysis and visualization.

The Splunk HTTP Event Collector

The Splunk HTTP Event Collector (HEC) allows you to send data directly into Splunk Enterprise or Splunk Cloud over HTTP or HTTPS. To use HEC you must enable its endpoint (it is not enabled by default) and generate an HEC token. Applications can use the HEC token to POST event data to the HEC endpoint. Events are indexed on receipt and may be accessed via the Splunk browser interface.

Using StreamSets Data Collector with the Splunk HTTP Event Collector

Follow the Splunk documentation to enable HEC (if you are a managed Splunk Cloud customer, you must file a request ticket with Splunk Support) and generate a token. Save the token in the SDC resources directory:

$ echo -n YOUR-HEC-TOKEN-VALUE > /path/to/your/sdc/resources/splunkToken

Note the use of the -n option to omit the trailing newline character.

We'll use SDC's Jython Evaluator to send a single API request to Splunk for each batch of records. I'm going to use the pipeline from the SDC taxi transactions tutorial as an example, but you can adapt the same script for use in just about any pipeline.

Add a Jython Evaluator to your pipeline, copy the Jython script for Splunk from here, and paste it into the evaluator. I've attached a ‘Trash' stage to the evaluator's output, since I don't need the records once they're in Splunk.

Pipeline with Jython Splunk evaluatorRunning the pipeline writes the 5000+ taxi transaction records to Splunk in just a few seconds; we can then query Splunk for the top credit card types for transactions with payment type of ‘CRD':

Splunk ChartSplunk Jython Evaluator script

The script uses several useful techniques; let's take a closer look:

We'll be using the Requests library to access Splunk, so we add its location to the system module search path, and import it:

import sys
# Set to wherever the requests package lives on your machine
sys.path.append('/Library/Python/2.7/site-packages')
import requests

You will need to configure the appropriate endpoint for HEC. For simplicity, I'm using HTTP, but you can also configure HTTPS:

# Endpoint for Splunk HTTP Event Collector
url = 'http://localhost:8088/services/collector'

HEC recognizes several metadata keys, defined in the Format events for HTTP Event Collector document:

# Splunk metadata fields
metadata = ['time', 'host', 'source', 'sourcetype', 'index']

Including credentials such as usernames and passwords in source code is a BAD THING, so we read the Splunk token from its resource file. We don't want to do this for every batch, so we save it in the state object:

# Read Splunk token from file and cache in state
if state.get('headers') is None:
  state['headers'] = {'Authorization': 'Splunk ${runtime:loadResource('splunkToken', false)}'}

Now we initialize a buffer string, and loop through the records in the batch:

buffer = ''

# Loop through batch, building request payload
for record in records:

For each record, we build a payload dictionary containing the metadata keys, such as host and time, that Splunk recognizes. Any fields in the record with matching keys will be copied to the top level of the payload.

# Metadata fields are passed as top level properties
payload = dict((key, record.value[key]) for key in record.value if key in metadata)

The remainder of the fields are added to an event dictionary within the payload. Note that the entire content of the record is used ‘as-is'. If you want to rename or exclude fields you can do so via processors such as the Field Renamer and Field Remover, or manipulate them in the script as required.

# Everything else is passed in the 'event' property
payload['event'] = dict((key, record.value[key]) for key in record.value if key not in metadata)

Now the JSON representation of the payload is added to the buffer:

buffer += json.dumps(payload) + '\n'

We write the record to the processor's output, so we could send it to a destination if we chose to do so:

# Write record to processor output
output.write(record)

If there is data in the buffer, we send it to Splunk, and decode the JSON response:

if len(buffer) > 0:
  # Now submit a single request for the entire batch
  r = requests.post(url,
                    headers=state['headers'],
                    data=buffer).json()

We need to check that Splunk correctly received the data, and raise an exception if it did not. This ensures that, in the event of an error, the pipeline will be stopped and the data can be reprocessed once the error is rectified:

# Check for errors from Splunk
if r['code'] != 0:
  log.error('Splunk error: {}: {}', r['code'], r['text'])
  raise Exception('Splunk API error {0}: {1}'.format(r['code'], r['text']))

Finally, we log the status message we received from Splunk:

# All is good
log.info('Splunk API response: {}', r['text'])

Conclusion

StreamSets Data Collector's Jython evaluator allows you to efficiently integrate with APIs such as Splunk's HTTP Event Collector, where you want to make a single HTTP request per batch of records. While the script presented above allows you to efficiently send records to Splunk, the same techniques can be used with any web service API.

Pat PattersonIngest Data into Splunk with StreamSets Data Collector