jeudi 22 juin 2017

Polymorphism with Spark / Scala, Datasets and case classes

We are using Spark 2.x with Scala for a system that has 13 different ETL operations. 7 of them are relatively simple and each driven by a single domain class, and differ primarily by this class and some nuances in how the load is handled.

A simplified version of the load class is as follows, for the purposes of this example say that there are 7 pizza toppings being loaded, here's Pepperoni:

object LoadPepperoni {
  def apply(inputFile: Dataset[Row],
            historicalData: Dataset[Pepperoni],
            mergeFun: (Pepperoni, PepperoniRaw) => Pepperoni): Dataset[Pepperoni] = {
    val sparkSession = SparkSession.builder().getOrCreate()
    import sparkSession.implicits._

    val rawData: Dataset[PepperoniRaw] = inputFile.rdd.map{ case row : Row =>
      PepperoniRaw(
          weight = row.getAs[String]("weight"),
          cost = row.getAs[String]("cost")
        )
    }.toDS()

    val validatedData: Dataset[PepperoniRaw] = ??? // validate the data

    val dedupedRawData: Dataset[PepperoniRaw] = ??? // deduplicate the data

    val dedupedData: Dataset[Pepperoni] = dedupedRawData.rdd.map{ case datum : PepperoniRaw =>
        Pepperoni( value = ???, key1 = ???, key2 = ??? )
    }.toDS()

    val joinedData = dedupedData.joinWith(historicalData,
      historicalData.col("key1") === dedupedData.col("key1") && 
        historicalData.col("key2") === dedupedData.col("key2"),
      "right_outer"
    )

    joinedData.map { case (hist, delta) =>
      if( /* some condition */) {
        hist.copy(value = /* some transformation */)
      }
    }.flatMap(list => list).toDS()
  }
}

In other words the class performs a series of operations on the data, the operations are mostly the same and always in the same order, but can vary slightly per topping, as would the mapping from "raw" to "domain" and the merge function.

To do this for 7 toppings (i.e. Mushroom, Cheese, etc), I would rather not simply copy/paste the class and change all of the names, because the structure and logic is common to all loads. Instead I'd rather define a generic "Load" class with generic types, like this:

object Load {
  def apply[R,D](inputFile: Dataset[Row],
            historicalData: Dataset[D],
            mergeFun: (D, R) => D): Dataset[D] = {
    val sparkSession = SparkSession.builder().getOrCreate()
    import sparkSession.implicits._

    val rawData: Dataset[R] = inputFile.rdd.map{ case row : Row =>
...

And for each class-specific operation such as mapping from "raw" to "domain", or merging, have a trait or abstract class that implements the specifics. This would be a typical dependency injection / polymorphism pattern.

But I'm running into a few problems. As of Spark 2.x, encoders are only provided for native types and case classes, and there is no way to generically identify a class as a case class. So the inferred toDS() and other implicit functionality is not available when using generic types.

Also as mentioned in this related question of mine, the case class copy method is not available when using generics either.

I have looked into other design patterns common with Scala and Haskell such as type classes or ad-hoc polymorphism, but the obstacle is the Spark Dataset basically only working on case classes, which can't be abstractly defined.

It seems that this would be a common problem in Spark systems but I'm unable to find a solution. Any help appreciated.

Aucun commentaire:

Enregistrer un commentaire