Kafka

The Kafka destination writes data to a Kafka cluster.

The destination writes each record as a Kafka message to the specified topic. The Kafka cluster determines the number of partitions that the destination uses to write the data.

When you configure the Kafka destination, you define connection information, the Kafka topic to write to, and the data format to use.

For pipelines that run locally or that run on a Hadoop cluster, you can also configure the destination to connect securely to Kafka.

Generated Messages

Each Kafka message contains two parts: an optional key and a required value. The Kafka destination generates a null value for the message key and writes the record data to the message value.

For example, let's say that a batch contains the following data:

order_id customer_id amount
1075623 2 34.56
1076645 47 234.67
1050945 342 126.05
When you configure the destination to use JSON as the data format, the destination writes the following messages to Kafka:
Key Value
null {"order_id":1075623,"customer_id":2,amount":34.56}
null {"order_id":1076645,"customer_id":47,"amount":234.67}
null {"order_id":1050945,"customer_id":342,"amount":126.05}

Enabling Security

For pipelines that run locally or that run on a Hadoop cluster, you can configure the Kafka destination to use one of the following security options to connect securely to Kafka:

At this time, you cannot enable Kafka security when the pipeline runs on a Databricks cluster.

Enabling SSL/TLS Encryption

When the Kafka cluster uses the Kafka SSL security protocol, enable the Kafka destination to use SSL/TLS encryption.

  1. Make sure that the Kafka cluster is configured for SSL/TLS as described in the Kafka documentation.
  2. Store the truststore file created for Kafka clients in the same location on the Transformer machine and on each node in the cluster.

    For example, you might store the file in the following location on each machine:

    /var/private/ssl/kafka.client.truststore.jks

  3. On the Kafka tab of the stage, configure each Kafka broker URI to use the SSL/TLS port.

    The default SSL/TLS port number is 9093.

  4. On the Security tab, configure the following properties:
    Security Property Description
    Security Option Set to SSL/TLS Encryption (Security Protocol=SSL).
    Truststore File Absolute path to the truststore file stored in the same location on the Transformer machine and on each node in the cluster.

    For example, you might enter the following path:

    /var/private/ssl/kafka.client.truststore.jks

    Truststore Password Password to the truststore file.
    Tip: To secure sensitive information, you can use credential stores or runtime resources.

Enabling SSL/TLS Encryption and Authentication

When the Kafka cluster uses the Kafka SSL security protocol and requires client authentication, enable the Kafka destination to use SSL/TLS encryption and authentication.

  1. Make sure that the Kafka cluster is configured for SSL/TLS and client authentication as described in the Kafka documentation.
  2. Store the truststore and keystore files created for Kafka clients in the same location on the Transformer machine and on each node in the cluster.

    For example, you might store the files in the following locations on each machine:

    • /var/private/ssl/kafka.client.truststore.jks
    • /var/private/ssl/kafka.client.keystore.jks
  3. On the Kafka tab of the stage, configure each Kafka broker URI to use the SSL/TLS port.

    The default SSL/TLS port number is 9093.

  4. On the Security tab, configure the following properties:
    Security Property Description
    Security Option Set to SSL/TLS Encryption and Authentication (Security Protocol=SSL).
    Truststore File Absolute path to the truststore file stored in the same location on the Transformer machine and on each node in the cluster.

    For example, you might enter the following path:

    /var/private/ssl/kafka.client.truststore.jks

    Truststore Password Password to the truststore file.
    Tip: To secure sensitive information, you can use credential stores or runtime resources.
    Keystore File Absolute path to the keystore file stored in the same location on the Transformer machine and on each node in the cluster.

    For example, you might enter the following path:

    /var/private/ssl/kafka.client.keystore.jks

    Keystore Password Password to the keystore file.
    Tip: To secure sensitive information, you can use credential stores or runtime resources.

Enabling Kerberos Authentication

When the Kafka cluster uses the Kafka SASL_PLAINTEXT security protocol, enable the Kafka destination to use Kerberos authentication.

  1. Make sure that the Kafka cluster is configured for Kerberos (SASL) as described in the Kafka documentation.
  2. For pipelines that run on a Hadoop YARN cluster configured for Kerberos, make sure that Kerberos authentication is enabled for Transformer.
  3. Create a Java Authentication and Authorization Service (JAAS) configuration file that contains the configuration properties required for Kafka clients.
    Add the following KafkaClient login section to the file:
    KafkaClient {
       com.sun.security.auth.module.Krb5LoginModule required
       useKeyTab=true
       keyTab="<keytab path>"
       storeKey=true
       useTicketCache=false
       principal="<principal name>/<host name>@<realm>";
    };
    For example:
    KafkaClient {
       com.sun.security.auth.module.Krb5LoginModule required
       useKeyTab=true
       keyTab="/etc/security/keytabs/kafka_client.keytab"
       storeKey=true
       useTicketCache=false
       principal="kafka/node-1.cluster@EXAMPLE.COM";
    };
  4. Store the JAAS file and the keytab file created for Kafka clients in the same location on the Transformer machine and on each node in the cluster.
  5. Edit the $TRANSFORMER_DIST/libexec/transformer-env.sh file to modify the TRANSFORMER_JAVA_OPTS environment variable to define the path to the JAAS configuration file.
    Modify the environment variable as follows:
    export TRANSFORMER_JAVA_OPTS="-Xmx1024m -Xms1024m -server -XX:-OmitStackTraceInFastThrow -Djava.security.auth.login.config=<JAAS config path>/kafka_client_jaas.conf ${TRANSFORMER_JAVA_OPTS}"

    Restart Transformer from the command prompt to enable the changes to the transformer-env.sh file.

  6. On the Security tab of the stage, configure the following properties:
    Security Property Description
    Security Option Set to Kerberos Authentication (Security Protocol=SASL_PLAINTEXT).

    When selected, Transformer sets the Kafka SASL mechanism to GSSAPI. Transformer does not support PLAIN (username/password) for the SASL mechanism.

    Kerberos Service Name Kerberos service principal name that the Kafka brokers run as.
  7. In the pipeline properties, on the Cluster tab, add the following additional configuration properties under Extra Spark Configuration:
    • spark.driver.extraJavaOptions
    • spark.executor.extraJavaOptions

    Set both properties to the path to the JAAS configuration file on the Transformer machine. For example:

Enabling Kerberos Authentication on SSL/TLS

When the Kafka cluster uses the Kafka SASL_SSL security protocol, enable the Kafka destination to use Kerberos authentication on SSL/TLS.

  1. Make sure Kafka is configured to use SSL/TLS and Kerberos (SASL) as described in the Kafka SSL/TLS documentation and the Kafka Kerberos documentation.
  2. For pipelines that run on a Hadoop YARN cluster configured for Kerberos, make sure that Kerberos authentication is enabled for Transformer.
  3. Create a Java Authentication and Authorization Service (JAAS) configuration file that contains the configuration properties required for Kafka clients.
    Add the following KafkaClient login section to the file:
    KafkaClient {
       com.sun.security.auth.module.Krb5LoginModule required
       useKeyTab=true
       keyTab="<keytab path>"
       storeKey=true
       useTicketCache=false
       principal="<principal name>/<host name>@<realm>";
    };
    For example:
    KafkaClient {
       com.sun.security.auth.module.Krb5LoginModule required
       useKeyTab=true
       keyTab="/etc/security/keytabs/kafka_client.keytab"
       storeKey=true
       useTicketCache=false
       principal="kafka/node-1.cluster@EXAMPLE.COM";
    };
  4. Store the following files in the same location on the Transformer machine and on each node in the cluster:
    • JAAS configuration file
    • Keytab file created for Kafka clients
    • Truststore file created for Kafka clients
  5. Edit the $TRANSFORMER_DIST/libexec/transformer-env.sh file to modify the TRANSFORMER_JAVA_OPTS environment variable to define the path to the JAAS configuration file.
    Modify the environment variable as follows:
    export TRANSFORMER_JAVA_OPTS="-Xmx1024m -Xms1024m -server -XX:-OmitStackTraceInFastThrow -Djava.security.auth.login.config=<JAAS config path>/kafka_client_jaas.conf ${TRANSFORMER_JAVA_OPTS}"

    Restart Transformer from the command prompt to enable the changes to the transformer-env.sh file.

  6. On the Kafka tab of the stage, configure each Kafka broker URI to use the SSL/TLS port.

    The default SSL/TLS port number is 9093.

  7. On the Security tab, configure the following properties:
    Security Property Description
    Security Option Set to Kerberos Authentication on SSL/TLS (Security Protocol=SASL_SSL).
    Kerberos Service Name Kerberos service principal name that the Kafka brokers run as.
    Truststore File Absolute path to the truststore file stored in the same location on the Transformer machine and on each node in the cluster.

    For example, you might enter the following path:

    /var/private/ssl/kafka.client.truststore.jks

    Truststore Password Password to the truststore file.
    Tip: To secure sensitive information, you can use credential stores or runtime resources.
  8. In the pipeline properties, on the Cluster tab, add the following additional configuration properties under Extra Spark Configuration:
    • spark.driver.extraJavaOptions
    • spark.executor.extraJavaOptions

    Set both properties to the path to the JAAS configuration file on the Transformer machine. For example:

Data Formats

The Kafka destination writes records based on the specified data format.

The destination can write using the following data formats:
Avro
The destination writes records based on the Avro schema.
Note: To use the Avro data format, Apache Spark version 2.4 or later must be installed on the Transformer machine and on each node in the cluster.
You can use one of the following methods to specify the location of the Avro schema definition:
  • In Pipeline Configuration - Use the schema defined in the stage properties. Optionally, you can configure the destination to register the specified schema with Confluent Schema Registry at a URL with a schema subject.
  • Confluent Schema Registry - Retrieve the schema from Confluent Schema Registry. Confluent Schema Registry is a distributed storage layer for Avro schemas. You specify the URL to Confluent Schema Registry and whether to look up the schema by the schema ID or subject.

You can also compress data with an Avro-supported compression codec.

Delimited
The destination writes a delimited message for every record. You can specify a custom delimiter, quote, and escape character to use in the data.
JSON
The destination writes a JSON line message for every record. For more information, see the JSON Lines website.
Text
The destination writes a message with a single String field for every record. When you configure the destination, you select the field to use.

Configuring a Kafka Destination

Configure a Kafka destination to write data to a Kafka cluster.

  1. On the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Stage Library Stage library to use to connect to Kafka:
    • Kafka cluster-provided libraries - The cluster where the pipeline runs has Kafka libraries installed, and therefore has all of the necessary libraries to run the pipeline.
    • Kafka Transformer-provided libraries - Transformer passes the necessary libraries with the pipeline to enable running the pipeline.

      Use when running the pipeline locally or when the cluster where the pipeline runs does not include the Kafka libraries.

    Note: When using additional Kafka stages in the pipeline, ensure that they use the same stage library.
  2. On the Kafka tab, configure the following properties:
    Kafka Property Description
    Broker URIs List of comma-separated pairs of hosts and ports used to establish the initial connection to the Kafka cluster. Use the following format:

    <host1>:<port1>,<host2>:<port2>,…

    Once a connection is established, the stage discovers the full set of available brokers.

    Topic Kafka topic to write to.

    If the topic doesn't exist, the destination creates the topic using the default configurations defined in the Kafka cluster.

  3. On the Security tab, configure the security properties to enable the destination to securely connect to Kafka.

    For information about the security options and additional steps required to enable security, see Enabling Security.

  4. On the Data Format tab, configure the following properties:
    Data Format Property Description
    Data Format Format of the data to write to messages. Select one of the following formats:
    • Avro
    • Delimited
    • JSON
    • Text
  5. For Avro data, click the Schema tab and configure the following properties:
    Schema Property Description
    Avro Schema Location Location of the Avro schema definition used to write the data:
    • In Pipeline Configuration - Use the schema specified in the Avro Schema property.
    • Confluent Schema Registry - Use the schema retrieved from Confluent Schema Registry.
    Avro Schema Avro schema definition used to write the data.

    You can optionally use the runtime:loadResource function to use a schema definition stored in a runtime resource file.

    Register Schema Registers the specified Avro schema with Confluent Schema Registry.
    Schema Registry URLs Confluent Schema Registry URLs used to look up the schema or to register a new schema. To add a URL, click Add. Use the following format:
    http://<host name>:<port number>
    Look Up Schema By Method used to look up the schema in Confluent Schema Registry:
    • Subject - Look up the specified Avro schema subject.
    • Schema ID - Look up the specified Avro schema ID.
    Schema Subject Avro schema subject to look up or to register in Confluent Schema Registry.

    If the specified subject to look up has multiple schema versions, the destination uses the latest schema version for that subject. To use an older version, find the corresponding schema ID, and then set the Look Up Schema By property to Schema ID.

    Schema ID Avro schema ID to look up in Confluent Schema Registry.
    Avro Compression Codec Avro compression type to use.
  6. For delimited data, on the Data Format tab, configure the following property:
    Delimited Property Description
    Delimiter Character Delimiter character to use in the data. Select one of the available options or select Other to enter a custom character.

    You can enter a Unicode control character using the format \uNNNN, where ​N is a hexadecimal digit from the numbers 0-9 or the letters A-F. For example, enter \u0000 to use the null character as the delimiter or \u2028 to use a line separator as the delimiter.

    Quote Character Quote character to use in the data.
    Escape Character Escape character to use in the data
  7. For text data, on the Data Format tab, configure the following property:
    Text Property Description
    Text Field String field in the record that contains the data to be written. All data must be incorporated into the specified field.