Pulsar Producer

The Pulsar Producer destination writes data to topics in an Apache Pulsar cluster.

The Pulsar Producer destination attaches to a topic and publishes messages to a Pulsar broker for processing.

When you configure a Pulsar Producer destination, you define the URL to connect to Pulsar. You also define the topics to publish messages to. You can configure the destination to publish messages to a single topic or to multiple topics if you include an expression in the configured topic name.

You can configure the destination to use Pulsar security features. You can also configure advanced properties as needed, such as the partition or compression type to use when publishing messages or whether the destination publishes messages asynchronously or synchronously.

For more information about Pulsar topics and producers, see the Apache Pulsar documentation.

Enabling Security

If the Pulsar cluster uses security features, you must configure the Pulsar Producer destination to use the same security features to connect to Pulsar.

A Pulsar cluster can use the following security features:

TLS transport encryption
When configured for TLS transport encryption, the Pulsar cluster uses TLS to encrypt all traffic between the Pulsar server and clients. The Pulsar server uses a key and certificate which clients use to verify the server's identity.
Mutual TLS authentication
When configured for TLS transport encryption, the Pulsar cluster can additionally be configured to use mutual TLS authentication. With mutual authentication, clients also use keys and certificates which the server uses to verify the client's identity.
  1. On the Pulsar tab of the stage, set the Pulsar URL property to the secure URL for the broker service.
    Use the following format for the URL:
    pulsar+ssl://<host name>:<broker service TLS port>/
    For example:
    pulsar+ssl://pulsar.us-west.example.com:6651/
  2. On the Security tab of the stage, select Enable TLS.
  3. Store the PEM file that contains the certificate authority (CA) that signed the Pulsar cluster certificate in the Data Collector resources directory, $SDC_RESOURCES.
    For information about creating certificates for the Pulsar cluster, see the Pulsar documentation.
  4. On the Security tab of the stage, enter the name of the CA certificate PEM file in the CA Certificate PEM property.
  5. If the Pulsar cluster is also configured for mutual TLS authentication, select Enable Mutual Authentication on the Security tab of the stage.
  6. Create the client certificate and client private key PEM files for the stage to use.
    For information about creating client certificates for Pulsar, see the Pulsar documentation.
  7. Store the client certificate and client private key PEM files created for the stage in the Data Collector resources directory, $SDC_RESOURCES.
  8. On the Security tab of the stage, enter the name of the client files in the Client Certificate PEM and Client Key PEM properties.

Data Formats

The Pulsar Producer destination writes data to Pulsar based on the data format that you select. You can use the following data formats:

Binary
The destination writes binary data from a single field in the record.
Delimited
The destination writes records as delimited data. When you use this data format, the root field must be list or list-map.
You can use the following delimited format types:
  • Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
  • RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
  • MS Excel CSV - Microsoft Excel comma-separated file.
  • MySQL CSV - MySQL comma-separated file.
  • PostgreSQL CSV - PostgreSQL comma-separated file.
  • PostgreSQL Text - PostgreSQL text file.
  • Tab-Separated Values - File that includes tab-separated values.
  • Custom - File that uses user-defined delimiter, escape, and quote characters.
JSON
The destination writes records as JSON data. You can use one of the following formats:
  • Array - Each file includes a single array. In the array, each element is a JSON representation of each record.
  • Multiple objects - Each file includes multiple JSON objects. Each object is a JSON representation of a record.
Protobuf
Writes one record in a message. Uses the user-defined message type and the definition of the message type in the descriptor file to generate the message.
For information about generating the descriptor file, see Protobuf Data Format Prerequisites.
SDC Record
The destination writes records in the SDC Record data format.
Text
The destination writes data from a single text field to the destination system. When you configure the stage, you select the field to use. When necessary, merge record data into the field earlier in the pipeline.
You can configure the characters to use as record separators. By default, the destination uses a UNIX-style line ending (\n) to separate records.
When a record does not contain the selected text field, you can configure the destination to report the missing field as an error or to ignore the missing field. By default, the destination reports an error.
When configured to ignore a missing text field, you can configure the destination to discard the record or to write the record separator characters to create an empty line for the record. By default, the destination discards the record.
XML
The destination creates a valid XML document for each record. The destination requires the record to have a single root field that contains the rest of the record data. For details and suggestions for how to accomplish this, see Record Structure Requirement.

The destination can include indentation to produce human-readable documents. It can also validate that the generated XML conforms to the specified schema definition. Records with invalid schemas are handled based on the error handling configured for the destination.

Configuring a Pulsar Producer

Configure a Pulsar Producer destination to write data to Pulsar topics.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions.

    Records that do not meet all preconditions are processed based on the error handling configured for the stage.

    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the Pulsar tab, configure the following properties:
    Pulsar Property Description
    Pulsar URL URL to the Pulsar web service or broker service.
    If the Pulsar cluster is not enabled for TLS, enter either the web service or broker service URL in the following format:
    • Web service URL - http://<host name>:<web service port>. For example: http://pulsar.us-west.example.com:8080.
    • Broker service URL - pulsar://<host name>:<broker service port>. For example: pulsar://pulsar.us-west.example.com:6650
    If the Pulsar cluster is enabled for TLS, enter the secure broker service URL in the following format:
    pulsar+ssl://<host name>:<broker service TLS port>

    For example: pulsar+ssl://pulsar.us-west.example.com:6651

    Topic Name of the topic to publish messages to. Enter the topic name in the following format:
    {persistent|non-persistent}://<tenant>/<namespace>/<topic name>
    For example, to publish to a persistent topic named my-sdc-topic in the my-namespace namespace within the my-tenant tenant, enter the following as the topic name:
    persistent://my-tenant/my-namespace/my-sdc-topic
    If you enter a topic name only, then Pulsar uses the default persistent://public/default/ location. For example, to publish to a persistent topic belonging to the public tenant in the default namespace, simply enter the topic name as follows:
    my-sdc-topic

    If the specified topic does not exist, Pulsar creates the topic when the pipeline starts.

    You can use expressions to define the topic name. For example, if the my-topic field in the record contains the topic name, enter the following as the topic name:
    persistent://my-tenant/my-namespace/${record:value("/my-topic")}
    Keep Alive Interval (ms) Number of milliseconds to allow the connection to Pulsar to remain idle. After the destination publishes no messages for this amount of time, the connection is closed. The destination must reconnect to Pulsar.

    Default is 30,000 milliseconds.

    Operation Timeout (ms) Number of milliseconds to allow the Pulsar Producer-create operation to complete before marking the operation as failed.

    Default is 30,000 milliseconds.

  3. To enable security, click the Security tab and configure the following properties:
    Security Property Description
    Enable TLS Enables the stage to connect securely to Pulsar through TLS encryption.
    Enable Mutual Authentication Enables the stage to use mutual TLS authentication to connect securely to Pulsar.
    CA Certificate PEM Path to the PEM file containing the certificate authority (CA) that signed the Pulsar cluster certificate.

    Enter an absolute path to the file or a path relative to the Data Collector resources directory: $SDC_RESOURCES.

    Client Certificate PEM If mutual authentication is enabled, path to the PEM file containing the client certificate created for Data Collector.

    Enter an absolute path to the file or a path relative to the Data Collector resources directory: $SDC_RESOURCES.

    Client Key PEM If mutual authentication is enabled, path to the PEM file containing the client private key created for Data Collector.

    Enter an absolute path to the file or a path relative to the Data Collector resources directory: $SDC_RESOURCES.

  4. On the Advanced tab, optionally configure advanced properties.

    The defaults for these properties should work in most cases:

    Advanced Property Description
    Partition Type Partition type to use when publishing messages to a topic:
    • Single
    • Round Robin

    Default is Single.

    Hashing Scheme Hashing scheme to use when selecting which partition to write messages to.
    Message Key Message key used to compute the hash for partitioning. Enter the key or enter an expression that evaluates to the key.
    Compression Type Type of compression to apply to the published messages:
    • None
    • LZ4
    • ZLIB

    Default is None.

    Async Send Enables the destination to publish messages asynchronously. Clear to publish messages synchronously.

    For more information about the available send modes, see the Apache Pulsar documentation.

    Default is enabled.

    Max Pending Messages When sending messages asynchronously, the maximum number of messages that can wait in the queue for an acknowledgement from the Pulsar broker.

    Default is 1,000.

    Enable Batching When sending messages asynchronously, enables sending a batch of messages in a single request. Clear to send a single message in each request.

    Default is enabled.

    Max Batch Size (messages) When sending messages asynchronously and batching is enabled, the maximum number of messages to include in a batch.

    Default is 2,000.

    Batch Max Publish Latency (ms) When sending messages asynchronously and batching is enabled, the maximum number of milliseconds to wait before sending the next batch.

    Default is 1,000 milliseconds.

    Pulsar Configuration Properties

    Additional Pulsar configuration properties to use. Using simple or bulk edit mode, click the Add icon to add properties. Define the Pulsar property name and value.

    Use the property names and values as expected by Pulsar.

  5. On the Data Format tab, configure the following property:
    Data Format Property Description
    Data Format Type of data to be read. Use one of the following options:
    • Binary
    • Delimited
    • JSON
    • Protobuf
    • SDC Record
    • Text
    • XML
  6. For binary data, on the Data Format tab, configure the following property:
    Binary Property Description
    Binary Field Path Field that contains the binary data.
  7. For delimited data, on the Data Format tab, configure the following properties:
    Delimited Property Description
    Delimiter Format Format for delimited data:
    • Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
    • RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
    • MS Excel CSV - Microsoft Excel comma-separated file.
    • MySQL CSV - MySQL comma-separated file.
    • PostgreSQL CSV - PostgreSQL comma-separated file.
    • PostgreSQL Text - PostgreSQL text file.
    • Tab-Separated Values - File that includes tab-separated values.
    • Custom - File that uses user-defined delimiter, escape, and quote characters.
    Header Line Indicates whether to create a header line.
    Replace New Line Characters Replaces new line characters with the configured string.

    Recommended when writing data as a single line of text.

    New Line Character Replacement String to replace each new line character. For example, enter a space to replace each new line character with a space.

    Leave empty to remove the new line characters.

    Delimiter Character Delimiter character for a custom delimiter format. Select one of the available options or use Other to enter a custom character.

    You can enter a Unicode control character using the format \uNNNN, where ​N is a hexadecimal digit from the numbers 0-9 or the letters A-F. For example, enter \u0000 to use the null character as the delimiter or \u2028 to use a line separator as the delimiter.

    Default is the pipe character ( | ).

    Escape Character Escape character for a custom delimiter format. Select one of the available options or use Other to enter a custom character.

    Default is the backslash character ( \ ).

    Quote Character Quote character for a custom delimiter format. Select one of the available options or use Other to enter a custom character.

    Default is the quotation mark character ( " ).

    Charset Character set to use when writing data.
  8. For JSON data, on the Data Format tab, configure the following property:
    JSON Property Description
    JSON Content Determines how JSON data is written:
    • JSON Array of Objects - Each file includes a single array. In the array, each element is a JSON representation of each record.
    • Multiple JSON Objects - Each file includes multiple JSON objects. Each object is a JSON representation of a record.
    Charset Character set to use when writing data.
  9. For protobuf data, on the Data Format tab, configure the following properties:
    Protobuf Property Description
    Protobuf Descriptor File Descriptor file (.desc) to use. The descriptor file must be in the Data Collector resources directory, $SDC_RESOURCES.

    For more information about environment variables, see Data Collector Environment Configuration. For information about generating the descriptor file, see Protobuf Data Format Prerequisites.

    Message Type The fully-qualified name for the message type to use when writing data.

    Use the following format: <package name>.<message type>.

    Use a message type defined in the descriptor file.
  10. For text data, on the Data Format tab, configure the following properties:
    Text Property Description
    Text Field Path Field that contains the text data to be written. All data must be incorporated into the specified field.
    Record Separator Characters to use to separate records. Use any valid Java string literal. For example, when writing to Windows, you might use \r\n to separate records.

    By default, the destination uses \n.

    On Missing Field When a record does not include the text field, determines whether the destination reports the missing field as an error or ignores the missing field.
    Insert Record Separator if No Text When configured to ignore a missing text field, inserts the configured record separator string to create an empty line.

    When not selected, discards records without the text field.

    Charset Character set to use when writing data.
  11. For XML data, on the Data Format tab, configure the following properties:
    XML Property Description
    Pretty Format Adds indentation to make the resulting XML document easier to read. Increases the record size accordingly.
    Validate Schema Validates that the generated XML conforms to the specified schema definition. Records with invalid schemas are handled based on the error handling configured for the destination.
    Important: Regardless of whether you validate the XML schema, the destination requires the record in a specific format. For more information, see Record Structure Requirement.
    XML Schema The XML schema to use to validate records.