skip to Main Content

Create a Custom Expression Language Function for StreamSets Data Collector

By Posted in Data Integration April 28, 2017

Custom Expression Language SnapshotOne of the most powerful features in StreamSets Data Collector Engine is support for Expression Language, or ‘EL’ for short. EL was introduced in JavaServer Pages (JSP) 2.0 as a mechanism for accessing Java code from JSP. The Expression Evaluator and Stream Selector stages rely heavily on EL, but you can use Expression Language in configuring almost every SDC stage. In this blog entry I’ll explain a little about EL and show you how to write your own EL functions.

EL Basics

As its name implies, Expression Language allows you to do more than just access Java code – you can write expressions such as

${str:length(record:value('/id')) > 10}

This will evaluate to true if the /id field’s value is more than 10 characters long, otherwise it will be false.

SDC includes a wide variety of EL functions for purposes such as accessing a record’s fields and attributes, detecting drift in record structure, and performing standard math and string operations. You can get a long way with these ‘off-the-shelf’ functions and, when you want to go further, it’s really straightforward to create custom EL functions.

Custom Expression Language Functions

Let’s say you’re processing web server log data and you want to filter out any requests from clients in the local ‘private’ network address ranges; for example, the 192.168.1.0 – 192.168.1.255 range. Java helpfully provides an isSiteLocalAddress() method on the InetAddress class, and Google’s Guava library allows us to create an InetAddress object from a string containing an IP address without hitting the network, so we can easily sketch out a class with an isPrivate() method:

package com.streamsets.el.example;

import com.google.common.net.InetAddresses;

public class DomainNameEL {
  public static boolean isPrivate(String address) {
    return InetAddresses.forString(address).isSiteLocalAddress();
  }
}

What if we pass something that isn’t an IP address at all? InetAddresses.forString() will throw an exception, which we don’t want to happen while we’re running our pipeline, so let’s catch that, and any other possible exceptions such as null pointers:

package com.streamsets.el.example;

import com.google.common.net.InetAddresses;

public class DomainNameEL {
  public static boolean isPrivate(String address) {
    try {
      return InetAddresses.forString(address).isSiteLocalAddress();
    } catch (Exception e) {
      return false;
    }
  }
}

Note – when you use Expression Language in a processor stage, SDC evaluates it using a dummy record as part of pipeline validation, so you should take care that your EL functions don’t throw an exception on empty or unexpected input.

To make this available as an EL function we just need to add some annotations:

package com.streamsets.el.example;

import com.google.common.net.InetAddresses;
import com.streamsets.pipeline.api.ElFunction;
import com.streamsets.pipeline.api.ElParam;
import com.streamsets.pipeline.api.ElDef;

// @ElDef marks this as a class that contains EL functions
@ElDef
public class DomainNameEL {
  private static final String DNS = "dns";

  // This is an EL function - it must be public static
  @ElFunction(
    prefix = DNS,
    name = "isPrivate",
    description = "Returns true if this is a private IPv4 address."
  )
  public static boolean isPrivate(
    // @ElParam assigns a UI name to a parameter
    @ElParam("address") String address
  ) {
    try {
      return InetAddresses.forString(address).isSiteLocalAddress();
    } catch (Exception e) {
      return false;
    }
  }
}

And that’s it! We can use Maven to build this into a JAR using a very standard pom.xml file, copy the JAR to $SDC_DIST/libs-common-lib, and use it in a pipeline:

Custom Expression Language Pipeline

Let’s preview the Expression Language pipeline on some test data:

Custom EL Preview

Success! Note that this is a deliberately simple, but still useful, example. EL functions can accept any number of arguments and be arbitrarily complex. What functionality are you going to build into a custom Expression Language function? Let us know in the comments!

Want to start building smart data pipelines today? Try them with the EL function.

manage-smart-data-pipelines

StreamSets enables data engineers to build end-to-end smart data pipelines. Spend your time building, enabling and innovating instead of maintaining, rewriting and fixing.

Conduct Data Ingestion and Transformations In One Place

Deploy across hybrid and multi-cloud
Schedule a Demo
Back To Top