Elasticsearch

The Elasticsearch destination writes data to an Elasticsearch cluster, including Elastic Cloud clusters (formerly Found clusters). The destination uses the Elasticsearch HTTP API to write each record to Elasticsearch as a document.

When you configure the Elasticsearch destination, you configure the cluster name, the HTTP URI, and document-related information.

When the Data Collector shares the same network as the Elasticsearch cluster, you can enter one or more node URI and automatically detect additional Elasticsearch nodes on the cluster.

The Elasticsearch destination can use CRUD operations defined in the sdc.operation.type record header attribute to write data. You can define a default operation for records without the header attribute or value. You can also configure whether to use multi-row operations for inserts and deletes, and how to handle records with unsupported operations.For information about Data Collector change data processing and a list of CDC-enabled origins, see Processing Changed Data.

You can also add advanced Elasticsearch properties as needed.

Time Basis and Time-Based Indexes

The time basis is the time used by the Elasticsearch destination to write records to time-based indexes. When indexes have no time component, you can ignore the time basis property.

You can use the time of processing or the time associated with the data as the time basis.

For example, say you define the Index property using the following datetime variables:
logs-${YYYY()}-${MM()}-${DD()}

If you use the time of processing as the time basis, the destination write records to indexes based on when it processes each record. If you use the time associated with the data, such as a transaction timestamp, then the destination writes records to the indexes based on that timestamp.

You can use the following times as the time basis:
Processing Time
When you use processing time as the time basis, the destination writes to indexes based on the processing time and the index. To use the processing time as the time basis, use the following expression:
${time:now()}
This is the default time basis.
Record Time
When you use the time associated with a record as the time basis, you specify a date field in the record. The destination writes data to indexes based on the datetimes associated with the records.
To use a time associated with the record, use an expression that calls a field and resolves to a datetime value, such as ${record:value("/Timestamp")}.

Document IDs

When appropriate, you can specify an expression that defines the document ID. When you do not specify an expression, Elasticsearch generates IDs for each document.

When you configure the destination to perform create, update, or delete operations, you must define the document ID.

For example, to perform updates for documents with IDs based on the EmployeeID field, define the write operation as update and define the Document ID as follows: ${record:value('/EmployeeID')}.

Define the CRUD Operation

The Elasticsearch destination can create, update, delete, or index data. The destination writes the records based on the CRUD operation defined in a CRUD operation header attribute or in operation-related stage properties.

You define the CRUD operation in the following ways:

CRUD record header attribute
You can define the CRUD operation in a CRUD operation record header attribute. The destination looks for the CRUD operation to use in the sdc.operation.type record header attribute.
The attribute can contain one of the following numeric values:
  • 1 for CREATE, the equivalent of INSERT
  • 2 for DELETE
  • 3 for UPDATE
  • 4 for INDEX, the equivalent of UPSERT
If your pipeline includes a CRUD-enabled origin that processes changed data, the destination simply reads the operation type from the sdc.operation.type header attribute that the origin generates. If your pipeline uses a non-CDC origin, you can use the Expression Evaluator or a scripting processor to define the record header attribute. For more information about Data Collector changed data processing and a list of CDC-enabled origins, see Processing Changed Data.
Operation stage properties
You define a default operation in the destination properties. The destination uses the default operation when the sdc.operation.type record header attribute is not set.
You can also define how to handle records with unsupported operations defined in the sdc.operation.type header attribute. The destination can discard them, send them to error, or use the default operation.

Configuring an Elasticsearch Destination

Configure an Elasticsearch destination to write data to an Elasticsearch cluster.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions.

    Records that do not meet all preconditions are processed based on the error handling configured for the stage.

    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline. Not valid for cluster pipelines.
  2. On the Elasticsearch tab, configure the following properties:
    Elasticsearch Property Description
    Cluster HTTP URI HTTP URI used to connect to the cluster. Use the following format:
    <host>:<port>
    Additional HTTP Params Additional HTTP parameters that you want to send as query string parameters to Elasticsearch. Enter the exact parameter name and value expected by Elasticsearch.
    Detect Additional Nodes in Cluster

    Detects additional nodes in the cluster based on the configured Cluster URI.

    Selecting this property is the equivalent to setting the client.transport.sniff Elasticsearch property to true.

    Use only when the Data Collector shares the same network as the Elasticsearch cluster. Do not use for Elastic Cloud or Docker clusters.

    Use Security Specifies whether security is enabled on the Elasticsearch cluster.
    Time Basis
    Time basis to use for writing to time-based indexes. Use one of the following expressions:
    • ${time:now()} - Uses the processing time as the time basis. The processing time is the time associated with the Data Collector running the pipeline.
    • An expression that calls a field and resolves to a datetime value, such as ${record:value(<date field path>)}. Uses the datetime result as the time basis.

    When the Index property does not include datetime variables, you can ignore this property.

    Default is ${time:now()}.

    Data Time Zone Time zone for the destination system. Used to resolve datetimes in time-based indexes.
    Index Index for the generated documents. Enter an index name, an expression that evaluates to the index name, or a field that includes the index name.

    For example, if you enter customer as the index, the destination writes the document within the customer index.

    If you use datetime variables in the expression, make sure to configure the time basis appropriately. For details about datetime variables, see Datetime Variables.

    Mapping Mapping type for the generated documents. Enter the mapping type, an expression that evaluates to the mapping type, or a field that includes the mapping type.

    For example, if you enter user as the mapping type, the destination writes the document with a user mapping type.

    Document ID ID for the record. Use to specify the ID for the generated documents. When you do not specify an ID, Elasticsearch creates an ID for each document.

    By default, the destination allows Elasticsearch to create the ID.

    Data Charset

    Character encoding of the data to be processed.

    Default Operation Default CRUD operation to perform if the sdc.operation.type record header attribute is not set.
    Unsupported Operation Handling Action to take when the CRUD operation type defined in the sdc.operation.type record header attribute is not supported:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Use Default Operation - Writes the record to the destination system using the default operation.
  3. If you enabled security, on the Security tab, configure the following properties:
    Security Property Description
    Security Username/Password Elasticsearch username and password.
    Enter the username and password using the following syntax:
    <username>:<password>
    Tip: To secure sensitive information such as usernames and passwords, you can use runtime resources or Hashicorp Vault secrets. For more information, see Using Runtime Resources or Accessing Hashicorp Vault Secrets.
    SSL Truststore Path

    Location of the truststore file.

    Configuring this property is the equivalent to configuring the shield.ssl.truststore.path Elasticsearch property.

    Not necessary for Elastic Cloud clusters.

    SSL Truststore Password

    Password for the truststore file.

    Configuring this property is the equivalent to configuring the shield.ssl.truststore.password Elasticsearch property.

    Not necessary for Elastic Cloud clusters.