Automating Pipeline Development with the StreamSets SDK for Python

When it comes to creating and managing your dataflow pipelines, the graphical user interfaces of StreamSets Control Hub and StreamSets Data Collector put the complete power of our robust Data Operations Platform at your fingertips. There are times, however, when a more programmatic approach may be needed, and those times will be significantly more enjoyable with the release of version 3.2.0 of the StreamSets SDK for Python. In this post, I'll describe some of the SDK's new functionality and show examples of how you can use it to enable your own data use cases.

Note that the StreamSets SDK for Python is part of our subscription offering, and requires a license key to work. For installation instructions, please see our documentation.

New Data Collector Control Functionality

The latest release of the StreamSets SDK for Python includes a number of new features to automate interactions with StreamSets Data Collector instances. To demonstrate these, we'll follow a typical development workflow where a user designs a pipeline, runs it and takes a snapshot to validate that the data is being processed as expected, confirms that neither the Data Collector instance nor the pipeline complain of any errors, and then repeats the process until the expected results are achieved. We've picked just a handful of new functionality to demonstrate below, but read our SDK documentation for more details.

Building pipelines

After opening a Python 3 interpreter and creating an instance of the DataCollector class to communicate with your SDC instance, pipelines are built with a PipelineBuilder  object (in the example below, we assume a Data Collector running on localhost:18630):

Starting and Stopping Pipelines

This one's pretty self-explanatory. Note that the DataCollector.start_pipeline method will, by default, wait for the pipeline to reach a RUNNING or FINISHED state before returning. To make it return immediately, add wait=False to the argument list.

Capturing and Examining Pipeline Snapshots

The DataCollector.capture_snapshot method returns a streamsets.sdk.sdc_api.SnapshotCommand instance, which lets you monitor the state of the snapshot asynchronously. By accessing this object's snapshot attribute, we can wait for the snapshot to be completed and then access its records (below, we're looking at the first record output by the Dev Data Generator stage). Also note that if your pipeline is stopped and you'd like to capture a snapshot of the first batch upon starting it, simply add the start_pipeline=True argument when capturing the snapshot:

Getting Pipeline History

Again, pretty self-explanatory. In our case, we'll go a step further and query the latest entry in our pipeline history for metrics (e.g. to determine how many batches were processed by the pipeline):

Getting Data Collector Logs

The streamsets.sdk.sdc_models.Log instance returned by this function can either be printed directly or manipulated using the Log.after_time or Log.before_time methods:

Support for StreamSets Control Hub-Registered Data Collector Instances

Users looking to interact with Data Collectors registered with StreamSets Control Hub can now do so. After instantiating the DataCollector class tied to a ControlHub instance, every command described above should work just as with any other Data Collector instance:

Clearer Compatibility Story

With this release, the StreamSets SDK for Python has moved to a versioning schema that aligns it with releases of StreamSets Data Collector and StreamSets Control Hub. Moving forward, that means more clarity about which versions of StreamSets products are certified to work with SDK releases and more predictability about when new StreamSets features will be supported by the SDK.

What's Next?

We are committed to continue adding more functionality to the StreamSets SDK for Python to enable our customers' evolving data operations needs. Upcoming releases will greatly expand upon integration with StreamSets Control Hub to help enterprises looking to take advantage of its rapidly-growing feature set scale up their design, execution, and operations efforts.

We're also really excited about this release because it finally lets us pull back the curtains on the StreamSets Test Framework, which is built on top of the StreamSets SDK for Python. This is the platform we use to test the quality of our StreamSets product line, and we'll go into more detail about what it is, how it works, and how we use it in a future series of blog posts. Stay tuned!

Share This Article :

Related Posts

Receive Updates

Receive Updates

Join our mailing list to receive the latest news from StreamSets.

You have Successfully Subscribed!

Pin It on Pinterest