Pipeline Configuration

Data Collector Console - Edit Mode

The following image shows the Data Collector console when you configure a pipeline:

Area / Icon Name Description
1 Pipeline canvas Displays the pipeline. Use to configure the pipeline data flow.
2 Pipeline Creation Help Bar Offers lists of stages to help complete the pipeline.

You can use the help bar to connect a stage to an open node. You can also add a stage between linked stages by clicking the link.

3 Stage list Lists the stages in the pipeline. Use to select a stage to configure. You can also select the stage in the canvas.
4 Properties panel Displays the properties of the pipeline or selected stage when you configure a pipeline.
5 Stage library List of available stages. Use to add stages to the pipeline. You can drag a stage to a location on canvas or click a stage to add it to the end of the pipeline.

You can view all stages, stages by type, or stages by library. You can also search for a stage by name.

More icon Provides additional actions for the pipeline. Use to reset the origin for the pipeline. Or to view or delete snapshot data.
Issues icon Displays the number of implicit validation issues in the pipeline. Click to view a detailed list of issues found during implicit validation.
Undo icon Reverts recent changes. On Mac, you can also use Command+Z. On Windows, you can use Ctrl+Z.
Redo icon Restores changes that were reverted. On Mac, you can also use Command+Shift+Z. On Windows, you can use Ctrl+Y.
Delete icon Deletes the selected stage or link.
Duplicate Stage icon Duplicates the selected stage. You can duplicate processors and destinations. Because a pipeline can include only one origin, you cannot duplicate an origin stage.
Auto Arrange icon Arranges the stages in the pipeline.
Preview icon Starts a data preview.
Validate icon Validates the pipeline. Performs explicit validation.
Share icon Shares the pipeline with users and groups. Use to configure pipeline permissions.
Start icon Starts the pipeline.
Stage Library icon Toggles the display of the stage library.
Stream Link icon Indicates the flow of data through the pipeline. Select to configure data rules and alerts.
Error icon Indicates that one or more required properties are not defined. Can display on a stage for stage properties or in the canvas for pipeline properties.

Related error messages display when you hover over the icon. You can also view the messages in the Issues list.

The icon can also display on tabs in the properties panel to indicate the location of missing properties.

Help icon Provides context-sensitive help based on the information in the panel.
Maximize Pane icon Maximizes the configuration panel.
Note: Some icons and options might not display in the console. The items that display are based on the task that you are performing and roles assigned to your user account.

For example, the Copy icon displays only when you select stage that can be copied, it does not display when you select a link or an origin. Or, if you log in with the Manager role, most configuration-related icons are not available.

Retrying the Pipeline

By default, when Data Collector encounters a stage-level error that might cause a standalone pipeline to fail, it retries the pipeline. That is, it waits a period of time, and then tries again to run the pipeline.

A stage-level error might include a stage not being able to connect to an external system because the system or network is down.

You can define the maximum number of pipeline retries that Data Collector attempts.

With the default value, -1, Data Collector retries the pipeline an infinite number of times. This allows the pipeline to continue running as soon as any temporary connection or system failures resolve.

If you want the pipeline to stop after a given number of retries, you can define the maximum number of retries to perform.

The wait time between retries begins at 15 seconds and doubles in length until reaching a maximum of 5 minutes.

Pipeline Memory

Data Collector uses memory when it runs a pipeline. To avoid causing out-of-memory errors on the host machine, you can configure the maximum amount of memory that can be used for the pipeline.

Best practice is to configure the pipeline to use up to 65% of the Data Collector Java heap size. For example, with the default Data Collector Java heap size of 1024 MB, the highest setting for the pipeline memory should be 665 MB.

Use the Max Pipeline Memory pipeline property to define the maximum amount of memory to use. When you configure the pipeline, you can also configure the action to take when pipeline memory usage exceeds the configured amount.

Tip: To avoid errors, you might create an alert to indicate when the memory usage approaches the configured limit.

Rate Limit

You can limit the rate at which a pipeline processes records by defining the maximum number of records that the pipeline can read in a second.

By default, a pipeline has no rate limit. You might want to limit the rate for the following reasons:

The pipeline reads records faster than it can write them to the destination system
Because a pipeline processes one batch at a time, pipeline performance slows when a pipeline reads records faster than it can process them or write them to the destination system. The pipeline must wait until a batch is committed to the destination system before reading the next batch, causing a delay before the next batch is read and preventing the pipeline from reading at a steady rate. Reading data at a steady rate provides better performance than reading sporadically.

You can limit the rate at which the pipeline reads records to decrease the delay between reads from the origin system.

The origin system requires that the data be read at a slower rate

If the origin system is being used for other purposes, it might not be able to handle the rate at which the pipeline reads records. You can limit the rate to meet the origin system requirements.

Use the Rate Limit pipeline property to define the maximum number of records that the pipeline can read in a second.

Runtime Values

Runtime values are values that you define outside of the pipeline and use for stage and pipeline properties. You can change the values for each pipeline run without having to edit the pipeline.

You can use runtime values for any pipeline property that allows the use of the expression language. You can, for example, use runtime values to represent batch sizes, timeouts, directories, and URI. You cannot use runtime values to represent fields.

You can use the following methods of passing runtime values to pipelines:

Runtime parameters
Use runtime parameters when you want to specify the values for pipeline properties when you start the pipeline.
You define runtime parameters when you configure the pipeline, and then you call the parameters from within that pipeline. When you start the pipeline, you specify the parameter values to use.
Runtime parameters are defined for a single pipeline - only that pipeline can call them.
Note: In Data Collector versions earlier than 2.5.0.0, pipeline runtime parameters were named pipeline constants.
Runtime properties
Use runtime properties when you want to define values for multiple pipeline properties in a file.
You define runtime properties in a file local to the Data Collector, and then you call the properties from within a pipeline. At runtime, Data Collector loads the property values from the file. A runtime properties file can contain multiple properties.
Runtime properties are defined for the entire Data Collector - any pipeline can call them.
Runtime resources
Use runtime resources when you want to secure sensitive information in files with restricted permissions.
You define runtime resources in a file local to the Data Collector, and then you call the resources from within a pipeline. You can restrict the permissions for the resource files to secure sensitive information. A resource file must contain one piece of information.
Runtime resources are defined for the entire Data Collector - any pipeline can call them.

Using Runtime Parameters

Runtime parameters are parameters that you define in a pipeline and then call from within that same pipeline. When you start the pipeline, you specify the parameter values to use. Use runtime parameters to specify values for pipeline properties when you start the pipeline.

Use runtime parameters to define values for stage and pipeline properties. For example, you can define an error directory parameter that points to different directories on a test machine and a production machine. Or you can define a connection parameter that points to different database connections for an origin in the test and production environments.

When you define a runtime parameter, you enter the default value to use. When you start the pipeline, you can specify another value to override the default. When the pipeline runs, the value replaces the name of the runtime parameter.

Note: If you shut down and then restart the Data Collector without stopping the pipeline, the pipeline continues running with the last set parameter values.
To implement runtime parameters, perform the following steps:
  1. Define runtime parameters.
  2. Use an expression in the pipeline to call a runtime parameter.
  3. Start the pipeline with parameter values.

When you monitor a pipeline started with parameters, you can view the parameter values that the pipeline is currently using.

Step 1. Define Runtime Parameters

Define runtime parameters when you configure the pipeline.

  1. In the pipeline properties, click the Parameters tab.
  2. Click the Add icon and define the name and the default value for each parameter.
    For example, define a parameter named JDBCConnectionString with the default value of jdbc:mysql://localhost:3306/sample.

Step 2. Call the Runtime Parameter

Use an expression to call a runtime parameter. You can use runtime parameters to represent any stage or pipeline property that allows the use of the expression language.

To call a runtime parameter in a stage or pipeline property, use the following syntax:
${<parameter name>}
For example, to use the JDBCConnectionString runtime parameter for the JDBC Multitable Consumer origin, enter the following syntax for the JDBC Connection String property:
${JDBCConnectionString}
You can call a runtime parameter from within an expression language function by simply entering the parameter name. For example, the following expression returns the value of the JDBCConnectionString runtime parameter:
 ${record:value(JDBCConnectionString)}
You can use a runtime parameter to represent a part of a property. For example, you could use a RootDir runtime parameter and append the rest of the directory in the property as follows:
${RootDir}/logfiles

You can also call a runtime parameter in the code developed for a scripting processor. The method you use to call the runtime parameter depends on the following scripting processor types:

JavaScript Evaluator or Jython Evaluator processor
Use the following syntax in any of the processor scripts: ${<parameter name>}. For example, the following line of JavaScript code assigns the value of the NewFieldValue parameter to a map field:
records[i].value.V= ${NewFieldValue}
Groovy Evaluator processor
Use the sdcFunctions.pipelineParameters() method in any of the processor scripts to return a map of all runtime parameters defined for the pipeline. For example, the following line of Groovy code assigns the value of the CompanyParam parameter to the Company Name field:
record.value['Company Name'] = sdcFunctions.pipelineParameters()['CompanyParam']

Step 3. Start the Pipeline with Parameters

When you start the pipeline, specify the values to use for the parameters.
Note: If you want to use the default parameter values, you can simply click the Start icon to start the pipeline.
  1. From the pipeline canvas, click the More icon, and then click Start with Parameters.
    If Start with Parameters is not enabled, the pipeline is not valid.

    The Start with Parameters dialog box lists all parameters defined for the pipeline and their default values.

  2. Override any default values with the values you want to use for this pipeline run.
  3. Click Start.

Viewing Runtime Parameters

When you monitor a pipeline started with parameters, you can view the parameter values that the pipeline is currently using.

Before you can view the current runtime parameter values, verify that the monitoring settings include the Runtime Parameters chart.

  1. In the Summary tab of the Monitor panel, click the More icon, and then click Settings.
  2. In the Monitoring Settings dialog box, click in the Charts field and then select Runtime Parameters.
  3. Click Save.
    When you monitor a pipeline that you started with parameter values, the Runtime Parameters chart displays as follows:

    The Runtime Parameters chart does not display when you start a pipeline without parameters.

Using Runtime Properties

Runtime properties are properties that you define in a file local to the Data Collector and call from within a pipeline. With runtime properties, you can define different sets of values for different Data Collectors.

Use runtime properties to define values for stage and pipeline properties. For example, you can define an error directory runtime property that points to different directories on a test machine and a production machine. Similarly, you might create test and production runtime properties for the origin and destination stages.

To implement runtime properties, perform the following steps:
  1. Define runtime properties.
  2. Use an expression in the pipeline to call a runtime property.

Step 1. Define Runtime Properties

You can define runtime properties in the Data Collector configuration file, sdc.properties, or in a separate runtime properties file.

If you define the properties in a separate runtime properties file, use the required procedure for your installation type.

Data Collector configuration file
Use the following steps to define runtime properties in the Data Collector configuration file:
  1. In the $SDC_CONF/sdc.properties file, configure the runtime.conf.location property as follows:
    runtime.conf.location=embedded
  2. Define the runtime properties in the $SDC_CONF/sdc.properties file using the following format:
    runtime.conf_<property name>=<value>
    For example, the following runtime property defines a Hadoop FS directory template location:
    runtime.conf_HDFSDirTemplate=/HDFS/DirectoryTemplate
  3. Restart Data Collector to enable the changes.
Separate runtime properties file for RPM and tarball
Use the following steps to define runtime properties in a separate runtime properties file for an RPM or tarball installation:
  1. Create a text file and save it in a directory relative to the $SDC_CONF directory.
  2. Define the runtime properties in the text file using the following format:
    <property name>=<value>
    For example, the following runtime property defines the same Hadoop FS directory template in a separate runtime properties file:
    HDFSDirTemplate=/HDFS/DirectoryTemplate
  3. In the Data Collector configuration file, $SDC_CONF/sdc.properties, configure the runtime.conf.location property to point to the relative location of the separate runtime properties file.

    For example, the following separate runtime properties file is located in a runtime directory that is relative to the $SDC_CONF directory:

    runtime.conf.location=runtime/test-runtime.properties
  4. Restart Data Collector to enable the changes.
Separate runtime properties file for Cloudera Manager
Use the following steps to define runtime properties in a separate runtime properties file for a Cloudera Manager installation:
  1. Create a text file and define the runtime properties in the text file using the following format:
    <property name>=<value>
    For example, the following runtime property defines the same Hadoop FS directory template in a separate runtime properties file:
    HDFSDirTemplate=/HDFS/DirectoryTemplate
  2. Save the text file in the same directory on every node that runs Data Collector.
  3. In Cloudera Manager, select the StreamSets service and click Configuration.
  4. On the Configuration page, in the Data Collector Advanced Configuration Snippet (Safety Valve) for sdc-env.sh field, add the following line to define the runtime properties file directory:
    ln -sf /<directory>/runtime.properties "${CONF_DIR}/runtime.properties"
    For example:
    ln -sf /opt/sdc-runtime/runtime.properties "${CONF_DIR}/runtime.properties"
  5. In the Data Collector Advanced Configuration Snippet (Safety Valve) for sdc.properties field, configure the runtime.conf.location property to point to the separate runtime properties file by adding the following line:
    runtime.conf.location=runtime.properties 
  6. Restart Data Collector to enable the changes.

Step 2. Call the Runtime Property

Use the runtime:conf function to call a runtime property. You can use runtime properties to represent any stage or pipeline property that allows the use of the expression language.

To call a runtime property, use the following syntax:
${runtime:conf(<property name>)}
Note: If you defined the runtime properties in the Data Collector configuration file, enter just <property name> and not runtime.conf_<property name>.
For example, to use the HDFSDirTemplate runtime property for the Hadoop FS destination, enter the following syntax for the Directory Template property:
${runtime:conf('HDFSDirTemplate')}
You can use a runtime property to represent a part of a property. For example, you could use a RootDir runtime property and append the rest of the directory in the property as follows:
${runtime:conf('RootDir')}/logfiles

Using Runtime Resources

Similar to runtime properties, runtime resources are values that you define in a file local to the Data Collector and call from within a pipeline. But with runtime resources, you can restrict the permissions for the files to secure sensitive information. Use runtime resources to load sensitive information from files at runtime.

Use runtime resources to define sensitive values for stage and pipeline properties. You can, for example, use runtime resources to represent user names and passwords, or OAuth authentication information.

To implement runtime resources, perform the following steps:
  1. Define each runtime resource.
  2. Use an expression in the pipeline to call a runtime resource.

Step 1. Define Runtime Resources

Use the following steps to define runtime resources:
  1. For each resource, create a text file and save it in the $SDC_RESOURCES directory.

    A file must contain one piece of information to be used when the resource is called.

    Note: When you call a runtime resource, the Data Collector replaces the expression with the entire contents of the file. Avoid including unnecessary characters such as spaces or carriage returns. Many text editors add additional characters by default.
  2. Optionally, restrict the permissions for the file.

    Generally, anyone can read a file. To restrict permissions, configure the file so only the owner has read or write permissions for the file - in octals, that's 600 or 400. And the owner must be the system user that runs the Data Collector.

    When you use the resource in the pipeline, you specify whether the file is restricted.

Step 2. Call the Runtime Resource

Use the runtime:loadResource function to call a runtime resource. You can use runtime resources to represent sensitive information in any stage or pipeline property that allows the use of the expression language.

To call a runtime resource, use the following syntax:

${runtime:loadResource(<file name>, <restricted: true | false>)}
Tip: To strip any leading or trailing white space characters from the file, add the str:trim function as follows:
${str:trim(runtime:loadResource(<file name>, <restricted: true | false>)}
For example, the following expression returns the contents of the JDBCpassword.txt file, stripping any leading or trailing white space characters. The file contains a password and is restricted so only the owner can read the file:
${str:trim(runtime:loadResource("JDBCpassword.txt", true))}

Webhooks

You can configure a pipeline use webhooks. A webhook is a user-defined HTTP callback - an HTTP request that the pipeline sends automatically when certain actions occur. You can use webhooks to automatically trigger external tasks based on an HTTP request. Tasks can be as simple as sending a message through an application API or as powerful as passing commands to the Data Collector command line interface.

You can configure the following types of webhooks:
Trigger Description Configuration Location
Alert The pipeline sends all alert webhooks each time an alert is triggered.

For example, if your text message application has a webhook API, you can have the pipeline send texts when alerts are triggered.

For details on how to configure an alert webhook, see Configuring an Alert Webhook.

Pipeline Rules tab > Webhook tab
State notification The pipeline sends all state notification webhooks when the pipeline transitions to the selected pipeline states.

For example, you can send an HTTP request to the Data Collector REST API to start a different pipeline when the current pipeline transitions to a Finished state.

For details on how to configure a state notification webhook, see the Notifications properties in Configuring a Pipeline.

Pipeline Configuration tab > Notifications tab

The flexibility of webhooks enables you to automatically trigger a wide range of tasks, depending on the ability of external systems to support webhooks or process HTTP requests.

Important: You must configure webhooks as expected by the receiving system. For details on how to configure incoming webhooks check the receiving system's documentation. You might also need to enable webhook usage within that system.

When you configure a webhook, you specify the URL to send the request and the HTTP method to use. Some HTTP methods allow you to include a request body or payload. In the payload, you can use parameters to include information about the cause of the trigger, such as the text of the alert or the latest pipeline state. You can also include request headers, content type, authentication type, username and password as needed.

Request Method

You can use the following methods in webhooks:
  • GET
  • PUT
  • POST
  • DELETE
  • HEAD

Payload and Parameters

You can include a request body or payload for PUT, POST, and DELETE request webhooks. Depending on the receiving system, the payload might not be used. For example, when using the Data Collector REST API, you simply include all required information in the URL.

When you configure a payload, you can use any valid content type, then specify the content type in the webhook properties. When defining a message, be sure to consider when the pipeline sends the webhook and include the information that the recipient needs.

You can use parameters in the payload to include information about the action that triggered the webhook. For example, when configuring a webhook that sends a text message when a pipeline stops, you might include the pipeline name, pipeline state, and the time parameters in the message.

Enclose parameters in double curly brackets, as follows:
{{<parameter name>>}}
You can use the following parameters in webhooks:
state notification parameters
When configuring a state notification webhook, you can use the following parameters:
  • PIPELINE_TITLE - The pipeline title or name.
  • PIPELINE_URL - The direct URL to the pipeline.
  • PIPELINE_STATE - The current state of the pipeline.
  • TIME - The time of the triggered request.
For example, say you configure the pipeline to send a webhook only when the pipeline transitions to the Stopped state - that is, when someone stops the pipeline. You might use the following message in a JSON request body:
{  
   "text":"At {{TIME}}, a user stopped the {{PIPELINE_TITLE}} pipeline. \n <To see the pipeline, click here: {{PIPELINE_URL}}"
}
However, if the pipeline is configured to send webhooks when the pipeline changes to several different states, you might use a more generic message and include the pipeline state in the message. For example:
{  
   "text":"The '{{PIPELINE_TITLE}}' pipeline changed state to {{PIPELINE_STATE}} at {{TIME}}. \n <{{PIPELINE_URL}}|Click here for details.>"
}
alert parameters
When configuring an alert webhook, you can use the following parameters:
  • ALERT_CONDITION - The condition of the rule associated with the alert.
  • ALERT_NAME - The alert label or name.
  • ALERT_TEXT - The text configured for the alert.
  • ALERT_VALUE - The value that triggered the condition. For example, if the alert is configured to trigger upon reaching 1000 error records, the alert value will be 1000.
  • PIPELINE_TITLE - The pipeline title.
  • PIPELINE_URL - The direct URL to the pipeline.
  • TIME - Time of the triggered request.
For example, say you configure a pipeline to send a webhook alert each time an alert triggers. To include some key information in the JSON request body, you might try this:
{  
   "text":"{{ALERT_TEXT}}: At {{TIME}}, {{ALERT_NAME}} was triggered by {{ALERT_VALUE}} for the following condition: {{ALERT_CONDITION}}. \n This is for the {{PIPELINE_TITLE}} pipeline, at {{PIPELINE_URL}}"
}

Examples

Here are some examples of how you might use webhooks:

Send webhooks to a Slack channel
You can configure a pipeline to send webhooks to a Slack channel. For example, you could have all alerts sent to an Operations Slack channel so someone on your operations team can respond to the alert.
To do this, you perform the following steps:
  1. Configure Slack for incoming webhooks for the channel you want to use.

    At the time of writing, you can enable webhooks in Slack starting from this page. When you enable webhooks, Slack generates a URL for you to use. If you have already enabled webhooks, check your account information for the Slack URL.

  2. Copy the URL that Slack generates and use this to configure the pipeline.
    The URL looks something like this:
    https://hooks.slack.com/services/<random string>/<random string>/<random string>

    Slack also offers instructions on how to configure the payload. At this time, they suggest a text field with the contents of the message, like Data Collector default payload.

  3. Configure the webhook in the pipeline.

    For Slack, you can just enter the URL and accept the defaults for everything else.

    Here's how a Slack alert webhook might look:

Start another pipeline
You can start a pipeline after the first pipeline completes all processing using dataflow triggers, the Pipeline Finisher executor, and a state notification webhook.
For example, say you have a JDBC Query Consumer origin that performs a full query to process all legacy data in some database tables.
You configure the origin to generate an event when it completes processing all available data, and you connect the event stream to the Pipeline Finisher executor. When the Pipeline Finisher executor receives the event from the JDBC Query Consumer, it transitions the pipeline to a Finished state. For more information on using the Pipeline Finisher executor with dataflow triggers, see Case Study: Stop the Pipeline.
To use this state change to start a second pipeline, configure a webhook that triggers when the pipeline state changes to Finished:
  1. In the pipeline properties, click the Notifications tab and click Add to add a webhook.
  2. For the URL, enter the command to start the pipeline.

    The command looks something like this:

    <http|https>://<system ip>.<http port>/rest/v1/pipeline/<pipeline id>/start

    For example: http://localhost:18630/rest/v1/pipeline/MyPipelinefbb6894c-08ec-421e-9e3c-4e6bbb2f5baf/start

  3. Click the Add icon to add a request header, and add the following header: X-Requested-By

    And set the value to: sdc.

    You can use the default method and authentication type, and keep or delete the payload, since they aren't used by the REST API.

    This is how the webhook might look:

Notifications

You can configure a pipeline to send an email or webhook when the pipeline changes to specified states.

For example, you might send an email when someone manually stops the pipeline, causing it to transition to a Stopped state. Or you might send a Slack or text message when the pipeline changes to a Start_Error or Run_Error state.

You can send notifications when the pipeline converts to the following states:
  • Running
  • Start_Error
  • Run_Error
  • Stopped
  • Finished
  • Disconnected
  • Connecting

You can specify multiple states to trigger notifications, but you cannot configure the pipeline to send different notifications based on different pipeline state changes at this time. For example, if you configure notifications for the Running and Finished states, the pipeline sends notifications when changing to both states.

However, when configuring a webhook, you can use webhook parameters in the payload to indicate the state change that triggered the notification. For more information about webhooks, see Webhooks.

To send email notification, Data Collector must be configured to send email. To enable sending email, configure the email alert Data Collector properties. For more information, see Configuring Data Collector.

For more information about pipeline states, see Understanding Pipeline States.

SSL/TLS Configuration

Some stages allow you use SSL/TLS to connect to the external system.

When you enable TLS, you can generally configure the following properties:
  • Keystore or Truststore properties
  • TLS protocols
  • Cipher suites

The properties that are available can depend on the stage that you are configuring.

You can enable SSL/TLS type properties in the following stages and locations:
  • HTTP Client origin, processor, and destination
  • HTTP to Kafka origin
  • HTTP Server origin
  • MQTT Subscriber origin and MQTT Producer destination
  • SDC RPC origin and SDC RPC destination
  • SDC RPC to Kafka origin
  • Websocket destination
  • Pipeline error handling, when writing error records to another pipeline

Keystore and Truststore Configuration

When SSL/TLS is enabled in a stage, you can also enable the use of a keystore and a truststore.

Though similar in many ways, a keystore contains a private key and public certificates that are used to verify the identity of the client upon a request from an SSL/TLS server. In contrast, a truststore generally contains certificates from trusted certificate authorities that an SSL/TLS client uses to verify the identity of an SSL/TLS server.

When you configure a keystore or truststore, you can configure the following properties:
keystore/truststore type
You can use the following types of keystores and truststores:
  • Java Keystore File (JKS)
  • PKCS-12 (p12 file)
file and location
When specifying the file and location of the keystore or truststore file, you can either use an absolute path to the file or a path relative to the Data Collector resources directory.
password
A password is optional for keystore and truststore files, but highly recommended.
algorithm
Data Collector uses the SunX509 key exchange algorithm by default. You can use any algorithm compatible with your keystore/truststore file that is supported by your JVM.

Transport Protocols

When SSL/TLS is enabled in a stage, you can configure the transport protocol to use.

Data Collector uses TLSv1.2 by default. You can specify one or more other protocols, but versions prior to TLSv1.2 are not as secure.

Cipher Suites

When SSL/TLS is enabled in a stage, you can configure the cipher suites to use to perform the SSL/TLS handshake.

By default, Data Collector can use any of the following cipher suites:
Supported Cipher Suite Java Secure Socket Extension (JSSE) Name
ECDHE-ECDSA-AES256-GCM-SHA384 TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384
ECDHE-RSA-AES256-GCM-SHA384 TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384
ECDHE-ECDSA-AES128-GCM-SHA256 TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256
ECDHE-RSA-AES128-GCM-SHA256 TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
ECDHE-ECDSA-AES256-SHA384 TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384
ECDHE-RSA-AES256-SHA384 TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384
ECDHE-ECDSA-AES128-SHA256 TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256
ECDHE-RSA-AES128-SHA256 TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256

Implicit and Explicit Validation

Data Collector performs two types of validation:
Implicit validation
Implicit validation occurs by default as the Data Collector console saves your changes. Implicit validation lists missing or incomplete configuration, such as an unconnected stage or a required property that has not been configured.
Errors found by implicit validation display in the Issues list. An error icons display on stages with undefined required properties and on the canvas for pipeline issues.
Explicit validation
Explicit validation occurs when you click the Validate icon, request data preview, or start the pipeline. Explicit validation becomes available when implicit validation passes.
Explicit validation is a semantic validation that checks all configured values for validity and verifies whether the pipeline can run as configured.
For example, while implicit validation verifies that you entered a value for a URI, explicit validation tests the validity of the URI by connecting to the system.
Errors found by explicit validation display in a list from the validation error message.

Expression Configuration

Use the expression language to configure expressions and conditions in processors, such as the Expression Evaluator or Stream Selector. Some destination properties also require the expression language, such as the directory template for Hadoop FS or Local FS.

Optionally, you can use the expression language to define any stage or pipeline property that represents a numeric or string value. This allows you to use runtime parameters or runtime properties throughout the pipeline. You can use expression completion to determine where you can use an expression and the expression elements that you can use in that location.

You can use the following elements in an expression:
  • Constants
  • Datetime variables
  • Field names
  • Functions
  • Literals
  • Operators
  • Runtime parameters
  • Runtime properties
  • Runtime resources

Basic Syntax

Precede all expressions with a dollar sign and enclose them with curly brackets, as follows: ${<expression>}.

For example, to add 2 + 2, use the following syntax: ${2 + 2}.

Using Field Names in Expressions

When a pipeline is valid for preview, expression completion provides available field names in a list. When a list is not available, use the appropriate format for the field name.

When you use a field name in an expression, use the following syntax:
${record:value("/<field name>")}
Note: You can use single or double quotation marks to surround a field name.

For example, the following expressions both concatenate the values from the DATE field with values from the TIME field:

${record:value('/DATE')} ${record:value('/TIME')}
${record:value("/DATE")} ${record:value("/TIME")}

Field Names with Special Characters

You can use quotation marks and the backslash character to handle special characters in field names.

Expression completion provides the correct syntax for field names with special characters. But when you need to enter the field names manually, be sure to use the following guidelines:
Use quotation marks around field names with special characters
When a field name includes special characters, surround the field name with single or double quotation marks as follows:
/"<field w/specialcharacter>"
Some examples:
/"Stream$ets"
/'city&state'
/"product names"
When using multiple sets of quotation marks, alternate between types as you go
Throughout the expression language, when using quotation marks, you can use single or double quotation marks. But make sure to alternate between the types when nesting quotation marks.
For example:
${record:value('/"Stream$ets"'}
${record:value("/'city&state'"}
Use a backslash as an escape character
To use a quotation mark or backslash in a field name, use a backslash ( \ ).
Add additional backslashes as necessary to escape quotation marks.
For example, to use a field named "ID's" as a required field, you would use a single backslash:
/ID\'s
To use the same field in an expression, you might need additional backslashes as follows:
${record:value('/ID\\'s')}

Referencing Field Names and Field Paths

When a pipeline is valid for preview, you can generally select fields from a list. When a list is not available or when you are defining a new field name, you need to use the appropriate format for the field name.

To reference a field, you specify the path of the field. A field path describes a data element in a record using a syntax similar to files in directories. The complexity of a field path differs based on the type of data in the record:

Simple maps or JSON objects
With simple maps or JSON objects, the fields are one level removed from the root. Reference the field as follows:
/<field name>
So, to reference a CITY field in a simple JSON object, enter /CITY. A simple expression that calls the field might look like this:
${record:value('/CITY')}
Complex maps or JSON objects
To reference a field in a complex map or JSON object, include the path to the field, as follows:
/<path to field>/<field name>
For example, the following field path describes an employeeName field several levels deep in a JSON object: /region/division/group/employeeName. An expression that calls the field might look like this:
${record:value("/region/division/group/employeeName")}
Arrays or lists
To reference a field in an array or list, include the index and path to the field, as follows:
[<index value>]/<path to field>/<field name>
For example, the following field path describes the same employeeName field in the third region index in an array: [2]/east/HR/employeeName.
An expression that calls the field might look like this:
${record:value('[2]/east/HR/employeeName')}
Delimited records can be structured as lists. For more information, see Delimited Data Root Field Type.
Text
To reference text when a record is a line of text, use the following field name:
/text

Wildcard Use for Arrays and Maps

In some processors, you can use the asterisk wildcard (*) as indices in an array or key values in a map. Use a wildcard to help define the field paths for maps and arrays.

You can use the asterisk wildcard as follows:
[*]
Matches all values for the specified index in an array. For example, the following field path represents the social security number of every employee in every division:
/Division[*]/Employee[*]/SSN
/*
Matches all values for the specified keys in a map. For example, the following field path represents all employee information in the first division:
/Division[0]/Employee[*]/*

Expression Completion in Properties

Expression completion provides a list of data types, runtime parameters, fields, and functions that you can use. The list includes runtime parameters when defined, and available fields when the pipeline is valid for data preview.

When an element does not display in the list, it is not a valid element at the specified location.

Tips for Expression Completion

Use the following information and tips when you invoke expression completion:

  1. To invoke expression completion, place the cursor where you want to create an expression and click Ctrl + Space Bar.

    A list of valid expression elements displays. Scroll to view the entire list.

    You can invoke expression completion anywhere where you can use an expression.

  2. Field names display at the top of the list when data preview is available. When defined, runtime parameters display in the list with a purple Parameters icon.

    In the following example, DirectoryRoot is a runtime parameter:

  3. To view more information about an element, click the element name:

  4. To add an element to an expression, double-click the element name or hit Enter.
  5. You can filter the element list by typing the first few letters of the element name.
  6. To view the syntax of a function, after you add the function, click within the parentheses.

Data Type Coercion

When an expression requires, the expression language attempts implicit data type conversion - a.k.a. data type coercion. When coercion is not possible, Data Collector passes the error records to the stage for error handling.

For example, you have an Expression Evaluator stage configured to send error records to the pipeline for error handling, and the pipeline writes error records to a file. The Expression Evaluator includes an expression that treats string data as integers. When the field includes integer or valid numeric data, the expression language coerces the data type. If the field includes a date, that record is written to the error records file.

To avoid coercion errors, you can use the Field Type Converter earlier in the pipeline to convert data to the appropriate data type.

Configuring a Pipeline

Configure a pipeline to define the stream of data. After you configure the pipeline, you can start the pipeline.

A pipeline can include the following stages:
  • A single origin stage
  • Multiple processor stages
  • Multiple destination stages
  1. From the Home page or Getting Started page, click Create New Pipeline.
    Tip: To get to the Home page, click the Home icon.
  2. In the New Pipeline window, enter a pipeline title and optional description, and click Save.
    The pipeline canvas displays the pipeline title, the generated pipeline ID, and an error icon. The error icon indicates that you need to configure error handling for the pipeline. The Properties panel displays the pipeline properties.
  3. In the Properties panel, on the General tab, configure the following properties:
    Pipeline Property Description
    Title Title of the pipeline.

    Data Collector uses the alphanumeric characters entered for the pipeline title as a prefix for the generated pipeline ID. For example, if you enter “My Pipeline *&%&^^ 123” as the pipeline title, then the pipeline ID has the following value: MyPipeline123tad9f592-5f02-4695-bb10-127b2e41561c.

    You can edit the pipeline title. However, because the pipeline ID is used to identify the pipeline, any changes to the pipeline title are not reflected in the pipeline ID.

    Description Optional description of the pipeline.
    Labels Optional labels to assign to the pipeline.

    Use labels to group similar pipelines. For example, you might want to group pipelines by database schema or by the test or production environment.

    You can use nested labels to create a hierarchy of pipeline groupings. Enter nested labels using the following format:
    <label1>/<label2>/<label3>
    For example, you might want to group pipelines in the test environment by the origin system. You add the labels Test/HDFS and Test/Elasticsearch to the appropriate pipelines.
    Execution Mode Execution mode of the pipeline:
    • Standalone - A single Data Collector process runs the pipeline.
    • Cluster Batch - Data Collector spawns additional workers as needed to process data in HDFS or MapR. Processes all available data and then stops the pipeline.
    • Cluster Yarn Streaming - Data Collector spawns additional workers as needed to process data. Use to stream data from a Kafka or MapR cluster that uses Spark Streaming on YARN.
    • Cluster Mesos Streaming - Data Collector spawns additional workers as needed to process data. Use to stream data from a Kafka cluster that uses Spark Streaming on Mesos.
    Delivery Guarantee Determines how Data Collector handles data after an unexpected event causes the pipeline to stop running:
    • At Least Once - Ensures all data is processed and written to the destination. Might result in duplicate rows.
    • At Most Once - Ensures that data is not reprocessed to prevent writing duplicate data to the destination. Might result in missing rows.

    Default is At Least Once.

    Retry Pipeline on Error Retries the pipeline upon error.
    Retry Attempts Number of retries attempted. Use -1 to retry indefinitely.

    The wait time between retries starts at 15 seconds and doubles until reaching five minutes.

    Max Pipeline Memory Maximum amount of memory for the pipeline to use. You can enter a numeric value or edit the default expression to use a percentage of the Data Collector Java heap size.
    Default is 65% of the Data Collector Java heap size:
     ${jvm:maxMemoryMB() * 0.65}
    On Memory Exceeded Action to take when the pipeline memory reaches the memory limit:
    • Log - Logs a message in the pipeline history.
    • Log and Alert - Logs a message and triggers an alert that displays in monitor mode and sends an alert email to any provided email addresses.
    • Log, Alert and Stop Pipeline - Logs a message, triggers an alert that displays in monitor mode and sends an alert email to any provided email addresses. Stops the pipeline. This option is not supported for cluster mode pipelines at this time.
    Rate Limit (records / sec) Maximum number of records that the pipeline can read in a second. Use 0 or no value to set no rate limit.

    Default is 0.

    Max Runners The maximum number of pipeline runners to use in a multithreaded pipeline.

    Use 0 for no limit. When set to 0, Data Collector generates up to the maximum number of threads or concurrency configured in the origin.

    You can use this property to help tune pipeline performance. For more information, see Tuning Threads and Runners.

    Default is 0.

  4. To define runtime parameters, on the Parameters tab, click the Add icon and define the name and the default value for each parameter.
    For more information, see Using Runtime Parameters.
  5. To configure notifications based on changes in pipeline state, on the Notifications tab, configure the following properties:
    Notifications Property Description
    Notify on Pipeline State Changes Sends notifications when the pipeline encounters the listed pipeline states.
    Email IDs Email addresses to receive notification when the pipeline state changes to one of the specified states. Click the Add icon to add additional addresses.
    Webhooks Webhook to send when the pipeline state changes to one of the specified states. Click the Add icon to add additional webhooks.
    Webhook URL URL to send the HTTP request.
    Headers Optional HTTP request headers.
    HTTP Method HTTP method. Use one of the following methods:
    • GET
    • PUT
    • POST
    • DELETE
    • HEAD
    Payload Optional payload to use. Available for PUT, POST, and DELETE methods.

    Use any valid content type.

    You can use webhook parameters in the payload to include information about the triggering event, such as the pipeline name or state. Enclose webhook parameters in double curly brackets as follows: {{PIPELINE_STATE}}.

    Content Type Optional content type of the payload. Configure this property when the content type is not declared in the request headers.
    Authentication Type Optional authentication type to include in the request. Use None, Basic, Digest, or Universal.

    Use Basic for Form authentication.

    User Name User name to include when using authentication.
    Password Password to include when using authentication.
  6. Click the Error Records tab and configure the following error handling option:
    Error Records Property Description
    Error Records Determines how to handle records that cannot be processed as expected. Use one of the following options:
    • Discard - Discards error records.
    • Write to Another Pipeline - Writes error records to another pipeline. To use this option, you need an SDC RPC destination pipeline to process the error records.
    • Write to Elasticsearch - Writes error records to the specified Elasticsearch cluster.
    • Write to File - Writes error records to a file in the specified directory.
    • Write to Kafka - Writes error records to the specified Kafka cluster.
    • Write to MapR Streams - Writes error records to the specified MapR cluster.

    For cluster mode, Write to File is not supported at this time.

  7. When writing error records to an SDC RPC pipeline, click the Error Records - Write to Another Pipeline tab and configure the following properties:
    Write to Pipeline Property Description
    SDC RPC Connection Connection information for the destination pipeline to continue processing data. Use the following format: <host>:<port>.

    Use a single RPC connection for each destination pipeline. Add additional connections as needed.

    Use the port number when you configure the SDC RPC origin that receives the data.

    SDC RPC ID User-defined ID to allow the destination to pass data to an SDC RPC origin. Use this ID in all SDC RPC origins to process data from the destination.
    Retries Per Batch Number of times the destination tries to write a batch to the SDC RPC origin.

    When the destination cannot write the batch within the configured number of retries, it fails the batch.

    Default is 3.

    Back Off Period Milliseconds to wait before retrying writing a batch to the SDC RPC origin.

    The value that you enter increases exponentially after each retry, until it reaches the maximum wait time of 5 minutes. For example, if you set the back off period to 10, the destination attempts the first retry after waiting 10 milliseconds, attempts the second retry after waiting 100 milliseconds, and attempts the third retry after waiting 1,000 milliseconds.

    Set to 0 to retry immediately.

    Default is 0.

    Connection Timeout (ms) Milliseconds to establish a connection to the SDC RPC origin.

    The destination retries the connection based on the Retries Per Batch property.

    Default is 5000 milliseconds.

    Read Timeout (ms) Milliseconds to wait for the SDC RPC origin to read data from a batch.

    The destination retries the write based on the Retries Per Batch property.

    Default is 2000 milliseconds.

    Use Compression Enables the destination to use compression to pass data to the SDC RPC origin. Enabled by default.
    Enable TLS

    Enables the use of TLS.

    Truststore File The path to the truststore file. Enter an absolute path to the file or a path relative to the Data Collector resources directory: $SDC_RESOURCES.

    For more information about environment variables, see Data Collector Environment Configuration.

    By default, no truststore is used.

    Truststore Type Type of truststore to use. Use one of the following types:
    • Java Keystore File (JKS)
    • PKCS-12 (p12 file)

    Default is Java Keystore File (JKS).

    Truststore Password Password to the truststore file. A password is optional, but recommended.
    Tip: To secure sensitive information such as passwords, you can use runtime resources or Hashicorp Vault secrets. For more information, see Using Runtime Resources or Accessing Hashicorp Vault Secrets.
    Truststore Trust Algorithm The algorithm used to manage the truststore.

    Default is SunX509.

    Use Default Protocols Determines the transport layer security (TLS) protocol to use. The default protocol is TLSv1.2. To use a different protocol, clear this option.
    Transport Protocols The TLS protocols to use. To use a protocol other than the default TLSv1.2, click the Add icon and enter the protocol name.
    Note: Older protocols are not as secure as TLSv1.2.
    Use Default Cipher Suites Determines the cipher suite to use when performing the SSL/TLS handshake.

    Data Collector provides a set of cipher suites that it can use by default. For a full list, see Cipher Suites.

    Cipher Suites Cipher suites to use. To use a cipher suite that is not a part of the default set, click the Add icon and enter the name of the cipher suite.

    Enter the Java Secure Socket Extension (JSSE) name for the additional cipher suites that you want to use.

  8. When writing error records to Elasticsearch, click the Error Records - Write to Elasticsearch tab and configure the following properties:
    Elasticsearch Property Description
    Cluster HTTP URI HTTP URI used to connect to the cluster. Use the following format:
    <host>:<port>
    Additional HTTP Params Additional HTTP parameters that you want to send as query string parameters to Elasticsearch. Enter the exact parameter name and value expected by Elasticsearch.
    Detect Additional Nodes in Cluster

    Detects additional nodes in the cluster based on the configured Cluster URI.

    Selecting this property is the equivalent to setting the client.transport.sniff Elasticsearch property to true.

    Use only when the Data Collector shares the same network as the Elasticsearch cluster. Do not use for Elastic Cloud or Docker clusters.

    Use Security Specifies whether security is enabled on the Elasticsearch cluster.
    Time Basis
    Time basis to use for writing to time-based indexes. Use one of the following expressions:
    • ${time:now()} - Uses the processing time as the time basis. The processing time is the time associated with the Data Collector running the pipeline.
    • An expression that calls a field and resolves to a datetime value, such as ${record:value(<date field path>)}. Uses the datetime result as the time basis.

    When the Index property does not include datetime variables, you can ignore this property.

    Default is ${time:now()}.

    Data Time Zone Time zone for the destination system. Used to resolve datetimes in time-based indexes.
    Index Index for the generated documents. Enter an index name, an expression that evaluates to the index name, or a field that includes the index name.

    For example, if you enter customer as the index, the destination writes the document within the customer index.

    If you use datetime variables in the expression, make sure to configure the time basis appropriately. For details about datetime variables, see Datetime Variables.

    Mapping Mapping type for the generated documents. Enter the mapping type, an expression that evaluates to the mapping type, or a field that includes the mapping type.

    For example, if you enter user as the mapping type, the destination writes the document with a user mapping type.

    Document ID ID for the record. Use to specify the ID for the generated documents. When you do not specify an ID, Elasticsearch creates an ID for each document.

    By default, the destination allows Elasticsearch to create the ID.

    Data Charset

    Character encoding of the data to be processed.

    If you enabled security, configure the following security property:
    Security Property Description
    Security Username/Password Elasticsearch username and password.
    Enter the username and password using the following syntax:
    <username>:<password>
    Tip: To secure sensitive information such as usernames and passwords, you can use runtime resources or Hashicorp Vault secrets. For more information, see Using Runtime Resources or Accessing Hashicorp Vault Secrets.
    SSL Truststore Path

    Location of the truststore file.

    Configuring this property is the equivalent to configuring the shield.ssl.truststore.path Elasticsearch property.

    Not necessary for Elastic Cloud clusters.

    SSL Truststore Password

    Password for the truststore file.

    Configuring this property is the equivalent to configuring the shield.ssl.truststore.password Elasticsearch property.

    Not necessary for Elastic Cloud clusters.

  9. When writing error records to file, click the Error Records - Write to File tab and configure the following properties:
    Write to File Property Description
    Directory Local directory for error record files.
    File Prefix Prefix used for error record files. Use to differentiate error record files from other files in the directory.

    Uses the prefix sdc-${sdc:id()} by default. The prefix evaluates to sdc-<Data Collector ID>. This provides default differentiation in case several Data Collectors write to the same directory.

    The Data Collector ID is stored in the following file: $SDC_DATA/sdc.id file. For more information about environment variables, see Data Collector Environment Configuration.

    File Wait Time (secs) Number of seconds Data Collector waits for error records. After that time, it creates a new error record file.

    You can enter a number of seconds or use the default expression to enter the time in minutes.

    Max File Size (MB) Maximum size for error files. Exceeding this size creates a new error file.

    Use 0 to avoid using this property.

  10. When writing error records to Kafka, click the Error Records - Write to Kafka tab and configure the following properties:
    Write to Kafka Property Description
    Broker URI Connection string for the Kafka broker. Use the following format: <host>:<port>.

    To ensure a connection, enter a comma-separated list of additional broker URI.

    Runtime Topic Resolution Evaluates an expression at runtime to determine the topic to use for each record.
    Topic Expression Expression used to determine where each record is written when using runtime topic resolution. Use an expression that evaluates to a topic name.
    Topic White List List of valid topic names to write to when using runtime topic resolution. Use to avoid writing to invalid topics. Records that resolve to invalid topic names are passed to the stage for error handling.

    Use an asterisk (*) to allow writing to any topic name. By default, all topic names are valid.

    Topic Topic to use.

    Not available when using runtime topic resolution.

    Partition Strategy Strategy to use to write to partitions:
    • Round Robin - Takes turns writing to different partitions.
    • Random - Writes to partitions randomly.
    • Expression - Uses an expression to write data to different partitions.
    • Default - Uses the default partition strategy that Kafka provides.
    Partition Expression Expression to use when using the expression partition strategy.

    Define the expression to evaluate to the partition where you want each record written. Partition numbers start with 0.

    Optionally, click Ctrl + Space Bar for help with creating the expression.

    One Message per Batch For each batch, writes the records to each partition as a single message.
    Kafka Configuration Additional Kafka properties to use. Click the Add icon and define the Kafka property name and value.

    Use the property names and values as expected by Kafka. Do not use the broker.list property.

    For information about enabling secure connections to Kafka, see Enabling Security.

  11. When writing data to a MapR Streams cluster, click the Error Records - Write to MapR Streams tab and configure the following properties:
    MapR Streams Producer Property Description
    Runtime Topic Resolution Evaluates an expression at runtime to determine the topic to use for each record.
    Topic Topic to use.

    Not available when using runtime topic resolution.

    Topic Expression Expression used to determine where each record is written when using runtime topic resolution. Use an expression that evaluates to a topic name.
    Topic White List List of valid topic names to write to when using runtime topic resolution. Use to avoid writing to invalid topics. Records that resolve to invalid topic names are passed to the stage for error handling.

    Use an asterisk (*) to allow writing to any topic name. By default, all topic names are valid.

    Partition Strategy Strategy to use to write to partitions:
    • Round Robin - Takes turns writing to different partitions.
    • Random - Writes to partitions randomly.
    • Expression - Uses an expression to write data to different partitions.
    • Default - Uses the default partition strategy that Kafka provides.
    Partition Expression Expression to use when using the expression partition strategy.

    Define the expression to evaluate to the partition where you want each record written. Partition numbers start with 0.

    Optionally, click Ctrl + Space Bar for help with creating the expression.

    One Message per Batch For each batch, writes the records to each partition as a single message.
    MapR Streams Configuration Additional configuration properties to use. To add properties, click Add and define the property name and value.

    Use the property names and values as expected by MapR.

    You can use MapR Streams properties and set of Kafka properties supported by MapR Streams.

  12. When using the cluster execution mode, click the Cluster tab and configure the following properties:
    For Spark Streaming on Mesos, configure the following properties:
    Mesos Cluster Property Description
    Dispatcher URL Master URL of the Mesos dispatcher. For example, mesos://dispatcher:7077.
    Checkpoint Configuration Directory Location of the HDFS configuration files that specify whether to write checkpoint metadata to HDFS or Amazon S3.

    Use a directory or symlink within the Data Collector resources directory.

    The directory should include the following files:
    • core-site.xml
    • hdfs-site.xml
    For Spark Streaming or MapReduce on YARN, configure the following properties.
    Yarn Cluster Property Description
    Worker Memory (MB) Maximum amount of memory allocated to each Data Collector worker in the cluster.

    Default is 1024 MB.

    Worker Java Options Additional Java properties for the pipeline. Separate properties with a space.

    The following properties are set by default.

    • XX:+UseConcMarkSweepGC and XX:+UseParNewGC are set to the Concurrent Mark Sweep (CMS) garbage collector.
    • Dlog4j.debug enables debug logging for log4j.

    Changing the default properties is not recommended.

    You can add any valid Java property.

    Launcher Env Configuration

    Additional configuration properties for the cluster launcher. Click the Add icon and define the property name and value.

  13. If you have registered the Data Collector to work with Dataflow Performance Manager (DPM), you can configure the pipeline to aggregate statistics on the Statistics tab.

    For information on configuring a pipeline to aggregate statistics for DPM, see Pipeline Statistics.

  14. Use the Stage Library to add an origin stage. In the Properties panel, configure the stage properties.
    For configuration details about origin stages, see Origins.
  15. Use the Stage Library to add the next stage that you want to use, connect the origin to the new stage, and configure the new stage.
    For configuration details about processors, see Processors.

    For configuration details about destinations, see Destinations.

  16. Add additional stages as necessary.
  17. At any point, you can use the Preview icon to preview data to help .configure the pipeline. For more information, see Data Preview Overview.
  18. Optionally, you can create metric or data alerts to track details about a pipeline run and create threshold alerts. For more information, see Rules and Alerts.
  19. When the pipeline is complete, use the Start icon to run the pipeline.
When Data Collector starts the pipeline, the Monitor console displays real-time statistics for the pipeline. For more information about monitoring, see Pipeline Monitoring Overview.