Azure IoT/Event Hub Consumer

The Azure IoT/Event Hub Consumer origin reads data from Microsoft Azure Event Hub.

The origin can use multiple threads to enable parallel processing of data from a single Azure event hub.

Before you use the Azure IoT/Event Hub Consumer origin, make sure you have the required Microsoft Azure storage account and container.

When you configure the Azure IoT/Event Hub Consumer, you specify the Microsoft Azure namespace and event hub names. You also define the shared access policy name and connection string key. You specify the consumer group to use and an event processor prefix that the origin uses when communicating with Azure Event Hub.

You configure the storage account details, such as the storage account name and key. And you specify the number of threads to use during processing.

Storage Account and Container Prerequisite

Before you use the Azure IoT/Event Hub Consumer origin, you need a Microsoft Azure storage account and at least one container.

The origin stores offsets in a storage account container, so to ensure the integrity of offset information, you must use a different container for each pipeline that includes an Azure IoT/Event Hub Consumer origin.

For example, say you use the Azure IoT/Event Hub Consumer as the origin for an IoT pipeline and a Transactions pipeline. To keep the offset data for these pipelines separate, you need to use two different storage account containers. They can be in the same storage account or in different storage accounts. When you configure the origins, you specify the storage account and container to use.

To create a new container for the pipeline:
  1. Log into the Microsoft Azure portal: https://portal.azure.com
  2. In the Navigation panel, click Storage Accounts.
  3. Select the storage account to use.

    If you need to create a storage account, click the Add icon. Enter a name for the storage account, and enter or select a resource group name. You can use the defaults for all other properties.

  4. In the storage account view, click + Container to create a container.
  5. Enter a container name, and click OK.
    Tip: Use a name that can be easily identified as the container for the pipeline that you want to use it in.

If these steps are no longer accurate, see the Microsoft Azure Event Hub documentation.

Multithreaded Processing

The Azure IoT/Event Hub Consumer origin performs parallel processing and enables the creation of a multithreaded pipeline.

The Azure IoT/Event Hub Consumer origin uses multiple concurrent threads to read from an event hub based on the Max Threads property. When you start the pipeline, the origin creates the number of threads specified in the Max Threads property. Each thread connects to the origin system, creates a batch of data, and passes the batch to an available pipeline runner.

A pipeline runner is a sourceless pipeline instance - an instance of the pipeline that includes all of the processors and destinations in the pipeline and represents all pipeline processing after the origin. Each pipeline runner processes one batch at a time, just like a pipeline that runs on a single thread. When the flow of data slows, the pipeline runners wait idly until they are needed, generating an empty batch at regular intervals. You can configure the Runner Idle Time pipeline property to specify the interval or to opt out of empty batch generation.

Multithreaded pipelines preserve the order of records within each batch, just like a single-threaded pipeline. But since batches are processed by different pipeline runners, the order that batches are written to destinations is not ensured.

For example, say you set the Max Threads property to 5. When you start the pipeline, the origin creates five threads, and the pipeline creates a matching number of pipeline runners. Upon receiving data, the origin passes a batch to each of the pipeline runners for processing.

Each pipeline runner performs the processing associated with the rest of the pipeline. After a batch is written to pipeline destinations, the pipeline runner becomes available for another batch of data. Each batch is processed and written as quickly as possible, independent from other batches processed by other pipeline runners, so batches may be written differently from the read-order.

At any given moment, the five pipeline runners can each process a batch, so this multithreaded pipeline processes up to five batches at a time. When incoming data slows, the pipeline runners sit idle, available for use as soon as the data flow increases.

Offset Management in Event Hub

When the pipeline stops, the Azure IoT/Event Hub Consumer origin notes where it stops reading by maintaining the last-saved offset in Azure Event Hub. When the pipeline starts again, the origin continues processing from where it stopped by default.

When you want the origin to process all available data instead of processing data from the last-saved offset, you must delete the offset information stored in Microsoft Azure Event Hub.

  1. In the Microsoft Azure portal, navigate to the storage account.
  2. To delete the offset information stored for the pipeline, delete the container that the pipeline uses.

    This can take some time. Allow the portal to complete the removal of the container before continuing.

  3. To enable the pipeline to store new offset information when you restart the pipeline, create a new container with the same name. Or, use a different name and update the Container Name property in the pipeline.

Data Formats

The Azure IoT/Event Hub Consumer origin reads data from Microsoft Azure Event Hub based on the data format that you select. You can use the following data formats:
Binary
Generates a record with a single byte array field at the root of the record.
When the data exceeds the user-defined maximum data size, the origin cannot process the data. Because the record is not created, the origin cannot pass the record to the pipeline to be written as an error record. Instead, the origin generates a stage error.
Datagram
Generates a record for every message. The origin can process collectd messages, NetFlow 5 and NetFlow 9 messages, and the following types of syslog messages:
  • RFC 5424
  • RFC 3164
  • Non-standard common messages, such as RFC 3339 dates with no version digit
When processing NetFlow messages, the stage generates different records based on the NetFlow version. When processing NetFlow 9, the records are generated based on the NetFlow 9 configuration properties. For more information, see NetFlow Data Processing.
Delimited
Generates a record for each delimited line. You can use the following delimited format types:
  • Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
  • RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
  • MS Excel CSV - Microsoft Excel comma-separated file.
  • MySQL CSV - MySQL comma-separated file.
  • Tab-Separated Values - File that includes tab-separated values.
  • PostgreSQL CSV - PostgreSQL comma-separated file.
  • PostgreSQL Text - PostgreSQL text file.
  • Custom - File that uses user-defined delimiter, escape, and quote characters.
  • Multi Character Delimited - File that uses multiple user-defined characters to delimit fields and lines, and single user-defined escape and quote characters.
You can use a list or list_map root field type for delimited data, and optionally include field names from a header line, when available. For more information about the root field types, see Delimited Data Root Field Type.
When using a header line, you can enable handling records with additional columns. The additional columns are named using a custom prefix and integers in sequential increasing order, such as _extra_1, _extra_2. When you disallow additional columns, records that include additional columns are sent to error.
You can also replace a string constant with null values.
When a record exceeds the maximum record length defined for the stage, the stage processes the object based on the error handling configured for the stage.
JSON
Generates a record for each JSON object. You can process JSON files that include multiple JSON objects or a single JSON array.
When an object exceeds the maximum object length defined for the origin, the origin processes the object based on the error handling configured for the stage.
Log
Generates a record for every log line.
When a line exceeds the user-defined maximum line length, the origin truncates longer lines.
You can include the processed log line as a field in the record. If the log line is truncated, and you request the log line in the record, the origin includes the truncated line.
You can define the log format or type to be read.
SDC Record
Generates a record for every record. Use to process records generated by a pipeline using the SDC Record data format.
For error records, the origin provides the original record as read from the origin in the original pipeline, as well as error information that you can use to correct the record.
When processing error records, the origin expects the error file names and contents as generated by the original pipeline.
Text
Generates a record for each line of text or for each section of text based on a custom delimiter.
When a line or section exceeds the maximum line length defined for the origin, the origin truncates it. The origin adds a boolean field named Truncated to indicate if the line was truncated.
For more information about processing text with a custom delimiter, see Text Data Format with Custom Delimiters.
XML
Generates records based on a user-defined delimiter element. Use an XML element directly under the root element or define a simplified XPath expression. If you do not define a delimiter element, the origin treats the XML file as a single record.
Generated records include XML attributes and namespace declarations as fields in the record by default. You can configure the stage to include them in the record as field attributes.
You can include XPath information for each parsed XML element and XML attribute in field attributes. This also places each namespace in an xmlns record header attribute.
Note: Field attributes and record header attributes are written to destination systems automatically only when you use the SDC RPC data format in destinations. For more information about working with field attributes and record header attributes, and how to include them in records, see Field Attributes and Record Header Attributes.
When a record exceeds the user-defined maximum record length, the origin skips the record and continues processing with the next record. It sends the skipped record to the pipeline for error handling.
Use the XML data format to process valid XML documents. For more information about XML processing, see Reading and Processing XML Data.

Configuring an Azure IoT/Event Hub Consumer

Configure an Azure IoT/Event Hub Consumer origin to write data to Microsoft Azure Event Hub. Be sure to complete the necessary prerequisites before you configure the origin.
  1. In the properties pane, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the Event Hub tab, configure the following properties:
    Event Hub Property Description
    Namespace Name The name of the namespace that contains the event hub that you want to use.
    Event Hub Name The event hub name.
    Shared Access Policy Name The policy name associated with the namespace.

    To retrieve the policy name, when logged into the Azure portal, navigate to your namespace and event hub, and then click Shared Access Policies for a list of policies.

    When appropriate, you can use the default shared access key policy, RootManageSharedAccessKey.

    Connection String Key One of the connection string keys associated with the specified shared access policy.

    To retrieve a connection string key, after accessing the list of shared access policies, click the policy name, and then copy the Connection String - Primary Key value.

    The value typically begins with "Endpoint".

    Consumer Group Consumer group to use. Enter a consumer group associated with the specified event hub.

    You can use the default consumer group, $Default.

    To view a list of available consumer groups, when viewing the event hub in the Azure portal, click Consumer Groups.

    Event Processor Prefix A prefix to identify the pipeline. Use a different prefix for each pipeline that includes the origin.

    Used to communicate with Azure Event Hub.

    Storage Account Name Name of the storage account to use.

    For information about creating a storage account, see Storage Account and Container Prerequisite.

    Storage Account Key One of the keys for the storage account.

    To retrieve the storage account key, when viewing the storage account details in the Azure portal, click Access Keys. Then copy one of the default key values.

    Container Name The name of the container to use for the pipeline.

    For information about creating a container, see Storage Account and Container Prerequisite.

    Max Threads Number of threads the origin generates and uses for multithreaded processing.
  3. On the Data Format tab, configure the following property:
    Data Format Property Description
    Data Format Format of data to be written. Use one of the following options:
    • Binary
    • Datagram
    • Delimited
    • JSON
    • Log
    • SDC Record
    • Text
    • XML
  4. For binary data, on the Data Format tab, configure the following properties:
    Binary Property Description
    Compression Format The compression format of the files:
    • None - Processes only uncompressed files.
    • Compressed File - Processes files compressed by the supported compression formats.
    • Archive - Processes files archived by the supported archive formats.
    • Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
    File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json.

    Default is *, which processes all files.

    Max Data Size (bytes) Maximum number of bytes in the message. Larger messages cannot be processed or written to error.
  5. For datagram data, on the Data Format tab, configure the following properties:
    Datagram Properties Description
    Datagram Packet Format Packet format of the data:
    • collectd
    • NetFlow
    • syslog
    • Raw/separated data
    TypesDB File Path Path to a user-provided types.db file. Overrides the default types.db file.

    For collectd data only.

    Convert Hi-Res Time & Interval Converts the collectd high resolution time format interval and timestamp to UNIX time, in milliseconds.

    For collectd data only.

    Exclude Interval Excludes the interval field from output record.

    For collectd data only.

    Auth File Path to an optional authentication file. Use an authentication file to accept signed and encrypted data.

    For collectd data only.

    Record Generation Mode Determines the type of values to include in the record. Select one of the following options:
    • Raw Only
    • Interpreted Only
    • Both Raw and Interpreted

    For NetFlow 9 data only.

    Max Templates in Cache The maximum number of templates to store in the template cache. For more information about templates, see Caching NetFlow 9 Templates.

    Default is -1 for an unlimited cache size.

    For NetFlow 9 data only.

    Template Cache Timeout (ms) The maximum number of milliseconds to cache an idle template. Templates unused for more than the specified time are evicted from the cache. For more information about templates, see Caching NetFlow 9 Templates.

    Default is -1 for caching templates indefinitely.

    For NetFlow 9 data only.

    Charset Character encoding of the messages to be processed.
    Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
  6. For delimited data, on the Data Format tab, configure the following properties:
    Delimited Property Description
    Compression Format The compression format of the files:
    • None - Processes only uncompressed files.
    • Compressed File - Processes files compressed by the supported compression formats.
    • Archive - Processes files archived by the supported archive formats.
    • Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
    File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json.

    Default is *, which processes all files.

    Delimiter Format Type Delimiter format type. Use one of the following options:
    • Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
    • RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
    • MS Excel CSV - Microsoft Excel comma-separated file.
    • MySQL CSV - MySQL comma-separated file.
    • Tab-Separated Values - File that includes tab-separated values.
    • PostgreSQL CSV - PostgreSQL comma-separated file.
    • PostgreSQL Text - PostgreSQL text file.
    • Custom - File that uses user-defined delimiter, escape, and quote characters.
    • Multi Character Delimited - File that uses multiple user-defined characters to delimit fields and lines, and single user-defined escape and quote characters.
    Header Line Indicates whether a file contains a header line, and whether to use the header line.
    Allow Extra Columns When processing data with a header line, allows processing records with more columns than exist in the header line.
    Extra Column Prefix Prefix to use for any additional columns. Extra columns are named using the prefix and sequential increasing integers as follows: <prefix><integer>.

    For example, _extra_1. Default is _extra_.

    Max Record Length (chars) Maximum length of a record in characters. Longer records are not read.

    This property can be limited by the parser buffer size. For more information, see Maximum Record Size.

    Delimiter Character Delimiter character for a custom delimiter format. Select one of the available options or use Other to enter a custom character.

    You can enter a Unicode control character using the format \uNNNN, where ​N is a hexadecimal digit from the numbers 0-9 or the letters A-F. For example, enter \u0000 to use the null character as the delimiter or \u2028 to use a line separator as the delimiter.

    Default is the pipe character ( | ).

    Multi Character Field Delimiter Characters that delimit fields for multi-character delimiter format.

    Default is two pipe characters (||).

    Multi Character Line Delimiter Characters that delimit lines or records for multi-character delimiter format.

    Default is the newline character (\n).

    Escape Character Escape character for a custom or multi-character delimiter format.
    Quote Character Quote character for a custom or multi-character delimiter format.
    Enable Comments Allows commented data to be ignored for custom delimiter format.
    Comment Marker Character that marks a comment when comments are enabled for custom delimiter format.
    Ignore Empty Lines Allows empty lines to be ignored for custom delimiter format.
    Root Field Type Root field type to use:
    • List_Map - Generates an indexed list of data. Enables you to use standard functions to process data. Use for new pipelines.
    • List - Generates a record with an indexed list with a map for header and value. Requires the use of delimited data functions to process data. Use only to maintain pipelines created before 1.1.0.
    Lines to Skip Number of lines to skip before reading data.
    Parse NULLs Replaces the specified string constant with null values.
    NULL Constant String constant to replace with null values.
    Charset Character encoding of the files to be processed.
    Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
  7. For JSON data, on the Data Format tab, configure the following properties:
    JSON Property Description
    Compression Format The compression format of the files:
    • None - Processes only uncompressed files.
    • Compressed File - Processes files compressed by the supported compression formats.
    • Archive - Processes files archived by the supported archive formats.
    • Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
    File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json.

    Default is *, which processes all files.

    JSON Content Type of JSON content. Use one of the following options:
    • Array of Objects
    • Multiple Objects
    Max Object Length (chars) Maximum number of characters in a JSON object.

    Longer objects are diverted to the pipeline for error handling.

    This property can be limited by the parser buffer size. For more information, see Maximum Record Size.

    Charset Character encoding of the files to be processed.
    Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
  8. For log data, on the Data Format tab, configure the following properties:
    Log Property Description
    Compression Format The compression format of the files:
    • None - Processes only uncompressed files.
    • Compressed File - Processes files compressed by the supported compression formats.
    • Archive - Processes files archived by the supported archive formats.
    • Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
    File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json.

    Default is *, which processes all files.

    Log Format Format of the log files. Use one of the following options:
    • Common Log Format
    • Combined Log Format
    • Apache Error Log Format
    • Apache Access Log Custom Format
    • Regular Expression
    • Grok Pattern
    • Log4j
    • Common Event Format (CEF)
    • Log Event Extended Format (LEEF)
    Max Line Length Maximum length of a log line. The origin truncates longer lines.

    This property can be limited by the parser buffer size. For more information, see Maximum Record Size.

    Retain Original Line Determines how to treat the original log line. Select to include the original log line as a field in the resulting record.

    By default, the original line is discarded.

    Charset Character encoding of the files to be processed.
    Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
    • When you select Apache Access Log Custom Format, use Apache log format strings to define the Custom Log Format.
    • When you select Regular Expression, enter the regular expression that describes the log format, and then map the fields that you want to include to each regular expression group.
    • When you select Grok Pattern, you can use the Grok Pattern Definition field to define custom grok patterns. You can define a pattern on each line.

      In the Grok Pattern field, enter the pattern to use to parse the log. You can use a predefined grok patterns or create a custom grok pattern using patterns defined in Grok Pattern Definition.

    • When you select Log4j, define the following properties:
      Log4j Property Description
      On Parse Error Determines how to handle information that cannot be parsed:
      • Skip and Log Error - Skips reading the line and logs a stage error.
      • Skip, No Error - Skips reading the line and does not log an error.
      • Include as Stack Trace - Includes information that cannot be parsed as a stack trace to the previously-read log line. The information is added to the message field for the last valid log line.
      Use Custom Log Format Allows you to define a custom log format.
      Custom Log4J Format Use log4j variables to define a custom log format.
  9. For text data, on the Data Format tab, configure the following properties:
    Text Property Description
    Compression Format The compression format of the files:
    • None - Processes only uncompressed files.
    • Compressed File - Processes files compressed by the supported compression formats.
    • Archive - Processes files archived by the supported archive formats.
    • Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
    File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json.

    Default is *, which processes all files.

    Max Line Length Maximum number of characters allowed for a line. Longer lines are truncated.

    Adds a boolean field to the record to indicate if it was truncated. The field name is Truncated.

    This property can be limited by the parser buffer size. For more information, see Maximum Record Size.

    Use Custom Delimiter Uses custom delimiters to define records instead of line breaks.
    Custom Delimiter One or more characters to use to define records.
    Include Custom Delimiter Includes delimiter characters in the record.
    Charset Character encoding of the files to be processed.
    Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
  10. For XML data, on the Data Format tab, configure the following properties:
    XML Property Description
    Compression Format The compression format of the files:
    • None - Processes only uncompressed files.
    • Compressed File - Processes files compressed by the supported compression formats.
    • Archive - Processes files archived by the supported archive formats.
    • Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
    File Name Pattern within Compressed Directory For archive and compressed archive files, file name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json.

    Default is *, which processes all files.

    Delimiter Element
    Delimiter to use to generate records. Omit a delimiter to treat the entire XML document as one record. Use one of the following:
    • An XML element directly under the root element.

      Use the XML element name without surrounding angle brackets ( < > ) . For example, msg instead of <msg>.

    • A simplified XPath expression that specifies the data to use.

      Use a simplified XPath expression to access data deeper in the XML document or data that requires a more complex access method.

      For more information about valid syntax, see Simplified XPath Syntax.

    Include Field XPaths Includes the XPath to each parsed XML element and XML attribute in field attributes. Also includes each namespace in an xmlns record header attribute.

    When not selected, this information is not included in the record. By default, the property is not selected.

    Note: Field attributes and record header attributes are written to destination systems automatically only when you use the SDC RPC data format in destinations. For more information about working with field attributes and record header attributes, and how to include them in records, see Field Attributes and Record Header Attributes.
    Namespaces Namespace prefix and URI to use when parsing the XML document. Define namespaces when the XML element being used includes a namespace prefix or when the XPath expression includes namespaces.

    For information about using namespaces with an XML element, see Using XML Elements with Namespaces.

    For information about using namespaces with XPath expressions, see Using XPath Expressions with Namespaces.

    Using simple or bulk edit mode, click the Add icon to add additional namespaces.

    Output Field Attributes Includes XML attributes and namespace declarations in the record as field attributes. When not selected, XML attributes and namespace declarations are included in the record as fields.
    Note: Field attributes are automatically included in records written to destination systems only when you use the SDC RPC data format in the destination. For more information about working with field attributes, see Field Attributes.

    By default, the property is not selected.

    Max Record Length (chars)

    The maximum number of characters in a record. Longer records are diverted to the pipeline for error handling.

    This property can be limited by the parser buffer size. For more information, see Maximum Record Size.

    Charset Character encoding of the files to be processed.
    Ignore Control Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.