Elasticsearch

Supported pipeline types:
  • Data Collector

The Elasticsearch destination writes data to an Elasticsearch cluster, including Elastic Cloud clusters (formerly Found clusters) and Amazon Elasticsearch Service clusters. The destination uses the Elasticsearch HTTP module to access the Bulk API and write each record to Elasticsearch as a document.

When you configure the Elasticsearch destination, you configure the cluster name, the HTTP URI, and document-related information.

When the Data Collector shares the same network as the Elasticsearch cluster, you can enter one or more node URI and automatically detect additional Elasticsearch nodes on the cluster.

The Elasticsearch destination can use CRUD operations defined in the sdc.operation.type record header attribute to write data. You can define a default operation for records without the header attribute or value. You can also configure whether to use multi-row operations for inserts and deletes, and how to handle records with unsupported operations. For information about Data Collector change data processing and a list of CDC-enabled origins, see Processing Changed Data.

You can also add advanced Elasticsearch properties as needed.

Security

When security is enabled for the Elasticsearch cluster, you must specify the authentication method:
Basic
Use Basic authentication for Elasticsearch clusters outside of Amazon Elasticsearch Service. With Basic authentication, the destination passes the Elasticsearch user name and password.
AWS Signature V4
Use AWS Signature V4 authentication for Elasticsearch clusters within Amazon Elasticsearch Service. The destination must sign HTTP requests with Amazon Web Services credentials. For details, see the Amazon Elasticsearch Service documentation. Use one of the following methods to sign with AWS credentials:
IAM roles
When Data Collector runs on an Amazon EC2 instance, you can use the AWS Management Console to configure an IAM role for the EC2 instance. Data Collector uses the IAM instance profile credentials to automatically connect to AWS.
When you use IAM roles, you do not need to specify the Access Key ID and Secret Access Key properties in the destination.
For more information about assigning an IAM role to an EC2 instance, see the Amazon EC2 documentation.
AWS access key pairs

When Data Collector does not run on an Amazon EC2 instance or when the EC2 instance doesn’t have an IAM role, you must specify the Access Key ID and Secret Access Key properties in the destination.

Tip: To secure sensitive information such as user names and passwords or access key pairs, you can use runtime resources or credential stores.

Time Basis and Time-Based Indexes

The time basis is the time used by the Elasticsearch destination to write records to time-based indexes. When indexes have no time component, you can ignore the time basis property.

You can use the time of processing or the time associated with the data as the time basis.

For example, say you define the Index property using the following datetime variables:
logs-${YYYY()}-${MM()}-${DD()}

If you use the time of processing as the time basis, the destination write records to indexes based on when it processes each record. If you use the time associated with the data, such as a transaction timestamp, then the destination writes records to the indexes based on that timestamp.

You can use the following times as the time basis:
Processing Time
When you use processing time as the time basis, the destination writes to indexes based on the processing time and the index. To use the processing time as the time basis, use the following expression:
${time:now()}
This is the default time basis.
Record Time
When you use the time associated with a record as the time basis, you specify a date field in the record. The destination writes data to indexes based on the datetimes associated with the records.
To use a time associated with the record, use an expression that calls a field and resolves to a datetime value, such as ${record:value("/Timestamp")}.

Document IDs

When appropriate, you can specify an expression that defines the document ID. When you do not specify an expression, Elasticsearch generates IDs for each document.

When you configure the destination to perform create, update, or delete operations, you must define the document ID.

For example, to perform updates for documents with IDs based on the EmployeeID field, define the write operation as update and define the Document ID as follows: ${record:value('/EmployeeID')}.

You can also optionally define a parent ID for each document to define a parent/child relationship between documents in the same index.

Define the CRUD Operation

The Elasticsearch destination can create, update, delete, or index data. The destination writes the records based on the CRUD operation defined in a CRUD operation header attribute or in operation-related stage properties.

You define the CRUD operation in the following ways:

CRUD record header attribute
You can define the CRUD operation in a CRUD operation record header attribute. The destination looks for the CRUD operation to use in the sdc.operation.type record header attribute.
The attribute can contain one of the following numeric values:
  • 1 for CREATE, the equivalent of INSERT
  • 2 for DELETE
  • 3 for UPDATE
  • 4 for INDEX, the equivalent of UPSERT
  • 8 for UPDATE with doc_as_upsert, the equivalent of MERGE
If your pipeline includes a CRUD-enabled origin that processes changed data, the destination simply reads the operation type from the sdc.operation.type header attribute that the origin generates. If your pipeline uses a non-CDC origin, you can use the Expression Evaluator or a scripting processor to define the record header attribute. For more information about Data Collector changed data processing and a list of CDC-enabled origins, see Processing Changed Data.
Operation stage properties
You define a default operation in the destination properties. The destination uses the default operation when the sdc.operation.type record header attribute is not set.
You can also define how to handle records with unsupported operations defined in the sdc.operation.type header attribute. The destination can discard them, send them to error, or use the default operation.

Configuring an Elasticsearch Destination

Configure an Elasticsearch destination to write data to an Elasticsearch cluster.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions.

    Records that do not meet all preconditions are processed based on the error handling configured for the stage.

    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline. Not valid for cluster pipelines.
  2. On the Elasticsearch tab, configure the following properties:
    Elasticsearch Property Description
    Cluster HTTP URI HTTP URI used to connect to the cluster. Use the following format:
    <host>:<port>
    Additional HTTP Params Additional HTTP parameters that you want to send as query string parameters to Elasticsearch. Enter the exact parameter name and value expected by Elasticsearch.
    Detect Additional Nodes in Cluster Detects additional nodes in the cluster based on the configured Cluster URI.

    Selecting this property is the equivalent to setting the client.transport.sniff Elasticsearch property to true.

    Use only when the Data Collector shares the same network as the Elasticsearch cluster. Do not use for Elastic Cloud or Docker clusters.

    Use Security Specifies whether security is enabled on the Elasticsearch cluster.
    Time Basis Time basis to use for writing to time-based indexes. Use one of the following expressions:
    • ${time:now()} - Uses the processing time as the time basis. The processing time is the time associated with the Data Collector running the pipeline.
    • An expression that calls a field and resolves to a datetime value, such as ${record:value(<date field path>)}. Uses the datetime result as the time basis.

    When the Index property does not include datetime variables, you can ignore this property.

    Default is ${time:now()}.

    Data Time Zone Time zone for the destination system. Used to resolve datetimes in time-based indexes.
    Index Index for the generated documents. Enter an index name or an expression that evaluates to the index name.

    For example, if you enter customer as the index, the destination writes the document within the customer index.

    If you use datetime variables in the expression, make sure to configure the time basis appropriately. For details about datetime variables, see Datetime Variables.

    Mapping Mapping type for the generated documents. Enter the mapping type, an expression that evaluates to the mapping type, or a field that includes the mapping type.

    For example, if you enter user as the mapping type, the destination writes the document with a user mapping type.

    Document ID Expression that evaluates to the ID for the generated documents. When you do not specify an ID, Elasticsearch creates an ID for each document.

    By default, the destination allows Elasticsearch to create the ID.

    Parent ID Optional parent ID for the generated documents. Enter a parent ID or an expression that evaluates to the parent ID.

    Use to establish a parent-child relationship between documents in the same index.

    Routing Optional custom routing value for the generated documents. Enter a routing value or an expression that evaluates to the routing value.

    Elasticsearch routes a document to a particular shard in an index based on the routing value defined for the document. You can define a custom value for each document. If you don’t define a custom routing value, Elasticsearch uses the parent ID (if defined) or the document ID as the routing value.

    Data Charset

    Character encoding of the data to be processed.

    Default Operation Default CRUD operation to perform if the sdc.operation.type record header attribute is not set.
    Unsupported Operation Handling Action to take when the CRUD operation type defined in the sdc.operation.type record header attribute is not supported:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Use Default Operation - Writes the record to the destination system using the default operation.
    Additional Properties Extra fields to include in the action statement. Specify in JSON format.

    For example, you can use the _retry_on_conflict field to specify how many times an update is retried when there is a version conflict. To specify three retries, include the following:

    "_retry_on_conflict" : 3

    For more information, see the Elasticsearch documentation.

  3. If you enabled security, on the Security tab, configure the following properties:
    Security Property Description
    Mode Authentication method to use:
    • Basic - Authenticate with Elasticsearch user name and password. Select this option for Elasticsearch clusters outside of Amazon Elasticsearch Service.
    • AWS Signature V4 - Authenticate with AWS. Select this option for Elasticsearch clusters within Amazon Elasticsearch Service.
    Security Username/Password Elasticsearch user name and password.
    Enter the user name and password using the following syntax:
    <username>:<password>

    Available when using Basic authentication.

    Region Amazon Web Services region that hosts the Elasticsearch domain.

    Available when using AWS Signature V4 authentication.

    Access Key ID AWS access key ID. Required when not using IAM roles with IAM instance profile credentials.

    Available when using AWS Signature V4 authentication.

    Secret Access Key AWS secret access key. Required when not using IAM roles with IAM instance profile credentials.

    Available when using AWS Signature V4 authentication.

    SSL Truststore Path Location of the truststore file.

    Configuring this property is the equivalent to configuring the shield.ssl.truststore.path Elasticsearch property.

    Not necessary for Elastic Cloud clusters.

    SSL Truststore Password Password for the truststore file.

    Configuring this property is the equivalent to configuring the shield.ssl.truststore.password Elasticsearch property.

    Not necessary for Elastic Cloud clusters.