Scala

The Scala processor runs custom Scala code to transform data. You develop the custom code using the Spark APIs for the version of Spark installed on your cluster, which must be compliant with Scala 2.11.x. Complete the prerequisite tasks before using the processor in a pipeline.

The processor can have one or more input streams and a single output stream.

The Scala processor receives a Spark DataFrame from each input stream, runs your custom Scala code to transform the DataFrames, and then returns a single DataFrame as output. The processor does not run the code on empty DataFrames, by default.

When you configure the Scala processor, you specify the code to run and whether to run the code on empty DataFrames.

Scala Code

The custom code that you develop for the Scala processor can include any code valid for the Spark APIs for the version of Spark installed on your cluster, which must be compliant with Scala 2.11.x.

The Scala processor can have one or more input streams and a single output stream.

When the processor has multiple input streams, it receives a DataFrame from each stream. The custom code must call Scala operations to transform the DataFrames and produce a single output DataFrame.

In the custom code, use the inputs and output variables to interact with DataFrames. Reference fields in the DataFrames using the same notation required by Spark.

When you validate a pipeline, invalid code generates compilation errors. You can find compiler output in the Transformer logs.

For more information about the Spark APIs, see the Spark Scala API documentation for a supported version of Spark.

Input and Output Variables

In the custom code, use the following variables to interact with DataFrames:
inputs
Use the inputs variable to access input DataFrames.
Because the processor can receive multiple DataFrames, the inputs variable is an array. Use parenthesis notation ((#)) to indicate the position in the array. Use 0 to access the DataFrame from the first input stream connected to the processor. Use 1 to access the DataFrame from the second input stream, and so on.
For example, when the processor receives a single DataFrame, use inputs(0) to access the DataFrame. When the processor receives two DataFrames, use inputs(0) to access the DataFrame from the first input stream connected to the processor, and use inputs(1) to access the DataFrame from the second input stream.
output
Use the output variable to pass the transformed DataFrame to the next stage.
After the custom code performs transformations on the DataFrame, assign the transformed DataFrame to the output variable. The processor passes the contents of the output variable to the output stream.

Examples

Here is an example of custom Scala code developed for the Scala processor.

Increment Value in Field

The following code increments the value of the first field in the DataFrame by 1, and then saves the result to the output variable:
import spark.implicits._
import scala.collection.mutable.Buffer
output = inputs(0).map(r => r.getLong(0) + 1).toDF()

Configuring a Scala Processor

Configure a Scala processor to transform data based on custom Scala code.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
  2. On the Scala tab, configure the following properties:
    Scala Property Description
    Scala Code Scala code to run. Develop the code using the Spark APIs for the version of Spark installed on your cluster, which must be compliant with Scala 2.11.x.
    Note: When you validate a pipeline, invalid code generates compilation errors. You can find compiler output in the Transformer logs.
    Skip Empty Batches Skips running custom code on empty DataFrames. Empty DataFrames are passed to downstream stages.