Configuring a Pipeline

Configure a pipeline to define the stream of data. After you configure the pipeline, you can start the pipeline.

A pipeline can include the following stages:
  • A single origin stage
  • Multiple processor stages
  • Multiple destination stages
  • Multiple executor stages
  1. From the Home page or Getting Started page, click Create New Pipeline.
    Tip: To get to the Home page, click the Home icon.
  2. In the New Pipeline window, enter a pipeline title and optional description, and select the type of pipeline to create:
    • Data Collector Pipeline - Select to design a standalone or cluster execution mode pipeline that runs on Data Collector.
    • Data Collector Edge Pipeline - Select to design an edge execution mode pipeline that runs on Data Collector Edge.
    • Microservice Pipeline - Select to design a microservice pipeline based on the sample pipeline or to review the sample pipeline.
  3. Click Save.
    The pipeline canvas displays the pipeline title, the generated pipeline ID, and an error icon. The error icon indicates that you need to configure error handling for the pipeline. The Properties panel displays the pipeline properties.
  4. In the Properties panel, on the General tab, configure the following properties:
    Pipeline Property Description
    Title Title of the pipeline.

    Data Collector uses the alphanumeric characters entered for the pipeline title as a prefix for the generated pipeline ID. For example, if you enter “My Pipeline *&%&^^ 123” as the pipeline title, then the pipeline ID has the following value: MyPipeline123tad9f592-5f02-4695-bb10-127b2e41561c.

    You can edit the pipeline title. However, because the pipeline ID is used to identify the pipeline, any changes to the pipeline title are not reflected in the pipeline ID.

    Description Optional description of the pipeline.
    Labels Optional labels to assign to the pipeline.

    Use labels to group similar pipelines. For example, you might want to group pipelines by database schema or by the test or production environment.

    You can use nested labels to create a hierarchy of pipeline groupings. Enter nested labels using the following format:
    <label1>/<label2>/<label3>
    For example, you might want to group pipelines in the test environment by the origin system. You add the labels Test/HDFS and Test/Elasticsearch to the appropriate pipelines.
    Execution Mode Execution mode of the pipeline:
    • Standalone - A single Data Collector process runs the pipeline.
    • Cluster Batch - Data Collector spawns additional workers as needed to process data in HDFS or MapR. Processes all available data and then stops the pipeline.
    • Cluster Yarn Streaming - Data Collector spawns additional workers as needed to process data, by default. You can limit the number of workers with the Worker Count cluster property. And you can use the Extra Spark Configuration property to pass Spark configurations to the spark-submit script.

      Use to stream data from a Kafka or MapR cluster that uses Spark Streaming on YARN.

    • Cluster Mesos Streaming - Data Collector spawns additional workers as needed to process data. Use to stream data from a Kafka cluster that uses Spark Streaming on Mesos.
    • Edge - A single Data Collector Edge (SDC Edge) process runs the pipeline on an edge device.
    Data Collector Edge URL

    For edge pipelines, the URL to the SDC Edge installation. Used when you directly publish and manage the pipeline on an SDC Edge that is not registered with Control Hub. You can leave the default value when you publish the pipeline to Control Hub, add the pipeline to a job, and start the job on a registered SDC Edge. For more information, see Publishing a Single Pipeline and Manage Pipelines on SDC Edge.

    Not available in Data Collector pipelines.

    Delivery Guarantee Determines how Data Collector handles data after an unexpected event causes the pipeline to stop running:
    • At Least Once - Ensures all data is processed and written to the destination. Might result in duplicate rows.
    • At Most Once - Ensures that data is not reprocessed to prevent writing duplicate data to the destination. Might result in missing rows.

    Default is At Least Once.

    Test Origin A virtual origin to provide test data for data preview. Only used when the Test Origin option is selected in the Preview Configuration dialog box.

    To enable the use of a test origin, select the origin to access the test data, then configure the origin properties on the Test Origin tab. You can use any available origin.

    Default is the Dev Raw Data Source origin.

    Start Event Determines how the start event is handled. Select one of the following options:
    • Discard - Use when you don't want to use the event.
    • An executor - To use the event to trigger a task, select the executor that you want to use. For more information about the executors, see Executors.
    • Write to Another Pipeline - Use to pass the event to another pipeline for more complex processing.

    Use in standalone Data Collector pipelines only.

    For more information about pipeline events, see Pipeline Event Generation.

    Not available in Data Collector Edge pipelines.

    Stop Event Determines how the stop event is handled. Select one of the following options:
    • Discard - Use when you don't want to use the event.
    • An executor - To use the event to trigger a task, select the executor that you want to use. For more information about the executors, see Executors.
    • Write to Another Pipeline - Use to pass the event to another pipeline for more complex processing.

    Use in standalone Data Collector pipelines only.

    For more information about pipeline events, see Pipeline Event Generation.

    Not available in Data Collector Edge pipelines.

    Retry Pipeline on Error Retries the pipeline upon error.

    Not available in Data Collector Edge pipelines.

    Retry Attempts Number of retries attempted. Use -1 to retry indefinitely.

    The wait time between retries starts at 15 seconds and doubles until reaching five minutes.

    Not available in Data Collector Edge pipelines.

    Rate Limit (records / sec) Maximum number of records that the pipeline can read in a second. Use 0 or no value to set no rate limit.

    Default is 0.

    Not available in Data Collector Edge pipelines.

    Max Runners The maximum number of pipeline runners to use in a multithreaded pipeline.

    Use 0 for no limit. When set to 0, Data Collector generates up to the maximum number of threads or concurrency configured in the origin.

    You can use this property to help tune pipeline performance. For more information, see Tuning Threads and Runners.

    Default is 0.

    Not available in Data Collector Edge pipelines.

    Runner Idle Time (sec)

    Minimum number of seconds a pipeline runner waits when idle before generating an empty batch. The number of empty batches that are generated by pipeline runners displays as the Idle Batch Count in the monitor mode runtime statistics.

    Use to ensure that batches are generated periodically, even when no data needs to be processed.

    Use -1 to allow pipeline runners to wait indefinitely when idle without generating empty batches.

    For standalone pipelines only.

    Not available in Data Collector Edge pipelines.

    Create Failure Snapshot Automatically creates a snapshot if the pipeline fails because of data-related errors. Can be used to troubleshoot the pipeline.

    Not available in Data Collector Edge pipelines.

  5. To define runtime parameters, on the Parameters tab, click the Add icon and define the name and the default value for each parameter. You can use simple or bulk edit mode to add the parameters.
    For more information, see Using Runtime Parameters.
  6. To configure notifications based on changes in pipeline state, on the Notifications tab, configure the following properties:
    Not available in Data Collector Edge pipelines.
    Notifications Property Description
    Notify on Pipeline State Changes Sends notifications when the pipeline encounters the listed pipeline states.
    Email IDs Email addresses to receive notification when the pipeline state changes to one of the specified states. Using simple or bulk edit mode, click the Add icon to add additional addresses.
    Webhooks Webhook to send when the pipeline state changes to one of the specified states. Using simple or bulk edit mode, click the Add icon to add additional webhooks.
    Webhook URL URL to send the HTTP request.
    Headers Optional HTTP request headers.
    HTTP Method HTTP method. Use one of the following methods:
    • GET
    • PUT
    • POST
    • DELETE
    • HEAD
    Payload Optional payload to use. Available for PUT, POST, and DELETE methods.

    Use any valid content type.

    You can use webhook parameters in the payload to include information about the triggering event, such as the pipeline name or state. Enclose webhook parameters in double curly brackets as follows: {{PIPELINE_STATE}}.

    Content Type Optional content type of the payload. Configure this property when the content type is not declared in the request headers.
    Authentication Type Optional authentication type to include in the request. Use None, Basic, Digest, or Universal.

    Use Basic for Form authentication.

    User Name User name to include when using authentication.
    Password Password to include when using authentication.
  7. Click the Error Records tab and configure the following error handling options:
    Error Records Property Description
    Error Records Determines how to handle records that cannot be processed as expected.

    Use one of the following options:

    • Discard - Discards error records.
    • Send Response to Origin - Passes error records back to the microservice origin to be included in a response to the originating REST API client. Use in microservice pipelines only.
    • Write to Another Pipeline - Writes error records to another pipeline. To use this option, you need an SDC RPC destination pipeline to process the error records.
    • Write to Azure Event Hub - Writes error records to the specified Microsoft Azure Event Hub.
    • Write to Elasticsearch - Writes error records to the specified Elasticsearch cluster.
    • Write to File - Writes error records to a file in the specified directory.

      Write to File is not supported for cluster mode at this time.

    • Write to Google Cloud Storage - Writes error records to Google Cloud Storage.
    • Write to Google Pub/Sub - Writes error records to Google Pub/Sub.
    • Write to Kafka - Writes error records to the specified Kafka cluster.
    • Write to Kinesis - Writes error records to the specified Kinesis stream.
    • Write to MapR Streams - Writes error records to the specified MapR Streams cluster.
    • Write to MQTT - Writes error records to the specified MQTT broker.

    In Data Collector Edge pipelines, you can use Discard, Write to File, or Write to MQTT.

    Error Record Policy Determines the version of the record to use as a basis for an error record. For more information, see Error Records and Version.
  8. When writing errors to Send Response to Origin, optionally click the Error Records - Send Response to Origin tab and configure the following property:
    Send Response to Origin Property Description
    Status Code HTTP status code for the error records. Default is 500, representing an internal server error.

    All error records are included in the response as error records.

  9. When writing error records to an SDC RPC pipeline, click the Error Records - Write to Another Pipeline tab and configure the following properties:
    Write to Pipeline Property Description
    SDC RPC Connection Connection information for the destination pipeline to continue processing data. Use the following format: <host>:<port>.

    Use a single RPC connection for each destination pipeline. Using simple or bulk edit mode, add additional connections as needed.

    Use the port number when you configure the SDC RPC origin that receives the data.

    SDC RPC ID User-defined ID to allow the destination to pass data to an SDC RPC origin. Use this ID in all SDC RPC origins to process data from the destination.
    Retries Per Batch Number of times the destination tries to write a batch to the SDC RPC origin.

    When the destination cannot write the batch within the configured number of retries, it fails the batch.

    Default is 3.

    Back Off Period Milliseconds to wait before retrying writing a batch to the SDC RPC origin.

    The value that you enter increases exponentially after each retry, until it reaches the maximum wait time of 5 minutes. For example, if you set the back off period to 10, the destination attempts the first retry after waiting 10 milliseconds, attempts the second retry after waiting 100 milliseconds, and attempts the third retry after waiting 1,000 milliseconds.

    Set to 0 to retry immediately.

    Default is 0.

    Connection Timeout (ms) Milliseconds to establish a connection to the SDC RPC origin.

    The destination retries the connection based on the Retries Per Batch property.

    Default is 5000 milliseconds.

    Read Timeout (ms) Milliseconds to wait for the SDC RPC origin to read data from a batch.

    The destination retries the write based on the Retries Per Batch property.

    Default is 2000 milliseconds.

    Use Compression Enables the destination to use compression to pass data to the SDC RPC origin. Enabled by default.
    Use TLS

    Enables the use of TLS.

    Truststore File The path to the truststore file. Enter an absolute path to the file or a path relative to the Data Collector resources directory: $SDC_RESOURCES.

    For more information about environment variables, see Data Collector Environment Configuration.

    By default, no truststore is used.

    Truststore Type Type of truststore to use. Use one of the following types:
    • Java Keystore File (JKS)
    • PKCS #12 (p12 file)

    Default is Java Keystore File (JKS).

    Truststore Password Password to the truststore file. A password is optional, but recommended.
    Tip: To secure sensitive information such as passwords, you can use runtime resources or credential stores.
    Truststore Trust Algorithm The algorithm used to manage the truststore.

    Default is SunX509.

    Use Default Protocols Determines the transport layer security (TLS) protocol to use. The default protocol is TLSv1.2. To use a different protocol, clear this option.
    Transport Protocols The TLS protocols to use. To use a protocol other than the default TLSv1.2, click the Add icon and enter the protocol name. You can use simple or bulk edit mode to add protocols.
    Note: Older protocols are not as secure as TLSv1.2.
    Use Default Cipher Suites Determines the default cipher suite to use when performing the SSL/TLS handshake. To use a different cipher suite, clear this option.
    Cipher Suites Cipher suites to use. To use a cipher suite that is not a part of the default set, click the Add icon and enter the name of the cipher suite. You can use simple or bulk edit mode to add cipher suites.

    Enter the Java Secure Socket Extension (JSSE) name for the additional cipher suites that you want to use.

  10. When writing error records to Microsoft Azure Event Hub, click the Error Records - Write to Event Hub tab and configure the following properties:
    Event Hub Property Description
    Namespace Name The name of the namespace that contains the event hub that you want to use.
    Event Hub Name The event hub name.
    Shared Access Policy Name The policy name associated with the namespace.

    To retrieve the policy name, when logged into the Azure portal, navigate to your namespace and event hub, and then click Shared Access Policies for a list of policies.

    When appropriate, you can use the default shared access key policy, RootManageSharedAccessKey.

    Connection String Key One of the connection string keys associated with the specified shared access policy.

    To retrieve a connection string key, after accessing the list of shared access policies, click the policy name, and then copy the Connection String - Primary Key value.

    The value typically begins with "Endpoint".

  11. When writing error records to Elasticsearch, click the Error Records - Write to Elasticsearch tab and configure the following properties:
    Elasticsearch Property Description
    Cluster HTTP URI HTTP URI used to connect to the cluster. Use the following format:
    <host>:<port>
    Additional HTTP Params Additional HTTP parameters that you want to send as query string parameters to Elasticsearch. Enter the exact parameter name and value expected by Elasticsearch.
    Detect Additional Nodes in Cluster Detects additional nodes in the cluster based on the configured Cluster URI.

    Selecting this property is the equivalent to setting the client.transport.sniff Elasticsearch property to true.

    Use only when the Data Collector shares the same network as the Elasticsearch cluster. Do not use for Elastic Cloud or Docker clusters.

    Use Security Specifies whether security is enabled on the Elasticsearch cluster.
    Time Basis Time basis to use for writing to time-based indexes. Use one of the following expressions:
    • ${time:now()} - Uses the processing time as the time basis. The processing time is the time associated with the Data Collector running the pipeline.
    • An expression that calls a field and resolves to a datetime value, such as ${record:value(<date field path>)}. Uses the datetime result as the time basis.

    When the Index property does not include datetime variables, you can ignore this property.

    Default is ${time:now()}.

    Data Time Zone Time zone for the destination system. Used to resolve datetimes in time-based indexes.
    Index Index for the generated documents. Enter an index name or an expression that evaluates to the index name.

    For example, if you enter customer as the index, the destination writes the document within the customer index.

    If you use datetime variables in the expression, make sure to configure the time basis appropriately. For details about datetime variables, see Datetime Variables.

    Mapping Mapping type for the generated documents. Enter the mapping type, an expression that evaluates to the mapping type, or a field that includes the mapping type.

    For example, if you enter user as the mapping type, the destination writes the document with a user mapping type.

    Document ID Expression that evaluates to the ID for the generated documents. When you do not specify an ID, Elasticsearch creates an ID for each document.

    By default, the destination allows Elasticsearch to create the ID.

    Parent ID Optional parent ID for the generated documents. Enter a parent ID or an expression that evaluates to the parent ID.

    Use to establish a parent-child relationship between documents in the same index.

    Routing Optional custom routing value for the generated documents. Enter a routing value or an expression that evaluates to the routing value.

    Elasticsearch routes a document to a particular shard in an index based on the routing value defined for the document. You can define a custom value for each document. If you don’t define a custom routing value, Elasticsearch uses the parent ID (if defined) or the document ID as the routing value.

    Data Charset

    Character encoding of the data to be processed.

    If you enabled security, configure the following security property:
    Security Property Description
    Mode Authentication method to use:
    • Basic - Authenticate with Elasticsearch user name and password. Select this option for Elasticsearch clusters outside of Amazon Elasticsearch Service.
    • AWS Signature V4 - Authenticate with AWS. Select this option for Elasticsearch clusters within Amazon Elasticsearch Service.
    Security Username/Password Elasticsearch user name and password.
    Enter the user name and password using the following syntax:
    <username>:<password>

    Available when using Basic authentication.

    Region Amazon Web Services region that hosts the Elasticsearch domain.

    Available when using AWS Signature V4 authentication.

    Access Key ID AWS access key ID. Required when not using IAM roles with IAM instance profile credentials.

    Available when using AWS Signature V4 authentication.

    Secret Access Key AWS secret access key. Required when not using IAM roles with IAM instance profile credentials.

    Available when using AWS Signature V4 authentication.

    SSL Truststore Path Location of the truststore file.

    Configuring this property is the equivalent to configuring the shield.ssl.truststore.path Elasticsearch property.

    Not necessary for Elastic Cloud clusters.

    SSL Truststore Password Password for the truststore file.

    Configuring this property is the equivalent to configuring the shield.ssl.truststore.password Elasticsearch property.

    Not necessary for Elastic Cloud clusters.

  12. When writing error records to file, click the Error Records - Write to File tab and configure the following properties:
    Write to File Property Description
    Directory Local directory for error record files.
    File Prefix Prefix used for error record files. Use to differentiate error record files from other files in the directory.

    Uses the prefix sdc-${sdc:id()} by default. The prefix evaluates to sdc-<Data Collector ID>. This provides default differentiation in case several Data Collectors write to the same directory.

    The Data Collector ID is stored in the following file: $SDC_DATA/sdc.id file.

    For more information about environment variables, see Data Collector Environment Configuration.

    File Wait Time (secs) Number of seconds Data Collector waits for error records. After that time, it creates a new error record file.

    You can enter a number of seconds or use the default expression to enter the time in minutes.

    Max File Size (MB) Maximum size for error files. Exceeding this size creates a new error file.

    Use 0 to avoid using this property.

  13. When writing error records to Google Cloud Storage, click the Error Records - Write to Google Cloud Storage tab and configure the following properties:
    Google Cloud Storage Property Description
    Project ID Project ID to connect to.
    Bucket Bucket to use when writing records.
    Note: The bucket name must be DNS compliant. For more information about bucket naming conventions, see the Google Cloud Storage documentation.
    Credentials Provider Credentials provider to use to connect:
    • Default credentials provider
    • Service account credentials file (JSON)
    Credentials File Path (JSON) When using a Google Cloud service account credentials file, path to the file that the origin uses to connect to Google Cloud Storage. The credentials file must be a JSON file.

    Enter a path relative to the Data Collector resources directory, $SDC_RESOURCES, or enter an absolute path.

    Common Prefix Common prefix that determines where objects are written.
    Partition Prefix Optional partition prefix to specify the partition to use.

    Use a specific partition prefix or define an expression that evaluates to a partition prefix.

    When using datetime variables in the expression, be sure to configure the time basis for the stage.

    Data Time Zone

    Time zone for the destination system. Used to resolve datetimes in a time-based partition prefix.

    Time Basis
    Time basis to use for writing to a time-based bucket or partition prefix. Use one of the following expressions:
    • ${time:now()} - Uses the processing time as the time basis in conjunction with the specified Data Time Zone.
    • An expression that calls a field and resolves to a datetime value, such as ${record:value(<date field path>)}. Uses the time associated with the record as the time basis, adjusted for the specified Data Time Zone.

    When the Partition Prefix property has no time component, you can ignore this property.

    Default is ${time:now()}.

    Object Name Prefix Defines a prefix for object names written by the destination. By default, object names start with "sdc" as follows: sdc-<UUID>.

    Not required for the whole file data format.

  14. When writing error records to Google Pub/Sub, click the Error Records - Write to Google Pub/Sub tab and configure the following properties:
    Google Pub/Sub Property Description
    Topic ID Google Pub/Sub topic ID to write messages to.
    Project ID Google Pub/Sub project ID to connect to.
    Credentials Provider Credentials provider to use to connect to Google Pub/Sub:
    • Default credentials provider
    • Service account credentials file (JSON)
    Credentials File Path (JSON) When using a Google Cloud service account credentials file, path to the file that the destination uses to connect to Google Pub/Sub. The credentials file must be a JSON file.

    Enter a path relative to the Data Collector resources directory, $SDC_RESOURCES, or enter an absolute path.

  15. When writing error records to Kafka, click the Error Records - Write to Kafka tab and configure the following properties:
    Write to Kafka Property Description
    Broker URI Connection string for the Kafka broker. Use the following format: <host>:<port>.

    To ensure a connection, enter a comma-separated list of additional broker URI.

    Runtime Topic Resolution Evaluates an expression at runtime to determine the topic to use for each record.
    Topic Expression Expression used to determine where each record is written when using runtime topic resolution. Use an expression that evaluates to a topic name.
    Topic White List List of valid topic names to write to when using runtime topic resolution. Use to avoid writing to invalid topics. Records that resolve to invalid topic names are passed to the stage for error handling.

    Use an asterisk (*) to allow writing to any topic name. By default, all topic names are valid.

    Topic Topic to use.

    Not available when using runtime topic resolution.

    Partition Strategy Strategy to use to write to partitions:
    • Round Robin - Takes turns writing to different partitions.
    • Random - Writes to partitions randomly.
    • Expression - Uses an expression to write data to different partitions. Writes records to the partitions specified by the results of the expression.
    • Default - Uses an expression to extract a partition key from the record. Writes records to partitions based on a hash of the partition key.
    Partition Expression Expression to use with the default or expression partition strategy.

    When using the default partition strategy, specify an expression that returns the partition key from the record. The expression must evaluate to a string value.

    When using the expression partition strategy, specify an expression that evaluates to the partition where you want each record written. Partition numbers start with 0. The expression must evaluate to a numeric value.

    Optionally, click Ctrl + Space Bar for help with creating the expression.

    One Message per Batch For each batch, writes the records to each partition as a single message.
    Kafka Configuration Additional Kafka properties to use. Using simple or bulk edit mode, click the Add icon and define the Kafka property name and value.

    Use the property names and values as expected by Kafka. Do not use the broker.list property.

    For information about enabling secure connections to Kafka, see Enabling Security.

  16. When writing error records to Kinesis, click the Error Records - Write to Kinesis tab and configure the following properties:
    Kinesis Property Description
    Access Key ID

    AWS access key ID.

    Required when not using IAM roles with IAM instance profile credentials.

    Secret Access Key

    AWS secret access key.

    Required when not using IAM roles with IAM instance profile credentials.

    Region Amazon Web Services region that hosts the Kinesis cluster.
    Endpoint Endpoint to connect to when you select Other for the region. Enter the endpoint name.
    Stream Name Kinesis stream name.
    Partitioning Strategy Strategy to write data to Kinesis shards:
    • Random - Generates a random partition key.
    • Expression - Uses the result of an expression as the partition key.

    Partition Expression Expression to generate the partition key used to pass data to different shards.

    Use for the expression partition strategy.

    Kinesis Producer Configuration Additional Kinesis properties.

    When you add a configuration property, enter the exact property name and the value. The Kinesis Producer does not validate the property names or values.

    Preserve Record Order Select to preserve the order of records. Enabling this option can reduce pipeline performance.
  17. When writing error records to a MapR Streams cluster, click the Error Records - Write to MapR Streams tab and configure the following properties:
    MapR Streams Producer Property Description
    Runtime Topic Resolution Evaluates an expression at runtime to determine the topic to use for each record.
    Topic Topic to use.

    Not available when using runtime topic resolution.

    Topic Expression Expression used to determine where each record is written when using runtime topic resolution. Use an expression that evaluates to a topic name.
    Topic White List List of valid topic names to write to when using runtime topic resolution. Use to avoid writing to invalid topics. Records that resolve to invalid topic names are passed to the stage for error handling.

    Use an asterisk (*) to allow writing to any topic name. By default, all topic names are valid.

    Partition Strategy Strategy to use to write to partitions:
    • Round Robin - Takes turns writing to different partitions.
    • Random - Writes to partitions randomly.
    • Expression - Uses an expression to write data to different partitions. Writes records to the partitions specified by the results of the expression.
    • Default - Uses an expression to extract a partition key from the record. Writes records to partitions based on a hash of the partition key.
    Partition Expression Expression to use with the default or expression partition strategy.

    When using the default partition strategy, specify an expression that returns the partition key from the record. The expression must evaluate to a string value.

    When using the expression partition strategy, specify an expression that evaluates to the partition where you want each record written. Partition numbers start with 0. The expression must evaluate to a numeric value.

    Optionally, click Ctrl + Space Bar for help with creating the expression.

    One Message per Batch For each batch, writes the records to each partition as a single message.
    MapR Streams Configuration Additional configuration properties to use. Using simple or bulk edit mode, click the Add icon and define the MapR Streams property name and value.

    Use the property names and values as expected by MapR.

    You can use MapR Streams properties and the set of Kafka properties supported by MapR Streams.

  18. When writing error records to an MQTT broker, click the Error Records - Write to MQTT tab and configure the following properties:
    MQTT Property Description
    Broker URL MQTT Broker URL. Enter in the following format:
    <tcp | ssl>://<hostname>:<port>

    Use ssl for secure connections to the broker.

    For example:
    tcp://localhost:1883
    Client ID MQTT Client ID. The ID must be unique across all clients connecting to the same broker.
    You can define an expression that evaluates to the client ID. For example, enter the following expression to use the unique pipeline ID as the client ID:
    ${pipeline:id()}

    If a pipeline includes multiple MQTT stages and you want to use the unique pipeline ID as the client ID for both stages, prefix the client ID with a string like this:

    sub-${pipeline:id()} and pub-${pipeline:id()} 
    Otherwise, all stages will use the same client ID. This can cause problems, such as messages disappearing.
    Topic Topic to publish to. Using simple or bulk edit mode, click the Add icon to read from additional topics.
    Quality of Service Determines the quality of service level used to guarantee message delivery:
    • At Most Once (0)
    • At Least Once (1)
    • Exactly Once (2)

    For more information, see the HiveMQ documentation on quality of service levels.

    Client Persistence Mechanism Determines the persistence mechanism that the destination uses to guarantee message delivery when the quality of service level is at least once or exactly once. Select one of the following options:
    • Memory - Store messages in memory on the Data Collector machine until the delivery of the message is complete.
    • File - Store messages in a local file on the Data Collector machine until the delivery of the message is complete.

    Not used when the quality of service level is at most once.

    For more information, see the HiveMQ documentation on client persistence.

    Client Persistence Data Directory Local directory on the Data Collector machine where the destination temporarily stores messages in a file when you configure file persistence.

    The user who starts Data Collector must have read and write access to this directory.

    Keep Alive Interval (secs) Maximum time in seconds to allow the connection to the MQTT broker to remain idle. After the destination publishes no messages for this amount of time, the connection is closed. The destination must reconnect to the MQTT broker.

    Default is 60 seconds.

    Use Credentials Enables entering MQTT credentials.
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.
    Username MQTT user name.
    Password MQTT password.
    Retain the Message Determines whether or not the MQTT broker retains the message last published by the destination when no MQTT client is subscribed to listen to the topic.

    When selected, the MQTT broker retains the last message published by the destination. Any messages published earlier are lost. When cleared, all messages published by the destination are lost.

    For more information about MQTT retained messages, see http://www.hivemq.com/blog/mqtt-essentials-part-8-retained-messages.

    Use TLS

    Enables the use of TLS.

    Truststore File The path to the truststore file. Enter an absolute path to the file or a path relative to the Data Collector resources directory: $SDC_RESOURCES.

    For more information about environment variables, see Data Collector Environment Configuration.

    By default, no truststore is used.

    Truststore Type Type of truststore to use. Use one of the following types:
    • Java Keystore File (JKS)
    • PKCS #12 (p12 file)

    Default is Java Keystore File (JKS).

    Truststore Password Password to the truststore file. A password is optional, but recommended.
    Tip: To secure sensitive information such as passwords, you can use runtime resources or credential stores.
    Truststore Trust Algorithm The algorithm used to manage the truststore.

    Default is SunX509.

    Use Default Protocols Determines the transport layer security (TLS) protocol to use. The default protocol is TLSv1.2. To use a different protocol, clear this option.
    Transport Protocols The TLS protocols to use. To use a protocol other than the default TLSv1.2, click the Add icon and enter the protocol name. You can use simple or bulk edit mode to add protocols.
    Note: Older protocols are not as secure as TLSv1.2.
    Use Default Cipher Suites Determines the default cipher suite to use when performing the SSL/TLS handshake. To use a different cipher suite, clear this option.
    Cipher Suites Cipher suites to use. To use a cipher suite that is not a part of the default set, click the Add icon and enter the name of the cipher suite. You can use simple or bulk edit mode to add cipher suites.

    Enter the Java Secure Socket Extension (JSSE) name for the additional cipher suites that you want to use.

  19. When using a cluster batch or streaming execution mode, click the Cluster tab and configure the cluster properties.
    For information about configuring cluster mode pipelines, see Cluster Batch and Streaming Execution Modes.
  20. When using the cluster EMR batch execution mode, click the EMR tab and configure the properties required to run pipelines on an Amazon EMR cluster.
    For information about configuring cluster EMR batch mode pipelines to process data from Amazon S3, see Amazon S3 Requirements.
  21. If you have registered the Data Collector to work with StreamSets Control Hub, you can configure the pipeline to aggregate statistics on the Statistics tab.
    For information about Control Hub aggregated statistics, see Pipeline Statistics.
  22. To configure a test origin, on the Test Origin tab, configure the origin properties.
    All origin properties appear on the Test Origin tab.
    For configuration details for a specific origin, see “Configuring an <origin type> Origin” in the Origins chapter.
    To use a different test origin, select the origin to use in the Test Origin property on the General tab.
  23. If you are using the pipeline start or stop events, configure the related event consumer properties on the <event type> - <event consumer> tab.
    All properties for the event consumer appear on the tab.
    For configuration details for a specific executor, see "Configuring an <executor type> Executor" in the Executors chapter.

    For details on writing to another pipeline, see Configuring an SDC RPC Destination.

    To use a different event consumer, select the consumer to use in the Start Event or Stop Event properties on the General tab.

  24. Use the Stage Library panel to add an origin stage. In the Properties panel, configure the stage properties.
    For configuration details about origin stages, see Origins.
  25. Use the Stage Library panel to add the next stage that you want to use, connect the origin to the new stage, and configure the new stage.
    For configuration details about processors, see Processors.

    For configuration details about destinations, see Destinations.

    For configuration details about executors, see Executors.

  26. Add additional stages as necessary.
  27. At any point, you can use the Preview icon to preview data to help .configure the pipeline. For more information, see Data Preview Overview.
  28. Optionally, you can create metric or data alerts to track details about a pipeline run and create threshold alerts. For more information, see Rules and Alerts.
    Not valid in Data Collector Edge pipelines. Data Collector Edge pipelines do not support rules or alerts.
  29. When the pipeline is validated and complete, use the Start icon to run the pipeline.
When Data Collector starts the pipeline, Monitor mode displays real-time statistics for the pipeline. For more information about monitoring, see Pipeline Monitoring Overview.