Kafka

The Kafka origin reads data from one or more topics in an Apache Kafka cluster. All messages in a batch must use the same schema. The origin supports Apache Kafka 0.10 and later.

The Kafka origin can read messages from a list of Kafka topics or from topics that match a pattern defined in a Java-based regular expression. When reading topics in the first batch, the origin can start from the first message, the last message, or a particular position in a partition. In subsequent batches, the origin starts from the last-saved offset.

When configuring the Kafka origin, you specify the Kafka brokers that the origin can initially connect to, the topics the origin reads, and where to start reading each topic. You also specify the maximum number of messages to read from any partition in each batch.

You select the data format of the data and configure related properties. When processing delimited or JSON data, you can define a custom schema for reading the data and configure related properties.

For pipelines that run locally or that run on a Hadoop cluster, you can also configure the origin to connect securely to Kafka.

You can configure the origin to load data only once and cache the data for reuse throughout the pipeline run. Or, you can configure the origin to cache each batch of data so the data can be passed to multiple downstream batches efficiently. You can also configure the origin to skip tracking offsets, which enables reading the entire data set each time you start the pipeline.

Partitioning

Spark runs a Transformer pipeline just as it runs any other application, splitting the data into partitions and performing operations on the partitions in parallel. Spark determines how to split pipeline data into initial partitions based on the origins in the pipeline.

For a Kafka origin, Spark determines the partitioning based on the number of partitions in the Kafka topics being read.

For example, if a Kafka origin is configured to read from 10 topics that each have 5 partitions, Spark creates a total of 50 partitions to read from Kafka.

Spark uses these partitions throughout the pipeline unless a processor causes Spark to shuffle the data. When you need to change the partitioning in the pipeline, use the Repartition processor.

Topic Specification

The Kafka origin reads data in messages from one or more topics that you specify.

Use one of the following methods to specify the topics to read:
Topic list
Add a list of topics from your Kafka cluster.
For example, suppose you want the origin to read two topics named orders_exp and orders_reg. When configuring the origin, clear the Use Topic Pattern property and in the Topic List property, add the following two topics:
  • orders_exp
  • orders_reg
Topic pattern
Specify a Java-based regular expression that identifies topics from your Kafka cluster.

For example, suppose your cluster has four topics named cust_east, cust_west, orders_exp, and orders_reg. To read the two topics cust_east and cust_west, you can use an expression. Select the Use Topic Pattern property and in the Topic Pattern property, enter the Java expression c+.

With this configuration, if you later add the topic cust_north to your cluster, the origin will automatically read the new topic.

Offsets

In a Kafka topic, an offset identifies a message in a partition. When configuring the Kafka origin, you define the starting offset to specify the first message to read in each partition of a topic.

Use one of the following methods to identify the starting offset:
Earliest
The origin reads all available messages, starting with the first message in each partition of each topic.
Latest
The origin reads the last message in each partition of each topic and any subsequent messages added to those topics after the pipeline starts.
Specific offsets
The origin reads messages starting from a specified offset for each partition in each topic. If an offset is not specified for a partition in a topic, the origin returns an error.

When reading the last message in a batch, the origin saves the offset from that message. In the subsequent batch, the origin starts reading from the next message.

For example, suppose your Kafka topics orders_exp and orders_reg have two partitions, 0 and 1. To have the origin read from the partitions starting with the third message, 2, configure the origin as follows:

Enabling Security

For pipelines that run locally or that run on a Hadoop cluster, you can configure the Kafka origin to use one of the following security options to connect securely to Kafka:

At this time, you cannot enable Kafka security when the pipeline runs on a Databricks cluster.

Enabling SSL/TLS Encryption

When the Kafka cluster uses the Kafka SSL security protocol, enable the Kafka origin to use SSL/TLS encryption.

  1. Make sure that the Kafka cluster is configured for SSL/TLS as described in the Kafka documentation.
  2. Store the truststore file created for Kafka clients in the same location on the Transformer machine and on each node in the cluster.

    For example, you might store the file in the following location on each machine:

    /var/private/ssl/kafka.client.truststore.jks

  3. On the Kafka tab of the stage, configure each Kafka broker URI to use the SSL/TLS port.

    The default SSL/TLS port number is 9093.

  4. On the Security tab, configure the following properties:
    Security Property Description
    Security Option Set to SSL/TLS Encryption (Security Protocol=SSL).
    Truststore File Absolute path to the truststore file stored in the same location on the Transformer machine and on each node in the cluster.

    For example, you might enter the following path:

    /var/private/ssl/kafka.client.truststore.jks

    Truststore Password Password to the truststore file.
    Tip: To secure sensitive information, you can use credential stores or runtime resources. For more information about runtime resources, see the Data Collector documentation.

Enabling SSL/TLS Encryption and Authentication

When the Kafka cluster uses the Kafka SSL security protocol and requires client authentication, enable the Kafka origin to use SSL/TLS encryption and authentication.

  1. Make sure that the Kafka cluster is configured for SSL/TLS and client authentication as described in the Kafka documentation.
  2. Store the truststore and keystore files created for Kafka clients in the same location on the Transformer machine and on each node in the cluster.

    For example, you might store the files in the following locations on each machine:

    • /var/private/ssl/kafka.client.truststore.jks
    • /var/private/ssl/kafka.client.keystore.jks
  3. On the Kafka tab of the stage, configure each Kafka broker URI to use the SSL/TLS port.

    The default SSL/TLS port number is 9093.

  4. On the Security tab, configure the following properties:
    Security Property Description
    Security Option Set to SSL/TLS Encryption and Authentication (Security Protocol=SSL).
    Truststore File Absolute path to the truststore file stored in the same location on the Transformer machine and on each node in the cluster.

    For example, you might enter the following path:

    /var/private/ssl/kafka.client.truststore.jks

    Truststore Password Password to the truststore file.
    Tip: To secure sensitive information, you can use credential stores or runtime resources. For more information about runtime resources, see the Data Collector documentation.
    Keystore File Absolute path to the keystore file stored in the same location on the Transformer machine and on each node in the cluster.

    For example, you might enter the following path:

    /var/private/ssl/kafka.client.keystore.jks

    Keystore Password Password to the keystore file.
    Tip: To secure sensitive information, you can use credential stores or runtime resources. For more information about runtime resources, see the Data Collector documentation.

Enabling Kerberos Authentication

When the Kafka cluster uses the Kafka SASL_PLAINTEXT security protocol, enable the Kafka origin to use Kerberos authentication.

  1. Make sure that the Kafka cluster is configured for Kerberos (SASL) as described in the Kafka documentation.
  2. For pipelines that run on a Hadoop YARN cluster configured for Kerberos, make sure that Kerberos authentication is enabled for Transformer.
  3. Create a Java Authentication and Authorization Service (JAAS) configuration file that contains the configuration properties required for Kafka clients.
    Add the following KafkaClient login section to the file:
    KafkaClient {
       com.sun.security.auth.module.Krb5LoginModule required
       useKeyTab=true
       keyTab="<keytab path>"
       storeKey=true
       useTicketCache=false
       principal="<principal name>/<host name>@<realm>";
    };
    For example:
    KafkaClient {
       com.sun.security.auth.module.Krb5LoginModule required
       useKeyTab=true
       keyTab="/etc/security/keytabs/kafka_client.keytab"
       storeKey=true
       useTicketCache=false
       principal="kafka/node-1.cluster@EXAMPLE.COM";
    };
  4. Store the JAAS file and the keytab file created for Kafka clients in the same location on the Transformer machine and on each node in the cluster.
  5. Edit the $TRANSFORMER_DIST/libexec/transformer-env.sh file to modify the TRANSFORMER_JAVA_OPTS environment variable to define the path to the JAAS configuration file.
    Modify the environment variable as follows:
    export TRANSFORMER_JAVA_OPTS="-Xmx1024m -Xms1024m -server -XX:-OmitStackTraceInFastThrow -Djava.security.auth.login.config=<JAAS config path>/kafka_client_jaas.conf ${TRANSFORMER_JAVA_OPTS}"

    Restart Transformer from the command prompt to enable the changes to the transformer-env.sh file.

  6. On the Security tab of the stage, configure the following properties:
    Security Property Description
    Security Option Set to Kerberos Authentication (Security Protocol=SASL_PLAINTEXT).

    When selected, Transformer sets the Kafka SASL mechanism to GSSAPI. Transformer does not support PLAIN (username/password) for the SASL mechanism.

    Kerberos Service Name Kerberos service principal name that the Kafka brokers run as.
  7. In the pipeline properties, on the Cluster tab, add the following additional configuration properties under Extra Spark Configuration:
    • spark.driver.extraJavaOptions
    • spark.executor.extraJavaOptions

    Set both properties to the path to the JAAS configuration file on the Transformer machine. For example:

Enabling Kerberos Authentication on SSL/TLS

When the Kafka cluster uses the Kafka SASL_SSL security protocol, enable the Kafka origin to use Kerberos authentication on SSL/TLS.

  1. Make sure Kafka is configured to use SSL/TLS and Kerberos (SASL) as described in the Kafka SSL/TLS documentation and the Kafka Kerberos documentation.
  2. For pipelines that run on a Hadoop YARN cluster configured for Kerberos, make sure that Kerberos authentication is enabled for Transformer.
  3. Create a Java Authentication and Authorization Service (JAAS) configuration file that contains the configuration properties required for Kafka clients.
    Add the following KafkaClient login section to the file:
    KafkaClient {
       com.sun.security.auth.module.Krb5LoginModule required
       useKeyTab=true
       keyTab="<keytab path>"
       storeKey=true
       useTicketCache=false
       principal="<principal name>/<host name>@<realm>";
    };
    For example:
    KafkaClient {
       com.sun.security.auth.module.Krb5LoginModule required
       useKeyTab=true
       keyTab="/etc/security/keytabs/kafka_client.keytab"
       storeKey=true
       useTicketCache=false
       principal="kafka/node-1.cluster@EXAMPLE.COM";
    };
  4. Store the following files in the same location on the Transformer machine and on each node in the cluster:
    • JAAS configuration file
    • Keytab file created for Kafka clients
    • Truststore file created for Kafka clients
  5. Edit the $TRANSFORMER_DIST/libexec/transformer-env.sh file to modify the TRANSFORMER_JAVA_OPTS environment variable to define the path to the JAAS configuration file.
    Modify the environment variable as follows:
    export TRANSFORMER_JAVA_OPTS="-Xmx1024m -Xms1024m -server -XX:-OmitStackTraceInFastThrow -Djava.security.auth.login.config=<JAAS config path>/kafka_client_jaas.conf ${TRANSFORMER_JAVA_OPTS}"

    Restart Transformer from the command prompt to enable the changes to the transformer-env.sh file.

  6. On the Kafka tab of the stage, configure each Kafka broker URI to use the SSL/TLS port.

    The default SSL/TLS port number is 9093.

  7. On the Security tab, configure the following properties:
    Security Property Description
    Security Option Set to Kerberos Authentication on SSL/TLS (Security Protocol=SASL_SSL).
    Kerberos Service Name Kerberos service principal name that the Kafka brokers run as.
    Truststore File Absolute path to the truststore file stored in the same location on the Transformer machine and on each node in the cluster.

    For example, you might enter the following path:

    /var/private/ssl/kafka.client.truststore.jks

    Truststore Password Password to the truststore file.
    Tip: To secure sensitive information, you can use credential stores or runtime resources. For more information about runtime resources, see the Data Collector documentation.
  8. In the pipeline properties, on the Cluster tab, add the following additional configuration properties under Extra Spark Configuration:
    • spark.driver.extraJavaOptions
    • spark.executor.extraJavaOptions

    Set both properties to the path to the JAAS configuration file on the Transformer machine. For example:

Data Formats

The Kafka origin generates records based on the specified data format.

The origin can read the following data formats:
Avro
The origin generates a record for every message.
Note: To use the Avro data format, Apache Spark version 2.4 or later must be installed on the Transformer machine and on each node in the cluster.
You can use one of the following methods to specify the location of the Avro schema definition:
  • In Pipeline Configuration - Use the schema defined in the stage properties.
  • Confluent Schema Registry - Retrieve the schema from Confluent Schema Registry. Confluent Schema Registry is a distributed storage layer for Avro schemas. You specify the URL to Confluent Schema Registry and whether to look up the schema by the schema ID or subject.
Delimited
The origin generates a record for every message. You can specify a custom delimiter, quote, and escape character used in the data.
By default, the origin names the first field _c0, the second field _c1, and so on. The origin also infers data types from the data by default. You can rename the fields downstream with a Field Renamer processor, or you can specify a custom schema in the origin.
When you specify a custom schema, the origin uses the field names and data types defined in the schema, applying the first field in the schema to the first field in the record, and so on.
By default, when the origin encounters parsing errors, it stops the pipeline. When processing data with a custom schema, the origin handles parsing errors based on the configured error handling.
JSON
The origin generates a record for every message.
By default, the origin uses the field names, field order, and data types in the message.
When you specify a custom schema, the origin matches the field names in the schema to those in the data, then applies the data types and field order defined in the schema.
By default, when the origin encounters parsing errors, it stops the pipeline. When processing data with a custom schema, the origin handles parsing errors based on the configured error handling.
Text
The origin generates a record for every message.
The record includes a single field named Value where the origin writes the string data.

Configuring a Kafka Origin

Configure a Kafka origin to read data from topics in an Apache Kafka cluster.

  1. On the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Stage Library Stage library to use to connect to Kafka:
    • Kafka cluster-provided libraries - The cluster where the pipeline runs has Kafka libraries installed, and therefore has all of the necessary libraries to run the pipeline.
    • Kafka Transformer-provided libraries - Transformer passes the necessary libraries with the pipeline to enable running the pipeline.

      Use when running the pipeline locally or when the cluster where the pipeline runs does not include the Kafka libraries.

    Note: When using additional Kafka stages in the pipeline, ensure that they use the same stage library.
    Load Data Only Once Reads data in a single batch and caches the results for reuse. Use to perform lookups in streaming execution mode pipelines.

    When using the origin to perform lookups, do not limit the batch size. All lookup data should be read in a single batch.

    This property is ignored in batch execution mode.

    Cache Data Caches processed data so the data can be reused for multiple downstream stages. Use to improve performance when the stage passes data to multiple stages.

    Caching can limit pushdown optimization when the pipeline runs in ludicrous mode.

    Available when Load Data Only Once is not enabled. When the origin loads data once, it also caches data.

    Skip Offset Tracking Skips tracking offsets.

    In a streaming pipeline, the origin reads all available data with each batch.

    In a batch pipeline, the origin reads all available data each time the pipeline starts.

  2. On the Kafka tab, configure the following properties:
    Kafka Property Description
    Broker URIs List of comma-separated pairs of hosts and ports used to establish the initial connection to the Kafka cluster. Use the following format:

    <host1>:<port1>,<host2>:<port2>,…

    Once a connection is established, the stage discovers the full set of available brokers.

    Match Topic Pattern Enables the origin to find the topics to read based on a regular expression. When not used, you enter a list of topics.
    Topic Pattern Java-based regular expression (regex) that specifies the topics to read.

    Available when you specify topics based on pattern.

    Topic List List of Kafka topics to read. Click the Add icon to add additional topics.

    Available when you do not specify topics based on pattern.

    Starting Offset Method to determine first message to read:
    • Earliest - Reads messages starting with the first messages in each topic.
    • Latest - Reads messages starting with the last message in each topic.
    • Specific Offsets - For each topic, reads messages from a specified partition and position.
    Specific Offsets The position of the first message read in a topic and partition when the starting offset is a specific offset.

    For the first topic, enter the topic name, and then click the Add Partition icon as necessary to add fields for specifying the partition names and starting positions for the topic.

    For additional topics, click the Add Topic icon to add another topic field and then click the Add Partition icon as necessary to add fields for specifying the partition names and starting positions in each topic.

    You must specify an offset for each partition in a topic.

    Max Messages per Partition In each batch, the maximum number of messages the origin reads from each partition in a topic.

    For a batch pipeline, this property determines the total number of messages processed in each pipeline run. For a streaming pipeline, this property determines the number of messages processed at one time.

  3. On the Security tab, configure the security properties to enable the origin to securely connect to Kafka.

    For information about the security options and additional steps required to enable security, see Enabling Security.

  4. On the Data Format tab, configure the following property:
    Data Format Property Description
    Data Format Format of data in Kafka messages. Select one of the following formats:
    • Avro (Spark 2.4 or later)
    • Delimited
    • JSON
    • Text
  5. For Avro data, click the Schema tab and configure the following properties:
    Avro Property Description
    Avro Schema Location Location of the Avro schema definition to use when processing data:
    • In Pipeline Configuration - Use the schema specified in the Avro Schema property.
    • Confluent Schema Registry - Retrieve the schema from Confluent Schema Registry.
    Avro Schema Avro schema definition used to process the data. Overrides any existing schema definitions associated with the data.

    You can optionally use the runtime:loadResource function to use a schema definition stored in a runtime resource file.

    Schema Registry URLs Confluent Schema Registry URLs used to look up the schema. To add a URL, click Add. Use the following format to enter the URL:
    http://<host name>:<port number>
    Lookup Schema By Method used to look up the schema in Confluent Schema Registry:
    • Subject - Look up the specified Avro schema subject.
    • Schema ID - Look up the specified Avro schema ID.
    Schema Subject Avro schema subject to look up in Confluent Schema Registry.

    If the specified subject has multiple schema versions, the origin uses the latest schema version for that subject. To use an older version, find the corresponding schema ID, and then set the Look Up Schema By property to Schema ID.

    Schema ID Avro schema ID to look up in the Confluent Schema Registry.
  6. For delimited data, on the Data Format tab, optionally configure the following properties:
    Delimited Property Description
    Delimiter Character Delimiter character used in the data. Select one of the available options or select Other to enter a custom character.

    You can enter a Unicode control character using the format \uNNNN, where ​N is a hexadecimal digit from the numbers 0-9 or the letters A-F. For example, enter \u0000 to use the null character as the delimiter or \u2028 to use a line separator as the delimiter.

    Quote Character Quote character used in the data.
    Escape Character Escape character used in the data
    Includes Header Indicates that the data includes a header line. When selected, the origin uses the first line to create field names and begins reading with the second line.
  7. To use a custom schema for delimited or JSON data, click the Schema tab and configure the following properties:
    Schema Property Description
    Schema Mode Mode that determines the schema to use when processing data:
    • Infer from Data

      The origin infers the field names and data types from the data.

    • Use Custom Schema - DDL Format

      The origin uses a custom schema defined in the DDL format.

    • Use Custom Schema - JSON Format

      The origin uses a custom schema defined in the JSON format.

    Note that the schema is applied differently depending on the data format of the data.

    Schema Custom schema to use to process the data.

    Enter the schema in DDL or JSON format, depending on the selected schema mode.

    Error Handling Determines how the origin handles parsing errors:
    • Permissive - When the origin encounters a problem parsing any field in the record, it creates a record with the field names defined in the schema, but with null values in every field.
    • Drop Malformed - When the origin encounters a problem parsing any field in the record, it drops the entire record from the pipeline.
    • Fail Fast - When the origin encounters a problem parsing any field in the record, it stops the pipeline.
    Original Data Field Field where the data from the original record is written when the origin cannot parse the record.

    When writing the original record to a field, you must add the field to the custom schema as a String field.

    Available when using permissive error handling.