Pulsar Consumer
Supported pipeline types:
|
The Pulsar Consumer origin subscribes to Pulsar topics, processes incoming messages, and then sends acknowledgements back to Pulsar as the messages are read.
When you configure a Pulsar Consumer origin, you define the URL to connect to Pulsar. You also define the subscription name and consumer name to use for the origin and the topics to subscribe to. When the pipeline starts, Pulsar creates a consumer with the specified consumer name. If the subscription and topics do not exist, Pulsar also creates the subscription and topics.
You can configure the origin to use Pulsar security features. You can also configure advanced properties as needed, such as the type of subscription to create or the initial offset to begin reading from.
The Pulsar Consumer origin can include record header attributes that enable you to use information about the record in pipeline processing.
For more information about Pulsar topics, subscriptions, and consumers, see the Apache Pulsar documentation.
Topics Selector
The Pulsar Consumer origin can subscribe to a single topic or to multiple topics. In either case, the origin uses one thread for the read.
To define the topics that the origin subscribes to, configure the Topics Selector property on the Pulsar tab.
The origin provides the following methods of subscribing to topics:
- Single Topic
- Subscribes to a single topic. Use the following format to specify the topic
name:
{persistent|non-persistent}://<tenant>/<namespace>/<topic name>
- Topics List
- Subscribes to multiple topics defined in a list of topic names. Use the Add icon to add additional topic names. Define each topic name using the same format required for a single topic.
- Topics Pattern
- Subscribes to multiple topics defined by a naming pattern. Use the following
format to specify the
pattern:
{persistent|non-persistent}://<tenant>/<namespace>/<regular expression>
For more information about defining topic names and about subscribing to multiple topics, see the Apache Pulsar documentation.
Parallel Reads of a Topic
The Pulsar Consumer origin can subscribe to a single topic or to multiple topics. In either case, the origin uses one thread to read from the topic or topics. To perform parallel reads from the same topic, you can configure multiple pipelines with a Pulsar Consumer origin that subscribe to the same topic.
To configure multiple origins to subscribe to the same topic, you'll need to determine whether the origins use a single shared subscription, multiple exclusive subscriptions, or multiple failover subscriptions. A shared subscription allows multiple consumers to attach to the same subscription. An exclusive subscription allows only a single consumer to attach to the subscription. A failover subscription allows multiple consumers to attach to the same subscription, but only one consumer receives messages at a time. If the initial master consumer disconnects, messages are delivered to the next consumer.
For more information about the Pulsar subscription modes, see Subscription Modes in the Pulsar documentation.
You can configure multiple Pulsar Consumer origins to subscribe to the same Pulsar topic in the following ways:
- Single shared subscription
- To configure multiple Pulsar Consumer origins to subscribe to the same topic
using the same shared subscription, configure the following origin
properties:
- Subscription Name - On the Pulsar tab, configure each origin to use the same subscription name.
- Topic - On the Pulsar tab, configure each origin to use the same topic name.
- Subscription Type - On the Advanced tab, configure each origin to use the shared subscription type.
- Multiple exclusive subscriptions
- To configure multiple Pulsar Consumer origins to subscribe to the same topic
using multiple exclusive subscriptions, configure the following origin
properties:
- Subscription Name - On the Pulsar tab, configure each origin to use a unique subscription name.
- Topic - On the Pulsar tab, configure each origin to use the same topic name.
- Subscription Type - On the Advanced tab, configure each origin to use the exclusive subscription type.
- Multiple failover subscriptions
- To configure multiple Pulsar Consumer origins to subscribe to the same topic
using multiple failover subscriptions, configure the following origin
properties:
- Subscription Name - On the Pulsar tab, configure each pair of origins to
use a unique subscription name.
For example, configure the origins in pipelineA and pipelineB to use subscription1. The origin in pipelineA serves as the master consumer, and the origin in pipelineB is the next in line to receive messages if the pipelineA origin disconnects. Then, configure the origins in pipelineC and pipelineD to use subscription2.
- Topic - On the Pulsar tab, configure each origin to use the same topic name.
- Subscription Type - On the Advanced tab, configure each origin to use the failover subscription type.
- Subscription Name - On the Pulsar tab, configure each pair of origins to
use a unique subscription name.
Enabling Security
If the Pulsar cluster uses security features, you must configure the Pulsar Consumer origin to use the same security features to connect to Pulsar.
A Pulsar cluster can use the following security features:
- TLS transport encryption
- When configured for TLS transport encryption, the Pulsar cluster uses TLS to encrypt all traffic between the Pulsar server and clients. The Pulsar server uses a key and certificate which clients use to verify the server's identity.
- Mutual TLS authentication
- When configured for TLS transport encryption, the Pulsar cluster can additionally be configured to use mutual TLS authentication. With mutual authentication, clients also use keys and certificates which the server uses to verify the client's identity.
Offset Management
The first time that a Pulsar Consumer origin receives messages from a topic, an offset entry is created for that subscription and topic. The offset entry is created and maintained by Pulsar.
- No stored offset
- When the subscription and topic combination does not have a previously stored offset, the Pulsar Consumer origin begins receiving messages based on the value of the initial offset defined on the Advanced tab of the origin.
- Previously stored offset
- When the subscription and topic combination has a previously stored offset, the Pulsar Consumer origin receives messages starting with the next unprocessed message after the stored offset. For example, when you stop and restart the pipeline, processing resumes from the last committed offset.
Record Header Attributes
The Pulsar Consumer origin includes any information in the properties field of the message - outside of the payload field - in records as record header attributes.
You can use the record:attribute
or
record:attributeOrDefault
functions to access the information
in the attributes. For more information about working with record header attributes,
see Working with Header Attributes.
Data Formats
The Pulsar Consumer origin processes data differently based on the data format. Pulsar Consumer can process the following types of data:
- Binary
- Generates a record with a single byte array field at the root of the record.
- Datagram
- Generates a record for every message. The origin can process collectd messages, NetFlow 5 and NetFlow 9 messages, and the following types of syslog messages:
- Delimited
- Generates a record for each delimited line. You can use the
following delimited format types:
- Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
- RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
- MS Excel CSV - Microsoft Excel comma-separated file.
- MySQL CSV - MySQL comma-separated file.
- Tab-Separated Values - File that includes tab-separated values.
- PostgreSQL CSV - PostgreSQL comma-separated file.
- PostgreSQL Text - PostgreSQL text file.
- Custom - File that uses user-defined delimiter, escape, and quote characters.
- Multi Character Delimited - File that uses multiple user-defined characters to delimit fields and lines, and single user-defined escape and quote characters.
- JSON
- Generates a record for each JSON object. You can process JSON files that include multiple JSON objects or a single JSON array.
- Log
- Generates a record for every log line.
- Protobuf
- Generates a record for every protobuf message. By default, the origin assumes messages contain multiple protobuf messages.
- SDC Record
- Generates a record for every record. Use to process records generated by a Data Collector pipeline using the SDC Record data format.
- Text
- Generates a record for each line of text or for each section of text based on a custom delimiter.
- XML
- Generates records based on a user-defined delimiter element. Use an XML element directly under the root element or define a simplified XPath expression. If you do not define a delimiter element, the origin treats the XML file as a single record.
Configuring a Pulsar Consumer Origin
Configure a Pulsar Consumer origin to read messages from Apache Pulsar.