skip to Main Content

Streaming Analysis Using Spark ML in StreamSets DataOps Platform

By Posted in Data Integration April 9, 2020

Learn how to load a serialized Spark ML model stored in MLeap bundle format on Databricks File System (DBFS), and use it for classification on new, streaming data flowing through the StreamSets DataOps Platform.

In my previous blogs, I illustrated how easily you can extend the capabilities of StreamSets Transformer using Scala and PySpark. If you have not perused blogs train Spark ML Random Forest Regressor model, serialize the trained model,  train Logistic Regression NLP model, I highly recommend it before proceeding because this blog builds upon them.

Ok, let’s get right to it!

Watch It In Action

If you’d like to see everything in action, checkout this short demo video—there’s no audio though… please continue reading this blog if you’d like to know the technical details. :)

 

Streaming Data: Twitter to Kafka

I’ve designed this StreamSets Data Collector pipeline to ingest and transform tweets, and store them in Kafka. This pipeline is the main source of our streaming data that we will perform sentiment analysis on in the second pipeline.

Pipeline overview:

  • Ingest
    • Query tweets from Twitter using its Search API for #quarantinelife using HTTP Client origin in polling mode.
  • Transform 
  • Store
    • The transformed tweet records are sent to Apache Kafka destination.

Here’s an example of the original Twitter Search API response as ingested by the HTTP Client origin.

And here’s an example of the transformed tweet written to Kafka.

Classification on Streaming Data: Kafka to Spark ML Model to Databricks

As detailed in my previous blogs, let’s assume that you have trained and serialized a model in MLeap bundle format, and it’s stored on DBFS as shown below.

Next up… I’ve designed this StreamSets Transformer pipeline running on Databricks cluster.

Pipeline overview:

  • Ingest
    • Transformed tweet records are read from Apache Kafka–from the same topic as written to by the first pipeline.
  • Transform 
    • Scala processor loads the Spark ML model (/dbfs/dash/ml/spark_nlp_model.zip) and classifies each tweet. A value of 1 indicates positive sentiment and 0 indicates negative sentiment. (*See code snippet below.)
  • Store 
    • Each tweet record along with its classification is stored on DBFS in Parquet format for querying and further analysis. The DBFS location in this case is /dash/nlp/.

*Below is the Scala code inserted into Scala processor >> Scala tab >> Scala Code section. 

import spark.implicits._
import scala.collection.mutable.Buffer

import org.apache.spark.ml.feature.VectorAssembler
import ml.combust.bundle._
import ml.combust.mleap.spark.SparkSupport._
import ml.combust.mleap.runtime.MleapSupport._
import org.apache.spark.ml.bundle.SparkBundleContext

var df = inputs(0)

if (df.columns.contains("text")) {
  // Load MLeap bundle to make predictions on new data
  val saveModelZipPath = "/dbfs/dash/ml/spark_nlp_model.zip"
  val bundle = BundleFile("jar:file:" + saveModelZipPath)
  var loadedMLeapBundle = bundle.loadMleapBundle().get.root
  bundle.close()

  // Return original/input data/features + respective predictions
  output = loadedMLeapBundle.sparkTransform(df.select("text")).select("text","prediction")
} else {
  output = df
}

It basically takes the input data(frame) and if it contains column “text” (tweet), it loads the NLP model (“spark_nlp_model.zip”) and classifies each tweet. Then it creates a new dataframe with just the tweet and its classification stored in “prediction” column. (Note that you could also pass along/include all columns present in the input dataframe instead of just the two–“text” and “prediction”.)

Analysis on Databricks

Once the tweets, along with their classification, are stored on the Databricks File System, they’re ready for querying in Databricks Notebook.

Query the tweets and their classification

Here I’ve created a dataframe that reads all the Parquet files output by the second pipeline in DBFS location /dash/nlp/ and shows what the data looks like.

Create temp table and aggregate data

Here I’ve created a temp table that reads the same data stored in /dash/nlp/ DBFS location and an aggregate query showing total number of positive tweets vs negative tweets.

Structured  Streaming

In the demo video, I’ve also shown how to create and run structured streaming query in Databricks to auto-update the counts–total number of positive and negative sentiment tweets–without having to manually refresh the source dataframe as new data is flowing in from the second pipeline.

Good News!

Based on my model and the data I’ve collected, there appears to be more positive sentiments than negative sentiments when it comes to #quarantinelife hashtag. That is  something to feel good about! :)

In all honesty and fairness though, it goes without saying that the model accuracy depends on the size and quality of the training and test datasets as well as feature engineering and hyperparameter tuning–which isn’t exactly the point of this blog; rather to showcase how StreamSets DataOps Platform can be used and extended for variety of use cases.

Learn more about StreamSets for Databricks and StreamSets DataOps Platform which is available on Microsoft Azure Marketplace and AWS Marketplace.

Conduct Data Ingestion and Transformations In One Place

Deploy across hybrid and multi-cloud
Schedule a Demo
Back To Top