Basic Tutorial

The basic tutorial creates a pipeline that reads a file from a directory, processes the data in two branches, and writes all data to a file system. You'll use data preview to help configure the pipeline, and you'll create a data alert and run the pipeline.

Here are high level steps for building and running the basic pipeline:
  1. Configure pipeline properties, primarily error handling.
  2. Add a Directory origin to represent the data to be processed.
  3. Preview source data to determine the field-level details needed for the pipeline.
  4. Use a Stream Selector to route credit card transactions to the primary branch and cash transactions to the secondary branch. We'll define a required field to discard records without a payment type.
  5. Configure a Jython Evaluator to perform custom processing that determines the credit card type based on the credit card number.
  6. Add a Field Masker to mask credit card numbers. Use a required field to discard records without credit card numbers.
  7. Connect both branches to a Local FS destination.
  8. In the secondary branch, use an Expression Evaluator to add fields to the cash records to match the credit card records. Use data preview to verify the fields to add.
  9. Add a data rule to raise an alert if too many credit card payments are missing credit card numbers.
  10. Start the pipeline and monitor the results.

Create a Pipeline and Define Pipeline Properties

When you configure a pipeline, you need to decide what to do with error records. You can discard them or - more productively - write them to file, another pipeline, or to Kafka.

Write error records to one of these locations as a convenient way to deal with error records without having to stop the pipeline.

This tutorial writes the records to a local file, but if you prefer to write error records to Kafka, feel free.

  1. If you aren't already, log in to the Data Collector.
  2. From the Home page or Getting Started page, click Create New Pipeline.
    Tip: To get to the Home page, click the Home icon.
  3. In the New Pipeline window, enter a pipeline title and optional description, and choose to run the pipeline on Data Collector.
  4. Click Save.
    An empty canvas displays:

    Note a few helpful areas:
    Number Name Description
    1 Pipeline Creation Help Bar Indicates that the origin is missing and offers a list of origins to choose from.

    If the Pipeline Creation Help Bar does not display, you might have disabled it. To enable it, in the top right corner of the Data Collector window, click the Help > Settings. Clear the Hide Pipeline Creation Help Bar option.

    2 Issues icon Click to display the list of pipeline issues found by implicit validation.
    3 Stage library panel Displays a list of available origins by default.
    4 Stage menu Changes the stages that display in the stage library. Displays all stages by default.
    5 Properties panel / Preview panel / Monitor panel When you configure a pipeline, the Properties panel displays the properties of the pipeline or selected stage. You can resize, minimize and maximize the panel.

    When you preview data, the Preview panel displays the input and output data for the selected stage or group of stages.

    When you monitor a running pipeline, the Monitor panel displays real-time metrics and statistics.

    Note: Some icons and options might not display in the UI. The items that display are based on the task that you are performing and roles assigned to your user account.
    Stage Library icon Toggles the display of the Stage Library panel.
  5. In the Properties panel, click the Error Records tab. And for the Error Records property, select Write to File.
    This writes error records to a file so you can deal with error records without having to stop the pipeline.
  6. Click the Error Records - Write to File tab and configure the following properties.

    Use the defaults for properties that aren't listed:

    Write to File Property Description
    Directory Directory for error record files. Enter the directory that you set up for the tutorial. We recommended:
    /<base directory>/tutorial/error
    Note: To prevent validation errors, the directory must already exist.
    Files Prefix This defines a prefix for error record files.

    By default, the files are prefixed with "SDC" and an expression that returns the Data Collector ID, but that's more than we need here.

    Delete the default and enter the following prefix: err_

    Max File Size For the tutorial, reduce the files size to something more manageable, such as 5 or 1 MB.

Now we'll start building the pipeline...

Configure the Origin

The origin represents the incoming data for the pipeline. When you configure the origin, you define how to connect to the origin system, the type of data to be processed, and other properties specific to the origin.

The Data Collector provides a wide range of origins. We'll use the Directory origin to process the sample CSV file that you downloaded.
  1. To add the stage to the canvas, from the Pipeline Creation Help Bar, click Select Origin > Directory. Or, in the Stage Library panel, click the Directory origin: .
    The origin displays in the canvas and the Properties panel displays the properties for the stage.
  2. In the Properties panel, click the Files tab and configure the following properties.

    Use the defaults for properties that aren't listed:

    Directory Property Value
    Files Directory Directory where you saved the sample file. Enter an absolute path.

    We recommended: /<base directory>/tutorial/origin.

    File Name Pattern The Directory origin processes only the files in the directory that match the file name pattern.

    The tutorial sample file name is nyc_taxi_data.csv. Since the file is the only file in the directory, you can use something generic, like the asterisk wild card (*) or *.csv.

    If you had other .csv files in the directory that you didn't want to process, you might be more specific, like this: nyc_taxi*.csv.

    Or if you want to process files with prefixes for other cities, you might use *taxi*.csv.

    Read Order This determines the read order when the directory includes multiple files. You can read based on the last-modified timestamp or file name. Because it's simpler, let's use Last Modified Timestamp.
  3. Click the Data Formats tab, and configure the following properties.

    Use the defaults for properties that aren't listed:

    Delimited Property Description
    Data Format The data in the sample file is delimited, so select Delimited.
    Delimiter Format Type Since the sample file is a standard CSV file, use the default: Default CSV (ignores empty lines).
    Header Line The sample file includes a header, so select With Header Line.
    Root Field Type This property determines how the Data Collector processes delimited data. Use the default List-Map.

    This allows you to use standard functions to process delimited data. With the List root field type, you need to use delimited data functions.

This is how the stage and pipeline should look at this point:

Notice the error icons on the page. When you hover over the Directory error icon or click the Issues icon, a validation message states that the origin has open streams - that means that it's not connected to anything yet. We'll take care of that next.

Preview Data

To become more familiar with the data set and gather some important details for pipeline configuration, let's preview the source data.

Here are some key details that we need to configure the pipeline:
  • The field that contains payment information - We'll use this to route data in the Stream Selector.
  • The field that contains credit card numbers - We'll use this to mask the data in the Field Masker.

When you access data in a field, you specify the field path for the field. The field path depends on the complexity of the record: /<fieldname> for simple records and <path to field>/<fieldname> for more complex records.

Because we're using the List-Map root field type, we can use /<fieldname>.

To start data preview, all stages must be connected and all required properties defined - though not necessarily correctly. Since the origin is configured and the only stage, the pipeline should be ready to preview as is.

  1. Above the pipeline canvas, click the Preview icon: .
    If the icon is not enabled, make sure you completed all the previous steps for the tutorial. If more than one issue displays in the list, correct any issue except Validation_0011.

    Validation_0011 just indicates that Directory origin isn't connected to anything yet and does not prevent data preview.

  2. In the Preview Configuration dialog box, configure the following properties.

    Use the defaults for properties that aren't listed:

    Data Preview Property Description
    Preview Source Use the default Configured Source to use the sample source data.
    Write to Destinations and Executors By default, this property is not selected. In general, you should keep this property clear to avoid writing data to destination systems or triggering tasks in destination systems.
    Show Field Type By default, this property is selected. Keep it selected to see the data type of fields in the record.
    Remember the Configuration Select to use these properties each time you run data preview.

    When you select this option, the UI enters data preview without showing this dialog box again.

  3. Click Run Preview.
    The Preview Panel displays a list of 10 output records for Directory in list view. If you drill down a bit, you see that the each record displays an ordered list of field names and values with the data type of each field.

    Because the data is read from a file, all fields are strings:

    Note that in the Preview panel, the preview records display on the Records tab: . To review or change properties, use the Stage Configuration tab: . To change preview properties, use the Preview Configuration tab: .

  4. For a full view of the data, click the Table View icon: .
    The Preview panel displays the first several columns of data.

  5. To view all columns, click Show All.
    If you scroll to the right, you can verify that the credit_card field contains credit card numbers for the first three records. When you scroll back to the payment_type field, notice this corresponds to records where the payment type is "CRD".
  6. Now that we have the information that we need, click the Close Preview icon: .
Now we know the following:
  • Payment type information is in the payment_type field and credit card information is in the credit_card field.
  • To use these fields in expressions, we'll use their field paths: /payment_type and /credit_card.
  • To route records paid by credit card, we'll look for records where the payment type is "CRD".

Route Data with the Stream Selector

To route data to different streams for processing, we use the Stream Selector processor.

The Stream Selector routes data to different streams based on user-defined conditions. Any data not captured by a user-defined condition routes to the default stream.

We'll route credit card transactions to a credit card stream for processing. All other transactions will go to the default stream.

We'll also define a required field to drop records with no payment type. When you define a required field, a record must include data for the specified field to enter the stage. Records that don't include data in required fields are sent to the pipeline for error handling. If you configured the pipeline to write to file, that's where error records go.

To represent data in a field, we use the record:value function. This returns field values associated with the field.
To capture records with credit card payments, use the following condition:
${record:value('/payment_type') == 'CRD'}

Note that we enclose expressions in a dollar sign and curly brackets. You can use single or double quotation marks around strings. For more information about the expression language, see Expression Language.

  1. From the Pipeline Creation Help Bar, click Select Processor to Connect > Stream Selector. Or, in the Stage Library panel, select the Stream Selector processor () and connect the Directory origin to it.
  2. On the General tab, click in the Required Fields text box.
    A list of available fields displays because you already performed data preview. It also displays when the pipeline is valid for data preview.
  3. To discard records with no payment type information, select the following field: /payment_type.
    If a list does not appear, you can manually enter the field path: /payment_type.
  4. To configure the Stream Selector condition, click the Conditions tab.
    A condition for the default stream displays. The default stream represents any records not captured by other conditions.
  5. Click the Add icon: .
    A condition text box displays in the Property panel, and the corresponding output location appears on the stage in the canvas.
  6. The following condition captures records where a credit card is used for payment. You can copy and paste the expression, but try typing it in to see how the expression completion feature helps you select the function and ensure valid syntax.
    ${record:value('/payment_type') == 'CRD'}
    All records that match this condition pass to the first output stream. All other records are captured by the default condition and passed through the second output stream.
The Stream Selector should look like this:

Use Jython for Card Typing

Next, we'll evaluate credit card numbers to determine the credit card type. You can use an Expression Evaluator to do the same calculations, but with a short script, the Jython Evaluator is easier.

You can use custom scripts with the JavaScript Evaluator and the Jython Evaluator to perform processing that is not easily performed using other Data Collector processors. When using scripts to handle list-map data, the script must treat the data as maps.

The Jython script that we provide creates an additional field, credit_card_type, and generates the credit card type by evaluating the first few digits of the credit card number. The script returns an error message if the record has a credit card payment type without a corresponding credit card number.

  1. Add a Jython Evaluator processor to the canvas.

    If the Jython Evaluator processor isn’t listed in the stage library, you’ll need to install the Jython stage library first. By default, a full Data Collector installation includes the Jython stage library. The core installation does not include the Jython stage library.

    1. In the Data Collector top right toolbar, click the Package Manager icon:
    2. In the Package Manager search field, type “jy” so that the Package Manager displays the Jython stage library:
    3. Select the library, click the Install icon (), and then click Install to confirm your choice.
      Data Collector installs the external library and displays a message offering to restart Data Collector.
    4. Click Restart Data Collector.
    5. After Data Collector restarts, open the tutorial pipeline in the canvas and then add a Jython Evaluator processor to the canvas.
  2. Connect the first output location of the Stream Selector to the Jython Evaluator.
    This routes records paid by credit card to the Jython Evaluator.
  3. With the Jython Evaluator selected, in the Properties panel, click the Jython tab.
  4. Use the default Batch by Batch record processing mode to process data in batches, instead of record by record.
  5. In the Script text box, review the information in the comments, then delete it. Paste in the following script:
    try: 
      for record in records:
        cc = record.value['credit_card']
        if cc == '':
          error.write(record, "Payment type was CRD, but credit card was null")
          continue
    
        cc_type = ''
        if cc.startswith('4'):
          cc_type = 'Visa'
        elif cc.startswith(('51','52','53','54','55')):
          cc_type = 'MasterCard'
        elif cc.startswith(('34','37')):
          cc_type = 'AMEX'
        elif cc.startswith(('300','301','302','303','304','305','36','38')):
          cc_type = 'Diners Club'
        elif cc.startswith(('6011','65')):
          cc_type = 'Discover'
        elif cc.startswith(('2131','1800','35')):
          cc_type = 'JCB'
        else:
          cc_type = 'Other'
    
        record.value['credit_card_type'] = cc_type
    
        output.write(record)
    except Exception as e:
      error.write(record, e.message)
    Note: Incorrect indentation can cause Jython validation errors. For best results, copy the script from the online help. Copying the script from the PDF can result in incorrect indentation.

    To launch context-sensitive help, click the Help icon in the Properties panel. Then in the table of contents, scroll down to find the Data Collector Tutorial chapter > Basic Tutorial > Use Jython for Card Typing.

In the Jython Evaluator, the script should look like this:

Mask Credit Card Numbers

Now let's prevent sensitive information from reaching internal databases by using a Field Masker to mask the credit card numbers.

The Field Masker provides fixed and variable-length masks to mask all data in a field. To reveal specified positions in the data, you can use custom mask. To reveal a group of positions within the data, you can use a regular expression mask to define the structure of the data and then reveal one or more groups.

For the credit card numbers, we'll use the following regular expression to mask all but the last four digits:
(.*)([0-9]{4})

The regular expression defines two groups so we can reveal the second group.

  1. Add a Field Masker processor to the canvas and connect the Jython Evaluator to it.
  2. In the Properties panel, click the Mask tab.
  3. Click in the Fields to Mask field. Scroll through the list of fields and select the field that represents credit card data: /credit_card.
    A list of fields displays when the pipeline is valid for data preview. If the list does not display, you can enter the field path manually.
  4. To use a regular expression as a mask and display the last 4 digits of the credit card number, configure the rest of the properties as follows:
    Field Masker Property Configuration
    Mask Type Regular Expression
    Regular Expression (.*)([0-9]{4})
    Groups to Show 2
Here's how the Field Masker should look in the pipeline:

Write to the Destination

The Data Collector can write data to many destinations. The Local FS destination writes to files in a local file system.

When you configure the Local FS destination, you define the directory template. This determines the naming convention for the output directories that are created.

  1. Add a Local FS destination to the canvas and connect the Field Masker to it.
  2. Click the Output Files tab and configure the following properties.

    Use the defaults for properties that aren't listed:

    Local FS Property Configuration
    Files Prefix Defines a prefix for output file names.

    By default, the files are prefixed with "SDC" and an expression that returns the Data Collector ID, but that's more than we need here.

    Let's simplify and use "out_" instead.
    Directory Template By default, the directory template includes datetime variables to create a directory structure for output files. This is intended for writing large volumes of data.

    Since we only have the sample file to process, we don't need the datetime variables. Go ahead and delete the default and enter the directory where you want the files to be written.

    We suggested: /<base directory>/tutorial/destination.

    Max File Size (MB) For the tutorial, let's lower the file size to something manageable, like 5 or 1.
  3. Click the Data Format tab, and configure the following properties.

    Use the defaults for properties that aren't listed:

    Delimited Property Configuration
    Data Format Delimited
    Header Line With Header Line
This completes the primary branch:

Now, we'll go back to the Stream Selector and complete the secondary branch.

Add a Corresponding Field with the Expression Evaluator

The Jython Evaluator script added a new field to the credit payments branch. To ensure all records have the same structure, we'll use the Expression Evaluator to add the same field to the non-credit branch.

This ensures that all records have the same format when written to the destination.

To do this, let's use data preview to verify how the Jython Evaluator adds the credit card type to records.

  1. Click the Preview icon.
  2. In the pipeline, click the Jython Evaluator to view the output of that processor.
  3. Expand the first output record, scroll down, and notice the new field highlighted in green: /credit_card_type.
  4. Click Close Preview.
  5. Add an Expression Evaluator processor to the canvas and connect the second, default stream of the Stream Selector to it.
  6. Click the Expressions tab.
  7. Configure the following Field Expression properties:
    Output Field Expression
    /credit_card_type n/a
    This creates a credit_card_type field that indicates the information is not applicable.
    Since we're using "n/a" as a constant for the expression, we don't need to use the dollar sign and brackets for the expression. But if we wanted to use them, we could define them as ${'credit_card_type'} and ${'N/A'}.
  8. Link the Expression Evaluator to the Local FS destination.
    This streams the data from this branch to the destination, merging data from both branches:

Create a Data Rule and Alert

Now before we run the basic pipeline, let's add a data rule and alert. Data rules are user-defined rules used to inspect data moving between two stages. They are a powerful way to look for outliers and anomalous data.

Data rules and alerts require a detailed understanding of the data passing through the pipeline. For more general pipeline monitoring information, you can use metric rules and alerts.

The script in the Jython Evaluator creates error records for credit card transactions without credit card numbers. We can create a data rule and alert to let us know when the record count reaches a specified threshold.

We'll use an expression with the record:value() function to identify when the credit card number field, /credit_card, is null. The function returns the data in the specified field.

  1. Between the Stream Selector and the Jython Evaluator, select the link or Data Inspection icon: .
    The Data Rules tab displays in the Preview panel.
  2. Click Add.
  3. In the Data Rule dialog box, configure the following properties.

    Use the defaults for properties that aren't listed:

    Data Rule Property Description
    Label Missing Card Numbers
    Condition ${record:value("/credit_card") == ""}
    Sampling Percentage 35
    Alert Text At least 10 missing credit card numbers!
    Threshold Value 10

    This creates an alert that goes off after finding ten records that don't include credit card numbers in the credit card payment stream.

    Note: With a larger data set, a smaller sampling percentage and higher threshold value might be appropriate, but we'll use these numbers for the purposes of the tutorial.
  4. Click Save.
    The rule displays in the data rule list. And the Data Inspection icon darkens to show that a data rule is configured for the stream.
  5. To enable the data rule and alert, click Active.
    Notice the Data Inspection icon becomes a darker grey to show that a data rule is active on the stream.

Run the Basic Pipeline

Now that the basic pipeline is complete, you can start it by clicking the Start icon: .

The UI enters Monitor mode and shows summary statistics in the Monitor panel. At some point as the pipeline runs, the data alert triggers and the location of the triggered data alert turns red:

Click the data alert icon to view the data alert notification. Then, close the notification and explore the information available in the Monitor panel.

Note that when you select an unused part of the canvas, the Monitor panel displays monitoring information for the entire pipeline. When you select a stage, it displays information for the stage.

The Jython Evaluator shows 40 error records. Click the error records number to see the list of cached error records and related error messages.

You can also select the red Data Inspection icon to view information about the data alert and view the error records associated with the data alert.

To continue with the extended tutorial, stop the pipeline.

If you like, you can use data preview to step through the pipeline to review how each stage processes the data. But if you hang in there, you can do it with us in the extended tutorial.