Getting Started

What is StreamSets Transformer?

StreamSets Transformer is an execution engine that runs data processing pipelines on Apache Spark, an open-source cluster-computing framework. Because Transformer pipelines run on Spark deployed on a cluster, the pipelines can perform transformations that require heavy processing on the entire data set in batch or streaming mode.

Transformer is designed to run on any type of cluster. At this time, Transformer works with Hadoop distributions and Databricks.

You install Transformer on a machine that is configured to submit Spark jobs to a cluster, such as a Hadoop edge or data node or a cloud virtual machine. You then register Transformer to work with StreamSets Control Hub.

You use a web browser to access the Control Hub user interface (UI). Within Control Hub, you design Transformer pipelines and configure a job to run the pipeline. Transformer pipelines read data from one or more origins, transform the data by performing operations on the entire data set, and then write the transformed data to destinations. Transformer pipelines can run in batch or streaming mode.

When you start a job with a Transformer pipeline, Transformer submits the pipeline as a Spark application to the cluster. Spark handles all of the pipeline processing, including performing complex transformations on the data, such as joining, sorting, or aggregating the data. As the Spark application runs, you use the Control Hub UI to monitor the progress of the pipeline, including viewing real-time statistics and any errors that might occur.

With Transformer, you can leverage the performance and scale that Spark offers without having to write your own Spark application using Java, Scala, or Python.

Note: Transformer is not compatible with Data Protector. If your organization uses Data Protector, note that Transformer jobs are not protected by Data Protector policies.

Pipeline Processing on Spark

Transformer functions as a Spark client that launches distributed Spark applications.

When you start a pipeline on a Hadoop cluster, Transformer uses the Spark Launcher API to launch a Spark application. When you start a pipeline on a Databricks cluster, Transformer uses the Databricks REST API to run a Databricks job which launches a Spark application.

Transformer passes the pipeline definition in JSON format as an application argument. Spark runs the application just as it runs any other application, distributing the processing across nodes in the cluster.
Note: To get started with Transformer in a development environment, you can simply install both Transformer and Spark on the same machine and run Spark locally on that machine. In this case, Spark runs each application on the single machine.
Each pipeline stage represents a Spark operation:
  • The Spark operation for an origin reads data from the origin system in a batch. The origin represents the data as a Spark DataFrame and passes the DataFrame to the next operation.
  • The Spark operation for each processor receives a DataFrame, operates on that data, and then returns a new DataFrame that is passed to the next operation.
  • The Spark operation for a destination receives a DataFrame, converts the DataFrame to the specified data format such as Avro, Delimited, JSON, or Parquet, and then writes the converted data to the destination system.

As the Spark application runs, you use the Transformer UI to monitor the progress of the pipeline and troubleshoot any errors. When you stop the pipeline, Transformer stops the Spark application.

The following image shows how Transformer submits a pipeline to Spark as an application and how Spark runs that application:

Batch Case Study

Transformer can run pipelines in batch mode. A batch pipeline processes all available data in a single batch, and then stops.

A batch pipeline is typically used to process data that has already been stored over a period of time, often in a relational database or in a raw or staging area in a Hadoop Distributed File System (HDFS).

Let's say that you have an existing data warehouse in a relational database. You need to create a data mart for the sales team that includes a subset of the data warehouse tables. To create the data mart, you need to join data from the Retail and StoreDetails tables using the store zip code field. The Retail table includes transactional data for each order, including the product ID, unit price, store ID, and store zip code. The StoreDetails table includes demographic data for each store zip code, such as the city and population.

You also need to aggregate the data before sending it to the sales data mart to calculate the total revenue and total number of orders for each zip code.

In addition, you need to send the same joined data from the Retail and StoreDetails tables to Parquet files so that data scientists can efficiently analyze the data. To increase the analytics performance, you need to create a surrogate key for the data and then write the data to a small set of Parquet files.

The following image shows a high-level design of the data flow and some of the sample data:

You can use Transformer to create and run a single batch pipeline to meet all of these needs.

Let's take a closer look at how you design the batch pipeline:
Set execution mode to batch
On the General tab of the pipeline, you set the execution mode to batch.
Join data from two source tables
You add two JDBC origins to the pipeline, configuring one to read from the Retail database table and the other to read from the StoreDetails table. You want both origins to read all rows in each table in a single batch, so you use the default value of -1 for the Max Rows per Batch property for the origins.
You add a Join processor to perform an inner join on the data read by the two origins, joining data by the matching store zip code field.
Aggregate data before writing to the data mart
You create one pipeline branch that performs the processing needed for the sales data mart.
After the Join processor, you add an Aggregate processor that calculates the total revenue and total number of orders for each zip code.
Finally, you add a JDBC destination to write the transformed data to the data mart.
Create a surrogate key and repartition the data before writing to Parquet files
You create a second pipeline branch to perform the processing needed for the Parquet files used by data scientists.
You add a Spark SQL Query processor to write a custom Spark SQL query that creates a surrogate key for each record in the input data. In the Spark SQL query, you call the Spark SQL function monotonically_increasing_id to generate a unique ID for each record.
The Join processor causes Spark to shuffle the data, splitting the data into a large number of partitions. However, since this branch writes to Parquet files, the data must be written to a small number of files for data scientists to efficiently analyze the data. So you add a Repartition processor to decrease the number of partitions to four.
Finally, you add a File destination to the branch to write the data to Parquet files. The File destination creates one output file for each partition, so this destination creates a total of four output files.

The following image shows the complete design of this batch pipeline:

When you start this batch pipeline, the pipeline reads all available data in both database tables in a single batch. Each processor transforms the data, and then each destination writes the data to the data mart or to Parquet files. After processing all the data in a single batch, the pipeline stops.

Streaming Case Study

Transformer can run pipelines in streaming mode. A streaming pipeline maintains connections to origin systems and processes data at user-defined intervals. The pipeline runs continuously until you manually stop it.

A streaming pipeline is typically used to process data in stream processing platforms such as Apache Kafka.

Let's say that your website transactions are continuously being sent to Kafka. The website transaction data includes the customer ID, shipping address, product ID, quantity of items, price, and whether the customer accepted marketing campaigns.

You need to create a data mart for the sales team that includes aggregated data about the online orders, including the total revenue for each state by the hour. Because the transaction data continuously arrives, you need to produce one-hour windows of data before performing the aggregate calculations.

You also need to join the same website transaction data with detailed customer data from your data warehouse for the customers accepting marketing campaigns. You must send this joined customer data to Parquet files so that data scientists can efficiently analyze the data. To increase the analytics performance, you need to write the data to a small set of Parquet files.

The following image shows a high-level design of the data flow and some of the sample data:

You can use Transformer to create and run a single streaming pipeline to meet all of these needs.

Let's take a closer look at how you design the streaming pipeline:

Set execution mode to streaming
On the General tab of the pipeline, you set the execution mode to streaming. You also specify a trigger interval that defines the time that the pipeline waits before processing the next batch of data. Let's say you set the interval to 1000 milliseconds - that's 1 second.
Read from Kafka and then create one-hour windows of data
You add a Kafka origin to the Transformer pipeline, configuring the origin to read from the weborders topic in the Kafka cluster.
To create larger batches of data for more meaningful aggregate calculations, you add a Window processor. You configure the processor to create a tumbling window using one-hour windows of data.
Aggregate data before writing to the data mart
You create one pipeline branch that performs the processing needed for the sales data mart.
You want to aggregate the data by the shipping address state and by the hour. After the Window processor, you add a Spark SQL Expression processor that uses the current_timestamp() Spark SQL function to calculate the current time and write the value to a new time field. Then you add an Aggregate processor that calculates the total revenue and total number of orders by each state and hour.
Finally, you add a JDBC destination to write the transformed data to the data mart.
Filter, join, and repartition the data before writing to Parquet files
You create a second pipeline branch to perform the processing needed for the Parquet files used by data scientists.
You add a Filter processor to pass records downstream where the customer accepted marketing campaigns. The Filter processor drops all records where the customer declined the campaigns.
You add a JDBC origin to the pipeline, configuring the origin to read from the Customers database table. You want the origin to read all rows in a single batch, so you use the default value of -1 for the Max Rows per Batch property. You configure the origin to load the data only once so that the origin reads from the table once and then stores the data on the Spark nodes. When processing subsequent batches, the pipeline looks up that data on the Spark nodes.
You add a Join processor to perform an inner join on the data produced by the Filter processor and the data produced by the JDBC origin, joining data by the matching customer ID field.
The Join processor causes Spark to shuffle the data, splitting the data into a large number of partitions. However, since this branch writes to Parquet files, the data must be written to a small number of files for data scientists to efficiently analyze the data. So you add a Repartition processor to decrease the number of partitions to four.
Finally, you add a File destination to the branch to write the data to Parquet files. The File destination creates one output file for each partition, so this destination creates a total of four output files.

The following image shows the complete design of this streaming pipeline:

When you start this streaming pipeline, the pipeline reads the available online order data in Kafka. The pipeline reads customer data from the database once, storing the data on the Spark nodes for subsequent lookups.

Each processor transforms the data, and then each destination writes the data to the data mart or to Parquet files. After processing all the data in a single batch, the pipeline waits 1 second, then reads the next batch of data from Kafka and reads the database data stored on the Spark nodes. The pipeline runs continuously until you manually stop it.

Transformer for Data Collector Users

For users already familiar with StreamSets Data Collector pipelines, here's how Transformer pipelines are similar... and different.

At a high level, Data Collector pipelines can read from and write to a large number of heterogeneous origins and destinations. These pipelines perform lightweight data transformations that are applied to individual records or a single batch. In contrast, Transformer pipelines can perform heavyweight transformations such as joins, aggregates, and sorts on the entire data set.

Transformer pipelines are configured on the same canvas as Data Collector pipelines. The difference lies in the available functionality and how the execution engine - Transformer or Data Collector - executes the pipeline.

Some functionality is exactly the same. For example, you can use runtime parameters in both Transformer and Data Collector pipelines. And you can use origin, processor, and destination stages to define processing in both pipeline types.

Transformer includes some stages with the same name as Data Collector stages. Though they probably do what you expect, they also operate differently from their Data Collector counterparts because they are executed by Transformer using Apache Spark.

And of course, some functionality is unique to Transformer. For example, a Transformer pipeline can include multiple origins whereas Data Collector allows only one. A Transformer pipeline can also read all available data in a single batch. Because it can do that, it can also perform complex processing, such as ranking and deduplication, across the entire data set.

Here are some highlights of the similarities and differences between Transformer and Data Collector pipelines:
Category Transformer Pipeline Data Collector Pipeline
Execution engine Runs on a Spark cluster. Can run on a local Transformer machine for development. Runs on a StreamSets open source engine as a single JVM on bare metal, a VM, a container, or in the cloud.
Control Hub job Runs a single pipeline instance on one Transformer for each job. Spark automatically scales out the pipeline processing across nodes in the cluster. Can run multiple pipeline instances on multiple Data Collectors for each job. You manually scale out the pipeline processing by increasing the number of pipeline instances for a job.
Number of origins Allows multiple origins. Allows one origin.
Schema Requires all records in a batch to have the same schema.

File-based origins require that all files processed in the same pipeline run have the same schema.

As a batch passes through the pipeline, the schema for the data can change, but all data must have the same schema.

As a result, if a processor alters the schema of a subset of records in the batch, then the remaining records are similarly altered to ensure they have the same schema. For example, if a processor generates a new field for a subset of records, that field is added to the remaining records with null values. This is expected Spark behavior.

Allows records within a batch to have different schemas.
Streaming pipeline execution Provides streaming execution mode to process streaming data. Streaming pipelines do not stop until you stop the pipeline. Processes streaming data by default, not stopping until you stop the pipeline.
Batch pipeline execution Provides batch execution mode to process all available data in a single batch, then stop the pipeline.

This is the default execution mode.

Enabled by configuring the pipeline to pass a dataflow trigger to a Pipeline Finisher executor to stop a pipeline after processing all data.
Dataflow triggers and executor stages Not available. Available.
Calculations across records within a batch Available in stages such as the Aggregate, Deduplicate, Rank, and Sort processors. Not available.
Merging streams Provides a Union processor to merge multiple data streams. Allows merging multiple streams by simply connecting multiple stages to a single stage.
Joining data Provides a Join processor to join records from two data streams. Provides lookup processors to enhance data in the primary data stream.
Expression language Supports using the Spark SQL query language and some StreamSets functions in expressions.

See the individual stage documentation for details, such as the Filter processor or Spark SQL Expression processor.

Supports using the StreamSets expression language in expressions.
Merge consecutive batches Provides the Window processor to merge small streaming batches into larger batches for processing. Consecutive batches cannot be merged in the pipeline.
Repartition data Provides the Repartition processor to repartition data. Not available.
Stage library versions Requires all stages that access the same external system in a pipeline to use the same stage library version. Allows using different stage library versions in a pipeline to access the same external system.
Preview display Lists records based on the input order of the stage. Processors can display records based on the input order or output order of the processor. Lists records based on the input order of the stage.
Record format Records do not include record header attributes or field attributes. Records include record header attributes and can include field attributes.
Internal record representation Spark data frame. SDC record data format.
JSON file processing Files must include data of the JSON Lines format. JSON objects can be on a single line or span multiple lines.

For more information, see "Data Formats" in the origin documentation.

Files can contain multiple JSON objects or a single JSON array.
Fields and Field Paths When referencing a field, you do not use a leading slash.

For more information, see Referencing Fields.

When referencing a field, you typically use a leading slash to denote the field path.