Dataflow Performance Blog

Replicating Relational Databases with StreamSets Data Collector

HiveDrift2StreamSets Data Collector has long supported both reading and writing data from and to relational databases via Java Database Connectivity (JDBC). While it was straightforward to configure pipelines to read data from individual tables, ingesting records from an entire database was cumbersome, requiring a pipeline per table. StreamSets Data Collector (SDC) 2.3.0.0 introduces the JDBC Multitable Consumer, a new pipeline origin that can read data from multiple tables through a single database connection. In this blog entry, I'll explain how the JDBC Multitable Consumer can implement a typical use case – replicating an entire relational database into Hadoop.

Installing a JDBC Driver

The first task is to install the JDBC driver corresponding to your database. Follow the Additional Drivers documentation carefully; this is quite a delicate process and any errors in configuration will prevent the driver from being loaded.

In the sample below, I use MySQL, but you should be able to ingest data from any relational database, as long as it has a JDBC driver.

Replicating a Database

I used the MySQL retail_db database included in the Cloudera Quickstart VM as my sample data source. It contains 7 tables:

Although this is a simple schema, the sample database contains a significant amount of data – over a quarter of a million rows in total. My goal is to replicate the entire database – every table, every row – into an Apache Hive data warehouse.

I created a new pipeline, and dropped in the JDBC Multitable Consumer. The key piece of configuration here is the JDBC Connection String. In my case, this was jdbc:mysql://pat-retaildb.my-rds-instance.rds.amazonaws.com:3306/retail_db, but you'll need to change this to match your database.

I used the default values for the remainder of the JDBC tab – see the documentation for an explanation of these items, in particular, batch strategy.

On the Tables tab, I used a single table configuration, setting JDBC schema to retail_db, and left the table name pattern with its default value, %, to match all tables in the database. You can create multiple table configurations, each with its own table name pattern, to configure whichever subset of tables you require. Since each of the tables in the sample database has a suitable primary key, I didn't need to configure any offset columns, but you have the option to do so if necessary. The documentation describes table configurations in some detail, and is worth reading carefully.

One MySQL-specific note: the default transaction isolation level for MySQL InnoDB tables is REPEATABLE READ. This means that repeating the same SELECT statement in the same transaction gives the same results. Since the JDBC Multitable Consumer repeatedly queries MySQL tables, and we want to pick up changes between those queries, I set ‘Transaction isolation' to ‘Read committed' in the consumer's Advanced tab.

With the origin configured, I was able to preview data. I checked ‘Show Record/Field Header' in Preview Configuration so I could see the attributes that the origin sets:

Note in particular the jdbc.tables attribute – every record carries with it the name of its originating table. Note also that, in preview mode, the origin reads only the first table that matches the configuration.

Transforming Data

When we load data from transactional databases into the data warehouse, we often want to filter out personally identifiable information (PII). Looking at the customers table, it has columns for first name, last name, email, password and street address. I don't want those in the data warehouse – for my analyses, customer id, city, state and ZIP Code suffice.

I separated out customer records in the pipeline with a Stream Selector with a condition

A Field Remover then stripped the PII from the customer records

Writing Data to Hive

The combination of the Hive Metadata Processor and the Hadoop FS and Hive Metastore destinations allow us to write data to Hive without needing to predefine the Hive schema. The Hive Metadata Processor examines the schema of incoming records and reconciles any difference with the corresponding Hive schema, emitting data records for consumption by the Hadoop FS destination and metadata records for the Hive Metastore destination. See Ingesting Drifting Data into Hive and Impala for a detailed tutorial on setting it all up.

Configuration of the three stages mostly involves specifying the Hive JDBC URL and Hadoop FS location, but there is one piece of ‘magic': I set the Hive Metadata Processor's Table Name to retaildb-${record:attribute('jdbc.tables')}. This tells the processor to use each record's table name attribute, as we saw in the preview above, to build the destination Hive table name. I'm prefixing the Hive table names with retaildb- so I can distinguish the tables from similarly named tables that might come from other upstream databases.

Since I was building a sample system, I changed the Hadoop FS destination's Idle Timeout from its default ${1 * HOURS} to ${1 * MINUTES} – I was more interested in being able to quickly see records appearing in Hive, rather than maximizing the size of my output files!

Refreshing Impala's Metadata Cache

As it is now, this pipeline would read all of the data from MySQL and write it to Hive, but, if we were to use Impala to query Hive, we would not see any data. This is because we are writing data directly into Hadoop FS and metadata into Hive, we need to send Impala the INVALIDATE METADATA statement so that it reloads the metadata on the next query. We can do this automatically using SDC's Event Framework. I just followed the Impala Metadata Updates for HDS case study, adding an Expression Evaluator and Hive Query Executor to automatically send INVALIDATE METADATA statements to Impala when closing a data file or changing metadata. Note that you'll have to specify the Impala endpoint in the Hive Query Evaluator's JDBC URL – for my setup, it was jdbc:hive2://node-2.cluster:21050/;auth=noSasl.

Events

Once my pipeline was configured, I was able to run it. After a few minutes, the pipeline was quiet:

Ingest Multitable

I was able to check that all the rows had been ingested by running SELECT COUNT(*) FROM tablename for each table in both MySQL and Impala:

Continuous Replication

A key feature of the JDBC Multitable Consumer origin is that, like almost all SDC origins, it runs continuously. The origin retrieves records from each table in turn according to the configured interval, using a query of the form:

In my sample, this will result in any new rows created in MySQL being copied across to Hive. Since I configured the Hadoop FS destination's idle timeout to just one minute, I was able to quickly see data appear in the Hive table.

This short video highlights the key points in configuring the use case, and shows propagation of new rows from MySQL to Hive:

Conclusion

The JDBC Multitable Consumer allows a single StreamSets Data Collector pipeline to continuously ingest data from any number of tables in a relational database. Combined with the Hive Drift Solution you can quickly create powerful data pipelines to implement data warehouse use cases.

Pat PattersonReplicating Relational Databases with StreamSets Data Collector
  • Dima

    For MySQL subscribing to changes for several tables can done using MySQLBinaryLog origin with less load on source database.

    • http://blog.superpat.com/ Pat Patterson

      You’re absolutely right – I used incremental mode here for simplicity.

  • Shoaib Akhtar

    Bit off the topic but is there any plan to add a JMSMultiQueue and JMSMultiTopic Consumer any time soon?