Pipeline Configuration

Pipeline Configuration Overview

After you create a pipeline, you configure specific properties for the pipeline.

You configure how the pipeline handles delivery guarantee, or how data is handled after an unexpected event causes the pipeline to stop running. You can optionally configure a test origin for the pipeline so that you can use test data during pipeline preview.

You can also configure how the pipeline handles start and stop events, the maximum number of pipeline retries, and the rate limit at which the pipeline processes records.

You can access pipeline properties at any time during pipeline design. Simply click the pipeline canvas to diplay pipeline properties in the properties pane.

Delivery Guarantee

When you configure a pipeline, you define whether you want to prevent the loss of data or the duplication of data.

The Delivery Guarantee pipeline property offers the following choices:
At least once
Ensures that the pipeline processes all data.
If a failure causes the pipeline to stop while processing a batch of data, when the pipeline restarts, it reprocesses the batch. This option ensures that no data is lost.
With this option, the pipeline commits the offset after receiving write confirmation from destination systems. If a failure occurs after the pipeline passes data to destination systems but before receiving confirmation and committing the offset, up to one batch of data might be duplicated in destination systems.
At most once
Ensures that data is not processed more than once.
If a failure causes the pipeline to stop while processing a batch of data, when it starts up, it begins processing with the next batch of data. This option avoids the duplication of data in destinations due to reprocessing.
With this option, the pipeline commits the offset after a write without waiting for confirmation from destination systems. If a failure occurs after the pipeline passes data to destinations and commits the offset, up to one batch of data might not get written to the destination systems.

Test Origin for Preview

To aid in pipeline development, you can preview a pipeline and use a test origin that provides test data. Test origins are not used when running a pipeline.

A test origin is a virtual stage that you configure in the pipeline properties. Test origins do not display in the canvas. This allows you to configure the real origin - the one that you want to use in production - in the canvas, while configuring a test origin in the pipeline properties.

You can use any available origin as the test origin. The test data can be of any data format allowed by the test origin. The test data does not have to match the data format of the production data.

To use a test origin in preview, you must select the test origin in the preview configuration dialog box as the preview source. By default, preview uses the origin configured in the canvas as the preview source.

For example, say your IT department has provided a separate bucket for test data in Amazon S3. In the canvas, you configure an Amazon S3 origin to use the real bucket for production. In the pipeline properties, you configure a virtual Amazon S3 test origin to use the test bucket.

During pipeline development and testing, you configure preview to use the test origin, leaving production data untouched. Then when the pipeline is ready, you start the pipeline. StreamSets Cloud uses the real Amazon S3 origin to process production data, ignoring the test origin.

You can use the test origin as needed, but here are some general scenarios:
  • Use a test origin to access test data in the same origin system.

    Use the same type of origin in the canvas and as the test origin, configuring them to access the production data and test data for preview, respectively.

  • Use a test origin to access test data in a different origin system.

    When needed, you can use any origin to access test data for preview. The original location and data format of the test data becomes irrelevant after it is passed to the pipeline.

  • Use a development origin to provide test data.

    If you have some test data readily available, you can use the Raw Data Source origin as the test origin. Or, use other development stages to provide test data.

Configuring a Test Origin

Configure a test origin in the pipeline properties.

  1. On the General tab of the pipeline properties, select the origin type that you want to use.
    You can select any available origin type.
  2. On the Test Origin tab, configure the origin properties.
    Origin properties for the test origin are the same as for real origins, with all properties displaying on a single tab.
    For details about origin configuration, see "Configuring an <origin type> Origin" in the Origins chapter.

Using a Test Origin in Preview

To use a configured test origin in preview, configure the preview configuration options.

  1. With a pipeline open in the canvas, click the Preview icon to start preview.
  2. In the Preview Configuration dialog box, set the Preview Source property to Test Origin, then configure the rest of the preview properties as needed.

Event Generation

The event framework generates pipeline events when the pipeline starts and stops.

You can pass pipeline events to an executor for additional processing by specifying the executor that handles the event and then configuring the related event consumer properties on the Start Event or Stop Event tab of the pipeline. For example, you might configure a pipeline that writes to Amazon S3 to pass the pipeline stop event to the Amazon S3 executor so that the executor copies the written objects to another location in the same bucket after pipeline processing is complete.

By default, pipeline start and stop events are discarded. For more information about pipeline events, see Pipeline Event Generation.

For general information about the event framework, see Dataflow Triggers Overview.

Pipeline Event Records

Pipeline event records have the following event-related record header attributes. Record header attributes are stored as String values:

Record Header Attribute Description
sdc.event.type Event type. Uses one of the following types:
  • pipeline-start - Generated as the pipeline starts.
  • pipeline-stop - Generated as the pipeline stops.
sdc.event.version Integer that indicates the version of the event record type.
sdc.event.creation_timestamp Epoch timestamp when the stage created the event.
The event framework generates the following types of pipeline events:
pipeline-start
The event framework generates start events as the pipeline initializes, immediately after it starts and before individual stages are initialized.
The start event record has sdc.event.type set to pipeline-start, and the following fields:
Pipeline Start Event Field Description
pipelineId The ID of the pipeline that started.
pipelineTitle The user-defined name of the pipeline that started.
user The user who started the pipeline.
pipeline-stop
The event framework generates stop events as the pipeline stops, either manually, programmatically, or due to a failure. The stop event is generated after all stages have completed processing and cleaning up temporary resources, such as removing temporary files.
The stop event record has sdc.event.type set to pipeline-stop, and the following fields:
Pipeline Stop Event Field Description
pipelineId The ID of the pipeline that stopped.
pipelineTitle The user-defined name of the pipeline that stopped.
reason The reason why the pipeline stopped. Can be set to the following reasons:
  • Error - An error occurred as the pipeline was running.
  • Finished - The pipeline finished all expected processing.
  • User - A user stopped the pipeline.
user The user who stopped the pipeline, when relevant.

Retrying the Pipeline

By default, when StreamSets Cloud encounters a stage-level error that might cause a pipeline to fail, it retries the pipeline. That is, it waits a period of time, and then tries again to run the pipeline.

A stage-level error might include a stage not being able to connect to an external system because the system or network is down.

You can define the maximum number of pipeline retries that StreamSets Cloud attempts.

With the default value, -1, StreamSets Cloud retries the pipeline an infinite number of times. This allows the pipeline to continue running as soon as any temporary connection or system failures resolve.

If you want the pipeline to stop after a given number of retries, you can define the maximum number of retries to perform.

The wait time between retries begins at 15 seconds and doubles in length until reaching a maximum of 5 minutes.

Rate Limit

You can limit the rate at which a pipeline processes records by defining the maximum number of records that the pipeline can read in a second.

By default, a pipeline has no rate limit. You might want to limit the rate for the following reasons:

The pipeline reads records faster than it can write them to the destination system
Because a pipeline processes one batch at a time, pipeline performance slows when a pipeline reads records faster than it can process them or write them to the destination system. The pipeline must wait until a batch is committed to the destination system before reading the next batch, causing a delay before the next batch is read and preventing the pipeline from reading at a steady rate. Reading data at a steady rate provides better performance than reading sporadically.

You can limit the rate at which the pipeline reads records to decrease the delay between reads from the origin system.

The origin system requires that the data be read at a slower rate

If the origin system is being used for other purposes, it might not be able to handle the rate at which the pipeline reads records. You can limit the rate to meet the origin system requirements.

Use the Rate Limit pipeline property to define the maximum number of records that the pipeline can read in a second.

Configuring a Pipeline

Configure a pipeline to define the stream of data. After you configure the pipeline, you can run the pipeline.

A pipeline can include the following stages:
  • A single origin stage
  • Multiple processor stages
  • Multiple destination stages
  • Multiple executor stages
  1. In the left navigation pane, click the Pipelines icon: .
  2. Click Create.
  3. Enter a name for the pipeline, and then click Create.

    The pipeline canvas opens.

  4. In the properties pane, on the General tab, configure the following properties for the pipeline:
    Pipeline Property Description
    Name Name of the pipeline.
    Description Optional description of the pipeline.
    Labels Optional labels to assign to the pipeline.

    Use labels to group similar pipelines. For example, you might want to group pipelines by database schema or by the test or production environment.

    You can use nested labels to create a hierarchy of pipeline groupings. Enter nested labels using the following format:
    <label1>/<label2>/<label3>
    For example, to group pipelines in the test environment by the origin system, you might add the labels Test/S3 and Test/MySQL to the appropriate pipelines.
    Delivery Guarantee Determines how to handle data after an unexpected event causes the pipeline to stop running:
    • At Least Once - Ensures all data is processed and written to the destination. Might result in duplicate rows.
    • At Most Once - Ensures that data is not reprocessed to prevent writing duplicate data to the destination. Might result in missing rows.

    Default is At Least Once.

    Test Origin

    This feature is not supported at this time.

    Start Event Determines how the start event is handled. Select one of the following options:
    • Discard - Use when you don't want to use the event.
    • An executor - To use the event to trigger a task, select the executor that you want to use.

    For more information about pipeline events, see Pipeline Event Generation.

    Stop Event Determines how the stop event is handled. Select one of the following options:
    • Discard - Use when you don't want to use the event.
    • An executor - To use the event to trigger a task, select the executor that you want to use.

    For more information about pipeline events, see Pipeline Event Generation.

    Retry Pipeline on Error Retries the pipeline upon error.
    Retry Attempts Number of retries attempted. Use -1 to retry indefinitely.

    The wait time between retries starts at 15 seconds and doubles until reaching five minutes.

    Rate Limit (records / sec) Maximum number of records that the pipeline can read in a second. Use 0 or no value to set no rate limit.

    Default is 0.

    Max Runners The maximum number of pipeline runners to use in a multithreaded pipeline.

    Use 0 for no limit. When set to 0, StreamSets Cloud generates up to the maximum number of threads or concurrency configured in the origin.

    You can use this property to help tune pipeline performance.

    Default is 0.

    Create Failure Snapshot Automatically creates a snapshot if the pipeline fails because of data-related errors. Can be used to troubleshoot the pipeline.
    Runner Idle Time (sec) Minimum number of seconds a pipeline runner waits when idle before generating an empty batch. The number of empty batches that are generated by pipeline runners displays as the Idle Batch Count in the monitor mode runtime statistics.

    Use to ensure that batches are generated periodically, even when no data needs to be processed.

    Use -1 to allow pipeline runners to wait indefinitely when idle without generating empty batches.

  5. Do not configure the Parameters tab.

    This feature is not supported at this time.

  6. To configure notifications based on changes in pipeline state, on the Notifications tab, configure the following properties:
    Notifications Property Description
    Notify on Pipeline State Changes Sends webhook notifications when the pipeline encounters the listed pipeline states.
    Email IDs This feature is not supported at this time.
    Webhooks Webhook to send when the pipeline state changes to one of the specified states. Using simple or bulk edit mode, click the Add icon to add additional webhooks.
    Payload Optional payload to use. Available for PUT, POST, and DELETE methods.

    Use any valid content type.

    You can use webhook parameters in the payload to include information about the triggering event, such as the pipeline name or state. Enclose webhook parameters in double curly brackets as follows: {{PIPELINE_STATE}}.

    Webhook URL URL to send the HTTP request.
    Headers Optional HTTP request headers.
    HTTP Method HTTP method. Use one of the following methods:
    • GET
    • PUT
    • POST
    • DELETE
    • HEAD
    Content Type Optional content type of the payload. Configure this property when the content type is not declared in the request headers.
    Authentication Type Optional authentication type to include in the request. Use None, Basic, Digest, or Universal.

    Use Basic for Form authentication.

    User Name User name to include when using authentication.
    Password Password to include when using authentication.
  7. Click the Error Records tab and configure the following error handling options:
    Error Records Property Description
    Error Records Determines how to handle records that cannot be processed as expected.

    Use one of the following options:

    • Discard - Discards error records.
    • Write to Google Cloud Storage - Writes error records to Google Cloud Storage.
    • Write to Google Pub/Sub - Writes error records to Google Pub/Sub.
    Error Record Policy Determines the version of the record to use as a basis for an error record.
  8. When writing error records to another system, configure the properties for that system on the second Error Records tab.

    For configuration details, see the topic on configuring the destination for that system in the Destinations chapter.

  9. If you are using the pipeline start or stop events, configure the related event consumer properties on the Start Event or Stop Event tab.
    All properties for the event consumer appear on the tab.

    For configuration details, see the topic on configuring the specific executor in the Executors chapter.

    To use a different event consumer, select the consumer to use in the Start Event or Stop Event properties on the General tab.

  10. From the stage type list, select Origins to filter the stages by origins.
  11. Add an origin stage, and then configure the stage properties.
    For configuration details about origin stages, see Origins.
  12. Use the stage type list to add the next stage that you want to use, connect the origin to the new stage, and configure the new stage.
    For configuration details about processors, see Processors.

    For configuration details about destinations, see Destinations.

    For configuration details about executors, see Executors.

  13. Add additional stages as necessary.
  14. At any point, you can click the Preview icon to preview data to help configure the pipeline.
  15. Optionally, click the Rules tab in the properties pane to create metric or data alerts to track details about a pipeline run and create threshold alerts.

    For more information, see Rules and Alerts Overview.

  16. When the pipeline is validated and complete, click the Run icon to run the pipeline.