Kafka Producer

The Kafka Producer destination writes data to a Kafka cluster.

When you configure a Kafka Producer, you define connection information, the partition strategy, and data format to use. You can also configure Kafka Producer to determine the topic to write to at runtime.

The Kafka Producer passes data to partitions in the Kafka topic based on the partition strategy that you choose. You can optionally write a batch of records to the Kafka cluster as a single message.

You can add additional Kafka configuration properties as needed. When using Kafka version 0.9.0.0 or later, you can also configure the origin to use Kafka security features.

When using Kafka version 0.8.2 or later to write messages in the Avro format, you can configure the Kafka Producer to work with the Confluent Schema Registry. The Confluent Schema Registry is a distributed storage layer for Avro schemas which uses Kafka as its underlying storage mechanism.

Broker List

The Kafka Producer connects to Kafka based on the topic and associated brokers that you specify. To ensure a connection in case a specified broker goes down, list as many brokers as possible.

Runtime Topic Resolution

Kafka Producer can write a record to the topic based on an expression. When Kafka Producer evaluates a record, it calculates the expression based on record values and writes the record to the resulting topic.

When performing runtime topic resolution, Kafka Producer can write to any topic by default. You can create a white list of topics to limit the number of topics Kafka Producer attempts to use. When you create a white list, any record that resolves to an unlisted topic is sent to the stage for error handling. Use a white list when record data might resolve to invalid topic names.

Partition Strategy

The partition strategy determines how to write data to Kafka partitions. You can use a partition strategy to balance the work load or to write data semantically.

The Kafka Producer provides the following partition strategies:
Round-Robin
Writes each record to a different partition using a cyclical order. Use for load balancing.
Random
Writes each record to a different partition using a random order. Use for load balancing.
Expression
Writes each record to a partition based on the results of the partition expression. Use to perform semantic partitioning.
When you configure the partition expression, define the expression to evaluate to the partition where you want each record written. For example, the following expression writes records to two partitions based on the value in the Age field:
${record:value('/Age') < 21 ? 0 : 1}
The following example writes to three partitions based on the value of the Age field:
${record:value('/a') < 21 ? 0 : record:value('/a') < 55 ? 1 : 2}
Default
Writes statistics using the default partition strategy that Kafka provides.

Additional Kafka Properties

You can add custom Kafka configuration properties to the Kafka Producer destination.

When you add a Kafka configuration property, enter the exact property name and the value. The stage does not validate the property names or values.

Several properties are defined by default, you can edit or remove the properties as necessary.

Note: Because the stage uses several configuration properties, it ignores user-defined values for the following properties:
  • key.serializer.class
  • metadata.broker.list
  • partitioner.class
  • producer.type
  • serializer.class

Enabling Security

When using Kafka version 0.9.0.0 or later, you can configure the Kafka Producer to connect securely through SSL/TLS, Kerberos, or both.

These versions provide features to support secure connections through SSL/TLS or Kerberos (SASL). The Kafka community considers these features beta quality.

Earlier versions of Kafka do not support security.

Enabling SSL/TLS

Perform the following steps to enable the Kafka Producer to use SSL/TLS to connect to Kafka version 0.9.0.0 or later. You can use the same steps to configure a Kafka Consumer.

  1. To use SSL/TLS to connect, first make sure Kafka is configured for SSL/TLS as described in the Kafka documentation: http://kafka.apache.org/documentation.html#security_ssl.
  2. On the General tab of the stage, set the Stage Library property to Apache Kafka 0.9.0.0 or a later version.
  3. On the Kafka tab, add the security.protocol Kafka configuration property and set it to SSL.
  4. Then, add the following SSL Kafka configuration properties:
    • ssl.truststore.location
    • ssl.truststore.password
    When the Kafka broker requires client authentication - when the ssl.client.auth broker property is set to "required" - add and configure the following properties:
    • ssl.keystore.location
    • ssl.keystore.password
    • ssl.key.password
    Some brokers might require adding the following properties as well:
    • ssl.enabled.protocols
    • ssl.truststore.type
    • ssl.keystore.type

    For details about these properties, see the Kafka documentation.

For example, the following properties allow the stage to use SSL/TLS to connect to Kafka 0.0.9.0 with client authentication:

Enabling Kerberos (SASL)

When you use Kerberos authentication, Data Collector uses the Kerberos principal and keytab to connect to Kafka version 0.9.0.0 or later. Perform the following steps to enable the Kafka Producer destination to use Kerberos to connect to Kafka.

  1. To use Kerberos, first make sure Kafka is configured for Kerberos as described in the Kafka documentation: http://kafka.apache.org/documentation.html#security_sasl.
  2. In the Data Collector configuration file, $SDC_CONF/sdc.properties, make sure the following Kerberos properties are configured:
    • kerberos.client.enabled
    • kerberos.client.principal
    • kerberos.client.keytab
  3. On the General tab of the stage, set the Stage Library property to Apache Kafka 0.9.0.0 or a later version.
  4. On the Kafka tab, add the security.protocol Kafka configuration property, and set it to SASL_PLAINTEXT.
  5. Then, add the sasl.kerberos.service.name configuration property, and set it to the Kerberos principal name that Kafka runs as.

For example, the following Kafka properties enable connecting to Kafka 0.0.9.0 with Kerberos:

Enabling SSL/TLS and Kerberos

You can enable Kafka Producer to use SSL/TLS and Kerberos to connect to Kafka version 0.9.0.0 or later.

To use SSL/TLS and Kerberos, combine the steps required to enable each and set the security.protocol property as follows:
  1. Make sure Kafka is configured to use SSL/TLS and Kerberos (SASL) as described in the following Kafka documentation:
  2. In the Data Collector configuration file, $SDC_CONF/sdc.properties, make sure the following Kerberos properties are configured:
    • kerberos.client.enabled
    • kerberos.client.principal
    • kerberos.client.keytab
  3. On the General tab of the stage, set the Stage Library property to Apache Kafka 0.9.0.0 or a later version.
  4. On the Kafka tab, add the security.protocol property and set it to SASL_SSL.
  5. Then, add the sasl.kerberos.service.name configuration property, and set it to the Kerberos principal name that Kafka runs as.
  6. Then, add the following SSL Kafka configuration properties:
    • ssl.truststore.location
    • ssl.truststore.password
    When the Kafka broker requires client authentication - when the ssl.client.auth broker property is set to "required" - add and configure the following properties:
    • ssl.keystore.location
    • ssl.keystore.password
    • ssl.key.password
    Some brokers might require adding the following properties as well:
    • ssl.enabled.protocols
    • ssl.truststore.type
    • ssl.keystore.type

    For details about these properties, see the Kafka documentation.

Data Formats

Kafka Producer writes data to Kafka based on the data format that you select. You can use the following data formats:
Avro
The destination writes records based on the Avro schema.
You can use one of the following methods to specify the location of the Avro schema definition:
  • In Pipeline Configuration - Use the schema that you provide in the stage configuration.
  • In Record Header - Use the schema included in the avroSchema record header attribute.
  • Confluent Schema Registry - Retrieve the schema from Confluent Schema Registry. The Confluent Schema Registry is a distributed storage layer for Avro schemas. You can configure the destination to look up the schema in the Confluent Schema Registry by the schema ID or subject.

    You must specify the method that the Kafka Producer uses to serialize the messages in the Avro format. To embed the Avro schema ID in each message that the destination writes, set the key and value serializers to Confluent on the Kafka tab.

    If using the Avro schema in the stage or in the record header attribute, you can optionally configure the destination to register the Avro schema with the Confluent Schema Registry. You can also optionally include the schema definition in the message.

You can compress data with an Avro-supported compression codec. When using Avro compression, avoid using other compression available in the destination.
Binary
The destination writes binary data from a single field in the record.
Delimited
The destination writes records as delimited data. When you use this data format, the root field must be list or list-map.
JSON
The destination writes records as JSON data. You can use one of the following formats:
  • Array - Each file includes a single array. In the array, each element is a JSON representation of each record.
  • Multiple objects - Each file includes multiple JSON objects. Each object is a JSON representation of a record.
Protobuf
Writes one record in a message. Uses the user-defined message type and the definition of the message type in the descriptor file to generate the message.
For information about generating the descriptor file, see Protobuf Data Format Prerequisites.
SDC Record
The destination writes records in the SDC Record data format.
Text
The destination writes data from a single text field to the destination system. When you configure the stage, you select the field to use. When necessary, merge record data into the field earlier in the pipeline.
You can configure the characters to use as record separators. By default, the destination uses a Unix-style line ending (\n) to separate records.
When a record contains no data in the text field, you can configure the destination to write the record separator characters, creating an empty line. By default, the destination discards the record.

Configuring a Kafka Producer

Configure a Kafka Producer to write data to a Kafka cluster.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Stage Library Library version that you want to use.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions.

    Records that do not meet all preconditions are processed based on the error handling configured for the stage.

    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the Kafka tab, configure the following properties:
    Kafka Properties Description
    Broker URI Connection string for the Kafka broker. Use the following format: <host>:<port>.

    To ensure a connection, enter a comma-separated list of additional broker URI.

    Runtime Topic Resolution Evaluates an expression at runtime to determine the topic to use for each record.
    Topic Topic to use.

    Not available when using runtime topic resolution.

    Topic Expression Expression used to determine where each record is written when using runtime topic resolution. Use an expression that evaluates to a topic name.
    Topic White List List of valid topic names to write to when using runtime topic resolution. Use to avoid writing to invalid topics. Records that resolve to invalid topic names are passed to the stage for error handling.

    Use an asterisk (*) to allow writing to any topic name. By default, all topic names are valid.

    Partition Strategy Strategy to use to write to partitions:
    • Round Robin - Takes turns writing to different partitions.
    • Random - Writes to partitions randomly.
    • Expression - Uses an expression to write data to different partitions.
    • Default - Uses the default partition strategy that Kafka provides.
    Partition Expression Expression to use when using the expression partition strategy.

    Define the expression to evaluate to the partition where you want each record written. Partition numbers start with 0.

    Optionally, click Ctrl + Space Bar for help with creating the expression.

    One Message per Batch For each batch, writes the records to each partition as a single message.
    Kafka Configuration Additional Kafka properties to use. Click the Add icon and define the Kafka property name and value.

    Use the property names and values as expected by Kafka. Do not use the broker.list property.

    For information about enabling secure connections to Kafka, see Enabling Security.

    Key Serializer Method used to serialize the Kafka message key when the configured data format is Avro.

    Set to Confluent to embed the Avro schema ID in each message that Kafka Producer writes.

    Value Serializer Method used to serialize the Kafka message value when the configured data format is Avro.

    Set to Confluent to embed the Avro schema ID in each message that Kafka Producer writes.

  3. On the Data Format tab, configure the following property:
    Data Format Property Description
    Data Format Data format for messages:
    • Avro
    • Binary
    • Delimited
    • JSON
    • Protobuf
    • SDC Record
    • Text
  4. For Avro data, on the Data Format tab, configure the following properties:
    Avro Property Description
    Avro Schema Location Location of the Avro schema definition to use when writing data:
    • In Pipeline Configuration - Use the schema that you provide in the stage configuration.
    • In Record Header - Use the schema in the avroSchema record header attribute. Use only when the avroSchema attribute is defined for all records.
    • Confluent Schema Registry - Retrieve the schema from the Confluent Schema Registry.
    Avro Schema Avro schema definition used to write the data.

    You can optionally use the runtime:loadResource function to use a schema definition stored in a runtime resource file.

    Register Schema Select to register a new Avro schema with the Confluent Schema Registry.
    Schema Registry URLs Confluent Schema Registry URLs used to look up the schema or to register a new schema. To add a URL, click Add. Use the following format to enter the URL:
    http://<host name>:<port number>
    Look Up Schema By Method used to look up the schema in the Confluent Schema Registry:
    • Subject - Look up the specified Avro schema subject.
    • Schema ID - Look up the specified Avro schema ID.
    Schema Subject Avro schema subject to look up or to register in the Confluent Schema Registry.

    If the specified subject to look up has multiple schema versions, the origin uses the latest schema version for that subject. To use an older version, find the corresponding schema ID, and then set the Look Up Schema By property to Schema ID.

    Schema ID Avro schema ID to look up in the Confluent Schema Registry.
    Include Schema Includes the schema in each message.
    Note: If you configured Kafka Producer to embed the Avro schema ID in each message that it writes, clear this property.
    Avro Compression Codec The Avro compression type to use.

    When using Avro compression, do not enable other compression available in the destination.

  5. For binary data, on the Data Format tab, configure the following property:
    Binary Property Description
    Binary Field Path Field that contains the binary data.
  6. For delimited data, on the Data Format tab, configure the following properties:
    Delimited Property Description
    Delimiter Format Format for delimited data:
    • Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
    • RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
    • MS Excel CSV - Microsoft Excel comma-separated file.
    • MySQL CSV - MySQL comma separated file.
    • Tab-Separated Values - File that includes tab-separated values.
    • Custom - File that uses user-defined delimiter, escape, and quote characters.
    Header Line Indicates whether to create a header line.
    Replace New Line Characters Replaces new line characters with the configured string.

    Recommended when writing data as a single line of text.

    New Line Character Replacement String to replace each new line character. For example, enter a space to replace each new line character with a space.

    Leave empty to remove the new line characters.

    Delimiter Character Delimiter character for a custom delimiter format. Select one of the available options or use Other to enter a custom character.

    You can enter a Unicode control character using the format \uNNNN, where ‚ÄčN is a hexadecimal digit from the numbers 0-9 or the letters A-F. For example, enter \u0000 to use the null character as the delimiter or \u2028 to use a line separator as the delimiter.

    Default is the pipe character ( | ).

    Escape Character Escape character for a custom delimiter format. Select one of the available options or use Other to enter a custom character.

    Default is the backslash character ( \ ).

    Quote Character Quote character for a custom delimiter format. Select one of the available options or use Other to enter a custom character.

    Default is the quotation mark character ( " ).

    Charset Character set to use when writing data.
  7. For JSON data, on the Data Format tab, configure the following property:
    JSON Property Description
    JSON Content Determines how JSON data is written:
    • JSON Array of Objects - Each file includes a single array. In the array, each element is a JSON representation of each record.
    • Multiple JSON Objects - Each file includes multiple JSON objects. Each object is a JSON representation of a record.
    Charset Character set to use when writing data.
  8. For protobuf data, on the Data Format tab, configure the following properties:
    Protobuf Property Description
    Protobuf Descriptor File Descriptor file (.desc) to use. The descriptor file must be in the Data Collector resources directory, $SDC_RESOURCES.

    For more information about environment variables, see Data Collector Environment Configuration. For information about generating the descriptor file, see Protobuf Data Format Prerequisites.

    Message Type The fully-qualified name for the message type to use when reading data.

    Use the following format: <package name>.<message type>.

    Use a message type defined in the descriptor file.
  9. For text data, on the Data Format tab, configure the following properties:
    Text Property Description
    Text Field Path Field that contains the text data to be written. All data must be incorporated into the specified field.
    Record Separator Characters to use to separate records. Use any valid Java string literal. For example, when writing to Windows, you might use \r\n to separate records.

    By default, the destination uses \n.

    Insert Record Separator if No Text When a record does not include the text field, inserts the configured record separator string to create an empty line.

    When not selected, records without the text field are discarded.

    Charset Character set to use when writing data.