Kafka Consumer
Supported pipeline types:
|
When you configure a Kafka Consumer, you configure the consumer group name, topic, and ZooKeeper connection information.
You can configure the Kafka Consumer to work with the Confluent Schema Registry. The Confluent Schema Registry is a distributed storage layer for Avro schemas which uses Kafka as its underlying storage mechanism.
You can add additional Kafka configuration properties as needed. You can configure the origin to use Kafka security features. You can also configure the origin to capture Kafka message keys and store them in the record.
Kafka Consumer includes record header attributes that enable you to use information about the record in pipeline processing.
Offset Management
The first time that a Kafka Consumer origin identified by a consumer group receives messages from a topic, an offset entry is created for that consumer group and topic. The offset entry is created in ZooKeeper or Kafka, depending on your Kafka version and broker configuration.
- No stored offset
- When the consumer group and topic combination does not have a previously stored offset, the Kafka Consumer origin uses the Auto Offset Reset property to determine the first message to read. You can set the origin to read messages in the topic starting from the earliest message, latest message, or a particular timestamp. The default setting is the earliest message, which results in the origin reading all existing messages in the topic.
- Previously stored offset
- When the consumer group and topic combination has a previously stored offset, the Kafka Consumer origin receives messages starting with the next unprocessed message after the stored offset. For example, when you stop and restart the pipeline, processing resumes from the last committed offset.
Additional Kafka Properties
You can add custom Kafka configuration properties to the Kafka Consumer.
When you add the Kafka configuration property, enter the exact property name and the value. The Kafka Consumer does not validate the property names or values.
- auto.commit.enable
- group.id
- zookeeper.connect
Record Header Attributes
The Kafka Consumer origin creates record header attributes that include information about the originating file for the record. When the origin processes Avro data, it includes the Avro schema in an avroSchema record header attribute.
You can use the record:attribute
or
record:attributeOrDefault
functions to access the information
in the attributes. For more information about working with record header attributes,
see Working with Header Attributes.
- avroSchema - When processing Avro data, provides the Avro schema.
- Kafka timestamp - The timestamp from the header of the Kafka message. Created if the Include Timestamps property is enabled.
- Kafka timestamp type - The timestamp type from the header of the Kafka message. Created if the Include Timestamps property is enabled.
- offset - The offset where the record originated.
- partition - The partition where the record originated.
- topic - The topic where the record originated.
Kafka Security
You can configure the Kafka Consumer origin to connect securely to Kafka through SSL/TLS, Kerberos, or both. For more information about the methods and details on how to configure each method, see Security in Kafka Stages.
Data Formats
The Kafka Consumer origin processes data differently based on the data format. Kafka Consumer can process the following types of data:
- Avro
- Generates a record for every message. Includes a
precision
andscale
field attribute for each Decimal field. - Binary
- Generates a record with a single byte array field at the root of the record.
- Datagram
- Generates a record for every message. The origin can process collectd messages, NetFlow 5 and NetFlow 9 messages, and the following types of syslog messages:
- Delimited
- Generates a record for each delimited line. You can use the
following delimited format types:
- Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
- RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
- MS Excel CSV - Microsoft Excel comma-separated file.
- MySQL CSV - MySQL comma-separated file.
- Tab-Separated Values - File that includes tab-separated values.
- PostgreSQL CSV - PostgreSQL comma-separated file.
- PostgreSQL Text - PostgreSQL text file.
- Custom - File that uses user-defined delimiter, escape, and quote characters.
- Multi Character Delimited - File that uses multiple user-defined characters to delimit fields and lines, and single user-defined escape and quote characters.
- JSON
- Generates a record for each JSON object. You can process JSON files that include multiple JSON objects or a single JSON array.
- Log
- Generates a record for every log line.
- Protobuf
- Generates a record for every protobuf message. By default, the origin assumes messages contain multiple protobuf messages.
- SDC Record
- Generates a record for every record. Use to process records generated by a Data Collector pipeline using the SDC Record data format.
- Text
- Generates a record for each line of text or for each section of text based on a custom delimiter.
- XML
- Generates records based on a user-defined delimiter element. Use an XML element directly under the root element or define a simplified XPath expression. If you do not define a delimiter element, the origin treats the XML file as a single record.
Configuring a Kafka Consumer Origin
Configure a Kafka Consumer origin to read messages from a Kafka cluster.