Wait for Pipelines

Supported pipeline types:
  • Data Collector

The Wait for Pipelines processor waits for one or more pipelines to complete.

The Wait for Pipelines processor is an orchestration stage that you use in orchestration pipelines. Orchestration stages perform tasks, such as schedule and start pipelines and Control Hub jobs, that you can use to create an orchestrated workflow across the StreamSets platform.

Use this processor when you want to wait for pipelines that were started upstream to complete, before performing other orchestration tasks. The Wait for Pipelines processor can wait for pipelines that are running on the specified Data Collector, Data Collector Edge, or Transformer execution engine. To wait for pipelines on multiple engines, use additional processors.

For example, you might use a Wait for Pipelines processor to wait for SDC Edge pipelines to complete that were started by a Start Pipelines origin. If you also wanted to wait for Data Collector pipelines to complete, you would use another Wait for Pipelines processor to perform that task.

The Wait for Pipelines processor checks the status of all pipelines listed in incoming orchestration records that run on the specified execution engine. When the pipelines complete, the processor updates the pipeline status details in the record and passes a single orchestration record downstream.

When you configure the Wait for Pipelines processor, you define the URL of the execution engine that runs the pipelines. For an execution engine registered with Control Hub, you specify the Control Hub URL, so the processor starts the pipelines through Control Hub. Then, you specify how long to wait between each pipeline status checks.

You also configure the user name and password to run the pipeline and can optionally configure SSL/TLS properties.

Stage Processing and Pipeline Implementation

Use the Wait for Pipelines processor downstream from a Start Pipelines origin or Start Pipelines processor that starts pipelines that run in the background. When running pipelines in the background, a Start Pipelines stage passes its orchestration record downstream immediately after starting pipelines, rather than waiting for them to complete.

When a Wait for Pipelines processor receives an orchestration record, it checks the status for all of the pipelines that are running on the execution engine specified in the stage. After all of the pipelines complete, the processor updates the pipeline status information in the orchestration record and passes the record downstream.

If you pass orchestration records from multiple stages to the processor, the processor waits until all pipelines associated with those records are complete, then passes a single merged orchestration record downstream.

Note: Using a Wait for Pipelines processor is similar to configuring a Start Pipelines stage to run pipelines in the foreground. In both cases, the orchestration record passes downstream after the pipelines complete. Use the Wait for Pipelines processor when running pipelines in the foreground is not the optimal solution.

For example, instead of using a Wait for Pipelines processor immediately after a Start Pipelines origin that starts a pipeline in the background, you can just configure the origin to run the pipeline in the foreground. Then, the Start Pipelines origin passes its orchestration record downstream after the pipeline completes, with no need for a Wait for Pipelines processor.

In contrast, say you want to start several SDC Edge and Transformer pipelines when you start your orchestration pipeline. You also want them all to complete before starting a Data Collector pipeline. To do this, you create the following pipeline:

You configure a Start Pipelines origin to start the SDC Edge pipelines in the background, which passes an orchestration record to the first Start Pipelines processor as soon as the jobs start. This enables the pipelines started by both stages to run concurrently.

You configure the Start Pipelines processor to run the Transformer pipelines in the foreground, so the processor passes the updated orchestration record downstream only after all of the pipelines complete. That takes care of the Transformer pipelines, but the SDC Edge pipelines may still be running.

To ensure that the SDC Edge pipelines complete before starting the Data Collector pipeline, you add the Wait for Pipelines processor and configure it to wait pipelines running on that Data Collector engine.

When the Wait for Pipelines processor receives the orchestration record, it notes the IDs of all of the pipelines started upstream that are running on the specified Data Collector, and waits for them to complete. After the pipelines complete, the Wait for Pipelines processor updates pipeline status information in the orchestration record and passes the record downstream, triggering the start of the Data Collector pipeline.

Generated Record

When the Wait for Pipelines processor completes its task, it updates the pipeline status and related information in the orchestration record before passing the record downstream.

The processor updates and adds following fields:
Field Name Description
<pipeline ID>/pipelineStatus Status of the pipeline. For more information, see Understanding Pipeline States.

The processor updates this field.

<pipeline ID>/pipelineStatusMessage Status message for the pipeline.

The processor updates this field as needed.

<pipeline ID>/finishedSuccessfully Boolean field that indicates whether a pipeline completed successfully.

The processor adds this field.

<pipeline ID>/pipelineMetrics Map field that contains metrics for the pipeline and individual pipeline stages.

The processor adds these fields.

<unique task name>/success Boolean field that indicates whether all jobs completed successfully.

The processor adds this field.

For example, the following preview shows the fields that a Wait for Pipelines processor adds and updates in comparison to the incoming record:

Notice how the processor updated the pipelineStatus field, and added the finishedSuccessfully, pipelineMetrics, and success fields. All of the changes indicate that the pipeline completed successfully.

Configuring a Wait for Pipelines Processor

Configure a Wait for Pipelines processor to wait for Data Collector, Data Collector Edge, or Transformer pipelines to complete before passing an orchestration record downstream. The Wait for Pipelines processor is an orchestration stage that you use in orchestration pipelines.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions.

    Records that do not meet all preconditions are processed based on the error handling configured for the stage.

    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline. Not valid for cluster pipelines.
  2. On the Pipeline tab, configure the following properties:
    Pipeline Property Description
    Execution Engine URL URL of the execution engine that runs the pipelines. Execution engines include Data Collector, Data Collector Edge, and Transformer.
    Control Hub Enabled Starts pipelines through Control Hub. Select this property when the execution engine is registered with Control Hub.
    Control Hub URL URL of Control Hub where the execution engine is registered:
    • For Control Hub cloud, enter https://cloud.streamsets.com.
    • For Control Hub on-premises, enter the URL provided by your system administrator. For example, https://<hostname>:18631.
    Status Check Interval Milliseconds to wait before checking the specified execution engine URL for the completion status of the pipelines listed in the incoming orchestration record.
  3. On the Credentials tab, configure the following properties:
    Credentials Property Description
    User Name User that runs the pipeline. Enter a user name for the execution engine or enter a Control Hub user name if the engine is registered with Control Hub.
    Password Password for the user.
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores. For more information about credential stores, see Credential Stores in the Data Collector documentation.
  4. To use SSL/TLS, click the TLS tab and configure the following properties.
    TLS Property Description
    Use TLS Enables the use of TLS.
    Keystore File Path to the keystore file. Enter an absolute path to the file or a path relative to the Data Collector resources directory: $SDC_RESOURCES.

    For more information about environment variables, see Data Collector Environment Configuration in the Data Collector documentation.

    By default, no keystore is used.

    Keystore Type Type of keystore to use. Use one of the following types:
    • Java Keystore File (JKS)
    • PKCS #12 (p12 file)

    Default is Java Keystore File (JKS).

    Keystore Password Password to the keystore file. A password is optional, but recommended.
    Tip: To secure sensitive information such as passwords, you can use runtime resources or credential stores. For more information about credential stores, see Credential Stores in the Data Collector documentation.
    Keystore Key Algorithm Algorithm to manage the keystore.

    Default is SunX509.

    Truststore File Path to the truststore file. Enter an absolute path to the file or a path relative to the Data Collector resources directory: $SDC_RESOURCES.

    For more information about environment variables, see Data Collector Environment Configuration in the Data Collector documentation.

    By default, no truststore is used.

    Truststore Type Type of truststore to use. Use one of the following types:
    • Java Keystore File (JKS)
    • PKCS #12 (p12 file)

    Default is Java Keystore File (JKS).

    Truststore Password Password to the truststore file. A password is optional, but recommended.
    Tip: To secure sensitive information such as passwords, you can use runtime resources or credential stores. For more information about credential stores, see Credential Stores in the Data Collector documentation.
    Truststore Trust Algorithm Algorithm to manage the truststore.

    Default is SunX509.

    Use Default Protocols Uses the default TLSv1.2 transport layer security (TLS) protocol. To use a different protocol, clear this option.
    Transport Protocols TLS protocols to use. To use a protocol other than the default TLSv1.2, click the Add icon and enter the protocol name. You can use simple or bulk edit mode to add protocols.
    Note: Older protocols are not as secure as TLSv1.2.
    Use Default Cipher Suites Uses a default cipher suite for the SSL/TLS handshake. To use a different cipher suite, clear this option.
    Cipher Suites Cipher suites to use. To use a cipher suite that is not a part of the default set, click the Add icon and enter the name of the cipher suite. You can use simple or bulk edit mode to add cipher suites.

    Enter the Java Secure Socket Extension (JSSE) name for the additional cipher suites that you want to use.