Pipeline Finisher

When the Pipeline Finisher executor receives an event, the executor stops a pipeline and transitions it to a Finished state. This allows the pipeline to complete all expected processing before stopping.

Use the Pipeline Finisher executor as part of an event stream. You can use the Pipeline Finisher executor in any logical way, such as stopping a pipeline upon receiving a no-more-data event from the PostgreSQL Query Consumer origin.

For example, you might use the executor to perform traditional "batch" processing - to process data, then stop when all data is processed rather than waiting indefinitely for more data.

When you configure a Pipeline Finisher executor, you can specify whether the executor should reset the origin after each pipeline run. When needed, you can use a precondition to limit the records that enter the stage to stop the pipeline. You might also configure the pipeline to notify you when the Pipeline Finisher executor stops the pipeline.

Before using the Pipeline Finisher executor, review the recommended implementation information.

For a solution that describes how to use the Pipeline Finisher executor, see Stopping a Pipeline After Processing All Available Data.

For more information about dataflow triggers and the event framework, see Overview.

Recommended Implementation

The Pipeline Finisher executor is designed to stop and transition a pipeline to a Finished state after processing available data in the origin system.

For example, you might use the executor to stop the pipeline after the PostgreSQL Query Consumer origin processes all available data specified in the query.

When an origin generates only the no-more-data event, you can simply connect the event output to the Pipeline Finisher executor. When an origin generates multiple event types, you need to ensure that the Pipeline Finisher executor stops the pipeline only after receiving the no-more-data event.

Here are some ways you can ensure the executor receives only the no-more-data event:
Configure a precondition for the Pipeline Finisher
In the executor, add a precondition to allow only a no-more-data event into the stage to trigger the executor. You can use the following expression:
${record:eventType() == 'no-more-data'}
Tip: Records dropped because of a precondition are handled based on the stage error handling configuration. So to avoid racking up error records, you might also configure the Pipeline Finisher executor to discard error records.
Use this method when pipeline logic allows you to discard other event types generated by the origin.
Add a Stream Selector processor before the Pipeline Finisher
You can add a Stream Selector processor between the origin and the executor to route only the no-more-data event to the Pipeline Finisher. Use this option when you want to pass other event types to a different branch for processing.
For example, say you're using the PostgreSQL Query Consumer origin, which generates no-more-data, query success, and query failure events. And say you want to store the query success and query failure events. You can use a Stream Selector processor with the following condition to route the no-more-data event to the Pipeline Finisher executor:
${record:eventType() == 'no-more-data'}
Then you can connect the default stream - which receives the query success and query failure events - to a destination for storage.

Related Event Generating Stages

Best practice is to use the Pipeline Finisher executor only with origins that generate no-more-data events.

The following origins generate no-more-data events:
  • Amazon S3 origin
  • Azure Data Lake Storage Gen1 origin
  • Azure Data Lake Storage Gen2 origin
  • Google Cloud Storage origin
  • MySQL Multitable Consumer origin
  • MySQL Query Consumer origin
  • Oracle Multitable Consumer origin
  • Oracle Query Consumer origin
  • PostgreSQL Multitable Consumer origin
  • PostgreSQL Query Consumer origin
  • Salesforce origin
  • SQL Server Multitable Consumer origin
  • SQL Server Query Consumer origin

Origin Reset for Additional Pipeline Runs

When you want a pipeline to process all available data each time that the pipeline runs, configure the Pipeline Finisher executor to reset the origin after it stops a pipeline.

When the executor resets the origin, the restart behavior for a pipeline is the same for all origins: the origin processes all available data.

By default, the restart behavior depends on the origin used in the pipeline. When an origin does not save an offset, when you restart the pipeline, the origin processes all available data again. For example, when the MySQL Query Consumer origin runs in full mode, the origin processes the full query each time you restart the pipeline.

When an origin stores an offset, when you restart the pipeline, the origin begins at the last-saved offset by default. For example, when the MySQL Query Consumer origin runs in incremental mode, by default, the origin continues where it left off when you restart the pipeline.

When you want the origin to process all available data with each pipeline run, configure the Pipeline Finisher executor to reset the origin. Though this property has no effect on origins that do not save an offset, those origins already process all available data with each pipeline run.

Notification

StreamSets Cloud can notify you when the Pipeline Finisher executor stops a pipeline.

You can configure the pipeline to send a webhook notification when the pipeline transitions to the specified state.

To have the pipeline send a webhook when the Pipeline Finisher executor stops a pipeline, set the Notify Upon Pipeline State Changes property on the pipeline Notifications tab to Finished. Then configure the webhook to send. For more information, see Configuring a Pipeline.

Configuring a Pipeline Finisher Executor

Configure a Pipeline Finisher executor to stop and transition the pipeline to a Finished state when the executor receives an event record.
  1. In the properties pane, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Stage Library Library version that you want to use.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. All other records are handled based on the On Record Error property.

    Click Add to create additional preconditions.

    Tip: To allow only the no-more-data event to pass to the executor, use the following condition:
    ${record:eventType() == 'no-more-data'}
    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
    Tip: When using preconditions to limit the event type that enters the executor, you might set this property to Discard to avoid processing other event types.
  2. On the Finisher tab, optionally configure the following property:
    Finisher Property Description
    Reset Origin Resets the pipeline origin after the Pipeline Finisher executor stops the pipeline.

    Enable this option to process all available data each time the pipeline runs. When disabled, the pipeline restart behavior depends on the origin configuration.