Cluster Pipelines

Cluster Pipeline Overview

A cluster pipeline is a pipeline that runs in cluster execution mode. You can run a pipeline in standalone execution mode or cluster execution mode.

In standalone mode, a single Data Collector process runs the pipeline. A pipeline runs in standalone mode by default.

In cluster mode, the Data Collector uses a cluster manager and a cluster application to spawn additional workers as needed. Use cluster mode to read data from a Kafka cluster, MapR cluster, or HDFS.

When would you choose standalone or cluster mode? Say you want to ingest logs from application servers and perform a computationally expensive transformation. To do this, you might use a set of standalone pipelines to stream log data from each application server to a Kafka or MapR cluster. And then use a cluster pipeline to process the data from the cluster and perform the expensive transformation.

Or, you might use cluster mode to move data from HDFS to another destination, such as Elasticsearch.

Cluster Batch and Streaming Execution Modes

Data Collector can run a cluster pipeline using the cluster batch or the cluster streaming execution mode.

The execution mode that Data Collector can use depends on the origin system that the cluster pipeline reads from:

Kafka cluster
Data Collector can process data from a Kafka cluster in cluster streaming mode. In cluster streaming mode, Data Collector processes data continuously until you stop the pipeline.
Data Collector runs as an application within Spark Streaming, an open source cluster-computing application.
Spark Streaming runs on either the Mesos or YARN cluster manager to process data from a Kafka cluster. The cluster manager and Spark Streaming spawn a Data Collector worker for each topic partition in the Kafka cluster. So each partition has a Data Collector worker to process data. When Spark Streaming runs on YARN, you can limit the number of workers spawned by configuring the Worker Count cluster pipeline property. And you can use the Extra Spark Configuration property to pass Spark configurations to the spark-submit script.

Use the Kafka Consumer origin to process data from a Kafka cluster in cluster streaming mode.

MapR cluster
Data Collector can process data from a MapR cluster in both execution modes:
  • Cluster batch mode - In cluster batch mode, Data Collector processes all available data and then stops the pipeline. Data Collector runs as an application on top of MapReduce, an open-source cluster-computing framework. MapReduce runs on a YARN cluster manager. YARN and MapReduce generate additional worker nodes as needed. MapReduce creates one map task for each MapR FS block.

    Use the MapR FS origin to process data from MapR in cluster batch mode.

  • Cluster streaming mode - In cluster streaming mode, Data Collector processes data continuously until you stop the pipeline. Data Collector runs as an application within Spark Streaming, an open source cluster-computing application.

    Spark Streaming runs on a YARN cluster manager to process data from a MapR cluster. The cluster manager and Spark Streaming spawn a Data Collector worker for each topic partition in the MapR cluster. So each partition has a Data Collector worker to process data. You can limit the number of workers spawned by configuring the Worker Count cluster pipeline property.

    Use the MapR Streams Consumer origin to process data from a MapR cluster in cluster streaming mode.

HDFS
Data Collector can process data from HDFS in cluster batch mode. In cluster batch mode, Data Collector processes all available data and then stops the pipeline.
Data Collector runs as an application on top of MapReduce, an open-source cluster-computing framework. MapReduce runs on a YARN cluster manager. YARN and MapReduce generate additional worker nodes as needed. MapReduce creates one map task for each HDFS block.

Use the Hadoop FS origin to process data from HDFS in cluster batch mode.

HTTP Protocols

You can configure Data Collector to use HTTP or HTTPS when you run cluster pipelines. By default Data Collector uses HTTP.

To configure HTTPS when you run cluster pipelines, you must generate an SSL/TLS certificate for the gateway node and the worker nodes. You then specify the generated keystore file and keystore password file for the gateway and worker nodes in the Data Collector configuration file, sdc.properties. You can optionally generate a truststore file for the gateway and worker nodes.

For more information, see Configuring HTTPS for Cluster Pipelines.

Checkpoint Storage for Streaming Pipelines

When the Data Collector runs a cluster streaming pipeline, on either Mesos or YARN, the Data Collector generates and stores checkpoint metadata. The checkpoint metadata provides the offset for the origin.

The Data Collector stores the checkpoint metadata in the following path on HDFS or Amazon S3:
/user/$USER/.streamsets-spark-streaming/<DataCollector ID>/<Kafka topic>/<consumer group>/<pipelineName>

When you run a cluster streaming pipeline on YARN, the Data Collector stores the metadata on HDFS.

When you run a cluster pipeline on Mesos, the Data Collector can store the metadata on HDFS or Amazon S3.

Configuring the Location for Mesos

When you run a cluster pipeline on Mesos, the Data Collector can write checkpoint information to either HDFS or Amazon S3.

To define the location for checkpoint storage:

  1. Configure the core-site.xml and hdfs-site.xml files to define where to write the checkpoint information.
    For more information about configuring the files, see https://wiki.apache.org/hadoop/AmazonS3.
  2. Store the files within the Data Collector resources directory.
  3. Enter the location of the files in the Cluster > Checkpoint Configuration Directory pipeline property.

Error Handling Limitations

Please note the following limitations to pipeline configuration options at this time:
  • Memory Limit Exceeded - Use either the Log option or the Log and Alert option. The Log, Alert, and Stop Pipeline option is not supported at this time.
  • Error Records - Write error records to Kafka or discard the records. Stopping the pipeline or writing records to file is not supported at this time.

Monitoring and Snapshot

The Data Collector UI allows you to monitor each Data Collector worker.

After you start a pipeline, the Data Collector UI displays basic monitoring information for the pipeline and links to each Data Collector worker. For monitoring details for a Data Collector worker, click the worker link. You can then view metrics and alerts for the worker.

Metric and data alerts are defined for the pipeline, but triggered by individual workers. When you define a metric or data alert, each worker inherits the alert and triggers the alert based on the statistics for the worker.
Note: You cannot take snapshots when monitoring cluster pipelines.

Kafka Cluster Requirements

Cluster mode pipelines that read from a Kafka cluster have the following requirements:
Component Requirement
Spark Streaming for cluster streaming modes Spark versions 1.3 through 1.6
Apache Kafka Spark Streaming on YARN requires a Cloudera or Hortonworks distribution of an Apache Kafka cluster.

Spark Streaming on Mesos requires Apache Kafka on Apache Mesos.

Note: When you add a partition to the Kafka topic, restart the pipeline to enable the Data Collector to generate a new worker to read from the new partition.

Configuring Cluster YARN Streaming for Kafka

Complete the following steps to configure a cluster pipeline to read from a Kafka cluster on YARN:

  1. Verify the installation of Kafka, Spark Streaming, and YARN as the cluster manager.
  2. Install the Data Collector on a Spark and YARN gateway node.
  3. To enable checkpoint metadata storage, grant the user defined in the user environment variable write permission on /user/$SDC_USER.
    The user environment variable defines the system user used to run Data Collector as a service. The file that defines the user environment variable depends on your operating system. For more information, see User and Group for Service Start.
    For example, say the user environment variable is defined as sdc and the cluster does not use Kerberos. Then you might use the following commands to create the directory and configure the necessary write permissions:
    $sudo -u hdfs hadoop fs -mkdir /user/sdc
    $sudo -u hdfs hadoop fs -chown sdc /user/sdc
  4. If necessary, specify the location of the spark-submit script.
    Data Collector assumes that the spark-submit script used to submit job requests to Spark Streaming is located in the following directory:
    /usr/bin/spark-submit
    If the script is not in this directory, use the SPARK_SUBMIT_YARN_COMMAND environment variable to define the location of the script.
    The location of the script may differ depending on the Spark version and distribution that you use.
    For example, when using CDH Spark 2.1, the spark-submit script is in the following directory by default: /usr/bin/spark2-submit. Then, you might use the following command to define the location of the script:
    export SPARK_SUBMIT_YARN_COMMAND=/usr/bin/spark2-submit
    Note: If you change the location of the spark-submit script, you must restart Data Collector to capture the change.
  5. To enable Data Collector to submit YARN jobs, perform one of the following tasks:
    • On YARN, set the min.user.id to a value equal to or lower than the user ID associated with the Data Collector user ID, typically named "sdc".
    • On YARN, add the Data Collector user name, typically "sdc", to the allowed.system.users property.
  6. On YARN, verify that the Spark logging level is set to a severity of INFO or lower.
    YARN sets the Spark logging level to INFO by default. To change the logging level:
    1. Edit the log4j.properties file, located in the following directory:
      <spark-home>/conf/log4j.properties
    2. Set the log4j.rootCategory property to a severity of INFO or lower, such as DEBUG or TRACE.
  7. If YARN is configured to use Kerberos authentication, configure Data Collector to use Kerberos authentication.
    When you configure Kerberos authentication for Data Collector, you enable Data Collector to use Kerberos and define the principal and keytab.
    Important: For cluster pipelines, enter an absolute path to the keytab when configuring Data Collector. Standalone pipelines do not require an absolute path.
    Once enabled, Data Collector automatically uses the Kerberos principal and keytab to connect to any YARN cluster that uses Kerberos. For more information about enabling Kerberos authentication for Data Collector, see Kerberos Authentication.
  8. In the pipeline properties, on the General tab, set the Execution Mode property to Cluster YARN Streaming.
  9. On the Cluster tab, enter the required properties for YARN.
  10. In the pipeline, use a Kafka Consumer origin.
    If necessary, select a cluster mode stage library on the General tab of the origin.

Configuring Cluster Mesos Streaming for Kafka

Complete the following steps to configure a cluster pipeline to read from a Kafka cluster on Mesos:

  1. Verify the installation of Kafka, Spark Streaming, and Mesos as the cluster manager.
  2. Install the Data Collector on a Spark and Mesos gateway node.
  3. To enable checkpoint metadata storage, grant the user defined in the user environment variable write permission on /user/$SDC_USER.
    The user environment variable defines the system user used to run Data Collector as a service. The file that defines the user environment variable depends on your operating system. For more information, see User and Group for Service Start.
    For example, say $SDC_USER is defined as sdc. Then you might use the following commands to create the directory and configure the necessary write permissions:
    $sudo -u hdfs hadoop fs -mkdir /user/sdc
    $sudo -u hdfs hadoop fs -chown sdc /user/sdc
  4. If necessary, specify the location of the spark-submit script.
    Data Collector assumes that the spark-submit script used to submit job requests to Spark Streaming is located in the following directory:
    /usr/bin/spark-submit
    If the script is not in this directory, use the SPARK_SUBMIT_MESOS_COMMAND environment variable to define the location of the script.
    The location of the script may differ depending on the Spark version and distribution that you use.
    For example, when using CDH Spark 2.1, the spark-submit script is in the following directory by default: /usr/bin/spark2-submit. Then, you might use the following command to define the location of the script:
    export SPARK_SUBMIT_MESOS_COMMAND=/usr/bin/spark2-submit
    Note: If you change the location of the spark-submit script, you must restart Data Collector to capture the change.
  5. In the pipeline properties, on the General tab, set the Execution Mode property to Cluster Mesos Streaming.
  6. On the Cluster tab, enter the required properties for Mesos.
  7. In the pipeline, use a Kafka Consumer origin for cluster mode.
    If necessary, select a cluster mode stage library on the General tab of the origin.

MapR Requirements

Cluster mode pipelines that read from a MapR cluster have the following requirements:
Component Requirement
Spark Streaming for cluster streaming mode Spark versions 1.3 through 1.6
MapR MapR version 5.1, 5.2, or 6.0
Note: When you add a partition to the MapR topic, restart the pipeline to enable Data Collector to generate a new worker to read from the new partition.

Configuring Cluster Batch Mode for MapR

Complete the following steps to configure a cluster pipeline to read from MapR in cluster batch mode.

  1. Verify the installation of MapR and YARN.
  2. Install the Data Collector on a YARN gateway node.
  3. Grant the user defined in the user environment variable write permission on /user/$SDC_USER.
    The user environment variable defines the system user used to run Data Collector as a service. The file that defines the user environment variable depends on your operating system. For more information, see User and Group for Service Start.
    For example, say the user environment variable is defined as sdc and the cluster does not use Kerberos. Then you might use the following commands to create the directory and configure the necessary write permissions:
    $sudo -u hdfs hadoop fs -mkdir /user/sdc
    $sudo -u hdfs hadoop fs -chown sdc /user/sdc
  4. To enable Data Collector to submit YARN jobs, perform one of the following tasks:
    • On YARN, set the min.user.id to a value equal to or lower than the user ID associated with the Data Collector user ID, typically named "sdc".
    • On YARN, add the Data Collector user name, typically "sdc", to the allowed.system.users property.
    • After you create the pipeline, specify a Hadoop FS user in the MapR FS origin.

      For the Hadoop FS User property, enter a user with an ID that is higher than the min.user.id property, or with a user name that is listed in the allowed.system.users property.

  5. On YARN, verify that the Hadoop logging level is set to a severity of INFO or lower.
    YARN sets the Hadoop logging level to INFO by default. To change the logging level:
    1. Edit the log4j.properties file.
      By default, the file is located in the following directory:
      /opt/mapr/hadoop/<hadoop-version>/conf/
    2. Set the log4j.rootLogger property to a severity of INFO or lower, such as DEBUG or TRACE.
  6. If YARN is configured to use Kerberos authentication, configure Data Collector to use Kerberos authentication.
    When you configure Kerberos authentication for Data Collector, you enable Data Collector to use Kerberos and define the principal and keytab.
    Important: For cluster pipelines, enter an absolute path to the keytab when configuring Data Collector. Standalone pipelines do not require an absolute path.
    Once enabled, Data Collector automatically uses the Kerberos principal and keytab to connect to any YARN cluster that uses Kerberos. For more information about enabling Kerberos authentication for Data Collector, see Kerberos Authentication.
  7. In the pipeline properties, on the General tab, set the Execution Mode property to Cluster Batch.
  8. On the Cluster tab, enter the required properties for YARN.
  9. In the pipeline, use the MapR FS origin for cluster mode.
    If necessary, select a cluster mode stage library on the General tab of the origin.

Configuring Cluster Streaming Mode for MapR

Complete the following steps to configure a cluster pipeline to read from MapR in cluster streaming mode.

  1. Verify the installation of MapR, Spark Streaming, and YARN.
  2. Install the Data Collector on a Spark and YARN gateway node.
  3. To enable checkpoint metadata storage, grant the user defined in the user environment variable write permission on /user/$SDC_USER.
    The user environment variable defines the system user used to run Data Collector as a service. The file that defines the user environment variable depends on your operating system. For more information, see User and Group for Service Start.
    For example, say the user environment variable is defined as sdc and the cluster does not use Kerberos. Then you might use the following commands to create the directory and configure the necessary write permissions:
    $sudo -u hdfs hadoop fs -mkdir /user/sdc
    $sudo -u hdfs hadoop fs -chown sdc /user/sdc
  4. If necessary, specify the location of the spark-submit script.
    Data Collector assumes that the spark-submit script used to submit job requests to Spark Streaming is located in the following directory:
    /usr/bin/spark-submit
    If the script is not in this directory, use the SPARK_SUBMIT_YARN_COMMAND environment variable to define the location of the script.
    The location of the script may differ depending on the Spark version and distribution that you use.
    For example, say the spark-submit script is in the following directory: /opt/mapr/spark/spark-1.6/bin/spark-submit. Then, you might use the following command to define the location of the script:
    export SPARK_SUBMIT_YARN_COMMAND=/opt/mapr/spark/spark-1.6/bin/spark-submit
    Note: If you change the location of the spark-submit script, you must restart Data Collector to capture the change.
  5. To enable Data Collector to submit YARN jobs, perform one of the following tasks:
    • On YARN, set the min.user.id to a value equal to or lower than the user ID associated with the Data Collector user ID, typically named "sdc".
    • On YARN, add the Data Collector user name, typically "sdc", to the allowed.system.users property.
  6. If necessary, set the Spark logging level to a severity of INFO or lower.
    By default, MapR sets the Spark logging level to WARN. To change the logging level:
    1. Edit the log4j.properties file, located in the following directory:
      <spark-home>/conf/log4j.properties
    2. Set the log4j.rootCategory property to a severity of INFO or lower, such as DEBUG or TRACE.
    For example, when using Spark 1.6.1, you would edit /opt/mapr/spark/spark-1.6.1/conf/log4j.properties, and you might set the property as follows:
    log4j.rootCategory=INFO
  7. If YARN is configured to use Kerberos authentication, configure Data Collector to use Kerberos authentication.
    When you configure Kerberos authentication for Data Collector, you enable Data Collector to use Kerberos and define the principal and keytab.
    Important: For cluster pipelines, enter an absolute path to the keytab when configuring Data Collector. Standalone pipelines do not require an absolute path.
    Once enabled, Data Collector automatically uses the Kerberos principal and keytab to connect to any YARN cluster that uses Kerberos. For more information about enabling Kerberos authentication for Data Collector, see Kerberos Authentication.
  8. In the pipeline properties, on the General tab, set the Execution Mode property to Cluster YARN Streaming.
  9. On the Cluster tab, enter the required properties for YARN.
  10. In the pipeline, use the MapR Streams Consumer origin for cluster mode.
    If necessary, select a cluster mode stage library on the General tab of the origin.

HDFS Requirements

Cluster mode pipelines that read from HDFS require the Cloudera distribution of Hadoop (CDH) or Hortonworks Data Platform (HDP).

Complete the following steps to configure a cluster mode pipeline to read from HDFS:

  1. Verify the installation of HDFS and YARN.
  2. Install Data Collector on a YARN gateway node.
  3. Grant the user defined in the user environment variable write permission on /user/$SDC_USER.
    The user environment variable defines the system user used to run Data Collector as a service. The file that defines the user environment variable depends on your operating system. For more information, see User and Group for Service Start.
    For example, say the user environment variable is defined as sdc and the cluster does not use Kerberos. Then you might use the following commands to create the directory and configure the necessary write permissions:
    $sudo -u hdfs hadoop fs -mkdir /user/sdc
    $sudo -u hdfs hadoop fs -chown sdc /user/sdc
  4. To enable Data Collector to submit YARN jobs, perform one of the following tasks:
    • On YARN, set the min.user.id to a value equal to or lower than the user ID associated with the Data Collector user ID, typically named "sdc".
    • On YARN, add the Data Collector user name, typically "sdc", to the allowed.system.users property.
    • After you create the pipeline, specify a Hadoop FS user in the Hadoop FS origin.

      For the Hadoop FS User property, enter a user with an ID that is higher than the min.user.id property, or with a user name that is listed in the allowed.system.users property.

  5. On YARN, verify that the Hadoop logging level is set to a severity of INFO or lower.
    YARN sets the Hadoop logging level to INFO by default. To change the logging level:
    1. Edit the log4j.properties file.
      By default, the file is located in the following directory:
      /etc/hadoop/conf
    2. Set the log4j.rootLogger property to a severity of INFO or lower, such as DEBUG or TRACE.
  6. If YARN is configured to use Kerberos authentication, configure Data Collector to use Kerberos authentication.
    When you configure Kerberos authentication for Data Collector, you enable Data Collector to use Kerberos and define the principal and keytab.
    Important: For cluster pipelines, enter an absolute path to the keytab when configuring Data Collector. Standalone pipelines do not require an absolute path.
    Once enabled, Data Collector automatically uses the Kerberos principal and keytab to connect to any YARN cluster that uses Kerberos. For more information about enabling Kerberos authentication for Data Collector, see Kerberos Authentication.
  7. In the pipeline properties, on the General tab, set the Execution Mode property to Cluster Batch.
  8. On the Cluster tab, enter the required properties to read from HDFS.
  9. In the pipeline, use the Hadoop FS origin for cluster mode.
    On the General tab of the origin, select the appropriate CDH or HDP stage library for cluster mode.
  10. If YARN is configured to use Kerberos authentication, in the origin, enable the Kerberos Authentication property on the Hadoop FS tab.

Cluster Pipeline Limitations

Please note the following limitations in cluster pipelines:
  • Non-cluster origins - Do not use non-cluster origins in cluster pipelines. For a description of the origins to use, see Cluster Batch and Streaming Execution Modes.
  • Pipeline events - You cannot use pipeline events in cluster pipelines.
  • Record Deduplicator processor - This processor is not supported in cluster pipelines at this time.
  • RabbitMQ Producer destination - This destination is not supported in cluster pipelines at this time.
  • Scripting processors - The state object is available only for the instance of the processor stage it is defined in. If the pipeline executes in cluster mode, the state object is not shared across nodes.
  • Spark Evaluator processor - Use in cluster streaming pipelines only. Do not use in cluster batch pipelines. You can also use the Spark Evaluator in standalone pipelines.
  • Spark Evaluator processor and Spark executor - When using Spark stages, the stages must use the same Spark version as the cluster. For example, if the cluster uses Spark 2.1, the Spark Evaluator must use a Spark 2.1 stage library.

    Both stages are available in several CDH and MapR stage libraries. To verify the Spark version that a stage library includes, see the CDH or MapR documentation. For more information about the stage libraries that include the Spark Evaluator, see Available Stage Libraries.