Record Header Attributes

Record header attributes are attributes in record headers that you can use in pipeline logic, as needed.

Some stages create record header attributes for a particular purpose. For example, CDC-enabled origins include the CRUD operation type in the sdc.operation.type record header attribute. This enables CRUD-enabled destinations to determine the operation type to use when processing records.

Other stages include processing-related information in record header attributes for general use. For example, event-generating stages include the event type in record header attributes in case you want to process the event based on that information. And several origins include information such as the originating file name, location, or partition for each record.

You can use certain processors to create or update record header attributes. For example, you can use an Expression Evaluator processor to create attributes for record-based writes.

The inclusion of attributes in record headers does not require using them in the pipeline. You can, for example, use the CDC-enabled Salesforce origin in a non-CDC pipeline and ignore the CDC record header attributes that are automatically generated.

When writing data to destination systems, record header attributes are preserved with the record only when using the Google Pub/Sub Publisher destination or when using another destination with the SDC Record data format. To preserve the information when using other data formats, use the Expression Evaluator processor to copy information from record header attributes to record fields.

Working with Header Attributes

You can use the Expression Evaluator processor to create or update record header attributes. For example, if you configure the Snowflake destination to process CDC data but your pipeline uses a non-CDC origin, you can use the Expression Evaluator processor to define the sdc.operation.type record header attribute.

Record header attributes are string values. You can use record:attribute functions in any expression to include attribute values in calculations.

Important: Record header attributes do not have field paths. When using an attribute in an expression, use just the attribute name, surrounded by quotation marks since attributes are strings, as follows:
 ${record:attribute('<attribute name>')}

For example, the following Expression Evaluator processor adds the file and offset record header attributes created by the Azure Data Lake Storage Gen2 origin to the record:

Internal Attributes

StreamSets Cloud generates and updates some read-only internal record header attributes as records move from stage to stage. These attributes can be viewed for debugging issues, but can only be updated by StreamSets Cloud.

The record:attribute function does not allow access to internal record header attributes. The following table describes the internal record header attributes and the functions that you can use to access the data in the pipeline:
Internal Record Header Attribute Description Related Function
stageCreator The ID of the stage that created the record. record:creator()
sourceId Source of the record. Can include different information based on the origin type. record:id()
stagesPath List of stages that processed the record in order, by stage name. record:path()
trackingId The route the record has taken through the pipeline, starting with the sourceId, then listing the stages that processed the record. n/a
previousTrackingId The tracking ID of the record before it entered the current stage. n/a
errorStage The stage that generated the error.

In error records only.

record:errorStage()

errorStageLabel The user-defined name for a stage.

In error records only.

record:errorStageLabel()
errorCode The error code.

In error records only.

record:errorCode()
errorMessage The error message.

In error records only.

record:errorMessage()

errorTimestamp The time that the error occurred.

In error records only.

record:errorTime()

errorStackTrace The stack trace associated with the error.

In error records only.

n/a

Header Attribute-Generating Stages

The following table lists the stages that generate record header attributes to enable special processing:
Stage Description
CDC-enabled origins Include the CRUD operation type in the sdc.operation.type header attribute and can include additional CRUD and CDC information in record header attributes. For more information, see CDC-Enabled Origins.
Origins that process Avro data Include the Avro schema in an avroSchema record header attribute.
Origins that process XML data Can include namespaces in an xmlns record header attribute when you enable field XPaths.
Stages that generate events Generate record header attributes for event records. For details about event record header attributes, see "Event Records" in the stage documentation.
Amazon S3 origin Can be configured to include system-defined and user-defined object metadata in record header attributes.
Azure Data Lake Storage Gen1 origin Includes information about the originating file for the record in record header attributes.
Azure Data Lake Storage Gen2 origin Includes information about the originating file for the record in record header attributes.
Google Pub/Sub Subscriber origin When available, includes user-defined message attributes in record header attributes.
MySQL Multitable Consumer origin Includes table and data type information in JDBC record header attributes.
MySQL Query Consumer origin Can be configured to include table and data type information in JDBC record header attributes.
Oracle Multitable Consumer origin Includes table and data type information in JDBC record header attributes.
Oracle Query Consumer origin Can be configured to include table and data type information in JDBC record header attributes.
PostgreSQL Multitable Consumer origin Includes table and data type information in JDBC record header attributes.
PostgreSQL Query Consumer origin Can be configured to include table and data type information in JDBC record header attributes.
Salesforce origin Includes Salesforce information about the origins of the record in Salesforce header attributes.
SQL Server Multitable Consumer origin Includes table and data type information in JDBC record header attributes.
SQL Server Query Consumer origin Can be configured to include table and data type information in JDBC record header attributes.
Expression Evaluator processor Can be configured to create or update record header attributes.

Record Header Attributes for Record-Based Writes

Destinations can use information in record header attributes to write data. Destinations that write Avro data can use Avro schemas in the record header. The Azure Data Lake Storage destinations can use record header attributes to determine the directory to write to.

To use a record header attribute, configure the destination to use the header attribute and ensure that the records include the header attribute. You can use the Expression Evaluator processor to add record header attributes.

You can use the following record header attributes in destinations:
targetDirectory attribute in all the Azure Data Lake Storage destinations
The targetDirectory record header attribute defines the directory where the record is written. If the directory does not exist, the destination creates the directory. The targetDirectory header attribute replaces the Directory Template property in the destination.
When you use targetDirectory to provide the directory, the time basis configured for the destination is used only for determining whether a record is late. Time basis is not used to determine the output directories to create or to write records to directories.
To use the targetDirectory header attribute, on the Output tab, select Directory in Header.
avroSchema attribute in destinations that write Avro data
The avroSchema header attribute defines the Avro schema for the record. When you use this header attribute, you cannot define an Avro schema to use in the destination.
To use the avroSchema header attribute, on the Data Format tab, select the Avro data format, and then for the Avro Schema Location property, select In Record Header.
roll attribute in all the Azure Data Lake Storage destinations
The roll attribute, when present in the record header, triggers a roll of the file.
You can define the name of the roll header attribute. When you use an Expression Evaluator processor to generate the roll header attribute, use the name of the roll attribute that you defined in the processor.
To use a roll header attribute, on the Output tab, select Use Roll Attribute and define the name of the attribute.

Generating Attributes for Record-Based Writes

You can use the Expression Evaluator processor to generate record header attributes for record-based writes.

To use the Expression Evaluator processor, you must generate record header attributes as expected by the destination. Use the following guidelines to generate record header attributes:
Generating the target directory
When using the Expression Evaluator processor to generate the target directory, note the following details:
  • The destination expects the directory in a header attribute named "targetDirectory".
  • The destination uses the directory exactly as written in the targetDirectory header attribute.

  • When you define the expression that evaluates to a directory, you can use any valid component, including expressions that evaluate data in the record.

For example, you want to write records to different directories based on the pipeline ID and the region and store ID where the transaction took place. Use the following expression in the Expression Evaluator processor to define the targetDirectory attribute:

${pipeline:id()}/transactions/${record.value('/region')}/${record.value('/storeID')}
Generating the Avro schema
When using the Expression Evaluator processor to generate the Avro schema, note the following details:
  • The destination expects the Avro schema in a header attribute named "avroSchema".
  • Use the standard Avro schema format, for example:
    {"type":"record","name":"table_name","namespace":"database_name",
    "fields":[{"name":"int_val","type":["null","int"],"default":null},
    {"name":"str_val","type":["null","string"],"default":null}]}
  • The database name and table name must be included in the Avro schema.
Tip: You might use an Avro schema generator to help generate the Avro schema.
Generating the roll attribute
When using the Expression Evaluator processor to generate the roll attribute, note the following details:
  • Use any name for the attribute and specify the attribute name in the destination.
  • Configure an expression that defines when to roll files.
To define these record header attributes in the Expression Evaluator processor, perform the following steps:
  1. On the Expressions tab of the Expression Evaluator, specify the Header Attribute name.

    To generate a target directory, use targetDirectory.

    To generate an Avro schema, use avroSchema.

    You can use any name for a roll indicator header attribute.

  2. For the Header Attribute Expression, define the expression that evaluates to the information you want the destination to use.