MQTT Publisher

Supported pipeline types:
  • Data Collector

  • Data Collector Edge

The MQTT Publisher destination publishes messages to a topic on an MQTT broker. The destination functions as an MQTT client that publishes messages, writing each record as a message.

When you configure the destination, you specify the information needed to connect to the MQTT broker. You must define connection credentials when the MQTT broker requires a user name and password. You can also configure SSL/TLS properties, including default transport protocols and cipher suites.

You specify the topic on the MQTT broker that the destination delivers the message to.

You also configure the quality of service level and the persistence mechanism that the destination uses to enable reliable messaging.

Edge Pipeline Prerequisite

In Data Collector Edge pipelines, MQTT stages require using an intermediary MQTT broker.

For example, an edge sending pipeline uses an MQTT Publisher destination to write to an MQTT broker. The MQTT broker temporarily stores the data until the MQTT Subscriber origin in the Data Collector receiving pipeline reads the data.

Topic

The MQTT Publisher destination writes messages to a single topic on the MQTT broker. Any MQTT client subscribed to that topic receives the messages. A topic is a string that the broker uses to filter messages for each connected client.

When you configure the destination, you define the topic name. You can include multiple topic levels in a topic. For example, the following topic has three topic levels:
sales/US/NorthernRegion

You can use the StreamSets expression language to define the topic name. You cannot use MQTT wildcards in the topic name used by the MQTT Publisher destination.

For more information, see the HiveMQ documentation on MQTT topics.

Data Formats

The MQTT Publisher destination writes messages to an MQTT broker based on the data format that you select.

The MQTT Publisher destination processes data formats as follows:

Binary
The stage writes binary data to a single field in the record.
JSON
The destination writes records as JSON data. You can use one of the following formats:
  • Array - Each file includes a single array. In the array, each element is a JSON representation of each record.
  • Multiple objects - Each file includes multiple JSON objects. Each object is a JSON representation of a record.
SDC Record
The destination writes records in the SDC Record data format.
Text
The destination writes data from a single text field to the destination system. When you configure the stage, you select the field to use.
You can configure the characters to use as record separators. By default, the destination uses a UNIX-style line ending (\n) to separate records.
When a record does not contain the selected text field, the destination can report the missing field as an error or to ignore the missing field. By default, the destination reports an error.
When configured to ignore a missing text field, the destination can discard the record or write the record separator characters to create an empty line for the record. By default, the destination discards the record.

Configuring an MQTT Publisher Destination

Configure an MQTT Publisher destination to write messages to an MQTT broker.

In Data Collector Edge pipelines, the MQTT Publisher destination requires an intermediary MQTT broker.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions.

    Records that do not meet all preconditions are processed based on the error handling configured for the stage.

    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline. Not valid for cluster pipelines.
  2. On the MQTT tab, configure the following properties:
    MQTT Property Description
    Broker URL MQTT Broker URL. Enter in the following format:
    <tcp | ssl>://<hostname>:<port>

    Use ssl for secure connections to the broker.

    For example:
    tcp://localhost:1883
    Client ID MQTT Client ID. The ID must be unique across all clients connecting to the same broker.
    You can define an expression that evaluates to the client ID. For example, enter the following expression to use the unique pipeline ID as the client ID:
    ${pipeline:id()}

    If a pipeline includes multiple MQTT stages and you want to use the unique pipeline ID as the client ID for both stages, prefix the client ID with a string like this:

    sub-${pipeline:id()} and pub-${pipeline:id()} 
    Otherwise, all stages will use the same client ID. This can cause problems, such as messages disappearing.
    Topic Topic to publish to. Using simple or bulk edit mode, click the Add icon to read from additional topics.
    Quality of Service Determines the quality of service level used to guarantee message delivery:
    • At Most Once (0)
    • At Least Once (1)
    • Exactly Once (2)

    For more information, see the HiveMQ documentation on quality of service levels.

    Client Persistence Mechanism Determines the persistence mechanism that the destination uses to guarantee message delivery when the quality of service level is at least once or exactly once. Select one of the following options:
    • Memory - Store messages in memory on the Data Collector machine until the delivery of the message is complete.
    • File - Store messages in a local file on the Data Collector machine until the delivery of the message is complete.

    Not used when the quality of service level is at most once.

    For more information, see the HiveMQ documentation on client persistence.

    Client Persistence Data Directory Local directory on the Data Collector machine where the destination temporarily stores messages in a file when you configure file persistence.

    The user who starts Data Collector must have read and write access to this directory.

    Keep Alive Interval (secs) Maximum time in seconds to allow the connection to the MQTT broker to remain idle. After the destination publishes no messages for this amount of time, the connection is closed. The destination must reconnect to the MQTT broker.

    Default is 60 seconds.

    Use Credentials Enables entering credentials on the Credentials tab. Use when the MQTT broker requires a user name and password.
    Retain the Message Determines whether or not the MQTT broker retains the message last published by the destination when no MQTT client is subscribed to listen to the topic.

    When selected, the MQTT broker retains the last message published by the destination. Any messages published earlier are lost. When cleared, all messages published by the destination are lost.

    For more information about MQTT retained messages, see http://www.hivemq.com/blog/mqtt-essentials-part-8-retained-messages.

  3. On the Credentials tab, enter the MQTT credentials to use if you enabled credentials.
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.
  4. To use SSL/TLS, on the TLS tab, configure the following properties:

    In Data Collector Edge pipelines, only the Use TLS and Truststore File properties are valid. After enabling TLS, enter an absolute path for the truststore file that uses the PEM format. In Data Collector Edge pipelines, the MQTT Publisher destination always uses the default protocol and cipher suites. It ignores all other TLS properties.

    TLS Property Description
    Use TLS

    Enables the use of TLS.

    Truststore File The path to the truststore file. Enter an absolute path to the file or a path relative to the Data Collector resources directory: $SDC_RESOURCES.

    For more information about environment variables, see Data Collector Environment Configuration.

    By default, no truststore is used.

    In Data Collector Edge pipelines, enter an absolute path to the file that uses the PEM format.

    Truststore Type Type of truststore to use. Use one of the following types:
    • Java Keystore File (JKS)
    • PKCS #12 (p12 file)

    Default is Java Keystore File (JKS).

    Truststore Password Password to the truststore file. A password is optional, but recommended.
    Tip: To secure sensitive information such as passwords, you can use runtime resources or credential stores.
    Truststore Trust Algorithm The algorithm used to manage the truststore.

    Default is SunX509.

    Use Default Protocols Determines the transport layer security (TLS) protocol to use. The default protocol is TLSv1.2. To use a different protocol, clear this option.
    Transport Protocols The TLS protocols to use. To use a protocol other than the default TLSv1.2, click the Add icon and enter the protocol name. You can use simple or bulk edit mode to add protocols.
    Note: Older protocols are not as secure as TLSv1.2.
    Use Default Cipher Suites Determines the default cipher suite to use when performing the SSL/TLS handshake. To use a different cipher suite, clear this option.
    Cipher Suites Cipher suites to use. To use a cipher suite that is not a part of the default set, click the Add icon and enter the name of the cipher suite. You can use simple or bulk edit mode to add cipher suites.

    Enter the Java Secure Socket Extension (JSSE) name for the additional cipher suites that you want to use.

  5. On the Data Format tab, configure the following property:
    Data Format Property Description
    Data Format Data format for messages. Use one of the following data formats:
  6. For binary data, on the Data Format tab, configure the following property:
    Binary Property Description
    Binary Field Path Field that contains the binary data.
  7. For JSON data, on the Data Format tab, configure the following property:
    JSON Property Description
    JSON Content Method to write JSON data:
    • JSON Array of Objects - Each file includes a single array. In the array, each element is a JSON representation of each record.
    • Multiple JSON Objects - Each file includes multiple JSON objects. Each object is a JSON representation of a record.
    Charset Character set to use when writing data.
  8. For text data, on the Data Format tab, configure the following properties:
    Text Property Description
    Text Field Path Field that contains the text data to be written. All data must be incorporated into the specified field.
    Record Separator Characters to use to separate records. Use any valid Java string literal. For example, when writing to Windows, you might use \r\n to separate records.

    By default, the destination uses \n.

    On Missing Field When a record does not include the text field, determines whether the destination reports the missing field as an error or ignores the missing field.
    Insert Record Separator if No Text When configured to ignore a missing text field, inserts the configured record separator string to create an empty line.

    When not selected, discards records without the text field.

    Charset Character set to use when writing data.