Data Collecting for Snowflake

Data Collecting for Snowflake

Mike Fuller, a consultant at Red Pill Analytics, has been working on ingesting data into Snowflake's cloud data warehouse using StreamSets for Snowflake. In this guest blog post, Mike explains how he was able to replicate an Oracle database to Snowflake using the new functionality, both for initial load and with change data capture.

StreamSets Data Collector offers incredible flexibility while creating data integration pipelines by allowing developers to either create a custom stage (an origin, processor, executor, or destination) completely from scratch, tailor a universal stage to a specific requirement, or use a dedicated stage for a particular technology, when available. In the past, the most straightforward method to load data in Snowflake Data Warehouse using Data Collector was to use some combination of JDBC and AWS or Azure stages, depending on specific requirements.

As of January 16, 2019, StreamSets for Snowflake now includes a Snowflake Data Warehouse destination, an optimized and fully supported stage to load data into Snowflake. The addition of a dedicated Snowflake destination simplifies configuration which expedites development and opens the door for getting the most out of your Data Collector+Snowflake integration.

Snowflake Stage
An Exciting Sight for StreamSets Data Collector Developers

Snowflake Destination

StreamSets officially announced general availability of the new Snowflake destination through a press release that can be found on the StreamSets website. Further, instructions on how to set up and use the new destination are included in the Data Collector Documentation.

The new Data Collector Snowflake destination includes all of the expected settings required to connect to Snowflake. Configurable items such as region, account, database, schema, warehouse, stage, etc. should all be familiar to Snowflake users.

Looking beyond the basics, a few of the notable items to call out right away include the ability to automatically create Snowflake tables that do not yet exist, the ability to detect and accommodate data drift, and the option to use Snowpipe for loading data asynchronously.

Data Replication

A common use case for loading Snowflake is to replicate data from an OLTP database so that it can be used for reporting and analytics. With Oracle databases being one of the most common on the market, replicating an Oracle database in Snowflake is a great candidate to get our hands dirty with the new Snowflake destination. What follows is an explanation of how to use StreamSets Data Collector to replicate data from an Oracle database into Snowflake Data Warehouse.

Prerequisites

A few pre-requisites are required to get started:

The Historical Load

There are almost always two parts to data replication: gathering what exists in the database today (historical load) and replicating transactions as records are created, updated, or deleted in the database going forward (change data capture). The historical load can be completed in a variety of ways; for this exercise, the JDBC Multitable Consumer is a good option.

Create a New Pipeline

First, create a pipeline with a JDBC Multitable Consumer origin and a Snowflake destination. We’ll also add a Pipeline Finisher executor so that the pipeline will stop after it loads all of the available data. After all stages are added and connected, the pipeline should look like this (don’t sweat the validation issues, they will be cleared up shortly):

Snowflake Pipeline

Configure: JDBC Multitable Consumer Origin

Within the JDBC Multitable Consumer stage under JDBC, update the Oracle database connection string and change the Max Batch Size (Records) parameter to 50000. NoteMake sure the production.maxBatchSize parameter has been set to 50000 or greater in the $SDC_CONF/sdc.properties file. There are other parameters that may be candidates to tweak but we’ll stick with the basics for now. Also be sure to enter the appropriate database credentials.

Snowflake JDBC Config

On the Tables tab, configure the origin to include tables that are of interest. In this case, I have a schema called ‘TRT that contains a handful of tables with financial brokerage data. There are a few audit tables that don’t provide any value so I’ve decided to exclude those from being replicated by using the java regex AUDIT.* in the table exclusion pattern. By using a % wildcard as the table name pattern, everything else comes along for the ride.

Snowflake Tables Config

Configure: Snowflake Destination

Moving on to the Snowflake configuration: set the region, account, and enter the user id and password on the Snowflake Connection Info tab. On the Snowflake tab, set the warehouse, database, and schema to the relevant values and then enter ${record:attribute('jdbc.tables')} in the Table field to instruct Data Collector to use the name of the table that exists in the source database. Check the boxes for Upper Case Schema & Field Names, Data Drift Enabled, and Table Auto Create.

Snowflake Config

Under Staging, enter the name of the external stage for Data Collector to use as a staging location and provide an AWS key pair. I have also chosen to purge the files after ingestion to keep from unnecessarily filling up the S3 bucket.

Snowflake Staging Config

Configure: Pipeline Finisher

Set a precondition on the Pipeline Finisher executor to only be executed after all data has been loaded by entering ${record:eventType()=='no-more-data'} .

Snowflake Finisher Config

The pipeline for the historical load of data in the source Oracle database is now ready to go. After validating to be sure, click Run to start processing. Keep in mind that no target tables have been created in Snowflake up front.

Snowflake Run Pipeline

After loading all data that exists in the source database, the Pipeline Finisher is called to stop the pipeline.

Snowflake Finished

Review Snowflake

Now that the pipeline has finished, a review of Snowflake shows that all of the tables have been created and loaded with data.

Snowflake Loaded

At this point, it makes sense to run through a series of validation steps; for example, check that all tables have been created correctly, the row counts match, compare sample records, so on and so forth. After everything checks out, it’s time to move on to CDC.

Change Data Capture

Once all of the existing data has been loaded, the next step is to set up Data Collector to replicate changes in the source database as transactions occur. Change data capture (CDC) is typically performed by reading the database change logs and StreamSets is no different. There are several mechanisms that can be used to ensure no gaps in processing; however, the source Oracle database is in a controlled environment so none of that will be necessary for this exercise.

Create CDC Pipeline

Create a new pipeline with an Oracle CDC Client origin and a Snowflake destination.

Snowflake CDC Pipeline

Configure: Oracle CDC Client

The Oracle CDC client now needs to be configured to connect to the same Oracle database that has been loaded with historical data. On the Oracle CDC tab, use the same schema and table matching/exclusion criteria as the historical load. In this case, TRT, %, and AUDIT.*, respectively. Also set the appropriate options based on the specific setup (dictionary source, db timezone, etc.)

Snowflake Oracle CDC Config

Enter the connection string the JDBC tab, set the Max Batch Size (records) to 50000, and provide the user id and password under Credentials.

Snowflake Oracle JDBC Config

Configure: Snowflake

The Snowflake destination needs to be configured exactly as it was for the historical load with a few exceptions:

Enter ${record:attribute('oracle.cdc.table)} in the Table name under Snowflake to allow Data Collector to detect the table name automatically and ensure that Table Auto Create and Data Drift Enabled are checked.

Snowflake CDC Config

Complete the configurations for Staging and then move to the Data tab. Check the box to enable CDC and enter the key columns for all tables. If there are several tables, bulk edit mode may come in handy. Enabling CDC will instruct Data Collector to generate SQL merge statements to handle the CRUD commands as the Oracle database logs are processed by StreamSets and written to Snowflake.

Snowflake CDC Data Config

That’s it. Fire up the CDC pipeline and start making some changes in the source database. Inserting a new record is the obvious first test:

Data Collector shows the record was processed:

Snowflake CDC Run

And Snowflake returns our test record:

Snowflake Test Record

Now for an update:

Snowflake received the update and reflects the change:

Snowflake Update

The row is also deleted from Snowflake:

As the evidence shows, Data Collector is mining the Oracle database logs, processing the transactions, and replicating changes in Snowflake in real time.

DevOps and Snowflake

By partnering with Snowflake, StreamSets brings DevOps data integration to the premier data warehouse built for the cloud. Combining StreamSets Data Collector with Snowflake Data Warehouse has always been a leading choice for Red Pill Analytics and we’re looking forward to being able to advise our customers that StreamSets can now be integrated with Snowflake in an officially supported capacity.

Need Help?

Red Pill Analytics is a Snowflake Solutions Partner as well as a StreamSets Partner. From proof-of-concept to implementation to training your users, we can help. If you are interested in guidance while working with Snowflake, StreamSets, or any of your data or analytics projects, feel free to reach out to us any time on the Red Pill website or find us on TwitterFacebook, and LinkedIn.

Related Resources

Visit the Resource Library

Related Blog Posts

Receive Updates

Receive Updates

Join our mailing list to receive the latest news from StreamSets.

You have Successfully Subscribed!