You are currently viewing Kotlin API for Apache Spark 1.0 Released

Kotlin API for Apache Spark 1.0 Released

The Kotlin API for Apache Spark is now widely available. This is the first stable release of the API that we consider to be feature-complete with respect to the user experience and compatibility with core Spark APIs.

Get on Maven Central

Let’s take a look at the new features this release brings to the API.

Typed select and sort

The Scala API has a typed select method that returns Datasets of Tuples. Sometimes using them can be more idiomatic or convenient than using the map function. Here’s what the syntax for this method looks like:

case class TestData(id: Long, name: String, url: String)
// ds is of type Dataset[TestData]
val result: Dataset[Tuple2[String, Long]] = 
        ds.select($"name".as[String], $"id".as[Long])

Sometimes obtaining just a tuple may be really convenient, but this method has a drawback: you have to select a column by name and explicitly provide the type. This can lead to errors, which might be hard to fix in long pipelines.

We’re trying to address this issue at least partially in our extension to the Scala API. Consider the following Kotlin code:

data class TestData(val id: Long, val name: String, val url: String)
// ds is of type Dataset<TestData>
val result: Dataset<Arity2<String, Long>> = 
        ds.selectTyped(TestData::name, TestData::id)

The result is the same, but the call is entirely type-safe. We do not use any strings and casts, and both the column name and the type are inferred from reflection.

We have also added a similarly reflective syntax to the sort function.

In Scala, this API supports arities up to 5, and we decided to be as consistent with the Scala API as possible. We also think that the usage of tuples with arities above five is an indication that something is going wrong. For example, maybe it would be better to extract a new domain object or, conversely, to work with untyped datasets.

More column functions

The Scala API is very rich in terms of functions that can be called on columns. We cannot make them identical to the Scala API because of the limitations of Kotlin. For example, overriding class members with extensions is forbidden, and the Dataset class is not extensible. But we can at least use infix functions and names in backticks to implement operator-like functions.

Here are the operator-like functions that we currently support:

  • ==
  • !=
  • eq / `===`
  • neq / `=!=`
  • -col(...)
  • !col(...)
  • gt
  • lt
  • geq
  • leq
  • or
  • and / `&&`
  • +
  • -
  • *
  • /
  • %

Luckily, we can see that very few of the functions require backticks, and those that do can be autocompleted without you having to type them.

More KeyValueGroupedDataset wrapper functions

We initially designed the API so that anyone could call any function that requires a Decoder and Encoder simply by using the magic <em>encoder()</em> function, which generates everything automagically. It gave our users some flexibility, and it also allowed us not to implement all the functions that the Dataset API offers. But we would ultimately like to provide our users with the best developer experience possible. This is why we’ve implemented necessary wrappers over KeyValueGroupedDataset, and also why we’ve added support for the following functions:

  • cogroup
  • flatMapGroupsWithState
  • mapGroupsWithState

Support for Scala TupleN classes

There are several APIs in the Spark API that return Datasets of Tuples. Examples of such APIs are the select and joinWith functions. Before this release, users had to manually find an encoder for tuples:

val encoder = Encoders.tuple(Encoders.STRING(), Encoders.INT())
ds
    .select(ds.col("a").`as`<String>, ds.col("b").`as`<Int>)
    .map({ Tuple2(it._1(), it._2() + 1) }, encoder)

And the more we work with tuples, the more encoders we need, which leads to verbosity and requires us to find increasing numbers of names for new encoders.

After this change, code becomes as simple as any usual Kotlin API code:

ds
    .select(ds.col("a").`as`<String>, ds.col("b").`as`<Int>)
    .map { Tuple2(it._1(), it._2() + 1) }

You no longer need to rely on specific encoders or lambdas inside argument lists.

Support for date and time types

Work with dates and times is an important part of many data engineering workflows. For Spark 3.0, we had default encoders registered for Date and Timestamp, but inside data structures we had support only for LocalDate and Instant, which is obviously not enough. We now have full support for LocalDate, Date, Timestamp, and Instant both as top-level entities of dataframes and as fields inside of structures.

We have also added support for Date and Timestamp as fields inside of structures for Spark 2.

Support for maps encoded as tuples

There is a well-known practice of encoding maps as tuples. For example, rather than storing the ID of an entity and the name of the entity in the map, it is fairly common to store them in two columns in a structure like Dataset<Pair<Long, String>> (which is how relational databases usually work).

We are aware of this, and we’ve decided to add support for working with such datasets in the same way you work with maps. We have added the functions takeKeys and takeValues to Dataset<Tuple2<T1, T2>>, Dataset<Pair<T1, T2>>, and Dataset<Arity2<T1, T2>>.

Conclusion

We want to say a huge “thank you” to Jolan Rensen, who helped us tremendously by offering feedback, assisting with the implementation of features, and fixing bugs in this release. He worked with our project while writing his thesis, and we’re happy that we can help him with his brilliant work. If you want to read more about Jolan, please visit his site.

If you want to read more about the details of the new release, please check out the changelog.

As usual, the latest release is available at Maven Central. And we would love to get your feedback, which you can leave in: