skip to Main Content

The DataOps Blog

Where Change Is Welcome

MySQL Database Change Capture with MapR Streams, Apache Drill, and StreamSets

By Posted in Engineering September 26, 2016

raphael headshotToday’s post is from Raphaël Velfre, a senior data engineer at MapR. Raphaël has spent some time working with StreamSets Data Collector (SDC) and MapR’s Converged Data Platform. In this blog entry, originally published on the MapR Converge blog, Raphaël explains how to use SDC to extract data from MySQL and write it to MapR Streams, and then move data from MapR Streams to MapR-FS via SDC, where it can be queried with Apache Drill.

Overview

A very common use case for the MapR Converged Data Platform is collecting and analyzing data from a variety of sources, including traditional relational databases. Until recently, data engineers would build an ETL pipeline that periodically walks the relational database and loads the data into files on the MapR cluster, then perform batch analytics on that data.

This model breaks down when use cases demand more instant access to that same data, in order to make a decision, raise an alert, or make an offer, since these batch pipelines are often scheduled to run hourly or even daily. To get to real-time data processing one must build a real-time data pipeline that is continuously collecting the latest data. Building a real-time data pipeline doesn’t mean you have to give up batch analytics, since you can always write data from the streaming pipeline to files or tables, but you can’t do real-time analytics using a batch pipeline.

To build a real-time data pipeline, you should start with MapR Streams, the publish/subscribe event streaming service of the MapR Converged Data Platform, as it is the most critical component to handle distribution of real-time data between applications.

Next, a tool is needed to extract the data out of the database and publish it into MapR Streams, as well as take data out of MapR Streams and write it to files or database tables. StreamSets Data Collector (SDC) is an open source, easy to use, GUI-based tool that runs directly on the MapR cluster and allows anyone to build robust data pipelines.

In this blog, we’ll walk through an example of building a real-time data pipeline from a MySQL database into MapR Streams, and even show how this data can be written to MapR-FS for batch or interactive analytics.

Prerequisites

For this example, I assume you have a MapR 5.1 cluster running and SDC properly installed and configured. Specific instructions for setting up SDC with MapR are provided in the StreamSets documentation.

Architecture of Our Use Case

Producer

The source will be a MySQL database that is running on my MapR cluster.

Note: MySQL could also run outside of the cluster; I just wanted to remove complexity in this architecture.

We will stream data from the clients table in that database and publish data to MapR Streams:

Consumer

Then we will stream data from a stream/topic to MapR-FS.

Set up the Environment

MySQL Database

First of all, we need to add the MySQL JDBC driver to StreamSets. Follow the instructions for installing additional drivers into SDC.

Now, we need to make sure that the user which is running StreamSets exists in the MySQL users’ table and has enough rights.

Log into MySQL using root from your node and enter the following queries:

>CREATE USER '<User_Running_Streamsets>'@'<Host_Running_Streamsets>' IDENTIFIED BY 'password';

>GRANT ALL PRIVILEGES ON *.* TO '<User_Running_Streamsets>'@'<Host_Running_Streamsets>'  WITH GRANT OPTION;

Then create a database crm and a table named clients:

>CREATE DATABASE crm;

>CREATE TABLE clients (ID INT, Name VARCHAR(10), Surname VARCHAR(10), City VARCHAR(10), Timestamp VARCHAR(10));

Now let’s add some clients into this table:

>INSERT INTO clients VALUES (1,'Velfre','Raphael','Paris','20160701');
>INSERT INTO clients VALUES (2,'Dupont','Jean','Paris','20160701');

STREAMS AND TOPICS

We will create a stream named clients and two topics:

>maprcli stream create -path /clients
>maprcli stream edit -path /clients -produceperm p -consumeperm p -topicperm p
>maprcli stream topic create -path /clients -topic clients_from_paris
>maprcli stream topic create -path /clients -topic clients_from_everywhere_else

StreamSets Runtime Properties

Runtime properties can be set up in a file locally and used in a pipeline. This is really useful in a production environment. Here we will set up three properties related to MySQL. Open $SDC_HOME/etc/sdc.properties and add the following, below the existing runtime.conf.location properties:

runtime.conf_MYSQL_HOST=jdbc:mysql://<Mysql_Host_IP>:3306/crm
runtime.conf_MYSQL_USER=root
runtime.conf_MYSQL_PWD=password

By doing this, you will be able to use the data pipeline that I developed.

Build StreamSets Pipelines

MySQL to MapR Streams

Log into StreamSets (port 18630), click Import Pipeline, and import Mysql_to_Streams.json:

Now you are able to the see the pipeline. Click on any component to see its configuration.

JDBC Consumer

This origin stage is used to query the MySQL database and to retrieve data.

JDBC Connection String
The Connection String is required to be able to connect to MySQL. Here we will be using the runtime property called “MYSQL_HOST” that we set up in $SDC_HOME/etc/sdc.properties.

Incremental Mode
Here we want our data pipeline to read new rows from our MySQL table as they are written. By checking this property, our pipeline will maintain the last value of the specified offset column to use in the next query.

SQL Query
We want to retrieve all data from the clients table. The WHERE clause is mandatory because we are running a pipeline and not a batch (like classic ETL). So the pipeline will run many small batch files based on the WHERE clause and offset column. Here we chose ID as the offset, since it will be unique and increment each time a client will be created.

Initial Offset
This is the initial value of the offset column.

Offset Column
As mentioned before, ID will be our offset column for this pipeline.

Query Interval
This component will run many batch files. Query interval is the time between two batches.

Stream Selector

The Stream Selector processor is used to dispatch data in many streams depending on one or more condition. Here I would like to split my clients that are from “Paris” from all the others using the “City” field from the clients table.

Records that satisfy condition 1 will be sent to the first output. Records that do not pass condition 1 will be streamed into the second (default) output.

MapR Streams Producer

Based on the Stream Selector output, data will be dispatched to two different topics in the /clients stream. Both of the MapR Streams Producers are configured the same way. Only the topic name changes.

Topic
Here we will produce data to topic clients_from_paris into the /clients stream.

Data Format
We will used delimited format and configure the format to be “Default CSV”:

MapR Streams to MapR-FS

Hit Import Pipeline, select Streams_to_MapRFS.json and click on Import.

MapR Streams Consumer

Consumer configuration is almost the same as the Producer, except for the following properties:

Consumer Group
Consumer group name that will retrieve data from topic clients_from_paris and the /clients stream.

MapR Streams Configuration
Here we would like to retrieve all data from the streams including data that has already been produced.

MapR-FS

File Prefix
This is the default, but it can be changed. It corresponds to the File Prefix that will be created by the pipeline.

Directory Template
Again, this is the default.
Folder /tmp will be located at MapR-FS root.

Note: here we are using the MapR-FS specific component, but we could also use the Local FS component since we can access MapR-FS from a node that is running NFS Gateway. In this case, the directory template would be: /mapr/<cluster_name>/tmp/out/…

Run the Data Integration Process

Go to the StreamSets home page, select the two pipelines and start them:

Now that both pipelines are running, the two records that we’ve created in the clients table should have been produced in our stream. Let’s confirm it using the streamanalyzer tool:

> mapr streamanalyzer -path /clients -topics clients_from_paris

Output should be:

Total number of messages: 2

We can also open the Mysql_to_Streams pipeline and look at the metrics information:

The same information is displayed in Streams_to_MapRFS:

For now, the output file at: maprfs://tmp/out/<timestamp>/ is hidden. This is controlled by the Idle Timeout property in the MapR-FS component – one hour is the default value.

The output file will remain hidden for one hour, or until the pipeline is stopped.

Note: this is not useful to MapR-FS, since it’s a full random R/W file system.

Now let’s add some data into the clients table:

>INSERT INTO clients VALUES (3,'Lee','Camille','Paris','20160702');
>INSERT INTO clients VALUES (4,'Petit','Emma','Paris','20160702');

Again, we can use streamanalyzer:

> mapr streamanalyzer -path /clients -topics clients_from_paris

Output should be:

Total number of messages: 4

And the metrics from the 2 pipeline should show 4 records:


Query the Data Using Drill

Here’s a quick reminder on how to query data with Drill. You have three tools to query data using Drill:

Here I will use Drill Explorer to query the data just injected into my cluster.

Set up the Storage Plugin

Since the output file generated on MapR-FS by SDC has no extension, we need to configure a default input format in the storage plugin page. Let’s update dfs Storage Plugins that is enabled by default:

“defaultInputFormat” : “csv” is the change

Query Data Using Drill Explorer

Open Drill Explorer and navigate into MapR-FS to find the output file. The output file should be located at:

Dfs.root > tmp > out > YYY-MM-DD-hh 

Then let’s click on this file:

If go into the SQL tab, you will see the query that has been executed.

If you want to execute more complex queries, you can do that via this tab:

Thanks to Drill, you are able to query data immediately after the data has been written, without any ETL process to build.

Conclusion

Our job is done. In this blog post, you learned how to use StreamSets Data Collector to easily integrate any data from your relational database with MapR Streams and subsequently MapR-FS, and even use Drill to query this data with ANSI SQL.

StreamSets Data Collector is open source. Download it now and start ingesting data today!

Back To Top

We use cookies to improve your experience with our website. Click Allow All to consent and continue to our site. Privacy Policy