A step-by-step walkthrough of how Mac Noland implemented StreamSets to move away from hand coded ETL and scale out an increasingly complex ingestion pipeline. Mac is a Solution Architect for phData, a Twin Cities services firm focused on Hadoop. He has spent 17 years as a software engineer and architect for projects in the legal, accounting, risk and medical device industries.
While many of our data loads into Hadoop come in batch, a few months back we were approached with an opportunity to provide a streaming data load capability for our Integration team. They are heavy users of traditional JMS messaging, and while that might change in the long-term to Kafka, getting them cut over didn’t align with project timelines. So, we went down the battle-tested Flume route.
The workflow started off pretty simple. We had a single JMS source, single file channel and single HDFS sink. Didn’t take long to implement and life was good. Then, like any project with success and momentum behind it, the requirements started to grow. Here’s how we built a streaming data ingestion pipeline.
The next thing we wanted to do is add a second sink for Solr so they could provide a search capability across the data set. In doing this, they wanted to provide some transformation logic on a few fields — e.g. transform the ingest time format. We used Morphlines with the SolrMorphlineSink which came together pretty nice with one small exception – growing complexity.
Publishing to Solr required us to ensure things like “id” and any required fields were populated correctly. If the message was nonconforming, the Solr sink would reject the publish action and leave the record in the sink. To clean these out, we set “isProductionMode=true” which sends them to the ether. However, this left us with messages that would not get ingested into Solr. Moreover, we didn’t have any visibility into when this happens and the ability to replay the messages.
Lastly, while the Flume and Morphline solution was easy for the Hadoop team to implement, we struggled with getting new team members up to speed on the Flume configuration and the Morphline syntax. Flume and Morphlines are admittedly verbose and resonate best with developers. To scale out our streaming ingestion pipeline, we needed something more user friendly for non-developers. Enter StreamSets Data Collector.
When putting together the proposal for our management, we identified three differentiators that separated StreamSets from our Flume/Morphline setup:
- Elegant handling of nonconforming messages. For example, right now if we get a message in that doesn’t have a ‘msgdate’ field on it, Solr will reject it and send it to the ether. StreamSets has a concept of staging those messages so they can be dealt with and re-published. This inertly gave us visibility into the issue as well.
- Drop and Drag ingest process modeling. Right now it’s all done in Flume and Morphline configuration files, which again, resonates best with developers. However, we’re not all developers and the current solution presents a barrier to entry for non-developers trying to use Flume and Morphlines. StreamSets has made an investment in the “designer” tools that break down the barrier to entry for non-developers. This immediately resonated with the group.
- Better operational support for handling multiple streams and splitting ones that need to go do both “data lake” and “tenant lake” locations. For example, if we get messages from integration and based on a field type, we need to publish specific values to the “tenant lake” locations (or possibly a different Solr collection), we’re going to have some operational constraints on doing that in Flume. It’s doable, but we’ll have a very large Flume configuration.
We are currently in proof-of-concept mode with StreamSets and everything is looking very promising for switching out our Flume/Morphline configuration for the innovation StreamSets has provided in this space.
Below I will walk through the pipeline. First note that the SOLR index is completely empty:
Now we’ll start publishing messages to a JMS queue. They are simple text messages with random words. Periodically the program outputs two types of bad records. Records without an id message header and records with empty content. We will use two of the StreamSets error handling facilities later on to catch these bad records.
This example pipeline has a few “stages.” Now that I have learned more about StreamSets, I’ve found that I could remove both the “Set Id Field” and “Set Content Field” stages but for our purposes we’ll keep the pipeline as is.
First we must configure the origin or source. Here we configure a JMS Consumer origin to read from an ActiveMQ queue:
Next we copy the id attribute. An attribute is like a header and the JMS origin copies all headers from the JMS message into the attributes. This step could be removed and simply access the attribute using the expression language (EL) in the SOLR configuration:
Next we rename the /text field to /content as SOLR expects. Again this could be eliminated since the mapping can be done in the SOLR config.
Now we configure the field selector which I think of as a case statement. We’ll send records with null id fields to a separate HDFS location. Here we use “preconditions” to mark records as errors. We could have done this with the null /id field as well, but I wanted to show both mechanisms. The error records can be configured to go a separate location such as Kafka or a directory. As we’ll see later as we run the pipeline, errors show up visually as well.
Below is one of the HDFS sink configurations. We will write the data out as Avro. Note that the File Type is Text when you select Avro as the data format. This does not mean the output files will be text, they will be standard Avro files:
Below is the Avro schema for the error records. Note that id can be null:
Whereas it cannot be null for the HDFS sink for valid records:
Here is our SOLR config:
After configuring the pipeline, I kicked start and we were off to the races. Note the red oval on the field selector. Clicking that brings up recent errors, which is a great feature. As mentioned earlier, this error happened because one of the previously defined precondition found rows with NULL in the ‘Content’ field. In our setup we are sending such rows to another HDFS location.
And now we have data in SOLR, and HDFS. Such alerts are useful to go in and examine the data after running some test messages. Although, StreamSets is designed for continuous operations, so it can remain on and continue to process JMS messages whenever they come in. One aspect I like about StreamSets is the alerting. Here I have an alert telling me that the pipeline is idle, after it processed all the messages. Maintaining hand coded ETL pipelines developed with Flume and Morphlines would have been a tedious exercise for some of the non-developer members of our integration team. StreamSets provided us a simple way to deliver a complex solution that the team can maintain without a lot of effort, and it doesn’t hurt that the tooling is production ready so we don’t have to worry about scaling it up for real-world workloads.