Stopping a Pipeline After Processing All Available Data

This solution describes how to design a pipeline that stops automatically after it finishes processing all available data.

Let's say that your dataflow topology updates a database table daily at 4 AM. Rather than have the pipeline process the data in a few minutes and sit idle for the rest of the day, you want to kick off the pipeline, have it process all data and then stop - just like old school batch processing.

To do this, simply configure the origin to generate events and then route the no-more-data event record to the Pipeline Finisher executor.

The following origins generate no-more-data events:
  • Amazon S3 origin
  • Azure Data Lake Storage Gen1 origin
  • Azure Data Lake Storage Gen2 origin
  • Google Cloud Storage origin
  • MySQL Multitable Consumer origin
  • MySQL Query Consumer origin
  • Oracle Multitable Consumer origin
  • Oracle Query Consumer origin
  • PostgreSQL Multitable Consumer origin
  • PostgreSQL Query Consumer origin
  • Salesforce origin
  • SQL Server Multitable Consumer origin
  • SQL Server Query Consumer origin

We'll use the PostgreSQL Query Consumer origin to show a more complex scenario.

Here's the basic pipeline that reads from a database, performs some processing, and writes to Snowflake:

To configure the pipeline to stop after processing all available queried data:
  1. Configure the origin to generate events:

    On the General tab of the PostgreSQL Query Consumer origin, select Produce Events.

    The event output stream becomes available:

    The PostgreSQL Query Consumer generates several types of events: query success, query failure, and no-more-data. We know this because you checked the Event Record section of the PostgreSQL Query Consumer documentation. Every event-generating stage has event details in a similar section.

    The query success and failure events can be useful, so you might use a Stream Selector processor to route those records to a separate event stream. But let's say we don't care about those events, we just want the no-more-data event to pass to the Pipeline Finisher executor.

  2. Connect the event output stream to the Pipeline Finisher executor.

    At this point, all events that the origin generates come to the executor. Since the PostgreSQL Query Consumer origin generates multiple event types, this setup might cause the executor to stop the pipeline too soon.

  3. To ensure that only the no-more-data event enters the executor, configure a precondition.

    With a precondition, only records that meet the specified condition can enter the stage.

    We know that each event record includes the event type in the sdc.event.type record header attribute. So to ensure that only no-more-data events enter the stage, we can use the following expression in the precondition:

    ${record:eventType() == 'no-more-data'}
  4. Records that don't meet the precondition go to the stage for error handling, so to avoid storing error records that we don't care about – that is, the query success and failure events – let's also set the On Record Error property to Discard.

    So here's the Pipeline Finisher executor:

That's it!

With this setup, the PostgreSQL Query Consumer origin passes a no-more-data event when it completes processing all data returned by the query, and the Pipeline Finisher executor stops the pipeline and transitions the pipeline to a Finished state. All other events generated by the origin are discarded. The next time you want to process more data, you can just start the pipeline again.