Google Pub/Sub Subscriber

The Google Pub/Sub Subscriber origin consumes messages from a Google Pub/Sub subscription.

When you configure the origin, you define the Google Pub/Sub subscription ID to receive messages from. You also define the project and credentials provider to use to connect to Google Pub/Sub. The origin can retrieve credentials from the Google Application Default Credentials or from a Google Cloud service account credentials file.

The Google Pub/Sub Subscriber origin can use multiple threads to enable parallel processing of data from a Google Pub/Sub subscription.

When available, the Google Pub/Sub Subscriber origin includes user-defined message attributes in record header attributes.

Credentials

When the Google Pub/Sub Subscriber origin consumes messages from a Google Pub/Sub subscription, it must pass credentials to Google Pub/Sub. Configure the origin to retrieve the credentials from the Google Application Default Credentials or from a Google Cloud service account credentials file.

Default Credentials Provider

When configured to use the Google Application Default Credentials, the origin checks for the credentials file defined in the GOOGLE_APPLICATION_CREDENTIALS environment variable. If the environment variable doesn't exist and Data Collector is running on a virtual machine (VM) in Google Cloud, the origin uses the built-in service account associated with the virtual machine instance.

For more information about the default credentials, see Google Application Default Credentials in the Google Developer documentation.

Complete the following steps to define the credentials file in the environment variable:

  1. Use the Google Cloud Platform Console or the gcloud command-line tool to create a Google service account and have your application use it for API access.
    For example, to use the command line tool, run the following commands:
    gcloud iam service-accounts create my-account
    gcloud iam service-accounts keys create key.json --iam-account=my-account@my-project.iam.gserviceaccount.com
  2. Store the generated credentials file on the Data Collector machine.
  3. In the Data Collector environment configuration file, add the GOOGLE_APPLICATION_CREDENTIALS environment variable and point it to the credentials file.

    If you start Data Collector as a service, set the environment variable in the $SDC_DIST/libexec/sdcd-env.sh file. If you start Data Collector manually, set the variable in the $SDC_DIST/libexec/sdc-env.sh file.

    Set the environment variable as follows:

    export GOOGLE_APPLICATION_CREDENTIALS="/var/lib/sdc-resources/keyfile.json"
  4. Restart Data Collector to enable the changes.
  5. On the Credentials tab for the stage, select Default Credentials Provider for the credentials provider.

Service Account Credentials File (JSON)

When configured to use the Google Cloud service account credentials file, the origin checks for the file defined in the origin properties.

Complete the following steps to use the service account credentials file:

  1. Generate a service account credentials file in JSON format.

    Use the Google Cloud Platform Console or the gcloud command-line tool to generate and download the credentials file. For more information, see generating a service account credential in the Google Cloud Platform documentation.

  2. Store the generated credentials file on the Data Collector machine.

    As a best practice, store the file in the Data Collector resources directory, $SDC_RESOURCES.

  3. On the Credentials tab for the stage, select Service Account Credentials File for the credentials provider and enter the path to the credentials file.

Multithreaded Processing

The Google Pub/Sub Subscriber origin can perform parallel processing and enables the creation of a multithreaded pipeline. The origin uses multiple concurrent threads based on the Num Pipeline Runners property.

When you start the pipeline, each thread connects to the origin system and creates a batch of data, and passes the batch to an available pipeline runner. A pipeline runner is a sourceless pipeline instance - an instance of the pipeline that includes all of the processors and destinations in the pipeline and performs all pipeline processing after the origin.

Each pipeline runner processes one batch at a time, just like a pipeline that runs on a single thread. When the flow of data slows, the pipeline runners wait idly until they are needed.

Multithreaded pipelines preserve the order of records within each batch, just like a single-threaded pipeline. But since batches are processed by different pipeline instances, the order that batches are written to destinations is not ensured.

For example, say you set the Num Pipeline Runners property to 5. When you start the pipeline, the origin creates five threads, and by default Data Collector creates a matching number of pipeline runners. Upon receiving data, the origin passes a batch to each of the pipeline runners for processing.

Each pipeline runner performs the processing associated with the rest of the pipeline. After a batch is written to pipeline destinations, the pipeline runner becomes available for another batch of data. Each batch is processed and written as quickly as possible, independent from other batches processed by other pipeline runners, so batches may be written differently from the read-order.

At any given moment, the five pipeline runners can each process a batch, so this multithreaded pipeline processes up to five batches at a time. When incoming data slows, the pipeline runners sit idle, available for use as soon as the data flow increases.

For more information about multithreaded pipelines, see Multithreaded Pipeline Overview.

Record Header Attributes

The Google Pub/Sub Subscriber origin includes user-defined message attributes in record header attributes when they are available. When the origin processes Avro data, it includes the Avro schema in an avroSchema record header attribute.

A Google Pub/Sub message contains a payload and optional attributes that describe the payload content. If the Google Pub/Sub Subscriber origin consumes a message with optional attributes, the origin includes the message attributes in record header attributes.

You can use the record:attribute or record:attributeOrDefault functions to access the information in the attributes. For more information about working with record header attributes, see Working with Header Attributes.

Data Formats

The Google Pub/Sub Subscriber origin processes data differently based on the data format. Google Pub/Sub Subscriber can process the following types of data:

Avro
Generates a record for every message. Includes a "precision" and "scale" field attribute for each Decimal field. For more information about field attributes, see Field Attributes.
The origin writes the Avro schema to an avroSchema record header attribute. For more information about record header attributes, see Record Header Attributes.
You can use one of the following methods to specify the location of the Avro schema definition:
  • Message/Data Includes Schema - Use the schema in the message.
  • In Pipeline Configuration - Use the schema that you provide in the stage configuration.
  • Confluent Schema Registry - Retrieve the schema from Confluent Schema Registry. The Confluent Schema Registry is a distributed storage layer for Avro schemas. You can configure the origin to look up the schema in the Confluent Schema Registry by the schema ID embedded in the message or by the schema ID or subject specified in the stage configuration.
Using a schema in the stage configuration or retrieving a schema from the Confluent Schema Registry overrides any schema that might be included in the message and can improve performance.
Binary
Generates a record with a single byte array field at the root of the record.
When the data exceeds the user-defined maximum data size, the origin cannot process the data. Because the record is not created, the origin cannot pass the record to the pipeline to be written as an error record. Instead, the origin generates a stage error.
Delimited
Generates a record for each delimited line. You can use the following delimited format types:
  • Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
  • RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
  • MS Excel CSV - Microsoft Excel comma-separated file.
  • MySQL CSV - MySQL comma separated file.
  • Tab-Separated Values - File that includes tab-separated values.
  • Custom - File that uses user-defined delimiter, escape, and quote characters.
You can use a list or list-map root field type for delimited data, optionally including the header information when available. For more information about the root field types, see Delimited Data Root Field Type.
When using a header line, you can allow processing records with additional columns. The additional columns are named using a custom prefix and integers in sequential increasing order, such as _extra_1, _extra_2. When you disallow additional columns when using a header line, records that include additional columns are sent to error.
You can also replace a string constant with null values.
When a record exceeds the maximum record length defined for the origin, the origin processes the object based on the error handling configured for the stage.
JSON
Generates a record for each JSON object. You can process JSON files that include multiple JSON objects or a single JSON array.
When an object exceeds the maximum object length defined for the origin, the origin processes the object based on the error handling configured for the stage.
Protobuf
Generates a record for every protobuf message. By default, the origin assumes messages contain multiple protobuf messages.
Protobuf messages must match the specified message type and be described in the descriptor file.
When the data for a record exceeds 1 MB, the origin cannot continue processing data in the message. The origin handles the message based on the stage error handling property and continues reading the next message.
For information about generating the descriptor file, see Protobuf Data Format Prerequisites.
SDC Record
Generates a record for every record. Use to process records generated by a Data Collector pipeline using the SDC Record data format.
For error records, the origin provides the original record as read from the origin in the original pipeline, as well as error information that you can use to correct the record.
When processing error records, the origin expects the error file names and contents as generated by the original pipeline.
Text
Generates a record for each line of text or for each section of text based on a custom delimiter.
When a line or section exceeds the maximum line length defined for the origin, the origin truncates it. The origin adds a boolean field named Truncated to indicate if the line was truncated.
For more information about processing text with a custom delimiter, see Text Data Format with Custom Delimiters.
XML
Generates records based on a user-defined delimiter element. Use an XML element directly under the root element or define a simplified XPath expression. If you do not define a delimiter element, the origin treats the XML file as a single record.
Generated records include XML attributes and namespace declarations as fields in the record by default. You can configure the stage to include them in the record as field attributes.
You can include XPath information for each parsed XML element and XML attribute in field attributes. This also places each namespace in an xmlns record header attribute.
Note: Field attributes and record header attributes are written to destination systems automatically only when you use the SDC RPC data format in destinations. For more information about working with field attributes and record header attributes, and how to include them in records, see Field Attributes and Record Header Attributes.
When a record exceeds the user-defined maximum record length, the origin skips the record and continues processing with the next record. It sends the skipped record to the pipeline for error handling.
Use the XML data format to process valid XML documents. For more information about XML processing, see Reading and Processing XML Data.
Tip: If you want to process invalid XML documents, you can try using the text data format with custom delimiters. For more information, see Processing XML Data with Custom Delimiters.

Configuring a Google Pub/Sub Subscriber Origin

Configure a Google Pub/Sub Subscriber origin to consume messages from a Google Pub/Sub subscription.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the Pub/Sub tab, configure the following properties:
    Pub/Sub Property Description
    Subscription ID Google Pub/Sub subscription ID to receive messages from.
    Num Pipeline Runners The number of threads that the origin generates and uses for multithreaded processing. Enter a positive integer or an expression that evaluates to a positive integer.
    By default, the origin uses the following expression to configure the property based on the number of available CPU cores on the Data Collector machine:
    ${runtime:availableProcessors()}

    This expression is evaluated when you start the pipeline.

    Max Batch Size (records) Maximum amount of records to include in a batch. Honors values up to the Data Collector maximum batch size.

    Default is 1000. The Data Collector default is 1000.

    Max Batch Wait Time (ms) Amount of time the origin will wait to fill a batch before sending an empty batch.
  3. On the Credentials tab, configure the following properties:
    Credentials Property Description
    Project ID Google Pub/Sub project ID to connect to.
    Credentials Provider Credentials provider to use to connect to Google Pub/Sub:
    • Default credentials provider
    • Service account credentials file (JSON)
    Credentials File Path (JSON) When using a Google Cloud service account credentials file, path to the file that the origin uses to connect to Google Pub/Sub. The credentials file must be a JSON file.

    Enter a path relative to the Data Collector resources directory, $SDC_RESOURCES, or enter an absolute path.

  4. Optionally, click the Advanced tab to tune the performance of the origin.

    The defaults for these properties should work in most cases:

    Advanced Property Description
    Number of Subscribers Number of subscribers to spawn.

    Default is 1.

    Subscriber Thread Pool Size Size of the thread pool for each subscriber. Enter a positive integer or an expression that evaluates to a positive integer.
    By default, the origin uses the following expression to configure the property based on the number of available CPU cores on the Data Collector machine:
    ${5 * 10 * runtime:availableProcessors()}
    Custom Endpoint Optional endpoint to receive messages, entered in the following format:
    <host name>:<port number>

    Use to test with a Cloud SDK emulator for Google Pub/Sub.

  5. On the Data Format tab, configure the following property:
    Data Format Property Description
    Data Format Type of data to be read. Use one of the following options:
    • Avro
    • Binary
    • Delimited
    • JSON
    • Protobuf
    • SDC Record
    • Text
    • XML
  6. For Avro data, on the Data Format tab, configure the following properties:
    Avro Property Description
    Avro Schema Location Location of the Avro schema definition to use when processing data:
    • Message/Data Includes Schema - Use the schema in the message.
    • In Pipeline Configuration - Use the schema provided in the stage configuration.
    • Confluent Schema Registry - Retrieve the schema from the Confluent Schema Registry.

    Using a schema in the stage configuration or in the Confluent Schema Registry can improve performance.

    Avro Schema Avro schema definition used to process the data. Overrides any existing schema definitions associated with the data.

    You can optionally use the runtime:loadResource function to use a schema definition stored in a runtime resource file.

    Schema Registry URLs Confluent Schema Registry URLs used to look up the schema. To add a URL, click Add. Use the following format to enter the URL:
    http://<host name>:<port number>
    Lookup Schema By Method used to look up the schema in the Confluent Schema Registry:
    • Subject - Look up the specified Avro schema subject.
    • Schema ID - Look up the specified Avro schema ID.
    • Embedded Schema ID - Look up the Avro schema ID embedded in each message.
    Overrides any existing schema definitions associated with the message.
    Schema Subject Avro schema subject to look up in the Confluent Schema Registry.

    If the specified subject has multiple schema versions, the origin uses the latest schema version for that subject. To use an older version, find the corresponding schema ID, and then set the Look Up Schema By property to Schema ID.

    Schema ID Avro schema ID to look up in the Confluent Schema Registry.
  7. For binary data, on the Data Format tab, configure the following properties:
    Binary Property Description
    Compression Format The compression format of the files:
    • None - Processes only uncompressed files.
    • Compressed File - Processes files compressed by the supported compression formats.
    • Archive - Processes files archived by the supported archive formats.
    • Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
    File Name Pattern within Compressed Directory File name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json.

    Default is *, which processes all files.

    Max Data Size (bytes) Maximum number of bytes in the message. Larger messages cannot be processed or written to error.
  8. For delimited data, on the Data Format tab, configure the following properties:
    Delimited Property Description
    Compression Format The compression format of the files:
    • None - Processes only uncompressed files.
    • Compressed File - Processes files compressed by the supported compression formats.
    • Archive - Processes files archived by the supported archive formats.
    • Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
    File Name Pattern within Compressed Directory File name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json.

    Default is *, which processes all files.

    Delimiter Format Type Delimiter format type. Use one of the following options:
    • Default CSV - File that includes comma-separated values. Ignores empty lines in the file.
    • RFC4180 CSV - Comma-separated file that strictly follows RFC4180 guidelines.
    • MS Excel CSV - Microsoft Excel comma-separated file.
    • MySQL CSV - MySQL comma separated file.
    • Tab-Separated Values - File that includes tab-separated values.
    • Custom - File that uses user-defined delimiter, escape, and quote characters.
    Header Line Indicates whether a file contains a header line, and whether to use the header line.
    Allow Extra Columns When processing data with a header line, allows processing records with more columns than exist in the header line.
    Extra Column Prefix Prefix to use for any additional columns. Extra columns are named using the prefix and sequential increasing integers as follows: <prefix><integer>.

    For example, _extra_1. Default is _extra_.

    Max Record Length (chars) Maximum length of a record in characters. Longer records are not read.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Delimiter Character Delimiter character for a custom delimiter format. Select one of the available options or use Other to enter a custom character.

    You can enter a Unicode control character using the format \uNNNN, where ‚ÄčN is a hexadecimal digit from the numbers 0-9 or the letters A-F. For example, enter \u0000 to use the null character as the delimiter or \u2028 to use a line separator as the delimiter.

    Default is the pipe character ( | ).

    Escape Character Escape character for a custom file type.
    Quote Character Quote character for a custom file type.
    Root Field Type Root field type to use:
    • List-Map - Generates an indexed list of data. Enables you to use standard functions to process data. Use for new pipelines.
    • List - Generates a record with an indexed list with a map for header and value. Requires the use of delimited data functions to process data. Use only to maintain pipelines created before 1.1.0.
    Lines to Skip Lines to skip before reading data.
    Parse NULLs Replaces the specified string constant with null values.
    NULL Constant String constant to replace with null values.
    Charset Character encoding of the files to be processed.
    Ignore Ctrl Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
  9. For JSON data, on the Data Format tab, configure the following properties:
    JSON Property Description
    Compression Format The compression format of the files:
    • None - Processes only uncompressed files.
    • Compressed File - Processes files compressed by the supported compression formats.
    • Archive - Processes files archived by the supported archive formats.
    • Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
    File Name Pattern within Compressed Directory File name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json.

    Default is *, which processes all files.

    JSON Content Type of JSON content. Use one of the following options:
    • Array of Objects
    • Multiple Objects
    Maximum Object Length (chars) Maximum number of characters in a JSON object.

    Longer objects are diverted to the pipeline for error handling.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Charset Character encoding of the files to be processed.
    Ignore Ctrl Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
  10. For protobuf data, on the Data Format tab, configure the following properties:
    Protobuf Property Description
    Compression Format The compression format of the files:
    • None - Processes only uncompressed files.
    • Compressed File - Processes files compressed by the supported compression formats.
    • Archive - Processes files archived by the supported archive formats.
    • Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
    File Name Pattern within Compressed Directory File name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json.

    Default is *, which processes all files.

    Protobuf Descriptor File Descriptor file (.desc) to use. The descriptor file must be in the Data Collector resources directory, $SDC_RESOURCES.

    For information about generating the descriptor file, see Protobuf Data Format Prerequisites. For more information about environment variables, see Data Collector Environment Configuration.

    Message Type The fully-qualified name for the message type to use when reading data.

    Use the following format: <package name>.<message type>.

    Use a message type defined in the descriptor file.
    Delimited Messages Indicates if a message might include more than one protobuf message.
  11. For SDC Record data, on the Data Format tab, configure the following properties:
    SDC Record Property Description
    Compression Format The compression format of the files:
    • None - Processes only uncompressed files.
    • Compressed File - Processes files compressed by the supported compression formats.
    • Archive - Processes files archived by the supported archive formats.
    • Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
    File Name Pattern within Compressed Directory File name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json.

    Default is *, which processes all files.

  12. For text data, on the Data Format tab, configure the following properties:
    Text Property Description
    Compression Format The compression format of the files:
    • None - Processes only uncompressed files.
    • Compressed File - Processes files compressed by the supported compression formats.
    • Archive - Processes files archived by the supported archive formats.
    • Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
    File Name Pattern within Compressed Directory File name pattern that represents the files to process within the compressed directory. You can use UNIX-style wildcards, such as an asterisk or question mark. For example, *.json.

    Default is *, which processes all files.

    Max Line Length Maximum number of characters allowed for a line. Longer lines are truncated.

    Adds a boolean field to the record to indicate if it was truncated. The field name is Truncated.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Use Custom Delimiter Uses custom delimiters to define records instead of line breaks.
    Custom Delimiter One or more characters to use to define records.
    Include Custom Delimiter Includes delimiter characters in the record.
    Charset Character encoding of the files to be processed.
    Ignore Ctrl Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.
  13. For XML data, on the XML tab, configure the following properties:
    XML Property Description
    Compression Format The compression format of the files:
    • None - Processes only uncompressed files.
    • Compressed File - Processes files compressed by the supported compression formats.
    • Archive - Processes files archived by the supported archive formats.
    • Compressed Archive - Processes files archived and compressed by the supported archive and compression formats.
    Delimiter Element
    Delimiter to use to generate records. Omit a delimiter to treat the entire XML document as one record. Use one of the following:
    • An XML element directly under the root element.

      Use the XML element name without surrounding angle brackets ( < > ) . For example, msg instead of <msg>.

    • A simplified XPath expression that specifies the data to use.

      Use a simplified XPath expression to access data deeper in the XML document or data that requires a more complex access method.

      For more information about valid syntax, see Simplified XPath Syntax.

    Include Field XPaths Includes the XPath to each parsed XML element and XML attribute in field attributes. Also includes each namespace in an xmlns record header attribute.

    When not selected, this information is not included in the record. By default, the property is not selected.

    Note: Field attributes and record header attributes are written to destination systems automatically only when you use the SDC RPC data format in destinations. For more information about working with field attributes and record header attributes, and how to include them in records, see Field Attributes and Record Header Attributes.
    Namespaces Namespace prefix and URI to use when parsing the XML document. Define namespaces when the XML element being used includes a namespace prefix or when the XPath expression includes namespaces.

    For information about using namespaces with an XML element, see Using XML Elements with Namespaces.

    For information about using namespaces with XPath expressions, see Using XPath Expressions with Namespaces.

    Using simple or bulk edit mode, click the Add icon to add additional namespaces.

    Output Field Attributes Includes XML attributes and namespace declarations in the record as field attributes. When not selected, XML attributes and namespace declarations are included in the record as fields.
    Note: Field attributes are automatically included in records written to destination systems only when you use the SDC RPC data format in the destination. For more information about working with field attributes, see Field Attributes.

    By default, the property is not selected.

    Max Record Length (chars)

    The maximum number of characters in a record. Longer records are diverted to the pipeline for error handling.

    This property can be limited by the Data Collector parser buffer size. For more information, see Maximum Record Size.

    Charset Character encoding of the files to be processed.
    Ignore Ctrl Characters Removes all ASCII control characters except for the tab, line feed, and carriage return characters.