Kafka Consumer

The Kafka Consumer origin reads data from an Apache Kafka cluster.

When you configure a Kafka Consumer, you configure the consumer group name, topic, and ZooKeeper connection information.

When using Kafka version 0.8.2 or later to consume messages in the Avro format, you can configure the Kafka Consumer to work with the Confluent Schema Registry. The Confluent Schema Registry is a distributed storage layer for Avro schemas which uses Kafka as its underlying storage mechanism.

You can add additional Kafka configuration properties as needed. When using Kafka version 0.9.0.0 or later, you can also configure the origin to use Kafka security features.

Kafka Consumer includes record header attributes that enable you to use the origins of a record in pipeline processing.

Initial and Subsequent Offsets

When you start a pipeline for the first time, the Kafka Consumer becomes a new consumer group for the topic.

By default, the origin reads only incoming data, processing data from all partitions and ignoring any existing data in the topic. After the origin passes data to destinations, it saves the offset with Kafka or ZooKeeper. When you stop and restart the pipeline, processing continues based on the offset.

For versions before Kafka 0.9.0.0, the offset is stored with Kafka or ZooKeeper based on the offsets.storage Kafka property. For Kafka version 0.9.0.0 or later, the offset is stored with Kafka.

Processing All Unread Data

You can configure the Kafka Consumer origin to read all unread data in a topic. By default, the Kafka Consumer origin reads only incoming data.

To read all unread data in the topic, add the auto.offset.reset Kafka configuration property to the origin:
  1. On the Kafka tab, click the Add icon to add a new Kafka configuration property.
  2. For the property name, enter auto.offset.reset.
  3. Define the value for the auto.offset.reset property:
    • When using versions before Kafka 0.9.0.0, set the property to smallest.
    • When using Kafka version 0.9.0.0 or later, set the property to earliest.
    For more information about auto.offset.reset, see the Apache Kafka documentation.

For more information about adding custom Kafka configuration properties, see Additional Kafka Properties.

Additional Kafka Properties

You can add custom Kafka configuration properties to the Kafka Consumer.

When you add the Kafka configuration property, enter the exact property name and the value. The Kafka Consumer does not validate the property names or values.

Note: The Kafka Consumer origin uses the following Kafka configuration properties. The origin ignores user-defined values for these properties:
  • auto.commit.enable
  • group.id
  • zookeeper.connect

Record Header Attributes

The Kafka Consumer origin creates record header attributes that include information about the origins of the record. You can use record:attribute functions to use the attribute information in pipeline processing. For more information about record header attributes, see Record Header Attributes.

The Kafka Consumer origin creates the following record header attributes:
  • offset - The offset in bytes where the record originated.
  • partition - The partition where the record originated.
  • topic - The topic where the record originated.

Enabling Security

When using Kafka version 0.9.0.0 or later, you can configure the Kafka Consumer origin to connect securely through SSL/TLS, Kerberos, or both.

These versions provide features to support secure connections through SSL/TLS or Kerberos (SASL). The Kafka community considers these features beta quality.

Earlier versions of Kafka do not support security.

Enabling SSL/TLS

Perform the following steps to enable Kafka Consumer to use SSL/TLS to connect to Kafka version 0.9.0.0 or later. You can use the same steps to configure a Kafka Producer.

  1. To use SSL/TLS to connect, first make sure Kafka is configured for SSL/TLS as described in the Kafka documentation: http://kafka.apache.org/documentation.html#security_ssl.
  2. On the General tab of the stage, set the Stage Library property to Apache Kafka 0.9.0.0 or a later version.
  3. On the Kafka tab, add the security.protocol Kafka configuration property and set it to SSL.
  4. Then, add the following SSL Kafka configuration properties:
    • ssl.truststore.location
    • ssl.truststore.password
    When the Kafka broker requires client authentication - when the ssl.client.auth broker property is set to "required" - add and configure the following properties:
    • ssl.keystore.location
    • ssl.keystore.password
    • ssl.key.password
    Some brokers might require adding the following properties as well:
    • ssl.enabled.protocols
    • ssl.truststore.type
    • ssl.keystore.type

    For details about these properties, see the Kafka documentation.

For example, the following properties allow the stage to use SSL/TLS to connect to Kafka 0.0.9.0 with client authentication:

Enabling Kerberos (SASL)

When you use Kerberos authentication, Data Collector uses the Kerberos principal and keytab to connect to Kafka version 0.9.0.0 or later. Perform the following steps to enable the Kafka Consumer origin to use Kerberos to connect to Kafka.

  1. To use Kerberos, first make sure Kafka is configured for Kerberos as described in the Kafka documentation: http://kafka.apache.org/documentation.html#security_sasl.
  2. In the Data Collector configuration file, $SDC_CONF/sdc.properties, make sure the following Kerberos properties are configured:
    • kerberos.client.enabled
    • kerberos.client.principal
    • kerberos.client.keytab
  3. On the General tab of the stage, set the Stage Library property to Apache Kafka 0.9.0.0 or a later version.
  4. On the Kafka tab, add the security.protocol Kafka configuration property, and set it to SASL_PLAINTEXT.
  5. Then, add the sasl.kerberos.service.name configuration property, and set it to the Kerberos principal name that Kafka runs as.

For example, the following Kafka properties enable connecting to Kafka 0.0.9.0 with Kerberos:

Enabling SSL/TLS and Kerberos

You can enable Kafka Consumer to use SSL/TLS and Kerberos to connect to Kafka version 0.9.0.0 or later.

To use SSL/TLS and Kerberos, combine the steps required to enable each and set the security.protocol property as follows:
  1. Make sure Kafka is configured to use SSL/TLS and Kerberos (SASL) as described in the following Kafka documentation:
  2. In the Data Collector configuration file, $SDC_CONF/sdc.properties, make sure the following Kerberos properties are configured:
    • kerberos.client.enabled
    • kerberos.client.principal
    • kerberos.client.keytab
  3. On the General tab of the stage, set the Stage Library property to Apache Kafka 0.9.0.0 or a later version.
  4. On the Kafka tab, add the security.protocol property and set it to SASL_SSL.
  5. Then, add the sasl.kerberos.service.name configuration property, and set it to the Kerberos principal name that Kafka runs as.
  6. Then, add the following SSL Kafka configuration properties:
    • ssl.truststore.location
    • ssl.truststore.password
    When the Kafka broker requires client authentication - when the ssl.client.auth broker property is set to "required" - add and configure the following properties:
    • ssl.keystore.location
    • ssl.keystore.password
    • ssl.key.password
    Some brokers might require adding the following properties as well:
    • ssl.enabled.protocols
    • ssl.truststore.type
    • ssl.keystore.type

    For details about these properties, see the Kafka documentation.

Data Formats

The Kafka Consumer origin processes data differently based on the data format. Kafka Consumer can process the following types of data:

Avro
Generates a record for every message.
You can use one of the following methods to specify the location of the Avro schema definition:
  • Message/Data Includes Schema - Use the schema in the message.
  • In Pipeline Configuration - Use the schema that you provide in the stage configuration.
  • Confluent Schema Registry - Retrieve the schema from Confluent Schema Registry. The Confluent Schema Registry is a distributed storage layer for Avro schemas. You can configure the origin to look up the schema in the Confluent Schema Registry by the schema ID embedded in the message or by the schema ID or subject specified in the stage configuration.

    You must specify the method that the Kafka Consumer uses to deserialize the message. If the Avro schema ID is embedded in each message, set the key and value deserializers to Confluent on the Kafka tab.

Using a schema in the stage configuration or retrieving a schema from the Confluent Schema Registry overrides any schema that might be included in the message and can improve performance.
Binary
Generates a record with a single byte array field at the root of the record.
When the data exceeds the user-defined maximum data size, the origin cannot process the data. Because the record is not created, the origin cannot pass the record to the pipeline to be written as an error record. Instead, the origin generates a stage error.
Datagram
Generates a record for every message. The origin can read collectd messages, Netflow messages from NetFlow Version 5, and the following types of syslog messages:
Delimited
Generates a record for each delimited line. You can use the following delimited format types:
  • Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
  • RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
  • MS Excel CSV - Microsoft Excel comma-separated file.
  • MySQL CSV - MySQL comma separated file.
  • Tab-Separated Values - File that includes tab-separated values.
  • Custom - File that uses user-defined delimiter, escape, and quote characters.
You can use a list or list-map root field type for delimited data, optionally including the header information when available.
You can also replace a string constant with null values.
When a record exceeds the maximum record length defined for the origin, the origin processes the object based on the error handling configured for the stage.
For more information about the root field types, see Delimited Data Root Field Type.
JSON
Generates a record for each JSON object. You can process JSON files that include multiple JSON objects or a single JSON array.
When an object exceeds the maximum object length defined for the origin, the origin processes the object based on the error handling configured for the stage.
Log
Generates a record for every log line.
When a line exceeds the user-defined maximum line length, the origin truncates longer lines.
You can include the processed log line as a field in the record. If the log line is truncated, and you request the log line in the record, the origin includes the truncated line.
You can define the log format or type to be read.
Protobuf
Generates a record for every protobuf message. By default, the origin assumes messages contain multiple protobuf messages.
Protobuf messages must match the specified message type and be described in the descriptor file.
When the data for a record exceeds 1 MB, the origin cannot continue processing data in the message. The origin handles the message based on the stage error handling property and continues reading the next message.
For information about generating the descriptor file, see Protobuf Data Format Prerequisites.
SDC Record
Generates a record for every record. Use to process records generated by a Data Collector pipeline using the SDC Record data format.
For error records, the origin provides the original record as read from the origin in the original pipeline, as well as error information that you can use to correct the record.
When processing error records, the origin expects the error file names and contents as generated by the original pipeline.
Text
Generates a record for each line of text or for each section of text based on a custom delimiter.
When a line or section exceeds the maximum line length defined for the origin, the origin truncates it. The origin adds a boolean field named Truncated to indicate if the line was truncated.
For more information about processing text with a custom delimiter, see Text Data Format with Custom Delimiters.
XML
Generates records based on a user-defined delimiter element. Use an XML element directly under the root element or define a simplified XPath expression. If you do not define a delimiter element, the origin treats the XML file as a single record.
Generated records include XML attributes and namespace declarations as fields in the record by default. You can configure the stage to include them in the record as field attributes.
You can include XPath information for each parsed XML element and XML attribute in field attributes. This also places each namespace in an xmlns record header attribute.
Note: Field attributes and record header attributes are written to destination systems automatically only when you use the SDC RPC data format in destinations. For more information about working with field attributes and record header attributes, and how to include them in records, see Field Attributes and Record Header Attributes.
When a record exceeds the user-defined maximum record length, the origin skips the record and continues processing with the next record. It sends the skipped record to the pipeline for error handling.
Use the XML data format to process valid XML documents. For more information about XML processing, see XML Data Format and Data Processing.
Tip: If you want to process invalid XML documents, you can try using the text data format with custom delimiters. For more information, see Processing XML Data with Custom Delimiters.

Log Formats

When you use an origin to read log data, you define the format of the log files to be read.

You can read log files that use the following log formats:

Common Log Format
A standardized text format used by web servers to generate log files. Also known as the NCSA (National Center for Supercomputing Applications) Common Log format.
Combined Log Format
A standardized text format based on the common log format that includes additional information. Also known as the Apache/NCSA Combined Log Format.
Apache Error Log Format
The standardized error log format generated by the Apache HTTP Server 2.2.
Apache Access Log Custom Format
A customizable access log generated by the Apache HTTP Server 2.2. Use the Apache HTTP Server version 2.2 syntax to define the format of the log file.
Regular Expression
Use a regular expression to define the structure of log data, and then assign the field or fields represented by each group.
Use any valid regular expression.
Grok Pattern
Use a grok pattern to define the structure of log data. You can use the grok patterns supported by Data Collector. You can also define a custom grok pattern and then use it as part of the log format.
For more information about supported grok patterns, see Defining Grok Patterns.
log4j
A customizable format generated by the Apache Log4j 1.2 logging utility. You can use the default format or specify a custom format. Use the Apache Log4j version 1.2 syntax to define the format of the log file.
You can also specify the action to take when the origin encounters an error when parsing a line. You can skip the line and optionally log an error. If you know that the unparsable information is part of a stack trace, you can have the origin include the unparsable information as a stack trace to the previous parsable line.

Configuring a Kafka Consumer

Configure a Kafka Consumer to read data from a Kafka cluster.

When you configure the Kafka Consumer, you configure the general properties, including Kafka and ZooKeeper details. Configure additional data format properties as needed. You can optionally add custom Kafka properties.
  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Stage Library Library version that you want to use.
    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline. Not valid for cluster pipelines.
  2. On the Kafka tab, configure the following properties:
    Kafka Property Description
    Broker URI Connection string for the Kafka broker. Use the following format: <host>:<port>.

    To ensure a connection, enter a comma-separated list of additional broker URIs.

    ZooKeeper URI Connection string for the ZooKeeper of the Kafka cluster. Use the following format: <host>:<port>.

    To use a ZooKeeper quorum, enter a comma-separated list.

    To use a ZooKeeper chroot path, add the path at the end of the list as follows:
    <host>:<port>, <host2>:<port2>, .../<chroot_path>
    Consumer Group Kafka consumer group that the Data Collector belongs to.
    Topic Kafka topic to read.
    Produce Single Record For each partition, generates a single record for records that include multiple objects.

    When not selected, the origin generates multiple records when a record includes multiple objects.

    Max Batch Size (records) Maximum number of records processed at one time. Honors values up to the Data Collector maximum batch size.

    Default is 1000. The Data Collector default is 1000.

    Batch Wait Time (ms) Number of milliseconds to wait before sending a partial or empty batch.
    Kafka Configuration

    Additional Kafka configuration properties to use. To add properties, click Add and define the Kafka property name and value.

    Use the property names and values as expected by Kafka.

    For information about enabling secure connections to Kafka, see Enabling Security.

  3. On the Data Format tab, configure the following property:
    Data Format Property Description
    Data Format Type of data to be read. Use one of the following options:
    • Avro
    • Binary
    • Datagram
    • Delimited
    • JSON
    • Log
    • Text
    • Protobuf
    • SDC Record
    • XML
  4. For Avro data, on the Data Format tab, configure the following properties:
    Avro Property Description
    Avro Schema Location Location of the Avro schema definition to use when processing data:
    • Message/Data Includes Schema - Use the schema in the message.
    • In Pipeline Configuration - Use the schema provided in the stage configuration.
    • Confluent Schema Registry - Retrieve the schema from the Confluent Schema Registry.

    Using a schema in the stage configuration or in the Confluent Schema Registry can improve performance.

    Avro Schema Avro schema definition used to process the data. Overrides any existing schema definitions associated with the data.

    You can optionally use the runtime:loadResource function to use a schema definition stored in a runtime resource file.

    Schema Registry URLs Confluent Schema Registry URLs used to look up the schema. To add a URL, click Add. Use the following format to enter the URL:
    http://<host name>:<port number>
    Lookup Schema By Method used to look up the schema in the Confluent Schema Registry:
    • Subject - Look up the specified Avro schema subject.
    • Schema ID - Look up the specified Avro schema ID.
    • Embedded Schema ID - Look up the Avro schema ID embedded in each message.
    Overrides any existing schema definitions associated with the message.
    Schema Subject Avro schema subject to look up in the Confluent Schema Registry.

    If the specified subject has multiple schema versions, the origin uses the latest schema version for that subject. To use an older version, find the corresponding schema ID, and then set the Look Up Schema By property to Schema ID.

    Schema ID Avro schema ID to look up in the Confluent Schema Registry.
  5. For binary data, on the Data Format tab and configure the following property:
    Binary Property Description
    Max Data Size (bytes) Maximum number of bytes in the message. Larger messages cannot be processed or written to error.
  6. For datagram data, on the Data Format tab, configure the following properties:
    Datagram Properties Description
    Data Format Message type:
    • collectd
    • NetFlow
    • syslog
    TypesDB File Path Path to a user-provided types.db file. Overrides the default types.db file.

    For collectd data only.

    Convert Hi-Res Time & Interval Converts the collectd high resolution time format interval and timestamp to UNIX time, in milliseconds.

    For collectd data only.

    Exclude Interval Excludes the interval field from output record.

    For collectd data only.

    Auth File Path to an optional authentication file. Use an authentication file to accept signed and encrypted data.

    For collectd data only.

    Charset Character encoding of the messages to be processed.
    Ignore Ctrl Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
  7. For delimited data, on the Data Format tab, configure the following properties:
    Delimited Property Description
    Delimiter Format Type Delimiter format type. Use one of the following options:
    • Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
    • RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
    • MS Excel CSV - Microsoft Excel comma-separated file.
    • MySQL CSV - MySQL comma separated file.
    • Tab-Separated Values - File that includes tab-separated values.
    • Custom - File that uses user-defined delimiter, escape, and quote characters.
    Header Line Indicates whether a file contains a header line, and whether to use the header line.
    Max Record Length (chars) Maximum length of a record in characters. Longer records are not read.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Delimiter Character Delimiter character for a custom delimiter format. Select one of the available options or use Other to enter a custom character.

    You can enter a Unicode control character using the format \uNNNN, where ‚ÄčN is a hexadecimal digit from the numbers 0-9 or the letters A-F. For example, enter \u0000 to use the null character as the delimiter or \u2028 to use a line separator as the delimiter.

    Default is the pipe character ( | ).

    Escape Character Escape character for a custom file type.
    Quote Character Quote character for a custom file type.
    Root Field Type Root field type to use:
    • List-Map - Generates an indexed list of data. Enables you to use standard functions to process data. Use for new pipelines.
    • List - Generates a record with an indexed list with a map for header and value. Requires the use of delimited data functions to process data. Use only to maintain pipelines created before 1.1.0.
    Lines to Skip Lines to skip before reading data.
    Parse NULLs Replaces the specified string constant with null values.
    NULL Constant String constant to replace with null values.
    Charset Character encoding of the files to be processed.
    Ignore Ctrl Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
  8. For JSON data, on the Data Format tab, configure the following properties:
    JSON Property Description
    JSON Content Type of JSON content. Use one of the following options:
    • Array of Objects
    • Multiple Objects
    Maximum Object Length (chars) Maximum number of characters in a JSON object.

    Longer objects are diverted to the pipeline for error handling.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Charset Character encoding of the files to be processed.
    Ignore Ctrl Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
  9. For log data, on the Data Format tab, configure the following properties:
    Log Property Description
    Log Format Format of the log files. Use one of the following options:
    • Common Log Format
    • Combined Log Format
    • Apache Error Log Format
    • Apache Access Log Custom Format
    • Regular Expression
    • Grok Pattern
    • Log4j
    Max Line Length Maximum length of a log line. The origin truncates longer lines.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Retain Original Line Determines how to treat the original log line. Select to include the original log line as a field in the resulting record.

    By default, the original line is discarded.

    Charset Character encoding of the files to be processed.
    Ignore Ctrl Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
    • When you select Apache Access Log Custom Format, use Apache log format strings to define the Custom Log Format.
    • When you select Regular Expression, enter the regular expression that describes the log format, and then map the fields that you want to include to each regular expression group.
    • When you select Grok Pattern, you can use the Grok Pattern Definition field to define custom grok patterns. You can define a pattern on each line.

      In the Grok Pattern field, enter the pattern to use to parse the log. You can use a predefined grok patterns or create a custom grok pattern using patterns defined in Grok Pattern Definition.

      For more information about defining grok patterns and supported grok patterns, see Defining Grok Patterns.

    • When you select Log4j, define the following properties:
      Log4j Property Description
      On Parse Error Determines how to handle information that cannot be parsed:
      • Skip and Log Error - Skips reading the line and logs a stage error.
      • Skip, No Error - Skips reading the line and does not log an error.
      • Include as Stack Trace - Includes information that cannot be parsed as a stack trace to the previously-read log line. The information is added to the message field for the last valid log line.
      Use Custom Log Format Allows you to define a custom log format.
      Custom Format Use log4j variables to define a custom log format.
  10. For protobuf data, on the Data Format tab, configure the following properties:
    Protobuf Property Description
    Protobuf Descriptor File Descriptor file (.desc) to use. The descriptor file must be in the Data Collector resources directory, $SDC_RESOURCES.

    For information about generating the descriptor file, see Protobuf Data Format Prerequisites. For more information about environment variables, see Data Collector Environment Configuration.

    Message Type The fully-qualified name for the message type to use when reading data.

    Use the following format: <package name>.<message type>.

    Use a message type defined in the descriptor file.
    Delimited Messages Indicates if a message might include more than one protobuf message.
  11. For text data, on the Data Format tab, configure the following properties:
    Text Property Description
    Max Line Length Maximum number of characters allowed for a line. Longer lines are truncated.

    Adds a boolean field to the record to indicate if it was truncated. The field name is Truncated.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Use Custom Delimiter Uses custom delimiters to define records instead of line breaks.
    Custom Delimiter One or more characters to use to define records.
    Include Custom Delimiter Includes delimiter characters in the record.
    Charset Character encoding of the files to be processed.
    Ignore Ctrl Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
  12. For XML data, on the Data Format tab, configure the following properties:
    XML Property Description
    Delimiter Element
    Delimiter to use to generate records. Omit a delimiter to treat the entire XML document as one record. Use one of the following:
    • An XML element directly under the root element.

      Use the XML element name without surrounding angle brackets ( < > ) . For example, msg instead of <msg>.

    • A simplified XPath expression that specifies the data to use.

      Use a simplified XPath expression to access data deeper in the XML document or data that requires a more complex access method.

      For more information about valid syntax, see Simplified XPath Syntax.

    Include Field XPaths Includes the XPath to each parsed XML element and XML attribute in field attributes. Also includes each namespace in an xmlns record header attribute.

    When not selected, this information is not included in the record. By default, the property is not selected.

    Note: Field attributes and record header attributes are written to destination systems automatically only when you use the SDC RPC data format in destinations. For more information about working with field attributes and record header attributes, and how to include them in records, see Field Attributes and Record Header Attributes.
    Namespaces Namespace prefix and URI to use when parsing the XML document. Define namespaces when the XML element being used includes a namespace prefix or when the XPath expression includes namespaces.

    For information about using namespaces with an XML element, see Using XML Elements with Namespaces.

    For information about using namespaces with XPath expressions, see Using XPath Expressions with Namespaces.

    Use the Add icon to add additional namespaces.

    Output Field Attributes Includes XML attributes and namespace declarations in the record as field attributes. When not selected, XML attributes and namespace declarations are included in the record as fields.
    Note: Field attributes are automatically included in records written to destination systems only when you use the SDC RPC data format in the destination. For more information about working with field attributes, see Field Attributes.

    By default, the property is not selected.

    Max Record Length (chars)

    The maximum number of characters in a record. Longer records are diverted to the pipeline for error handling.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Charset Character encoding of the files to be processed.
    Ignore Ctrl Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.