Dataflow Performance Blog

Transform Data in StreamSets Data Collector

I've written quite a bit over the past few months about the more advanced aspects of data manipulation in StreamSets Data Collector (SDC) – writing custom processors, calling Java libraries from JavaScript, Groovy & Python, and even using Java and Scala with the Spark Evaluator. As a developer, it's always great fun to break out the editor and get to work, but we should be careful not to jump the gun. Just because you can solve a problem with code, doesn't mean you should. Using SDC's built-in processor stages is not only easier than writing code, it typically results in better performance. In this blog entry, I'll look at some of these stages, and the problems you can solve with them.

I've created a sample pipeline showing an extended use case using the Dev Raw Data origin and Trash destination so it will run no matter your configuration. You'll need SDC 2.4.0.0 or higher to import it. Run preview, and you'll see exactly what's going on. Experiment with the configuration and sample data to see how you can apply the same techniques to your data transformations.

We'll start with some company data in JSON format such as you might receive from an API call and proceed through a series of transformations resulting in records that are ready to be inserted in a database.

Field Pivoter

The Field Pivoter can split one record into many, ‘pivoting' on a collection field – that is, a list, map, or list-map. This is particularly useful when processing API responses, which frequently contain an array of results.

For example, let's say we have the following input:

We want to create a record for each result, with namestreet, etc as fields. Configure a Field Pivoter to pivot on the /results field and copy all of its fields to the root of the new record, /

Field Pivoter

Let's take a look in preview:

Pivoted Fields

Field Flattener

Data formats such as Avro and JSON can represent hierarchical structures, where records contain fields that are themselves collections of other fields. For example, each record in our use case now has the following structure:

Many destinations, such as relational databases, however, require a ‘flat' record, where each field is simply a string, integer, etc. The Field Flattener, as its name implies, flattens the structure of the record. Configuration is straightforward – specify whether you want to flatten the entire record, or just a specific field, and the separator you would like to use. For example:

Field Flattener

 

Applying this to the sample data above results in a record with fields address.street, address.city, etc:

Field Flattener

 

Field Renamer

So we've got a nice flat record structure, but those field names don't match the columns in the database. It would be nice to rename those fields to street, city, etc. The Field Renamer has you covered here. Now, you could explicitly specify each field in the address, but that's a bit laborious, not to mention brittle in the face of data drift. We want to be able to handle new fields, such as latitude and longitude, appearing in the input data without having to stop the pipeline, reconfigure it, and restart it. Let's specify a regular expression to match field names with the prefix address.

Field Renamer

That regular expression – /'address\.(.*)' – is a little complex, so let's unpack it. The initial / is the field path – we want to match fields in the root of the record. We quote the field name since it contains what, for SDC, is a special character – a period. We include that period in the prefix that we want to match, escaping it with a backslash, since the period has a special meaning in regular expressions. Finally, we use parentheses to delimit the group of characters that we want to capture for the new field name: every remaining character in the existing name.

The target field expression specifies the new name for the field – a slash (its path), followed by whatever was in the delimited group. This was all a precise, if slightly roundabout, way to say “Look for fields that start with address. and rename them with whatever is after that prefix.

Let's see it in action on the output from the Field Flattener:

Renamed Fields

Field Splitter

Our (imaginary!) destination needs street number separately from street name. No problem! The Field Splitter was designed for exactly this situation. It works in a similar way to the renamer, but on field values rather than names.

Field Splitter

We're splitting the /street field on sequences of one or more whitespace characters. This will ensure that 2  Bryant St (with two spaces) will be treated the same as 2 Bryant St (with a single space). We put the results of the split operation into two new fields: /street_number and /street_name. If we can't split the /street field in two, then we send the record to the error stream; if there are more than two results, we want to put the remaining text, for example, Bryant St in the last field. Finally, we want to remove the original /street field.

The results are as you might expect:

Split Fields

Conclusion

Here is the final pipeline, that pivots, flattens, renames and splits fields in the incoming data:

Field Pipeline

I mentioned performance earlier. As a quick, albeit unscientific, test, I ran this pipeline on my MacBook Air for a minute or so; it processed about 512 records/second. I coded up the same functionality in Groovy, ran it for a similar length of time, and it only processed about 436 records/second.

Download StreamSets Data Collector, follow the installation docs, import the field manipulation pipeline, and try it out. Let us know in the comments if you find a (generally useful) transformation that we can't do ‘out of the box'!

Pat PattersonTransform Data in StreamSets Data Collector
  • Huzaira Bashir

    how can we use streamsets to pick csv files and then convert them to JSON object before storing them on the file system again?

    • http://blog.superpat.com/ Pat Patterson

      You would configure the Directory origin for CSV input and connect it to a Local FS destination configured for JSON output. It should be very straightforward – feel free to get in touch with our community if you need any help: https://streamsets.com/community/