When working with data lakes it is common to find complex nested data structures. The goal of a Spark application is often to start with the unstructured and often messy data of a data lake and create a clean simple fact table out of it which then would be suitable for easy further processing by analytics applications, such as machine learning models. In fact this kind of data cleaning is what most data scientists do most of the time.

Nesting StructTypes in each other

One of the most common patterns that we can observe in these these informal structures, is a JSON-like data structure, where we have record structures nested in arrays, which themselves are nested in more arrays and record structures. Nesting a record structure in another record structure is an easy case, because we can very easily process a Spark schema such as the following:

  val jsonStr =
    """{ "job": "cashier", 
      |"employee":{ 
      |    "name": "joe", 
      |    "address": "100 main st" 
      |  }
      |}""".stripMargin

  import spark.implicits._
  val ds = spark.read.json(Seq(jsonStr).toDS)
  ds.printSchema()

  ds.select("job", "employee.name", "employee.address").show()

OUTPUT:
root
 |-- employee: struct (nullable = true)
 |    |-- address: string (nullable = true)
 |    |-- name: string (nullable = true)
 |-- job: string (nullable = true)

+-------+----+-----------+
|    job|name|    address|
+-------+----+-----------+
|cashier| joe|100 main st|
+-------+----+-----------+

Nesting inside an array

This is easy because the cardinality doesn’t change. We simply index into the structure as employee.name. Things get much more complicated when include collections. For example, let’s add the requirement that we will keep track of all employees historically who held a position, not only the current one. We can no longer convert this to a flat structure easily. Let’s consider the following example:

   val jsonStr =
    """{ "job": "cashier",
      |"employees":[
      |  {
      |    "name": "joe",
      |    "address": "100 main st"
      |  },
      |  {
      |    "name": "mary",
      |    "address": "200 main st"
      |  }
      |]
      |}""".stripMargin
  import spark.implicits._
  val ds = spark.read.json(Seq(jsonStr).toDS)
  ds.printSchema()
  ds.select($"job", explode($"employees") as "employee")
    .select("job", "employee.name", "employee.address")
    .show()
OUTPUT:
root
 |-- employees: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- address: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- job: string (nullable = true)

+-------+----+-----------+
|    job|name|    address|
+-------+----+-----------+
|cashier| joe|100 main st|
|cashier|mary|200 main st|
+-------+----+-----------+


Nesting arrays inside arrays

So Spark can handle arrays in the document. It will rotate the array and create rows out of and inner join those rows to the main rows. We might get excited that we figured it all out and that we can process any number of arrays in a document, but once we try to nest another array, inside an array, it will become obvious that we don’t have the full solution yet. Change the document to the following structure, which has two arrays nested in each other, employees and addresses:

val jsonStr =
    """{ "job": "cashier",
      |"employees":[
      |  {
      |    "name": "joe",
      |    "addresses": ["100 main st", "100 broadway"]
      |  },
      |  {
      |    "name": "mary",
      |    "addresses": ["200 main st", "200 broadway"]
      |  }
      |]
      |}""".stripMargin

//WON'T WORK:
ds.select($"job", explode($"employees") as "employee", explode($"employee.addresses") as "address")

This will produce the following error message: Only one generator allowed per select clause but found 2: explode(employees), explode(employee.addresses). This is because nested generators are not allowed. If we change the command from select to selectExpr for each nested array, then we can work around this problem. The following will work for any level of nesting:

import spark.implicits._
  val ds = spark.read.json(Seq(jsonStr).toDS)
  ds.printSchema()
  ds.select($"job", explode($"employees") as "employee")
    .selectExpr("job", "employee", "explode(employee.addresses) as address")
    .select("job", "employee.name", "address")
    .show()

OUTPUT:
root
 |-- employees: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- addresses: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |-- job: string (nullable = true)

+-------+----+------------+
|    job|name|     address|
+-------+----+------------+
|cashier| joe| 100 main st|
|cashier| joe|100 broadway|
|cashier|mary| 200 main st|
|cashier|mary|200 broadway|
+-------+----+------------+