skip to Main Content

The DataOps Blog

Where Change Is Welcome

Apache Spark: Where Nothing Can Be Something

By Posted in Engineering November 19, 2020

“Nothing is more real than nothing” – Samuel Beckett

This post is written by Jeff Evans, Senior Software Engineer, StreamSets.

While working on an issue for a prospect’s proof of concept pipeline using StreamSets Transformer, a modern Spark ETL engine, I came across a perplexing bug.  A certain column in the data that — by all accounts — should have been present was not.  I finally isolated the problem in our Join processor.  From there, I managed to fix the bug and unblock the proof of concept.  That part of my job is routine.  What I didn’t expect was to run headlong into a minor existential crisis.  More on that later.  For now, let’s take a short detour to explore the various ways in which data can be empty in Spark.

Consider the following spark-shell session.

// needed to get automatic conversion from Strings
import spark.implicits._
// simulate CSV data that has a header row, but nothing below that
val csvData = "id,name,phone"
// read the CSV data
val data = spark.read.option("header", "true").csv(Seq(csvData).toDS())
// print the data
data.show()
+---+----+-----+
| id|name|phone|
+---+----+-----+

As you can see, the Spark DataFrame API (which is what Transformer primarily uses), can easily represent data that has columns (i.e. a schema), but no rows.  This is perfectly sensible, and commonplace.  Imagine running a SELECT statement on a database table, in which the WHERE clause filters out all potential results.  We have zero rows in the result set, but the column definitions are still known by the driver and available to Spark.

However, there is another sense in which a DataFrame can be empty in Spark, one the Transformer engineering team has been grappling with since we started.  Consider the following:

val data = spark.read.option("header", "true").csv(Seq("").toDS())

data.show()
++
||
++
++

Here, we have data with no columns (or, said another way, an empty schema).  There are many scenarios in Spark where this can happen.  For instance, external systems can sometimes write completely empty CSV files (which is what this example shows).  Other times, users might selectively (and intentionally) drop columns from one branch of their processing, leaving nothing left.

To any particularly twisted readers, I know what you’re wondering, and the answer is yes.  It’s even possible to have rows with no columns.

val csvStr = """
   |foo
   |1
   |2
   |3
   |4
   |5
   |"""

val csvData = spark.read.option("header", "true").csv(csvStr.split("\n").toSeq.toDS())

csvData.drop("foo").show()
++
||
++
||
||
||
||
||
++

Rest assured, if something is possible in Spark, then our customer facing teams have seen it.

So how do you handle it?  Suppose you’re writing a Spark application, and you want to perform a left outer join from “device” records to “manufacturer” records.  Even if a corresponding manufacturer record can’t be found for any particular device, you still want processing to continue with only the device columns.  But if you have a schema-less DataFrame, where you’re expecting your manufacturer data to be, it will fail.

import org.apache.spark.sql.functions._
// some device data
val devicesCsv = """
   | id,name,manufacturer_id
   | 1,"Reticulator",42
   | 2,"Deglutenizer",19
   | """

val devicesData = spark.read.option("header", "true").csv(devicesCsv.split("\n").toSeq.toDS())

// oops, our manufacturer data has nothing
val manufacturersCsv = ""
val manufacturersData = spark.read.option("header", "true").csv(manufacturersCsv.split("\n").toSeq.toDS())

// so the join fails
devicesData.as("d").join(manufacturersData.as("m"), expr("d.manufacturer_id == m.id"), "left_outer")
org.apache.spark.sql.AnalysisException: cannot resolve '`m.id`' given input columns: [d.id, d.name, d.manufacturer_id]; line 1 pos 21;
'Join LeftOuter, (manufacturer_id#75 = 'm.id)

You can work around it by doing something like this:

val safelyJoined = if (manufacturersData.schema.fieldNames.contains("id")) {
   // the manufacturer id column exists on the right side, so we’re good to join
   devicesData.as("d").join(manufacturersData.as("m"), expr("d.manufacturer_id == m.id"), "left_outer")
} else {
   // the id column doesn't exist, so just return the devices without joining
   devicesData
}

safelyJoined.show()
+---+-------------+---------------+
| id|         name|manufacturer_id|
+---+-------------+---------------+
|  1|  Reticulator|             42|
|  2| Deglutenizer|             19|
+---+-------------+---------------+

// still works the way we want if there IS manufacturer data available
val manufacturersCsv = """
   | id,name,phone
   | 42,"Initech","555-1234"
   | """

val manufacturersData = spark.read.option("header", "true").csv(manufacturersCsv.split("\n").toSeq.toDS())

// repeat same step as above for val safelyJoined

safelyJoined.show()
+---+-------------+---------------+----+-------+--------+
| id|         name|manufacturer_id|  id|   name|   phone|
+---+-------------+---------------+----+-------+--------+
|  1|  Reticulator|             42|  42|Initech|555-1234|
|  2| Deglutenizer|             19|null|   null|    null|
+---+-------------+---------------+----+-------+--------+

With that change, you can meet the requirement that a left join should succeed even if the right side has nothing in its schema.  Now, I seriously doubt that this is what Samuel Beckett had in mind when he penned Malone Meurt.  But I still believe he touched on something significant.

Back to Transformer’s Join processor.  Most of the time, our users want joins like this to succeed.  That’s why we built this logic into our Join processor in the first place.  In fact, all of our processors are designed to handle both types of emptiness in a frictionless and unsurprising manner.  But after fixing this particular bug, I realized that we needed to handle joining with emptiness in a more rigorous way.  We had to truly embrace the emptiness of our world and handle it, instead of hiding or hand waving it away.  This meant enumerating all possibilities and writing automated tests to ensure we were doing the right thing in each case.  So that’s exactly what we did. (See details below.)

Type Left has columns Left has rows Right has columns Right has rows* Number of output rows Output schema
CROSS N N N N 0 empty                 
N N Y N 0 right cols            
N N Y Y 0 right cols            
Y N N N 0 left cols             
Y N Y N 0 all cols (dupe id)    
Y N Y Y 0 all cols (dupe id)    
Y Y N N 0 left cols             
Y Y Y N 0 all cols (dupe id)    
Y Y Y Y (num left * num right) all cols (dupe id)    
INNER N N N N 0 empty                 
N N Y N 0 right cols            
N N Y Y 0 right cols            
Y N N N 0 left cols             
Y N Y N 0 all cols (left id)    
Y N Y Y 0 all cols (left id)    
Y Y N N 0 left cols             
Y Y Y N 0 all cols (left id)    
Y Y Y Y (num left) all cols (left id)    
FULL OUTER N N N N 0 empty                 
N N Y N 0 right cols            
N N Y Y (num left) right cols            
Y N N N 0 left cols             
Y N Y N 0 all cols (left id)    
Y N Y Y (num right) all cols (left id)    
Y Y N N (num left) left cols             
Y Y Y N (num left) all cols (left id)    
Y Y Y Y (num left) all cols (left id)    
LEFT ANTI N N N N 0 empty                 
N N Y N 0 empty                 
N N Y Y 0 empty                 
Y N N N 0 left cols             
Y N Y N 0 left cols             
Y N Y Y 0 left cols             
Y Y N N (num left) left cols             
Y Y Y N (num left) left cols             
Y Y Y Y 0 left cols             
LEFT OUTER N N N N 0 empty                 
N N Y N 0 right cols            
N N Y Y 0 right cols            
Y N N N 0 left cols             
Y N Y N 0 all cols (left id)    
Y N Y Y 0 all cols (left id)    
Y Y N N (num left) left cols             
Y Y Y N (num left) all cols (left id)    
Y Y Y Y (num left) all cols (left id)    
LEFT SEMI N N N N 0 empty                 
N N Y N 0 left cols             
N N Y Y 0 left cols             
Y N N N 0 left cols             
Y N Y N 0 left cols             
Y N Y Y 0 left cols             
Y Y N N 0 left cols             
Y Y Y N 0 left cols             
Y Y Y Y (num left) left cols             
RIGHT ANTI N N N N 0 empty                 
N N Y N 0 right cols            
N N Y Y (num right) right cols            
Y N N N 0 empty                 
Y N Y N 0 right cols            
Y N Y Y (num right) right cols            
Y Y N N 0 empty                 
Y Y Y N 0 right cols            
Y Y Y Y 0 right cols            
RIGHT OUTER N N N N 0 empty                 
N N Y N 0 right cols            
N N Y Y (num right) right cols            
Y N N N 0 left cols             
Y N Y N 0 all cols (right id)   
Y N Y Y (num right) all cols (right id)   
Y Y N N 0 left cols             
Y Y Y N 0 all cols (right id)   
Y Y Y Y (num right) all cols (right id)   

*assume each right row matches a left row when both are present

This behavior is controlled by a switch, which you can disable if you want to use Spark’s stricter version.

Conclusion

When it comes to selecting the best tools for handling your data challenges, the number and quality of connectors matters.  Of course, StreamSets Transformer offers dozens of connectors across eight different Spark vendors, and we are expanding that on a regular basis. However, we believe that the low level “nuts and bolts” matter just as much.  This improvement is just one small step of many that we’ve taken to ensure that the core processing logic of Transformer is sound, reliable, and production ready.

Learn more about StreamSets Transformer to leverage the power of Apache Spark for ETL data pipelines.

Back To Top