Dataflow Performance Blog

Fun with FileRefs – Manipulating Whole File Data

rot13 processorAs well as parsing incoming data into records, many StreamSets Data Collector (SDC) origins can be configured to ingest Whole Files. The blog entry Whole File Transfer with StreamSets Data Collector provides a basic introduction to the concept.

Although the initial release of the Whole File feature did not allow file content to be accessed in the pipeline, we soon added the ability for Script Evaluator processors to read the file, a feature exploited in the custom processor tutorial to read metadata from incoming image files. In this blog post, I'll show you how a custom processor can both create new records with Whole File content, and replace the content in existing records.

One of our community members recently asked a question in the Slack channel:

I can read data via the whole file data format, obtain a stream from /fileRef and convert it into a file. I can then process this file as I need. Is there any way to create a new record with a new changed file?

Although my first reaction was “You can't do that!”, but a moment's thought suggested that it is, in fact, possible to create records with new whole file content. The /fileRef field is just an object that implements the FileRef abstract class. A custom processor, written in Java, can provide a class implementing FileRef to do just about anything, as long as it can return a Java InputStream.

Creating Whole File Content

As a quick example, I implemented a custom processor with a FileRef storing its data in a String, and posted it to Ask StreamSets.

In this example, the processor creates a StringFileRef holding some sample data, adds it to a new record and inserts that record into the batch:

StringFileRef itself is almost trivially simple:

Using this processor in a pipeline with an appropriate destination results in files (or S3 objects) with the expected content:

Replacing Whole File Content

Let's look at another use case – a processor that needs to replace whole file content, and on a more realistic scale than 43 characters!

Our use case is that our incoming files need some custom processing before they are written to the destination. For the purposes of our example, this processing is going to be ROT13 – a simple substitution cipher that replaces each letter of the incoming plaintext data with the letter 13 letters later in the alphabet, rotating past ‘Z' back to ‘A'. ROT13 is often used in online forums to hide punchlines to jokes, spoilers etc. It is not an encryption mechanism that should be used to protect production data!

The complete custom processor project is on GitHub, but I'll walk through the highlights here.

As before the FileRef implementation is trivial, just a wrapper around a File object:

Rot13DProcessor allows configuration of a directory in which to create files:

The processor's init() method checks that it's possible to write to the configured directory:

The process() method uses the incoming filename and the configured directory to initialize an OutputFileRef:

Now we create a writer for the OutputFileRef, get the incoming file's input stream and transform the data. Note that we have to close the output writer before reading the output file's metadata, so that we get the correct file size.

The createFieldForMetadata() and getFileMetadata() are utility functions based on the built-in FileRef implementations in SDC.

Since the processor is accessing the disk, we need to set up appropriate permissions in ${SDC_CONF}/sdc-security.policy:

Now that the record has new values for /fileRef and /fileInfo, we just write the record to the batchMaker as usual, and it will be passed on down the pipeline. Subsequent processors and destinations see the record just as if it were a standard Whole File record.

Custom FileRef pipelineRunning the pipeline on some handy input files results in the expected output files:

Let's take a look at some of the file content:

Pretty unintelligible! Fortunately, there's a handy shell command that can rot13 the data back to its original state:

All is, indeed, well that ends well!

Note – the output files are left in place in the configured directory, so you'll need to clean them up periodically:

Conclusion

StreamSets Data Collector's Whole Files data format allows you to build pipelines that read files unparsed from a variety of origins – Amazon S3, Directory, SFTP/FTP Client and, soon, Google Cloud Storage. Script Evaluators can access Whole File content via the FileRef field's createInputStream method; custom processors written in Java can both read incoming and write outgoing Whole File data to Amazon S3, Azure Data Lake Store, Hadoop FS, Local FS, MapR FS and, again soon, Google Cloud Storage.

What use cases do you have for processing Whole File data? Let us know in the comments!

Pat PattersonFun with FileRefs – Manipulating Whole File Data