Dataflow Performance Blog

Upgrading From Apache Flume to StreamSets Data Collector

Apache Flume “is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data”. The typical use case is collecting log data and pushing it to a destination such as the Hadoop Distributed File System. In this blog entry we'll look at a couple of Flume use cases, and see how they can be implemented with StreamSets Data Collector.

As reliable as Flume is, it's not the easiest of systems to set up – even the simplest deployment demands a pretty arcane configuration file. For example:

# Sample Flume configuration to copy lines from
# log files to Hadoop FS

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
a1.sources.r1.spoolDir = /Users/pat/flumeSpool

a1.channels.c1.type = memory

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.path = /flume/events
a1.sinks.k1.hdfs.useLocalTimeStamp = true

A few minutes study (with a couple of references to the Flume documentation) reveals that the config file defines a Flume agent that reads data from a directory and writes to Hadoop FS. The data will be written as plain text, with the default 10 lines per Hadoop file.

While this example is relatively straightforward, things get complicated quickly as you add more functionality. For example, if you wanted to filter out log entries referencing a given IP address, you would add an interceptor:

# Throw away entries beginning with 1.2.3.4
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_filter
a1.sources.r1.interceptors.i1.regex = ^1\.2\.3\.4
a1.sources.r1.interceptors.i1.excludeEvents = true

It soon becomes difficult to look at a Flume config file and understand exactly what it does, let alone make changes or additions, without investing considerable time in mentally parsing it.

StreamSets Data Collector is an any-to-any big data ingest tool that picks up where Flume runs out of steam. Data Collector can interface with files, databases of every flavor, message queues; in fact, pretty much anything. A wide variety of prebuilt processors handle most transformations while script evaluators allow you flexibility to manipulate data in Python, JavaScript and Groovy.

Let's take a look at an equivalent pipeline, reading log data from a directory and writing to Hadoop FS, in Data Collector:

screen-shot-2016-11-29-at-6-07-44-pm

Clicking around the UI reveals the configuration for each pipeline stage, including the default values for every setting:

screen-shot-2016-11-29-at-6-07-56-pm

Running the pipeline, we can clearly see that a short test file is correctly sent to Hadoop FS:

screen-shot-2016-11-29-at-6-07-31-pm

This is great, but it gets even better as we add more functionality. Let's replicate the Flume interceptor, filtering out records starting with ‘1.2.3.4'. Data Collector doesn't have a direct equivalent of ‘regex filter' – it's actually much more flexible than that. We can use a Stream Selector stage to separate out records that match a given condition. In this case, we'll send them to the trash. We also get to use Data Collector's Expression Language (based on JSP 2.0 expression language) to define conditions, so we can match records starting with a given string without having to construct a regular expression:

screen-shot-2016-11-29-at-6-14-58-pm

Here's the first few lines of the input test file:

$ head -n 6 ~/sdcSpool/tester.txt
1.2.3.4 Trash
2.3.4.5 Keep
1.2.3.4 Trash
2.3.4.5 Keep
1.2.3.4 Trash
2.3.4.5 Keep 1.2.3.4

Let's run the pipeline with the Stream Selector:

screen-shot-2016-11-29-at-6-25-04-pm

Uh-oh – that doesn't look right! The pipeline should be filtering out those lines that start 1.2.3.4, but the record count at the bottom of the display shows that the Stream Selector is sending everything to Hadoop FS.

The first few lines of output confirm the problem:

$ hdfs dfs -cat /sdc/events/sdc-20f17369-362f-42b7-a526-41d87aa3b21c_f9c32a2b-3996-450b-bdc7-d3a665d7f6ae | head -n 6
1.2.3.4 Trash
2.3.4.5 Keep
1.2.3.4 Trash
2.3.4.5 Keep
1.2.3.4 Trash
2.3.4.5 Keep 1.2.3.4

Let's debug the pipeline – preview mode reads the first few lines of input and shows us the record's state at every stage of the data flow. Let's take a look:

screen-shot-2016-11-29-at-6-20-17-pm

All of the data is being sent to Hadoop FS, including the 1.2.3.4 records. Let's look at the Stream Selector configuration:

screen-shot-2016-11-29-at-6-21-22-pm

How did that extra dot get in there? Never mind… Let's fix it and preview again, just to check:

screen-shot-2016-11-29-at-6-22-13-pm

That's better! We can see that lines starting ‘1.2.3.4' match the condition, and are sent to trash via stream 1, while everything else is sent to Hadoop FS via stream 2.

We can reset the directory origin, rerun the pipeline, and see what happens:

screen-shot-2016-11-29-at-6-24-40-pm

Looks good – records are being sent to both trash and Hadoop FS – let's check the output!

$ hdfs dfs -cat /sdc/events/sdc-20f17369-362f-42b7-a526-41d87aa3b21c_8095f7f2-fbb4-45c0-b827-66034ebbbc98 | head -n 6
2.3.4.5 Keep
2.3.4.5 Keep
2.3.4.5 Keep 1.2.3.4
2.3.4.5 Keep
2.3.4.5 Keep
2.3.4.5 Keep 1.2.3.4

Success – we see only the lines that do not have the prefix ‘1.2.3.4'!

Even as we extend the pipeline to do more and more, Data Collector's web UI allows us to easily comprehend the data flow. For example, this pipeline operates on transaction records, computing the credit card type and masking credit card numbers as appropriate:

screen-shot-2016-11-29-at-6-34-15-pm

In these examples, we're running Data Collector in ‘standalone' mode, reading log files from local disk, but we could just as easily scale out with a cluster pipeline to work with batch or streaming data, and even integrate with Kerberos for authentication.

If your Flume configuration files are getting out of hand, download StreamSets Data Collector (it's open source!) and try it out. It's likely quicker and easier to recreate a Flume configuration from scratch in Data Collector than it is to extend your existing Flume agent!

Pat PattersonUpgrading From Apache Flume to StreamSets Data Collector