Today’s post is from Raphaël Velfre, a senior data engineer at MapR. Raphaël has spent some time working with StreamSets Data Collector (SDC) and MapR’s Converged Data Platform. In this blog entry, originally published on the MapR Converge blog, Raphaël explains how to use SDC to extract data from MySQL and write it to MapR Streams, and then move data from MapR Streams to MapR-FS via SDC, where it can be queried with Apache Drill.
A very common use case for the MapR Converged Data Platform is collecting and analyzing data from a variety of sources, including traditional relational databases. Until recently, data engineers would build an ETL pipeline that periodically walks the relational database and loads the data into files on the MapR cluster, then perform batch analytics on that data.
This model breaks down when use cases demand more instant access to that same data, in order to make a decision, raise an alert, or make an offer, since these batch pipelines are often scheduled to run hourly or even daily. To get to real-time data processing one must build a real-time data pipeline that is continuously collecting the latest data. Building a real-time data pipeline doesn’t mean you have to give up batch analytics, since you can always write data from the streaming pipeline to files or tables, but you can’t do real-time analytics using a batch pipeline.
To build a real-time data pipeline, you should start with MapR Streams, the publish/subscribe event streaming service of the MapR Converged Data Platform, as it is the most critical component to handle distribution of real-time data between applications.
Next, a tool is needed to extract the data out of the database and publish it into MapR Streams, as well as take data out of MapR Streams and write it to files or database tables. StreamSets Data Collector (SDC) is an open source, easy to use, GUI-based tool that runs directly on the MapR cluster and allows anyone to build robust data pipelines.
In this blog, we’ll walk through an example of building a real-time data pipeline from a MySQL database into MapR Streams, and even show how this data can be written to MapR-FS for batch or interactive analytics.
For this example, I assume you have a MapR 5.1 cluster running and SDC properly installed and configured. Specific instructions for setting up SDC with MapR are provided in the StreamSets documentation.
Architecture of Our Use Case
The source will be a MySQL database that is running on my MapR cluster.
Note: MySQL could also run outside of the cluster; I just wanted to remove complexity in this architecture.
We will stream data from the clients table in that database and publish data to MapR Streams:
Then we will stream data from a stream/topic to MapR-FS.
Set up the Environment
First of all, we need to add the MySQL JDBC driver to StreamSets. Follow the instructions for installing additional drivers into SDC.
Now, we need to make sure that the user which is running StreamSets exists in the MySQL users’ table and has enough rights.
Log into MySQL using root from your node and enter the following queries:
>CREATE USER '<User_Running_Streamsets>'@'<Host_Running_Streamsets>' IDENTIFIED BY 'password'; >GRANT ALL PRIVILEGES ON *.* TO '<User_Running_Streamsets>'@'<Host_Running_Streamsets>' WITH GRANT OPTION;
Then create a database
crm and a table named
>CREATE DATABASE crm; >CREATE TABLE clients (ID INT, Name VARCHAR(10), Surname VARCHAR(10), City VARCHAR(10), Timestamp VARCHAR(10));
Now let’s add some clients into this table:
>INSERT INTO clients VALUES (1,'Velfre','Raphael','Paris','20160701'); >INSERT INTO clients VALUES (2,'Dupont','Jean','Paris','20160701');
STREAMS AND TOPICS
We will create a stream named
clients and two topics:
>maprcli stream create -path /clients >maprcli stream edit -path /clients -produceperm p -consumeperm p -topicperm p >maprcli stream topic create -path /clients -topic clients_from_paris >maprcli stream topic create -path /clients -topic clients_from_everywhere_else
StreamSets Runtime Properties
Runtime properties can be set up in a file locally and used in a pipeline. This is really useful in a production environment. Here we will set up three properties related to MySQL. Open
$SDC_HOME/etc/sdc.properties and add the following, below the existing runtime.conf.location properties:
runtime.conf_MYSQL_HOST=jdbc:mysql://<Mysql_Host_IP>:3306/crm runtime.conf_MYSQL_USER=root runtime.conf_MYSQL_PWD=password
By doing this, you will be able to use the data pipeline that I developed.
Build StreamSets Pipelines
MySQL to MapR Streams
Log into StreamSets (port 18630), click Import Pipeline, and import
Now you are able to the see the pipeline. Click on any component to see its configuration.
This origin stage is used to query the MySQL database and to retrieve data.
JDBC Connection String
The Connection String is required to be able to connect to MySQL. Here we will be using the runtime property called “MYSQL_HOST” that we set up in
Here we want our data pipeline to read new rows from our MySQL table as they are written. By checking this property, our pipeline will maintain the last value of the specified offset column to use in the next query.
We want to retrieve all data from the clients table. The
WHERE clause is mandatory because we are running a pipeline and not a batch (like classic ETL). So the pipeline will run many small batch files based on the
WHERE clause and offset column. Here we chose ID as the offset, since it will be unique and increment each time a client will be created.
This is the initial value of the offset column.
As mentioned before, ID will be our offset column for this pipeline.
This component will run many batch files. Query interval is the time between two batches.
The Stream Selector processor is used to dispatch data in many streams depending on one or more condition. Here I would like to split my clients that are from “Paris” from all the others using the “City” field from the clients table.
Records that satisfy condition 1 will be sent to the first output. Records that do not pass condition 1 will be streamed into the second (default) output.
MapR Streams Producer
Based on the Stream Selector output, data will be dispatched to two different topics in the
/clients stream. Both of the MapR Streams Producers are configured the same way. Only the topic name changes.
Here we will produce data to topic
clients_from_paris into the
We will used delimited format and configure the format to be “Default CSV”:
MapR Streams to MapR-FS
Hit Import Pipeline, select
Streams_to_MapRFS.json and click on Import.
MapR Streams Consumer
Consumer configuration is almost the same as the Producer, except for the following properties:
Consumer group name that will retrieve data from topic
clients_from_paris and the
MapR Streams Configuration
Here we would like to retrieve all data from the streams including data that has already been produced.
This is the default, but it can be changed. It corresponds to the File Prefix that will be created by the pipeline.
Again, this is the default.
/tmp will be located at MapR-FS root.
Note: here we are using the MapR-FS specific component, but we could also use the Local FS component since we can access MapR-FS from a node that is running NFS Gateway. In this case, the directory template would be:
Run the Data Integration Process
Go to the StreamSets home page, select the two pipelines and start them:
Now that both pipelines are running, the two records that we’ve created in the clients table should have been produced in our stream. Let’s confirm it using the streamanalyzer tool:
> mapr streamanalyzer -path /clients -topics clients_from_paris
Output should be:
Total number of messages: 2
We can also open the Mysql_to_Streams pipeline and look at the metrics information:
The same information is displayed in Streams_to_MapRFS:
For now, the output file at:
maprfs://tmp/out/<timestamp>/ is hidden. This is controlled by the Idle Timeout property in the MapR-FS component – one hour is the default value.
The output file will remain hidden for one hour, or until the pipeline is stopped.
Note: this is not useful to MapR-FS, since it’s a full random R/W file system.
Now let’s add some data into the clients table:
>INSERT INTO clients VALUES (3,'Lee','Camille','Paris','20160702'); >INSERT INTO clients VALUES (4,'Petit','Emma','Paris','20160702');
Again, we can use streamanalyzer:
> mapr streamanalyzer -path /clients -topics clients_from_paris
Output should be:
Total number of messages: 4
And the metrics from the 2 pipeline should show 4 records:
Query the Data Using Drill
Here’s a quick reminder on how to query data with Drill. You have three tools to query data using Drill:
Here I will use Drill Explorer to query the data just injected into my cluster.
Set up the Storage Plugin
Since the output file generated on MapR-FS by SDC has no extension, we need to configure a default input format in the storage plugin page. Let’s update dfs Storage Plugins that is enabled by default:
“defaultInputFormat” : “csv” is the change
Query Data Using Drill Explorer
Open Drill Explorer and navigate into MapR-FS to find the output file. The output file should be located at:
Dfs.root > tmp > out > YYY-MM-DD-hh
Then let’s click on this file:
If go into the SQL tab, you will see the query that has been executed.
If you want to execute more complex queries, you can do that via this tab:
Thanks to Drill, you are able to query data immediately after the data has been written, without any ETL process to build.
Our job is done. In this blog post, you learned how to use StreamSets Data Collector to easily integrate any data from your relational database with MapR Streams and subsequently MapR-FS, and even use Drill to query this data with ANSI SQL.