Cluster Pipeline Overview

A cluster pipeline is a pipeline that runs in cluster execution mode. You can run a pipeline in standalone execution mode or cluster execution mode.

In standalone mode, a single Data Collector process runs the pipeline. A pipeline runs in standalone mode by default.

In cluster mode, the Data Collector uses a cluster manager and a cluster application to spawn additional workers as needed. Use cluster mode to read data from a Kafka cluster, MapR cluster, HDFS, or Amazon S3.

When would you choose standalone or cluster mode? Say you want to ingest logs from application servers and perform a computationally expensive transformation. To do this, you might use a set of standalone pipelines to stream log data from each application server to a Kafka or MapR cluster. And then use a cluster pipeline to process the data from the cluster and perform the expensive transformation.

Or, you might use cluster mode to move data from HDFS to another destination, such as Elasticsearch.

Cluster Batch and Streaming Execution Modes

Data Collector can run a cluster pipeline using cluster batch or cluster streaming execution mode.

The execution mode that Data Collector can use depends on the origin system that the cluster pipeline reads from:

Kafka cluster
Data Collector can process data from a Kafka cluster in cluster streaming mode. In cluster streaming mode, Data Collector processes data continuously until you stop the pipeline.
Data Collector runs as an application within Spark Streaming, an open source cluster-computing application.
Spark Streaming runs on either the Mesos or YARN cluster manager to process data from a Kafka cluster. The cluster manager and Spark Streaming spawn a Data Collector worker for each topic partition in the Kafka cluster. As a result, each partition has a Data Collector worker to process data. If you add a partition to the Kafka topic, you must restart the pipeline to enable the Data Collector to generate a new worker to read from the new partition.
When Spark Streaming runs on YARN, you can limit the number of workers spawned by configuring the Worker Count cluster pipeline property. You can also use the Extra Spark Configuration property to pass Spark configurations to the spark-submit script. In addition, you can configure the Kafka Consumer origin in a cluster streaming pipeline on YARN to connect securely through SSL/TLS, Kerberos, or both.

Use the Kafka Consumer origin to process data from a Kafka cluster in cluster streaming mode.

MapR cluster
Data Collector can process data from a MapR cluster in both execution modes:
  • Cluster batch mode - In cluster batch mode, Data Collector processes all available data and then stops the pipeline. Data Collector runs as an application on top of MapReduce, an open-source cluster-computing framework. MapReduce runs on a YARN cluster manager. YARN and MapReduce generate additional worker nodes as needed. MapReduce creates one map task for each MapR FS block.

    Use the MapR FS origin to process data from MapR in cluster batch mode.

  • Cluster streaming mode - In cluster streaming mode, Data Collector processes data continuously until you stop the pipeline. Data Collector runs as an application within Spark Streaming, an open source cluster-computing application.

    Spark Streaming runs on a YARN cluster manager to process data from a MapR cluster. The cluster manager and Spark Streaming spawn a Data Collector worker for each topic partition in the MapR cluster. As a result, each partition has a Data Collector worker to process data. If you add a partition to the MapR topic, you must restart the pipeline to enable Data Collector to generate a new worker to read from the new partition. You can limit the number of workers spawned by configuring the Worker Count cluster pipeline property.

    Use the MapR Streams Consumer origin to process data from a MapR cluster in cluster streaming mode.

HDFS
Data Collector can process data from HDFS in cluster batch mode. In cluster batch mode, Data Collector processes all available data and then stops the pipeline.
Data Collector runs as an application on top of MapReduce, an open-source cluster-computing framework. MapReduce runs on a YARN cluster manager. YARN and MapReduce generate additional worker nodes as needed. MapReduce creates one map task for each HDFS block.

Use the Hadoop FS origin to process data from HDFS in cluster batch mode.

Amazon S3
Data Collector can process data from Amazon S3 in the following cluster batch modes:
  • Cluster EMR batch mode - In cluster EMR batch mode, Data Collector runs on an Amazon EMR cluster to process Amazon S3 data. Data Collector can run on an existing EMR cluster or on a new EMR cluster that is provisioned when the pipeline starts. When you provision a new EMR cluster, you can configure whether the cluster remains active or terminates when the pipeline stops.
  • Cluster batch mode - In cluster batch mode, Data Collector runs on a Cloudera distribution of Hadoop (CDH) or Hortonworks Data Platform (HDP) cluster to process Amazon S3 data.

In either mode, Data Collector processes all available data and then stops the pipeline.

Data Collector runs as an application on top of MapReduce in the EMR, CDH, or HDP cluster. MapReduce runs on a YARN cluster manager. MapReduce creates one map task for each HDFS block.

Use the Hadoop FS origin to process data from Amazon S3 in cluster EMR or cluster batch mode.

Data Collector Configuration

When running cluster pipelines, the Data Collector configuration file, $SDC_CONF/sdc.properties, defined on the gateway node is propagated to the worker nodes with the exception of the following properties:
  • sdc.base.http.url
  • http.bindHost

If you modify the sdc.base.http.url and http.bindHost properties on the gateway node to configure a specific host name or port number or to configure a specific IP address that Data Collector binds to, the modified values are not propagated to the worker nodes. The worker nodes always use the default values for the sdc.base.http.url and http.bindHost properties so that the worker nodes can dynamically determine the host name and can bind to any IP address.

To prevent additional configuration properties from being propagated to the worker nodes, add the following property to the sdc.properties file on the gateway node:
cluster.slave.configs.remove=<property1>,<property2>

For more information on configuring the Data Collector configuration file, see Data Collector Configuration.

Enable HTTPS

You can enable Data Collector to use HTTPS when you run cluster pipelines. By default Data Collector uses HTTP.

To configure HTTPS for cluster pipelines, you first must configure Data Collector to use HTTPS. Then you generate an SSL/TLS certificate for each worker node in the cluster. Data Collector runs on the master gateway node in the cluster, so the gateway node uses the same keystore file configured for Data Collector.

You then specify the generated keystore file and keystore password file for the worker nodes in the Data Collector configuration file, $SDC_CONF/sdc.properties. You can optionally generate a truststore file for the gateway and worker nodes.

For more information, see Enabling HTTPS.

Temporary Directory

Data Collector requires that the Java temporary directory on the gateway node in the cluster is writable.

The Java temporary directory is specified by the Java system property java.io.tmpdir. On UNIX, the default value of this property is typically /tmp and is writable.

Before running cluster pipelines, verify that the Java temporary directory on the gateway node is writable.

Logs

Because cluster pipelines run as either MapReduce or Spark applications, each Data Collector worker in the cluster manages its own log. 

The Data Collector workers send log messages to different locations based on the cluster execution mode:

Cluster batch mode pipelines
For cluster batch mode pipelines, each Data Collector worker sends log messages to the syslog file on the worker node. You can use the YARN Resource Manager UI to view the syslog file for each MapReduce task.
Cluster streaming mode pipelines
For cluster streaming mode pipelines, each Data Collector worker sends log messages to stderr on the worker node. You can use the Spark UI to view stderr for each Spark application.

Cluster pipeline logs can grow in size over time, particularly for cluster streaming pipelines that run continuously. You can optionally configure the Data Collector installed on the gateway node to use the log4j rolling file appender to write log messages to an sdc.log file. This configuration is propagated to the worker nodes such that each Data Collector worker writes log messages to an sdc.log file within the YARN application directory.

The log4j rolling file appender automatically rolls or archives the current log file and then resumes logging in another file. The $SDC_CONF/sdc-log4j.properties file configured for the Data Collector installed on the gateway node determines how frequently the rolling file appender rolls files. By default, it writes log messages to a maximum of 10 files, rolling over to the next file when the current file reaches a size of 256 MB.

When you configure Data Collector to use the rolling file appender, you can view the log files for each worker node by using the YARN Resource Manager UI to locate the sdc.log file within the YARN application directory.

To enable Data Collector to use the rolling file appender, add the following line to the Data Collector configuration file, $SDC_CONF/sdc.properties, defined on the gateway node:
cluster.pipelines.logging.to.stderr=false

Checkpoint Storage for Streaming Pipelines

When the Data Collector runs a cluster streaming pipeline, on either Mesos or YARN, the Data Collector generates and stores checkpoint metadata. The checkpoint metadata provides the offset for the origin.

The Data Collector stores the checkpoint metadata in the following path on HDFS or Amazon S3:
/user/$USER/.streamsets-spark-streaming/<DataCollector ID>/<Kafka topic>/<consumer group>/<pipelineName>

When you run a cluster streaming pipeline on YARN, the Data Collector stores the metadata on HDFS.

When you run a cluster pipeline on Mesos, the Data Collector can store the metadata on HDFS or Amazon S3.

Configuring the Location for Mesos

When you run a cluster pipeline on Mesos, the Data Collector can write checkpoint information to either HDFS or Amazon S3.

To define the location for checkpoint storage:

  1. Configure the core-site.xml and hdfs-site.xml files to define where to write the checkpoint information.
    For more information about configuring the files, see https://wiki.apache.org/hadoop/AmazonS3.
  2. Store the files within the Data Collector resources directory.
  3. Enter the location of the files in the Cluster > Checkpoint Configuration Directory pipeline property.

Error Handling Limitations

Please note the following limitations to pipeline configuration options at this time:
  • Memory Limit Exceeded - Use either the Log option or the Log and Alert option. The Log, Alert, and Stop Pipeline option is not supported at this time.
  • Error Records - Write error records to Kafka or discard the records. Stopping the pipeline or writing records to file is not supported at this time.

Monitoring and Snapshot

The Data Collector UI allows you to monitor each Data Collector worker.

After you start a pipeline, the Data Collector UI displays basic monitoring information for the pipeline and links to each Data Collector worker. For monitoring details for a Data Collector worker, click the worker link. You can then view metrics and alerts for the worker.

Metric and data alerts are defined for the pipeline, but triggered by individual workers. When you define a metric or data alert, each worker inherits the alert and triggers the alert based on the statistics for the worker.
Note: You cannot take snapshots when monitoring cluster pipelines.