Amazon S3

The Amazon S3 origin reads objects stored in Amazon Simple Storage Service, also known as Amazon S3. The objects must be fully written, include data of the same supported format, and use the same schema.

When reading multiple objects in a batch, the origin reads the oldest object first. Upon successfully reading an object, the origin can delete the object, move it to an archive directory, or leave it in the directory.

When the pipeline stops, the origin notes the last-modified timestamp of the last object that it processed and stores it as an offset. When the pipeline starts again, the origin continues processing from the last-saved offset by default. You can reset pipeline offsets to process all available objects.

The Amazon S3 origin reads from Amazon S3 using connection information stored in a Hadoop configuration file. Complete the prerequisite tasks before using the origin in a local pipeline.

When you configure the origin, you specify the connection security to use and related properties. You define the location of the objects and the name pattern for the objects to read. You can optionally configure another name pattern to exclude objects from processing and define post-processing actions for successfully read objects.

You select the data format of the data and configure related properties. When processing delimited or JSON data, you can define a custom schema for reading the data and configure related properties. You can also configure advanced properties such as performance-related properties and proxy server properties.

Note: The user that Transformer uses to access HDFS depends on the pipeline configuration. For more information, see Hadoop YARN.

You can configure the origin to load data only once and cache the data for reuse throughout the pipeline run. Or, you can configure the origin to cache each batch of data so the data can be passed to multiple downstream batches efficiently. You can also configure the origin to skip tracking offsets, which enables reading the entire data set each time you start the pipeline.

Schema Requirement

All objects processed by the Amazon S3 origin must have the same schema.

When objects have different schemas, the resulting behavior depends on the data format and the version of Spark that you use. For example, the origin might skip processing delimited data with a different schema, but add null values to Parquet data with a different schema.

Connection Security

You can specify how securely the origin connects to Amazon S3. The origin can connect in the following ways:
IAM role
When Transformer runs on an Amazon EC2 instance, you can use the AWS Management Console to configure an IAM role for the Transformer EC2 instance. Then, Transformer uses the IAM instance profile credentials to automatically connect to Amazon.
For more information about assigning an IAM role to an EC2 instance, see the Amazon EC2 documentation.
AWS keys
When Transformer does not run on an Amazon EC2 instance or when the EC2 instance doesn’t have an IAM role, you can connect using an AWS access key pair. When using an AWS access key pair, you specify the access key ID and secret access key to use.
Tip: To secure sensitive information, you can use credential stores or runtime resources. For more information about runtime resources, see the Data Collector documentation.
None
When accessing a public bucket, you can connect anonymously using no security.

Partitioning

Spark runs a Transformer pipeline just as it runs any other application, splitting the data into partitions and performing operations on the partitions in parallel. Spark determines how to split pipeline data into initial partitions based on the origins in the pipeline.

For an Amazon S3 origin, Spark determines the partitioning based on the data format of the data being read:
Delimited, JSON, text, or XML
When reading text-based data, Spark can split the object into multiple partitions for processing, depending on the underlying file system. Multiline JSON files cannot be split.
Avro, ORC, or Parquet
When reading Avro, ORC, or Parquet data, Spark can split the object into multiple partitions for processing.

Spark uses these partitions throughout the pipeline unless a processor causes Spark to shuffle the data. When you need to change the partitioning in the pipeline, use the Repartition processor.

Data Formats

The Amazon S3 origin generates records based on the specified data format.

The origin can read the following data formats:
Avro
The origin generates a record for every Avro record in the object. Each object must contain the Avro schema. The origin uses the Avro schema to generate records.
When you configure the origin, you must specify the Avro option appropriate for the version of Spark to run the pipeline: Spark 2.3 or Spark 2.4 or later.
When using Spark 2.4 or later, you can define an Avro schema to use. The schema must be in JSON format. You can also configure the origin to process all objects in the specified locations. By default, the origin only processes objects with the .avro extension.
Delimited
The origin generates a record for each delimited line in the object. You can specify a custom delimiter, quote, and escape character used in the data.
By default, the origin uses the values in the first row for field names and creates records starting with the second row in the object. The origin infers data types from the data by default.
You can clear the Includes Header property to indicate that objects do not contain a header row. When objects do not include a header row, the origin names the first field _c0, the second field _c1, and so on. The origin also infers data types from the data by default. You can rename the fields downstream with a Field Renamer processor, or you can specify a custom schema in the origin.
When you specify a custom schema, the origin uses the field names and data types defined in the schema, applying the first field in the schema to the first field in the record, and so on.
By default, when the origin encounters parsing errors, it stops the pipeline. When processing data with a custom schema, the origin handles parsing errors based on the configured error handling.
Objects must use \n as the newline character. The origin skips empty lines.
JSON
By default, the origin generates a record for each line in the object. Each line in the object must contain valid JSON Lines data. For details, see the JSON Lines website.
If the JSON Lines data contains objects that span multiple lines, you must configure the origin to process multiline JSON objects. When processing multiline JSON objects, the origin generates a record for each JSON object, even if it spans multiple lines.
A standard single-line JSON Lines object can be split into partitions and processed in parallel. A multiline JSON object cannot be split, so must be processed in a single partition, which can slow pipeline performance.
By default, the origin uses the field names, field order, and data types in the data.
When you specify a custom schema, the origin matches the field names in the schema to those in the data, then applies the data types and field order defined in the schema.
By default, when the origin encounters parsing errors, it stops the pipeline. When processing data with a custom schema, the origin handles parsing errors based on the configured error handling.
ORC
The origin generates a record for each Optimized Row Columnar (ORC) row in the object.
Parquet
The origin generates records for every Parquet record in the object. The object must contain the Parquet schema. The origin uses the Parquet schema to generate records.
Text
The origin generates a record for each text line in the object. The object must use \n as the newline character.
The generated record consists of a single String field named Value that contains the data.
XML
The origin generates a record for every row in the object. You specify the root tag used in files and the row tag used to define records.

Configuring an Amazon S3 Origin

Configure an Amazon S3 origin to read data in Amazon S3. Complete the prerequisite tasks before using the origin in a local pipeline.

Note: All processed objects must share the same schema.
  1. On the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Stage Library Stage library to use to connect to Amazon S3:
    • AWS cluster-provided libraries - The cluster where the pipeline runs has Apache Hadoop Amazon Web Services libraries installed, and therefore has all of the necessary libraries to run the pipeline.
    • AWS Transformer-provided libraries for Hadoop 2.7.7 - Transformer passes the necessary libraries with the pipeline to enable running the pipeline.

      Use when running the pipeline locally or when the cluster where the pipeline runs does not include the Amazon Web Services libraries required for Hadoop 2.7.7.

    • AWS Transformer-provided libraries for Hadoop 3.2.0 - Transformer passes the necessary libraries with the pipeline to enable running the pipeline.

      Use when running the pipeline locally or when the cluster where the pipeline runs does not include the Amazon Web Services libraries required for Hadoop 3.2.0.

    Note: When using additional Amazon stages in the pipeline, ensure that they use the same stage library.
    Load Data Only Once Reads data in a single batch and caches the results for reuse. Use to perform lookups in streaming execution mode pipelines.

    When using the origin to perform lookups, do not limit the batch size. All lookup data should be read in a single batch.

    This property is ignored in batch execution mode.

    Cache Data Caches processed data so the data can be reused for multiple downstream stages. Use to improve performance when the stage passes data to multiple stages.

    Caching can limit pushdown optimization when the pipeline runs in ludicrous mode.

    Available when Load Data Only Once is not enabled. When the origin loads data once, it also caches data.

    Skip Offset Tracking Skips tracking offsets.

    In a streaming pipeline, the origin reads all available data with each batch.

    In a batch pipeline, the origin reads all available data each time the pipeline starts.

  2. On the Amazon S3 tab, configure the following properties:
    Amazon S3 Property Description
    Security Mode to use to connect to Amazon S3:
    • AWS keys - Connects using an AWS access key pair.
    • IAM Role - Connects using an IAM role assigned to the Transformer EC2 instance.
    • None - Connects to a public bucket using no security.
    Access Key ID AWS Access Key ID. Required when using AWS keys to connect to Amazon.
    Tip: To secure sensitive information, you can use credential stores or runtime resources. For more information about runtime resources, see the Data Collector documentation.
    Secret Access Key AWS Secret Access Key. Required when using AWS keys to connect to Amazon.
    Tip: To secure sensitive information, you can use credential stores or runtime resources. For more information about runtime resources, see the Data Collector documentation.
    Bucket Location of the objects to read. Use the following format:
    s3a://<bucket name>/<path to objects>/
    Object Name Pattern Glob pattern that defines the names of the objects to process. For example, to read all objects in the specified bucket, use an asterisk ( * ).
    Exclusion Pattern Glob pattern that defines the names of the objects to exclude from processing.
    Max Objects Per Batch Maximum number of objects to include in a batch.
       
    Write Mode Mode to write objects:
    • Overwrite existing objects - Deletes all objects in the location before creating new objects.
    • Write data to new objects - Creates new objects without affecting existing objects in the location.
  3. On the Post-Processing tab, optionally configure the following property:
    Post-Processing Property Description
    Post Processing Action to take after successfully processing an object:
    • None - Keeps the object in place.
    • Archive - Copies or moves the object to another location.
    • Delete - Deletes the object.
    Archive Directory Location to store successfully processed objects. Use the following format:
    s3a://<bucket name>/<path to objects>/
  4. On the Advanced tab, optionally configure the following properties:
    Advanced Property Description
    Additional Configuration

    Additional HDFS properties to pass to an HDFS-compatible file system. Specified properties override those in Hadoop configuration files.

    To add properties, click the Add icon and define the HDFS property name and value. Use the property names and values as expected by your version of Hadoop.

    Block Size Block size to use when reading data, in bytes.

    Default is 33554432.

    Buffer Hint

    TCP socket buffer size hint, in bytes.

    Default is 8192.

    Maximum Connections Maximum number of connections to Amazon.

    Default is 1.

    Connection Timeout Milliseconds to wait for a response before closing the connection.

    Default is 200000 milliseconds, or 3.33 minutes.

    Socket Timeout Milliseconds to wait for a response to a query.

    Default is 5000 milliseconds, or 5 seconds.

    Retry Count Maximum number of times to retry requests.

    Default is 20.

    Use Proxy Enables connecting to Amazon using a proxy server.
    Proxy Host Host name of the proxy server. Required when using a proxy server.
    Proxy Port Optional port number for the proxy server.
    Proxy User User name for the proxy server credentials. Using credentials is optional.
    Proxy Password Password for the proxy server credentials. Using credentials is optional.
    Tip: To secure sensitive information, you can use credential stores or runtime resources. For more information about runtime resources, see the Data Collector documentation.
    Proxy Domain Optional domain name for the proxy server.
    Proxy Workstation Optional workstation for the proxy server.
  5. On the Data Format tab, configure the following property:
    Data Format Property Description
    Data Format Format of the data. Select one of the following formats:
    • Avro (Spark 2.4 or later) - For Avro data processed by Spark 2.4 or later.
    • Avro (Spark 2.3) - For Avro data processed by Spark 2.3.
    • Delimited
    • JSON
    • ORC
    • Parquet
    • Text
    • XML
  6. For Avro data processed by Spark 2.4 or later, optionally configure the following properties:
    Avro/Spark 2.4 Property Description
    Avro Schema Optional Avro schema to use to process data. The specified Avro schema overrides any schema included in the objects.

    Specify the Avro schema in JSON format.

    Ignore Extension Processes all files in the specified directories.

    When not enabled, the origin only processes objects with the .avro extension.

  7. For delimited data, on the Data Format tab, optionally configure the following properties:
    Delimited Property Description
    Delimiter Character Delimiter character used in the data. Select one of the available options or select Other to enter a custom character.

    You can enter a Unicode control character using the format \uNNNN, where ​N is a hexadecimal digit from the numbers 0-9 or the letters A-F. For example, enter \u0000 to use the null character as the delimiter or \u2028 to use a line separator as the delimiter.

    Quote Character Quote character used in the data.
    Escape Character Escape character used in the data
    Includes Header Indicates that the data includes a header line. When selected, the origin uses the first line to create field names and begins reading with the second line.
  8. For JSON data, on the Data Format tab, configure the following property:
    JSON Property Description
    Multiline Enables processing multiline JSON Lines data.

    By default, the origin expects a single JSON object on each line of the file. Use this option to process JSON objects that span multiple lines.

  9. For XML data, on the Data Format tab, configure the following properties:
    XML Property Description
    Root Tag Tag used as the root element.

    Default is ROWS, which represents a <ROWS> root element.

    Row Tag Tag used as a record delineator.

    Default is ROW, which represents a <ROW> record delineator element.

  10. To use a custom schema for delimited or JSON data, click the Schema tab and configure the following properties:
    Schema Property Description
    Schema Mode Mode that determines the schema to use when processing data:
    • Infer from Data

      The origin infers the field names and data types from the data.

    • Use Custom Schema - DDL Format

      The origin uses a custom schema defined in the DDL format.

    • Use Custom Schema - JSON Format

      The origin uses a custom schema defined in the JSON format.

    Note that the schema is applied differently depending on the data format of the data.

    Schema Custom schema to use to process the data.

    Enter the schema in DDL or JSON format, depending on the selected schema mode.

    Error Handling Determines how the origin handles parsing errors:
    • Permissive - When the origin encounters a problem parsing any field in the record, it creates a record with the field names defined in the schema, but with null values in every field.
    • Drop Malformed - When the origin encounters a problem parsing any field in the record, it drops the entire record from the pipeline.
    • Fail Fast - When the origin encounters a problem parsing any field in the record, it stops the pipeline.
    Original Data Field Field where the data from the original record is written when the origin cannot parse the record.

    When writing the original record to a field, you must add the field to the custom schema as a String field.

    Available when using permissive error handling.