Kafka Cluster Requirements

Cluster mode pipelines that read from a Kafka cluster have the following requirements:
Component Requirement
Spark Streaming for cluster streaming modes Spark version 2.1 or later
Apache Kafka Spark Streaming on YARN requires a Cloudera or Hortonworks distribution of an Apache Kafka cluster version 0.10.0.0 or later.

Spark Streaming on Mesos requires Apache Kafka on Apache Mesos.

Note: By default, a Cloudera CDH cluster sets the Kafka-Spark integration version as 0.9. However, Data Collector cluster streaming pipelines require version 0.10 of the Kafka-Spark integration. As a result, the SPARK_KAFKA_VERSION environment variable is set to 0.10 by default in the Data Collector environment configuration file - sdc.env.sh or sdcd.env.sh. Do not change this environment variable value.

Kafka Consumer Maximum Batch Size

When using a Kafka Consumer origin in cluster mode, the Max Batch Size property is ignored. Instead, the effective batch size is <Batch Wait Time> x <Rate Limit Per Partition>.

For example, if Batch Wait Time is 60 seconds and Rate Limit Per Partition is 1000 messages/second, then the effective batch size from the Spark Streaming perspective is 60 x 1000 = 60000 messages/second. In this example, there is only one partition so only one cluster pipeline is spawned and the batch size for that pipeline is 60000.

If there are two partitions, then the effective batch size from the Spark Streaming perspective is 60 x 1000 x 2 = 120000 messages/second. By default, two cluster pipelines are created. If the number of messages in each partition are equal, then each pipeline receives 60000 messages in one batch. If, however, all 120000 messages are in a single partition, then the cluster pipeline processing that partition receives all 120000 messages.

To reduce the maximum batch size, either reduce the wait time or reduce the rate limit per partition. Similarly, to increase the maximum batch size, either increase the wait time or increase the rate limit per partition.

Configuring Cluster YARN Streaming for Kafka

Complete the following steps to configure a cluster pipeline to read from a Kafka cluster on YARN:

  1. Verify the installation of Kafka, Spark Streaming, and YARN as the cluster manager.
  2. Install the Data Collector on a Spark and YARN gateway node.
  3. To enable checkpoint metadata storage, grant the user defined in the user environment variable write permission on /user/$SDC_USER.
    The user environment variable defines the system user used to run Data Collector as a service. The file that defines the user environment variable depends on your operating system. For more information, see User and Group for Service Start.
    For example, say the user environment variable is defined as sdc and the cluster does not use Kerberos. Then you might use the following commands to create the directory and configure the necessary write permissions:
    $sudo -u hdfs hadoop fs -mkdir /user/sdc
    $sudo -u hdfs hadoop fs -chown sdc /user/sdc
  4. If necessary, specify the location of the spark-submit script that points to Spark version 2.1 or later.
    Data Collector assumes that the spark-submit script used to submit job requests to Spark Streaming is located in the following directory:
    /usr/bin/spark-submit
    If the script is not in this directory, use the SPARK_SUBMIT_YARN_COMMAND environment variable to define the location of the script.
    The location of the script may differ depending on the Spark version and distribution that you use.
    For example, when using CDH Spark 2.1, the spark-submit script is in the following directory by default: /usr/bin/spark2-submit. Then, you might use the following command to define the location of the script:
    export SPARK_SUBMIT_YARN_COMMAND=/usr/bin/spark2-submit
    Or, if using Hortonworks Data Platform (HDP) 2.6 which includes Spark 2.2.0, the spark-submit script is in the following directory by default: /usr/hdp/2.6/spark2/bin/spark-submit. Then, you might use the following command to define the location of the script:
    export SPARK_SUBMIT_YARN_COMMAND=/usr/hdp/2.6/spark2/bin/spark-submit
    Note: If you change the location of the spark-submit script, you must restart Data Collector to capture the change.
  5. To enable Data Collector to submit YARN jobs, perform one of the following tasks:
    • On YARN, set the min.user.id to a value equal to or lower than the user ID associated with the Data Collector user ID, typically named "sdc".
    • On YARN, add the Data Collector user name, typically "sdc", to the allowed.system.users property.
  6. On YARN, verify that the Spark logging level is set to a severity of INFO or lower.
    YARN sets the Spark logging level to INFO by default. To change the logging level:
    1. Edit the log4j.properties file, located in the following directory:
      <spark-home>/conf/log4j.properties
    2. Set the log4j.rootCategory property to a severity of INFO or lower, such as DEBUG or TRACE.
  7. If YARN is configured to use Kerberos authentication, configure Data Collector to use Kerberos authentication.
    When you configure Kerberos authentication for Data Collector, you enable Data Collector to use Kerberos and define the principal and keytab.
    Important: For cluster pipelines, enter an absolute path to the keytab when configuring Data Collector. Standalone pipelines do not require an absolute path.
    Once enabled, Data Collector automatically uses the Kerberos principal and keytab to connect to any YARN cluster that uses Kerberos. For more information about enabling Kerberos authentication for Data Collector, see Kerberos Authentication.
  8. In the pipeline properties, on the General tab, set the Execution Mode property to Cluster YARN Streaming.
  9. On the Cluster tab, configure the following properties:
    Cluster Property Description
    Worker Count Number of workers used in a Cluster Yarn Streaming pipeline. Use to limit the number of workers spawned for processing. By default, one worker is spawned for every partition in the topic.

    Default is 0 for one worker for each partition.

    Worker Java Options Additional Java properties for the pipeline. Separate properties with a space.

    The following properties are set by default.

    • XX:+UseConcMarkSweepGC and XX:+UseParNewGC are set to the Concurrent Mark Sweep (CMS) garbage collector.
    • Dlog4j.debug enables debug logging for log4j.

    Changing the default properties is not recommended.

    You can add any valid Java property.

    Launcher Env Configuration

    Additional configuration properties for the cluster launcher. Using simple or bulk edit mode, click the Add icon and define the property name and value.

    Worker Memory (MB) Maximum amount of memory allocated to each Data Collector worker in the cluster.

    Default is 1024 MB.

    Extra Spark Configuration For Cluster Yarn Streaming pipelines, you can configure additional Spark configurations to pass to the spark-submit script. Enter the Spark configuration name and the value to use.
    The specified configurations are passed to the spark-submit script as follows:
    spark-submit --conf <key>=<value>

    For example, to limit the off-heap memory allocated to each executor, you can use the spark.yarn.executor.memoryOverhead configuration and set it to the number of MB that you want to use.

    Data Collector does not validate the property names or values.

    For details on additional Spark configurations that you can use, see the Spark documentation for the Spark version that you are using.

  10. In the pipeline, use a Kafka Consumer origin.
    If necessary, select a cluster mode stage library on the General tab of the origin.
    Note: Batch Wait Time is ignored for the Kafka Consumer origin in cluster mode. For more information, see Kafka Consumer Maximum Batch Size.
  11. If the Kafka cluster is configured to use SSL/TLS, Kerberos, or both, configure the Kafka Consumer origin to securely connect to the cluster, as described in Enabling Security for Cluster YARN Streaming.

Enabling Security for Cluster YARN Streaming

When using a cluster pipeline to read from a Kafka cluster on YARN, you can configure the Kafka Consumer origin to connect securely through SSL/TLS, Kerberos, or both.

Enabling SSL/TLS

Perform the following steps to enable the Kafka Consumer origin in a cluster streaming pipeline on YARN to use SSL/TLS to connect to Kafka.

  1. To use SSL/TLS to connect, first make sure Kafka is configured for SSL/TLS as described in the Kafka documentation.
  2. On the General tab of the Kafka Consumer origin in the cluster pipeline, set the Stage Library property to Apache Kafka 0.10.0.0 or a later version.
  3. On the Kafka tab, add the security.protocol Kafka configuration property and set it to SSL.
  4. Then add and configure the following SSL Kafka properties:
    • ssl.truststore.location
    • ssl.truststore.password
    When the Kafka broker requires client authentication - when the ssl.client.auth broker property is set to "required" - add and configure the following properties:
    • ssl.keystore.location
    • ssl.keystore.password
    • ssl.key.password
    Some brokers might require adding the following properties as well:
    • ssl.enabled.protocols
    • ssl.truststore.type
    • ssl.keystore.type

    For details about these properties, see the Kafka documentation.

  5. Store the SSL truststore and keystore files in the same location on the Data Collector machine and on each node in the YARN cluster.

For example, the following properties allow the stage to use SSL/TLS to connect to Kafka with client authentication:

Enabling Kerberos (SASL)

When you use Kerberos authentication, Data Collector uses the Kerberos principal and keytab to connect to Kafka.

Perform the following steps to enable the Kafka Consumer origin in a cluster streaming pipeline on YARN to use Kerberos to connect to Kafka:

  1. To use Kerberos, first make sure Kafka is configured for Kerberos as described in the Kafka documentation.
  2. Make sure that Kerberos authentication is enabled for Data Collector, as described in Kerberos Authentication.
  3. Add the Java Authentication and Authorization Service (JAAS) configuration properties required for Kafka clients based on your installation and authentication type:
    • RPM, tarball, or Cloudera Manager installation without LDAP authentication - If Data Collector does not use LDAP authentication, create a separate JAAS configuration file on the Data Collector machine. Add the following KafkaClient login section to the file:
      KafkaClient {
          com.sun.security.auth.module.Krb5LoginModule required
          useKeyTab=true
          keyTab="<keytab path>"
          principal="<principal name>/<host name>@<realm>";
      };
      For example:
      KafkaClient {
          com.sun.security.auth.module.Krb5LoginModule required
          useKeyTab=true
          keyTab="/etc/security/keytabs/sdc.keytab"
          principal="sdc/sdc-01.streamsets.net@EXAMPLE.COM";
      };
      Then modify the SDC_JAVA_OPTS environment variable to include the following option that defines the path to the JAAS configuration file:
      -Djava.security.auth.login.config=<JAAS config path>

      Modify environment variables using the method required by your installation type.

    • RPM or tarball installation with LDAP authentication - If LDAP authentication is enabled in an RPM or tarball installation, add the properties to the JAAS configuration file used by Data Collector - the $SDC_CONF/ldap-login.conf file. Add the following KafkaClient login section to the end of the ldap-login.conf file:
      KafkaClient {
          com.sun.security.auth.module.Krb5LoginModule required
          useKeyTab=true
          keyTab="<keytab path>"
          principal="<principal name>/<host name>@<realm>";
      };
      For example:
      KafkaClient {
          com.sun.security.auth.module.Krb5LoginModule required
          useKeyTab=true
          keyTab="/etc/security/keytabs/sdc.keytab"
          principal="sdc/sdc-01.streamsets.net@EXAMPLE.COM";
      };
    • Cloudera Manager installation with LDAP authentication - If LDAP authentication is enabled in a Cloudera Manager installation, enable the LDAP Config File Substitutions (ldap.login.file.allow.substitutions) property for the StreamSets service in Cloudera Manager.

      If the Use Safety Valve to Edit LDAP Information (use.ldap.login.file) property is enabled and LDAP authentication is configured in the Data Collector Advanced Configuration Snippet (Safety Valve) for ldap-login.conf field, then add the JAAS configuration properties to the same ldap-login.conf safety valve.

      If LDAP authentication is configured through the LDAP properties rather than the ldap-login.conf safety value, add the JAAS configuration properties to the Data Collector Advanced Configuration Snippet (Safety Valve) for generated-ldap-login-append.conf field.

      Add the following KafkaClient login section to the appropriate field as follows:

      KafkaClient {
          com.sun.security.auth.module.Krb5LoginModule required
          useKeyTab=true
          keyTab="_KEYTAB_PATH"
          principal="<principal name>/_HOST@<realm>";
      };
      For example:
      KafkaClient {
          com.sun.security.auth.module.Krb5LoginModule required
          useKeyTab=true
          keyTab="_KEYTAB_PATH"
          principal="sdc/_HOST@EXAMPLE.COM";
      };

      Cloudera Manager generates the appropriate keytab path and host name.

  4. Store the JAAS configuration and Kafka keytab files in the same locations on the Data Collector machine and on each node in the YARN cluster.
  5. On the General tab of the Kafka Consumer origin in the cluster pipeline, set the Stage Library property to Apache Kafka 0.10.0.0 or a later version.
  6. On the Kafka tab, add the security.protocol Kafka configuration property, and set it to SASL_PLAINTEXT.
  7. Then, add the sasl.kerberos.service.name configuration property, and set it to kafka.

For example, the following Kafka properties enable connecting to Kafka with Kerberos:

Enabling SSL/TLS and Kerberos

You can enable the Kafka Consumer origin in a cluster streaming pipeline on YARN to use SSL/TLS and Kerberos to connect to Kafka.

To use SSL/TLS and Kerberos, combine the required steps to enable each and set the security.protocol property as follows:

  1. Make sure Kafka is configured to use SSL/TLS and Kerberos (SASL) as described in the following Kafka documentation:
  2. Make sure that Kerberos authentication is enabled for Data Collector, as described in Kerberos Authentication.
  3. Add the Java Authentication and Authorization Service (JAAS) configuration properties required for Kafka clients based on your installation and authentication type:
    • RPM, tarball, or Cloudera Manager installation without LDAP authentication - If Data Collector does not use LDAP authentication, create a separate JAAS configuration file on the Data Collector machine. Add the following KafkaClient login section to the file:
      KafkaClient {
          com.sun.security.auth.module.Krb5LoginModule required
          useKeyTab=true
          keyTab="<keytab path>"
          principal="<principal name>/<host name>@<realm>";
      };
      For example:
      KafkaClient {
          com.sun.security.auth.module.Krb5LoginModule required
          useKeyTab=true
          keyTab="/etc/security/keytabs/sdc.keytab"
          principal="sdc/sdc-01.streamsets.net@EXAMPLE.COM";
      };
      Then modify the SDC_JAVA_OPTS environment variable to include the following option that defines the path to the JAAS configuration file:
      -Djava.security.auth.login.config=<JAAS config path>

      Modify environment variables using the method required by your installation type.

    • RPM or tarball installation with LDAP authentication - If LDAP authentication is enabled in an RPM or tarball installation, add the properties to the JAAS configuration file used by Data Collector - the $SDC_CONF/ldap-login.conf file. Add the following KafkaClient login section to the end of the ldap-login.conf file:
      KafkaClient {
          com.sun.security.auth.module.Krb5LoginModule required
          useKeyTab=true
          keyTab="<keytab path>"
          principal="<principal name>/<host name>@<realm>";
      };
      For example:
      KafkaClient {
          com.sun.security.auth.module.Krb5LoginModule required
          useKeyTab=true
          keyTab="/etc/security/keytabs/sdc.keytab"
          principal="sdc/sdc-01.streamsets.net@EXAMPLE.COM";
      };
    • Cloudera Manager installation with LDAP authentication - If LDAP authentication is enabled in a Cloudera Manager installation, enable the LDAP Config File Substitutions (ldap.login.file.allow.substitutions) property for the StreamSets service in Cloudera Manager.

      If the Use Safety Valve to Edit LDAP Information (use.ldap.login.file) property is enabled and LDAP authentication is configured in the Data Collector Advanced Configuration Snippet (Safety Valve) for ldap-login.conf field, then add the JAAS configuration properties to the same ldap-login.conf safety valve.

      If LDAP authentication is configured through the LDAP properties rather than the ldap-login.conf safety value, add the JAAS configuration properties to the Data Collector Advanced Configuration Snippet (Safety Valve) for generated-ldap-login-append.conf field.

      Add the following KafkaClient login section to the appropriate field as follows:

      KafkaClient {
          com.sun.security.auth.module.Krb5LoginModule required
          useKeyTab=true
          keyTab="_KEYTAB_PATH"
          principal="<principal name>/_HOST@<realm>";
      };
      For example:
      KafkaClient {
          com.sun.security.auth.module.Krb5LoginModule required
          useKeyTab=true
          keyTab="_KEYTAB_PATH"
          principal="sdc/_HOST@EXAMPLE.COM";
      };

      Cloudera Manager generates the appropriate keytab path and host name.

  4. Store the JAAS configuration and Kafka keytab files in the same locations on the Data Collector machine and on each node in the YARN cluster.
  5. On the General tab of the Kafka Consumer origin in the cluster pipeline, set the Stage Library property to Apache Kafka 0.10.0.0 or a later version.
  6. On the Kafka tab, add the security.protocol property and set it to SASL_SSL.
  7. Then, add the sasl.kerberos.service.name configuration property, and set it to kafka.
  8. Then add and configure the following SSL Kafka properties:
    • ssl.truststore.location
    • ssl.truststore.password
    When the Kafka broker requires client authentication - when the ssl.client.auth broker property is set to "required" - add and configure the following properties:
    • ssl.keystore.location
    • ssl.keystore.password
    • ssl.key.password
    Some brokers might require adding the following properties as well:
    • ssl.enabled.protocols
    • ssl.truststore.type
    • ssl.keystore.type

    For details about these properties, see the Kafka documentation.

  9. Store the SSL truststore and keystore files in the same location on the Data Collector machine and on each node in the YARN cluster.

Configuring Cluster Mesos Streaming for Kafka

Complete the following steps to configure a cluster pipeline to read from a Kafka cluster on Mesos:

  1. Verify the installation of Kafka, Spark Streaming, and Mesos as the cluster manager.
  2. Install the Data Collector on a Spark and Mesos gateway node.
  3. To enable checkpoint metadata storage, grant the user defined in the user environment variable write permission on /user/$SDC_USER.
    The user environment variable defines the system user used to run Data Collector as a service. The file that defines the user environment variable depends on your operating system. For more information, see User and Group for Service Start.
    For example, say $SDC_USER is defined as sdc. Then you might use the following commands to create the directory and configure the necessary write permissions:
    $sudo -u hdfs hadoop fs -mkdir /user/sdc
    $sudo -u hdfs hadoop fs -chown sdc /user/sdc
  4. If necessary, specify the location of the spark-submit script that points to Spark version 2.1 or later.
    Data Collector assumes that the spark-submit script used to submit job requests to Spark Streaming is located in the following directory:
    /usr/bin/spark-submit
    If the script is not in this directory, use the SPARK_SUBMIT_MESOS_COMMAND environment variable to define the location of the script.
    The location of the script may differ depending on the Spark version and distribution that you use.
    For example, when using CDH Spark 2.1, the spark-submit script is in the following directory by default: /usr/bin/spark2-submit. Then, you might use the following command to define the location of the script:
    export SPARK_SUBMIT_MESOS_COMMAND=/usr/bin/spark2-submit
    Note: If you change the location of the spark-submit script, you must restart Data Collector to capture the change.
  5. In the pipeline properties, on the General tab, set the Execution Mode property to Cluster Mesos Streaming.
  6. On the Cluster tab, configure the following properties:
    Cluster Property Description
    Mesos Dispatcher URL Master URL of the Mesos dispatcher. For example:
    mesos://dispatcher:7077
    Checkpoint Configuration Directory Location of the HDFS configuration files that specify whether to write checkpoint metadata to HDFS or Amazon S3.

    Use a directory or symlink within the Data Collector resources directory.

    The directory should include the following files:
    • core-site.xml
    • hdfs-site.xml
  7. In the pipeline, use a Kafka Consumer origin for cluster mode.
    If necessary, select a cluster mode stage library on the General tab of the origin.
    Note: Batch Wait Time is ignored for the Kafka Consumer origin in cluster mode. For more information, see Kafka Consumer Maximum Batch Size.