Amazon SQS Consumer

Supported pipeline types:
  • Data Collector

Use the Amazon SQS Consumer origin to read data from queues in Amazon Simple Queue Services (SQS). The origin can use multiple threads to enable parallel processing of data. To read data from Amazon S3, use the Amazon S3 origin.

When you configure the Amazon SQS Consumer origin, you define the region and the set of queue name prefixes to use. These properties determine the objects that the origin processes. You can use IAM roles or AWS access key pairs to access the data.

You can optionally include Amazon SQS message attributes and sender attributes in records as record header attributes.

AWS Credentials

When Data Collector reads data from an Amazon SQS Consumer origin, it must pass credentials to Amazon Simple Queue Services.

Use one of the following methods to pass AWS credentials:

IAM role
When Data Collector runs on an Amazon EC2 instance, you can use the AWS Management Console to configure an IAM role for the EC2 instance. Data Collector uses the IAM instance profile credentials to automatically connect to AWS.
To use an IAM role, do not specify the Access Key ID and Secret Access Key properties in the origin.
For more information about assigning an IAM role to an EC2 instance, see the Amazon EC2 documentation.
AWS access key pair
When Data Collector does not run on an Amazon EC2 instance or when the EC2 instance doesn’t have an IAM role, you must specify the Access Key ID and Secret Access Key properties in the origin.
Tip: To secure sensitive information such as access key pairs, you can use runtime resources or credential stores.

Queue Name Prefix

The Amazon SQS Consumer origin uses the queue name prefix to determine the queues to process. You can define multiple queue name prefixes.

When you specify the queue name prefix, enter a string that represents the beginning of the queue names that you want to use. The origin processes data from every queue with a matching prefix. You cannot use wildcards within the queue name prefix.

For example, say you have the following queues:
sales-eu-france
sales-eu-germany
sales-us
sales-egypt

If you use "sales" as the prefix, the origin processes messages from all of the queues.

If you use "sales-eu" as the prefix, the origin processes only sales-eu-france and sales-eu-germany.

If you use "sales-e" as the prefix, the origin processes all queues except for sales-us.

Multithreaded Processing

The Amazon SQS Consumer origin performs parallel processing and enables the creation of a multithreaded pipeline. The Amazon SQS Consumer origin uses multiple concurrent threads based on the Max Threads property.

When performing multithreaded processing, the Amazon SQS Consumer origin determines the number of queues to process and creates the specified number of threads. When there are more queues than threads, the queues are divided up and assigned to different threads. Each thread processes data from a specific set of queues and cycles round-robin through the set of queues.

When a thread requests data from a queue, the queue returns messages based on the configured Number of Messages per Request property. The thread creates a batch of data and passes the batch to an available pipeline runner. After processing the batch, the thread continues to the next assigned queue.

A pipeline runner is a sourceless pipeline instance - an instance of the pipeline that includes all of the processors, executors, and destinations in the pipeline and handles all pipeline processing after the origin. Each pipeline runner processes one batch at a time, just like a pipeline that runs on a single thread. When the flow of data slows, the pipeline runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time pipeline property to specify the interval or to opt out of empty batch generation.

Multithreaded pipelines preserve the order of records within each batch, just like a single-threaded pipeline. But the order that batches are written to destinations is not ensured.

For example, say you set the Max Threads property to 5 and the origin is configured to process 20 queues. When you start the pipeline, the origin creates five threads, and Data Collector creates a matching number of pipeline runners. Each thread is assigned 4 queues to process. Each thread cycles through the queues, creating one batch of data at a time and passing it to a pipeline runner for processing.

At any given moment, the five pipeline runners can each process a batch, so this multithreaded pipeline processes up to five batches at a time. When incoming data slows, the pipeline runners sit idle, available for use as soon as the data flow increases.

For more information about multithreaded pipelines, see Multithreaded Pipeline Overview.

Including SQS Message Attributes

The Amazon SQS Consumer origin can include SQS message attributes in records as record header attributes. You can add different groups of SQS attributes to records based on the SQS Message Attribute Level that you select:
  • Basic - Includes some basic SQS attributes.
  • All Attributes - Includes all standard SQS attributes.

SQS attributes are added to record header attributes using the following naming convention: sqs.<SQS attribute name>.

The following table lists the SQS attributes included in each attribute level:
SQS Attribute Level Description
Basic Includes the following standard SQS attributes:
  • sqs.messageId - The SQS message ID.
  • sqs.queueNamePrefix - The queue name prefix that you configured for the stage.
  • sqs.region - The AWS region for the message.
All Attributes Includes the standard SQS attributes listed above and the following additional SQS attributes:
  • sqs.body - The full raw message body.
  • sqs.bodyMd5 - The MD5 hash of the raw message body.
  • sqs.queueUrl - The full URL for the originating queue.
  • sqs.attrsMd5 - The MD5 hash of the all SQS attributes included in the message.

For more information about SQS message attributes, see the Amazon SQS documentation.

Including Sender Attributes

In addition to SQS message attributes, you can include sender attributes in record headers. Sender attributes are custom attributes included in messages by the message sender.

To include message sender attributes, you perform the following steps:
  1. Set the SQS Message Attribute Level property to All Attributes.
  2. Configure the SQS Sender Attribute property, adding the name of each attribute that you want to include in the record.

SQS attributes are added to record header attributes using the following naming convention: sqs.messageAttr.<sender attribute name>.

For example, a senderId attribute appears in the record header as sqs.messageAttr.senderId.

Data Formats

The Amazon SQS Consumer origin processes data differently based on the data format. The Amazon SQS Consumer can process the following types of data:

Avro
Generates a record for every message. Includes a precision and scale field attribute for each Decimal field.
The origin includes the Avro schema in an avroSchema record header attribute. You can use one of the following methods to specify the location of the Avro schema definition:
  • Message/Data Includes Schema - Use the schema in the message.
  • In Pipeline Configuration - Use the schema that you provide in the stage configuration.
  • Confluent Schema Registry - Retrieve the schema from Confluent Schema Registry. Confluent Schema Registry is a distributed storage layer for Avro schemas. You can configure the origin to look up the schema in Confluent Schema Registry by the schema ID embedded in the message or by the schema ID or subject specified in the stage configuration.

    You must specify the method that the origin uses to deserialize the message. If the Avro schema ID is embedded in each message, set the key and value deserializers to Confluent on the Kafka tab.

Using a schema in the stage configuration or retrieving a schema from Confluent Schema Registry overrides any schema that might be included in the message and can improve performance.
Binary
Generates a record with a single byte array field at the root of the record.
When the data exceeds the user-defined maximum data size, the origin cannot process the data. Because the record is not created, the origin cannot pass the record to the pipeline to be written as an error record. Instead, the origin generates a stage error.
Delimited
Generates a record for each delimited line. You can use the following delimited format types:
  • Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
  • RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
  • MS Excel CSV - Microsoft Excel comma-separated file.
  • MySQL CSV - MySQL comma-separated file.
  • Tab-Separated Values - File that includes tab-separated values.
  • PostgreSQL CSV - PostgreSQL comma-separated file.
  • PostgreSQL Text - PostgreSQL text file.
  • Custom - File that uses user-defined delimiter, escape, and quote characters.
  • Multi Character Delimited - File that uses multiple user-defined characters to delimit fields and lines, and single user-defined escape and quote characters.
You can use a list or list-map root field type for delimited data, and optionally include field names from a header line, when available. For more information about the root field types, see Delimited Data Root Field Type.
When using a header line, you can enable handling records with additional columns. The additional columns are named using a custom prefix and integers in sequential increasing order, such as _extra_1, _extra_2. When you disallow additional columns, records that include additional columns are sent to error.
You can also replace a string constant with null values.
When a record exceeds the maximum record length defined for the stage, the stage processes the object based on the error handling configured for the stage.
JSON
Generates a record for each JSON object. You can process JSON files that include multiple JSON objects or a single JSON array.
When an object exceeds the maximum object length defined for the origin, the origin processes the object based on the error handling configured for the stage.
Log
Generates a record for every log line.
When a line exceeds the user-defined maximum line length, the origin truncates longer lines.
You can include the processed log line as a field in the record. If the log line is truncated, and you request the log line in the record, the origin includes the truncated line.
You can define the log format or type to be read.
Protobuf
Generates a record for every protobuf message. By default, the origin assumes messages contain multiple protobuf messages.
Protobuf messages must match the specified message type and be described in the descriptor file.
When the data for a record exceeds 1 MB, the origin cannot continue processing data in the message. The origin handles the message based on the stage error handling property and continues reading the next message.
For information about generating the descriptor file, see Protobuf Data Format Prerequisites.
SDC Record
Generates a record for every record. Use to process records generated by a Data Collector pipeline using the SDC Record data format.
For error records, the origin provides the original record as read from the origin in the original pipeline, as well as error information that you can use to correct the record.
When processing error records, the origin expects the error file names and contents as generated by the original pipeline.
Text
Generates a record for each line of text or for each section of text based on a custom delimiter.
When a line or section exceeds the maximum line length defined for the origin, the origin truncates it. The origin adds a boolean field named Truncated to indicate if the line was truncated.
For more information about processing text with a custom delimiter, see Text Data Format with Custom Delimiters.
XML
Generates records based on a user-defined delimiter element. Use an XML element directly under the root element or define a simplified XPath expression. If you do not define a delimiter element, the origin treats the XML file as a single record.
Generated records include XML attributes and namespace declarations as fields in the record by default. You can configure the stage to include them in the record as field attributes.
You can include XPath information for each parsed XML element and XML attribute in field attributes. This also places each namespace in an xmlns record header attribute.
Note: Field attributes and record header attributes are written to destination systems automatically only when you use the SDC RPC data format in destinations. For more information about working with field attributes and record header attributes, and how to include them in records, see Field Attributes and Record Header Attributes.
When a record exceeds the user-defined maximum record length, the origin skips the record and continues processing with the next record. It sends the skipped record to the pipeline for error handling.
Use the XML data format to process valid XML documents. For more information about XML processing, see Reading and Processing XML Data.
Tip: If you want to process invalid XML documents, you can try using the text data format with custom delimiters. For more information, see Processing XML Data with Custom Delimiters.

Configuring an Amazon SQS Consumer

Configure an Amazon SQS Consumer origin to read messages from Amazon SQS.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the SQS tab, configure the following properties:
    SQS Property Description
    Access Key ID

    AWS access key ID.

    Required when not using IAM roles with IAM instance profile credentials.

    Secret Access Key

    AWS secret access key.

    Required when not using IAM roles with IAM instance profile credentials.

    Region Amazon SQS region.
    Queue Name Prefix The common prefix for the queues to process.

    Wildcards are not allowed.

    Number of Messages per Request The number of messages to request for each request. The maximum allowed by Amazon Simple Queue Service is 10 messages.

    Default is 10.

    Max Batch Size (records) Maximum number of records processed at one time. Honors values up to the Data Collector maximum batch size.

    Default is 1000. The Data Collector default is 1000.

    Batch Wait Time (ms) Number of milliseconds to wait before sending a partial or empty batch.
    Max Threads The number of threads to use to process messages.

    Default is ${runtime:availableProcessors()}, which returns the number of Data Collector processors that are available when the pipeline starts.

    Poll Wait Time (Seconds) Number of seconds to wait for a request response. Specify a wait time to use Amazon SQS long polling to minimize the effects of empty responses. When configured, the origin waits the specified number of seconds for messages before continuing to another queue.

    Use -1 to opt out of this property. When opting out, when a queue has no data, the origin continues immediately to the next queue.

    SQS Message Attribute Level Determines the message attributes that are included in the record as record header attributes. Select one of the following options:
    • No Attributes - The origin includes no attributes included in the message in the record.
    • Basic Attributes - The origin includes a small set of basic SQS attributes in the record.
    • All Attributes - The origin includes all SQS-generated attribute information, such as the message body and queue URL. When specified, can also include SQS attributes generated by the message sender.

      For more information about the attributes included in the message, see Including SQS Message Attributes.

    Include Sender SQS Attributes When including all SQS message attributes in the record header attribute, you can also include attributes generated by the sender of the messages. Specify the message sender attributes that you want to include.

    Enter the exact attribute name.

  3. On the Advanced tab, optionally configure proxy information:
    Advanced Property Description
    Connection Timeout Seconds to wait for a response before closing the connection.

    Default is 10 seconds.

    Socket Timeout Seconds to wait for a response to a query.
    Retry Count Maximum number of times to retry requests.
    Use Proxy Specifies whether to use a proxy to connect.
    Proxy Host Proxy host.
    Proxy Port Proxy port.
    Proxy User User name for proxy credentials.
    Proxy Password Password for proxy credentials.
    Tip: To secure sensitive information such as user names and passwords, you can use runtime resources or credential stores.
  4. On the Data Format tab, configure the following property:
    Data Format Property Description
    Data Format Type of data to be read. Use one of the following options:
    • Avro
    • Binary
    • Delimited
    • JSON
    • Log
    • Text
    • Protobuf
    • SDC Record
    • XML
  5. For Avro data, on the Data Format tab, configure the following properties:
    Avro Property Description
    Avro Schema Location Location of the Avro schema definition to use when processing data:
    • Message/Data Includes Schema - Use the schema in the message.
    • In Pipeline Configuration - Use the schema provided in the stage configuration.
    • Confluent Schema Registry - Retrieve the schema from the Confluent Schema Registry.

    Using a schema in the stage configuration or in the Confluent Schema Registry can improve performance.

    Avro Schema Avro schema definition used to process the data. Overrides any existing schema definitions associated with the data.

    You can optionally use the runtime:loadResource function to use a schema definition stored in a runtime resource file.

    Schema Registry URLs Confluent Schema Registry URLs used to look up the schema. To add a URL, click Add. Use the following format to enter the URL:
    http://<host name>:<port number>
    Lookup Schema By Method used to look up the schema in the Confluent Schema Registry:
    • Subject - Look up the specified Avro schema subject.
    • Schema ID - Look up the specified Avro schema ID.
    • Embedded Schema ID - Look up the Avro schema ID embedded in each message.
    Overrides any existing schema definitions associated with the message.
    Schema Subject Avro schema subject to look up in the Confluent Schema Registry.

    If the specified subject has multiple schema versions, the origin uses the latest schema version for that subject. To use an older version, find the corresponding schema ID, and then set the Look Up Schema By property to Schema ID.

    Schema ID Avro schema ID to look up in the Confluent Schema Registry.
  6. For binary data, on the Data Format tab and configure the following property:
    Binary Property Description
    Max Data Size (bytes) Maximum number of bytes in the message. Larger messages cannot be processed or written to error.
  7. For delimited data, on the Data Format tab, configure the following properties:
    Delimited Property Description
    Delimiter Format Type Delimiter format type. Use one of the following options:
    • Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
    • RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
    • MS Excel CSV - Microsoft Excel comma-separated file.
    • MySQL CSV - MySQL comma-separated file.
    • Tab-Separated Values - File that includes tab-separated values.
    • PostgreSQL CSV - PostgreSQL comma-separated file.
    • PostgreSQL Text - PostgreSQL text file.
    • Custom - File that uses user-defined delimiter, escape, and quote characters.
    • Multi Character Delimited - File that uses multiple user-defined characters to delimit fields and lines, and single user-defined escape and quote characters.
    Header Line Indicates whether a file contains a header line, and whether to use the header line.
    Allow Extra Columns When processing data with a header line, allows processing records with more columns than exist in the header line.
    Extra Column Prefix Prefix to use for any additional columns. Extra columns are named using the prefix and sequential increasing integers as follows: <prefix><integer>.

    For example, _extra_1. Default is _extra_.

    Max Record Length (chars) Maximum length of a record in characters. Longer records are not read.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Delimiter Character Delimiter character for a custom delimiter format. Select one of the available options or use Other to enter a custom character.

    You can enter a Unicode control character using the format \uNNNN, where ​N is a hexadecimal digit from the numbers 0-9 or the letters A-F. For example, enter \u0000 to use the null character as the delimiter or \u2028 to use a line separator as the delimiter.

    Default is the pipe character ( | ).

    Multi Character Field Delimiter Characters that delimit fields for multi-character delimiter format.

    Default is two pipe characters (||).

    Multi Character Field Delimiter Characters that delimit fields for multi-character delimiter format.

    Default is two pipe characters (||).

    Escape Character Escape character for a custom or multi-character delimiter format.
    Quote Character Quote character for a custom or multi-character delimiter format.
    Enable Comments Allows commented data to be ignored for custom delimiter format.
    Comment Marker Character that marks a comment when comments are enabled for custom delimiter format.
    Ignore Empty Lines Allows empty lines to be ignored for custom delimiter format.
    Root Field Type Root field type to use:
    • List-Map - Generates an indexed list of data. Enables you to use standard functions to process data. Use for new pipelines.
    • List - Generates a record with an indexed list with a map for header and value. Requires the use of delimited data functions to process data. Use only to maintain pipelines created before 1.1.0.
    Lines to Skip Number of lines to skip before reading data.
    Parse NULLs Replaces the specified string constant with null values.
    NULL Constant String constant to replace with null values.
    Charset Character encoding of the files to be processed.
    Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
  8. For JSON data, on the Data Format tab, configure the following properties:
    JSON Property Description
    JSON Content Type of JSON content. Use one of the following options:
    • Array of Objects
    • Multiple Objects
    Max Object Length (chars) Maximum number of characters in a JSON object.

    Longer objects are diverted to the pipeline for error handling.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Charset Character encoding of the files to be processed.
    Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
  9. For log data, on the Data Format tab, configure the following properties:
    Log Property Description
    Log Format Format of the log files. Use one of the following options:
    • Common Log Format
    • Combined Log Format
    • Apache Error Log Format
    • Apache Access Log Custom Format
    • Regular Expression
    • Grok Pattern
    • Log4j
    • Common Event Format (CEF)
    • Log Event Extended Format (LEEF)
    Max Line Length Maximum length of a log line. The origin truncates longer lines.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Retain Original Line Determines how to treat the original log line. Select to include the original log line as a field in the resulting record.

    By default, the original line is discarded.

    Charset Character encoding of the files to be processed.
    Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
    • When you select Apache Access Log Custom Format, use Apache log format strings to define the Custom Log Format.
    • When you select Regular Expression, enter the regular expression that describes the log format, and then map the fields that you want to include to each regular expression group.
    • When you select Grok Pattern, you can use the Grok Pattern Definition field to define custom grok patterns. You can define a pattern on each line.

      In the Grok Pattern field, enter the pattern to use to parse the log. You can use a predefined grok patterns or create a custom grok pattern using patterns defined in Grok Pattern Definition.

      For more information about defining grok patterns and supported grok patterns, see Defining Grok Patterns.

    • When you select Log4j, define the following properties:
      Log4j Property Description
      On Parse Error Determines how to handle information that cannot be parsed:
      • Skip and Log Error - Skips reading the line and logs a stage error.
      • Skip, No Error - Skips reading the line and does not log an error.
      • Include as Stack Trace - Includes information that cannot be parsed as a stack trace to the previously-read log line. The information is added to the message field for the last valid log line.
      Use Custom Log Format Allows you to define a custom log format.
      Custom Log4J Format Use log4j variables to define a custom log format.
  10. For protobuf data, on the Data Format tab, configure the following properties:
    Protobuf Property Description
    Protobuf Descriptor File Descriptor file (.desc) to use. The descriptor file must be in the Data Collector resources directory, $SDC_RESOURCES.

    For information about generating the descriptor file, see Protobuf Data Format Prerequisites. For more information about environment variables, see Data Collector Environment Configuration.

    Message Type The fully-qualified name for the message type to use when reading data.

    Use the following format: <package name>.<message type>.

    Use a message type defined in the descriptor file.
    Delimited Messages Indicates if a message might include more than one protobuf message.
  11. For text data, on the Data Format tab, configure the following properties:
    Text Property Description
    Max Line Length Maximum number of characters allowed for a line. Longer lines are truncated.

    Adds a boolean field to the record to indicate if it was truncated. The field name is Truncated.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Use Custom Delimiter Uses custom delimiters to define records instead of line breaks.
    Custom Delimiter One or more characters to use to define records.
    Include Custom Delimiter Includes delimiter characters in the record.
    Charset Character encoding of the files to be processed.
    Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
  12. For XML data, on the Data Format tab, configure the following properties:
    XML Property Description
    Delimiter Element
    Delimiter to use to generate records. Omit a delimiter to treat the entire XML document as one record. Use one of the following:
    • An XML element directly under the root element.

      Use the XML element name without surrounding angle brackets ( < > ) . For example, msg instead of <msg>.

    • A simplified XPath expression that specifies the data to use.

      Use a simplified XPath expression to access data deeper in the XML document or data that requires a more complex access method.

      For more information about valid syntax, see Simplified XPath Syntax.

    Include Field XPaths Includes the XPath to each parsed XML element and XML attribute in field attributes. Also includes each namespace in an xmlns record header attribute.

    When not selected, this information is not included in the record. By default, the property is not selected.

    Note: Field attributes and record header attributes are written to destination systems automatically only when you use the SDC RPC data format in destinations. For more information about working with field attributes and record header attributes, and how to include them in records, see Field Attributes and Record Header Attributes.
    Namespaces Namespace prefix and URI to use when parsing the XML document. Define namespaces when the XML element being used includes a namespace prefix or when the XPath expression includes namespaces.

    For information about using namespaces with an XML element, see Using XML Elements with Namespaces.

    For information about using namespaces with XPath expressions, see Using XPath Expressions with Namespaces.

    Using simple or bulk edit mode, click the Add icon to add additional namespaces.

    Output Field Attributes Includes XML attributes and namespace declarations in the record as field attributes. When not selected, XML attributes and namespace declarations are included in the record as fields.
    Note: Field attributes are automatically included in records written to destination systems only when you use the SDC RPC data format in the destination. For more information about working with field attributes, see Field Attributes.

    By default, the property is not selected.

    Max Record Length (chars)

    The maximum number of characters in a record. Longer records are diverted to the pipeline for error handling.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Charset Character encoding of the files to be processed.
    Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.