Datasets and DataFrames
With the deprecation of the public use of the old good RDD API, Spark users are left with two options: typed Datasetsand untyped DataFrames (that are actually a specific case of Datasets). The API also allows users to freely cast one to another – e.g. using the .as[T] method to cast an untyped DataFrame to a Dataset[T]. It does not change the underlying data though and can result in surprising results if one is not aware of that.
What does .as[T] do?
Let’s start by looking at the source (code):
Returns a new Dataset where each record has been mapped on to the specified type. The method used to map columns depend on the type of U:
When U is a class, fields for the class will be mapped to columns of the same name (case sensitivity is determined by spark. sql. caseSensitive).
When U is a tuple, the columns will be mapped by ordinal (i. e. the first column will be assigned to _1).
When U is a primitive type (i. e. String, Int, etc), then the first column of the DataFrame will be used.
If the schema of the Dataset does not match the desired U type, you can use select along with alias or as to rearrange or rename as required.
Note that as[] only changes the view of the data that is passed into typed operations, such as map(), and does not eagerly project away any columns that are not present in the specified class.
Since:
1.6.0
The last note is crucial: casting to a Dataset does not change the underlying data. Any columns not present in the T (e.g., columns without a corresponding field in the case class) will not be discarded.
Why bother?
There are a few situations where having extra columns may be surprising and create problems with a job run (or even worse – silently introduce data quality issues):
- running union or unionAll transformations on non-aligned data,
- calling distinct (it will check for hidden columns’ uniqueness as well),
- saving data (will include extra columns).
A defensive version of .as[T]
The simple version of a defensive (meaning: adjusting the schema to the provided domain class) would be one with a .select() transformation call:
case class Artist(id: String, name: String, location: String)
def toArtistsDefensive(input: DataFrame): Dataset[Artist] = { input
.select("id", "name", "location")
.as[Artist]
}
This is a very DRY-unfriendly implementation, as each modification of the Artists class requires searching for all related select instances and updating them. Fortunately, with a bit of reflection, it can be refactored into a generic solution. This generic transformation will trim the Dataset to contain only the expected columns.
import scala.reflect.runtime.universe._
def toTDefensive[T <: Product: TypeTag](input: DataFrame): Dataset[T] = {
val caseClassFields = typeOf[T].members
.collect { case m: MethodSymbol if m.isCaseAccessor => m.name.toString }
.toSeq
val columns = caseClassFields
.map(F.col _)
.reverse
input
.select(columns: _*)
.as[T]
}