skip to Main Content

The DataOps Blog

Where Change Is Welcome

Send Kafka Messages To Amazon S3

By Posted in Engineering December 9, 2020

In this post, we will take a look at best practices for integrating StreamSets Data Collector (SDC), a fast data ingestion engine, with Kafka, and take a deep dive into the details with regards to sending Kafka messages to Amazon S3.

Apache Kafka is a distributed event streaming platform used by thousands of companies for streaming analytics and for other mission-critical applications. This has become one of the most common components of the big data ecosystem within organizations.

Kafka Message Consumer

In some scenarios an organization may already have an existing data pipeline bringing messages to Kafka. In this case, SDC can take on the role of a consumer and handle all of the logic for taking data from Kafka to wherever it needs to go. For example, you could deliver data from Kafka to S3/HDFS/Elasticsearch or whatever destination you choose without writing any code. With the use of StreamSets Kafka origin you can take Kafka messages and batch them together into appropriately sized and push them to desired destination.

Send Kafka Messages To S3Kafka messages consumer origins:

  1. Kafka Consumer
  2. Kafka MultiTopic Consumer

Why are there two Kafka origins?

Suppose you have an application that needs to read messages from a Kafka topic, run some validations against them, and write the results to another data store. In this case you will create a consumer pipeline which subscribes to the appropriate topic, and start receiving messages, validating them and writing the results.

Send Kafka Messages To S3This may work well for a while, but what if the rate at which producers write messages to the topic exceeds the rate at which your application can validate them?  If you are limited to a single consumer reading and processing the data, your application may fall further and further behind, unable to keep up with the rate of incoming messages. Obviously, there is a need to scale consumption from topics. Just like multiple producers can write to the same topic, we need to allow multiple consumers to read from the same topic and as well read from multiple topics, splitting the data between them. That’s where Kafka multitopic consumer origin comes in handy. This origin uses multiple threads to enable parallel processing of data.

Kafka Multitopic Consumer origin can then take advantage of additional processors and memory to run several consumer threads in parallel. Kafka will distribute messages across the partitions, and the load will be shared between the consumer threads.

Send Kafka Messages To S3

Which one is the right choice for me?

If your SDC is running on a more powerful machine which has high resource availability, then use Kafka Multitopic origin to scale vertically.

Send Kafka Messages To S3However, if you plan to scale horizontally which is a more cost-effective option, then use Kafka consumer again partitioning the Kafka topic, but this time running the pipeline on multiple Data Collector instances. This has some extra overhead of maintenance if you are manually running multiple instances of this pipeline over multiple data collector instances, but this can be achieved easily with StreamSets Control Hub to start multiple pipeline instances through the same job.

Send Kafka Messages To S3Advanced Features

Let’s look at some Advance features which you can leverage in both Kafka origins.

Kafka Security 

You can configure both Kafka stages – Kafka Consumer and Kafka Multitopic Consumer – to connect securely through the following methods:

  • Kerberos
  • SSL/TLS
  • Both SSL/TLS and Kerberos

Enabling security requires configuring additional Kafka configuration properties in the stage in addition to completing the prerequisite tasks.

Record Header Attributes 

Both Kafka origins creates record header attributes that include information about the originating file for the record. Especially when the origin processes Avro data, it includes the Avro schema in an avroSchema record header attribute.

  • avroSchema: When processing Avro data, provides the Avro schema
  • offset: The offset where the record originated
  • partition: The partition where the record originated
  • topic: The topic where the record originated

Kafka Message Key 

You can configure both origins to capture the message keys included in each Kafka message and store them in generated records. Kafka message keys can be string values or Avro messages, depending on how your Kafka system is configured. You can store message key values in a record header/field or both and can use the values in pipeline processing logic or to write them to destination systems. If you have no need for message keys, you can discard them. The Kafka Consumer and Kafka Multitopic Consumer origins discard message keys by default.

Data Pipeline Overview And Implementation

The goal here is to read Avro files from a file system directory and write them to a Kafka topic using the StreamSets Kafka Producer. We’ll then use a second pipeline configured with a Kafka Consumer to read all the messages from that topic, perform a set of transformations to mask the data and determine the type of credit card. And finally write the data to AWS S3 by partitioning the data on credit card type and make sure that the data stored on S3 is encrypted. In the second part of this blog, we will redesign our data pipeline for scaling and for handling huge amounts of data.

Here’s what the data in JSON format looks like:

{
  "transaction_date":"dd/mm/YYYY",
  "card_number":"0000-0000-0000-0000",
  "card_expiry_date":"mm/YYYY",
  "card_security_code":"0000",
  "purchase_amount":"$00.00",
  "description":"transaction description of the purchase"
}

Prerequisites 

  • A working instance of StreamSets Data Collector ( 3.18.1+)
  • A working Kafka instance (see the Quickstart for easy local setup. Last tested on version 1.1.0 but older and newer versions should work too.)

Publish data to Kafka topic using Kafka producer

Let’s create Kafka topic — “demo-topic” — by running this command

bin/Kafka-topics.sh --create --topic demo-topic --bootstrap-server localhost:9092 --partitions 3

Now let’s push some sample data to this Kafka topic using a simple data pipeline. We will read Avro files from a file system directory and write them to a Kafka topic using the StreamSets Kafka Producer in SDC Record data format. Then use another data pipeline to read the SDC Record data from Kafka and write it to Elasticsearch and convert data to Avro for S3.

Send Kafka Messages To S3

Consume Kafka messages and store them in AWS S3

Send Kafka Messages To S3

Kafka Consumer

  • Let’s configure Kafka origin to consume messages from your local Kafka setup and on Data Format tab select SDC Record.

Field Converter

  • It so happens that the card number field is defined as an integer in Avro. We will want to convert this to a string value. So type ‘/card_number‘ in the ‘Fields to Convert’ text box and set it to type String in ‘Convert to Type’. Leave the rest to default values.

Jython Evaluator

  • In this stage we’ll use a small piece of python code to look at the first few digits of the card number and figure out what type of card it is. We’ll add that card type to a new field called ‘credit_card_type‘.

Field Masker

  • The last step of the process is to mask the card number so that the last 4 digits of the card is all that makes it to the data stores.

Writing to Amazon S3

  • We’ll convert the data back to Avro format and store it in S3 bucket.
  • On ‘Data Format‘ select ‘Avro‘ and ‘In Pipeline Configuration‘ for ‘Avro Schema Location’. Then specify the following schema for Avro Schema:
{"namespace" : "cctest.avro",
 "type": "record",
 "name": "CCTest",
 "doc": "Test Credit Card Transactions",
 "fields": [
            {"name": "transaction_date", "type": "string"},
            {"name": "card_number", "type": "string"},
            {"name": "card_expiry_date", "type": "string"},
            {"name": "card_security_code", "type": "string"},
            {"name": "purchase_amount", "type": "string"},
            {"name": "description", "type": "string"}
           ]
}
  • To save storage space on the S3 bucket let’s compress the data as it’s written. Select BZip2 as the Avro Compression Codec.
  • To write to partitions based on data in the `credit_card_type` field, we will use ${record:value(‘/credit_card_type’)} expression as the Partition Prefix. With this expression, the destination will create and write records to partitions based on the “credit_card_type” value in the record.
  • Protecting Sensitive Data: you can use any of the options below for server-side encryption on Amazon S3 to protect sensitive data. For example, in our case, credit card numbers.
    • Amazon S3-Managed Encryption Keys (SSE-S3)
    • AWS KMS-Managed Encryption Keys (SSE-KMS)
    • Customer-Provided Encryption Keys (SSE-C)

Output on Amazon S3

Note that the Output on S3 will be partitioned by ‘credit_card_type’

Send Kafka Messages To S3

Data Pipeline Redesign For Large Workloads

Now let’s assume you have to scale the above solution given the scenario that you are dealing with large amounts of data and there are multiple upstream applications which are writing to multiple Kafka topics. So the rate at which producers write messages to the topic has exceeded the rate at which this pipeline consumes. 

Also assume there is one more upstream pipeline/application which generates the similar credit card data and stores that info in Kafka topic `demo-topic-2`.

bin/Kafka-topics.sh --create --topic demo-topic-2 --bootstrap-server localhost:9092 --partitions 2

So instead of recreating the entire data pipeline from scratch we can easily redesign the existing pipeline by swapping out the origin with Kafka Multitopic Consumer origin. We can scale the pipeline vertically by increasing the number of threads. Simultaneously we can read multiple topics as well if required.

Send Kafka Messages To S3

Note that the number of threads should always be less than or equal to the number of partitions which we are reading to achieve better parallelism. So here we have a demo-topic which has 3 partitions and demo-topic-2 which has 2 partitions. And hence we can set the number of threads to 5 to achieve more parallelism.

Note: Sample data and pipelines can be found on GitHub. 

Conclusion

In this blog, we learned how to use StreamSets Data Collector as a Kafka consumer and when to choose Kafka Consumer origin vs Kafka Multitopic Consumer origin to process large amounts of Kafka messages and take advantage of parallelism. We also explored various AWS S3 destination capabilities like partitioning and server side encryption. 

Here are some resources that will help jump start your journey with designing and deploying data pipelines:

Back To Top

We use cookies to improve your experience with our website. Click Allow All to consent and continue to our site. Privacy Policy