Apache Flume “is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data”. The typical use case is collecting log data and pushing it to a destination such as the Hadoop Distributed File System. In this blog entry we'll look at a couple of Flume use cases, and see how they can be implemented with StreamSets Data Collector.
As reliable as Flume is, it's not the easiest of systems to set up – even the simplest deployment demands a pretty arcane configuration file. For example:
# Sample Flume configuration to copy lines from # log files to Hadoop FS a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.channels = c1 a1.sources.r1.spoolDir = /Users/pat/flumeSpool a1.channels.c1.type = memory a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.path = /flume/events a1.sinks.k1.hdfs.useLocalTimeStamp = true
A few minutes study (with a couple of references to the Flume documentation) reveals that the config file defines a Flume agent that reads data from a directory and writes to Hadoop FS. The data will be written as plain text, with the default 10 lines per Hadoop file.
While this example is relatively straightforward, things get complicated quickly as you add more functionality. For example, if you wanted to filter out log entries referencing a given IP address, you would add an interceptor:
# Throw away entries beginning with 184.108.40.206 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = regex_filter a1.sources.r1.interceptors.i1.regex = ^1\.2\.3\.4 a1.sources.r1.interceptors.i1.excludeEvents = true
It soon becomes difficult to look at a Flume config file and understand exactly what it does, let alone make changes or additions, without investing considerable time in mentally parsing it.
Let's take a look at an equivalent pipeline, reading log data from a directory and writing to Hadoop FS, in Data Collector:
Clicking around the UI reveals the configuration for each pipeline stage, including the default values for every setting:
Running the pipeline, we can clearly see that a short test file is correctly sent to Hadoop FS:
This is great, but it gets even better as we add more functionality. Let's replicate the Flume interceptor, filtering out records starting with ‘220.127.116.11'. Data Collector doesn't have a direct equivalent of ‘regex filter' – it's actually much more flexible than that. We can use a Stream Selector stage to separate out records that match a given condition. In this case, we'll send them to the trash. We also get to use Data Collector's Expression Language (based on JSP 2.0 expression language) to define conditions, so we can match records starting with a given string without having to construct a regular expression:
Here's the first few lines of the input test file:
$ head -n 6 ~/sdcSpool/tester.txt 18.104.22.168 Trash 22.214.171.124 Keep 126.96.36.199 Trash 188.8.131.52 Keep 184.108.40.206 Trash 220.127.116.11 Keep 18.104.22.168
Let's run the pipeline with the Stream Selector:
Uh-oh – that doesn't look right! The pipeline should be filtering out those lines that start 22.214.171.124, but the record count at the bottom of the display shows that the Stream Selector is sending everything to Hadoop FS.
The first few lines of output confirm the problem:
$ hdfs dfs -cat /sdc/events/sdc-20f17369-362f-42b7-a526-41d87aa3b21c_f9c32a2b-3996-450b-bdc7-d3a665d7f6ae | head -n 6 126.96.36.199 Trash 188.8.131.52 Keep 184.108.40.206 Trash 220.127.116.11 Keep 18.104.22.168 Trash 22.214.171.124 Keep 126.96.36.199
Let's debug the pipeline – preview mode reads the first few lines of input and shows us the record's state at every stage of the data flow. Let's take a look:
All of the data is being sent to Hadoop FS, including the 188.8.131.52 records. Let's look at the Stream Selector configuration:
How did that extra dot get in there? Never mind… Let's fix it and preview again, just to check:
That's better! We can see that lines starting ‘184.108.40.206' match the condition, and are sent to trash via stream 1, while everything else is sent to Hadoop FS via stream 2.
We can reset the directory origin, rerun the pipeline, and see what happens:
Looks good – records are being sent to both trash and Hadoop FS – let's check the output!
$ hdfs dfs -cat /sdc/events/sdc-20f17369-362f-42b7-a526-41d87aa3b21c_8095f7f2-fbb4-45c0-b827-66034ebbbc98 | head -n 6 220.127.116.11 Keep 18.104.22.168 Keep 22.214.171.124 Keep 126.96.36.199 188.8.131.52 Keep 184.108.40.206 Keep 220.127.116.11 Keep 18.104.22.168
Success – we see only the lines that do not have the prefix ‘22.214.171.124'!
Even as we extend the pipeline to do more and more, Data Collector's web UI allows us to easily comprehend the data flow. For example, this pipeline operates on transaction records, computing the credit card type and masking credit card numbers as appropriate:
In these examples, we're running Data Collector in ‘standalone' mode, reading log files from local disk, but we could just as easily scale out with a cluster pipeline to work with batch or streaming data, and even integrate with Kerberos for authentication.
If your Flume configuration files are getting out of hand, download StreamSets Data Collector (it's open source!) and try it out. It's likely quicker and easier to recreate a Flume configuration from scratch in Data Collector than it is to extend your existing Flume agent!