Kudu

Supported pipeline types:
  • Data Collector

The Kudu destination writes data to a Kudu cluster.

When you configure the Kudu destination, you specify the connection information for one or more Kudu masters, define the table to use, and optionally define field mappings. By default, the destination writes field data to columns with matching names.

The Kudu destination can use CRUD operations defined in the sdc.operation.type record header attribute to write data. You can define a default operation for records without the header attribute or value. You can also configure how to handle records with unsupported operations. For information about Data Collector change data processing and a list of CDC-enabled origins, see Processing Changed Data.

If the destination receives a change data capture log from some origin systems, you must select the format of the change log.

You can also configure the external consistency mode, operation timeouts, and the maximum number of worker threads to use.

Define the CRUD Operation

The Kudu destination can insert, update, delete, or upsert data. The destination writes the records based on the CRUD operation defined in a CRUD operation header attribute or in operation-related stage properties.

You define the CRUD operation in the following ways:

CRUD record header attribute
You can define the CRUD operation in a CRUD operation record header attribute. The destination looks for the CRUD operation to use in the sdc.operation.type record header attribute.
The attribute can contain one of the following numeric values:
  • 1 for INSERT
  • 2 for DELETE
  • 3 for UPDATE
  • 4 for UPSERT
If your pipeline includes a CRUD-enabled origin that processes changed data, the destination simply reads the operation type from the sdc.operation.type header attribute that the origin generates. If your pipeline uses a non-CDC origin, you can use the Expression Evaluator or a scripting processor to define the record header attribute. For more information about Data Collector changed data processing and a list of CDC-enabled origins, see Processing Changed Data.
Operation stage properties
You define a default operation in the destination properties. The destination uses the default operation when the sdc.operation.type record header attribute is not set.
You can also define how to handle records with unsupported operations defined in the sdc.operation.type header attribute. The destination can discard them, send them to error, or use the default operation.

Kudu Data Types

The Kudu destination converts Data Collector data types to the following compatible Kudu data types:

Data Collector Data Type Kudu Data Type
Boolean Bool
Byte Int8
Byte Array Binary
Decimal Decimal. Available in Kudu version 1.7 and later. If using an earlier version of Kudu, configure your pipeline to convert the Decimal data type to a different Kudu data type.
Double Double
Float Float
Integer Int32
Long Int64 or Unixtime_micros. The destination determines the data type to use based on the mapped Kudu column.

The Data Collector Long data type stores millisecond values. The Kudu Unixtime_micros data type stores microsecond values. When converting to the Unixtime_micros data type, the destination multiplies the field value by 1,000 to convert the value to microseconds.

Short Int16
String String
The destination cannot convert the following Data Collector data types. Use a Field Type Converter processor earlier in the pipeline to convert these Data Collector data types to ones that are compatible with Kudu:
  • Character
  • Date
  • Datetime
  • List
  • List-Map
  • Map
  • Time

Configuring a Kudu Destination

Configure a Kudu destination to write to a Kudu cluster.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Stage Library Library version that you want to use.
    Required Fields Fields that must include data for the record to be passed into the stage.
    Tip: You might include fields that the stage uses.

    Records that do not include all required fields are processed based on the error handling configured for the pipeline.

    Preconditions Conditions that must evaluate to TRUE to allow a record to enter the stage for processing. Click Add to create additional preconditions.

    Records that do not meet all preconditions are processed based on the error handling configured for the stage.

    On Record Error Error record handling for the stage:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Stop Pipeline - Stops the pipeline.
  2. On the Kudu tab, configure the following properties:
    Kudu Property Description
    Kudu Masters Comma-separated list of connection information for the Kudu masters to use. Use the following format:
    <host>:<port>
    Table Name Table to write to. Enter one of the following:
    • Name of an existing Kudu table. If the table doesn't exist, the pipeline fails to start.
    • Expression that evaluates to the name of an existing Kudu table. For example, if the table name is stored in the "tableName" record attribute, enter the following expression:
      ${record:attribute('tableName')}
      If the table doesn't exist, the records are treated as error records.
    Note: When using tables created by Impala, use the prefix impala:: followed by the database name and table name. For example:
    impala::<database name>.<table name> 
    Field to Column Mapping Use to define specific mappings between record fields and Kudu columns. By default, the destination writes field data to columns with matching names.
    Default Operation Default CRUD operation to perform if the sdc.operation.type record header attribute is not set.
    Change Log Format If the incoming data is a change data capture log read from the following source systems, select the source system so that the destination can determine the format of the log:
    • Microsoft SQL Server
    • Oracle CDC Client
    • MySQL Binary Log
    • MongoDB Oplog

    For any other source data, set to None.

  3. Optionally, click the Advanced tab and configure the following properties:
    Advanced Property Description
    External Consistency External consistency mode used to write to Kudu:
    • Client Propagated - Ensures that writes from a single client are automatically externally consistent.
    • Commit Wait - An experimental external consistency model that tightly synchronizes the clocks on all machines in the cluster.

    For more information, see the Kudu documentation.

    Mutation Buffer Space The size of the buffer that Kudu uses to write a single batch of data, in records. Should be equal to or greater than the number of records in the batch passed from the pipeline.

    Default is 1000 records.

    Maximum Number of Worker Threads The maximum number of threads to use to perform processing for the stage.

    Default is the Kudu default – twice the number of available cores on the Data Collector machine. Use this property to limit the number of threads that can be used.

    To use the default, leave empty or enter 0.
    Operation Timeout (milliseconds) Number of milliseconds to allow for operations such as writes.

    Default is 10000, or 10 seconds.

    Admin Operation Timeout (milliseconds) Number of milliseconds to allow for admin-type operations, such as opening a table or getting a table schema.

    Default is 30000, or 30 seconds.

    Unsupported Operation Handling Action to take when the CRUD operation type defined in the sdc.operation.type record header attribute is not supported:
    • Discard - Discards the record.
    • Send to Error - Sends the record to the pipeline for error handling.
    • Use Default Operation - Writes the record to the destination system using the default operation.