Auto Added by WPeMatico

Kotlin API for Apache Spark 1.0 Released

The Kotlin API for Apache Spark is now widely available. This is the first stable release of the API that we consider to be feature-complete with respect to the user experience and compatibility with core Spark APIs.

Get on Maven Central

Let’s take a look at the new features this release brings to the API.

Typed select and sort

The Scala API has a typed

select

method that returns

Datasets

of

Tuples

. Sometimes using them can be more idiomatic or convenient than using the

map

function. Here’s what the syntax for this method looks like:

case class TestData(id: Long, name: String, url: String)
// ds is of type Dataset[TestData]
val result: Dataset[Tuple2[String, Long]] = 
        ds.select($"name".as[String], $"id".as[Long])

Sometimes obtaining just a tuple may be really convenient, but this method has a drawback: you have to select a column by name and explicitly provide the type. This can lead to errors, which might be hard to fix in long pipelines.

We’re trying to address this issue at least partially in our extension to the Scala API. Consider the following Kotlin code:

data class TestData(val id: Long, val name: String, val url: String)
// ds is of type Dataset<TestData>
val result: Dataset<Arity2<String, Long>> = 
        ds.selectTyped(TestData::name, TestData::id)

The result is the same, but the call is entirely type-safe. We do not use any strings and casts, and both the column name and the type are inferred from reflection.

We have also added a similarly reflective syntax to the

sort

function.

In Scala, this API supports arities up to 5, and we decided to be as consistent with the Scala API as possible. We also think that the usage of tuples with arities above five is an indication that something is going wrong. For example, maybe it would be better to extract a new domain object or, conversely, to work with untyped datasets.

More column functions

The Scala API is very rich in terms of functions that can be called on columns. We cannot make them identical to the Scala API because of the limitations of Kotlin. For example, overriding class members with extensions is forbidden, and the

Dataset

class is not extensible. But we can at least use infix functions and names in backticks to implement operator-like functions.

Here are the operator-like functions that we currently support:

  • ==
  • !=
  • eq / `===`
  • neq / `=!=`
  • -col(...)
  • !col(...)
  • gt
  • lt
  • geq
  • leq
  • or
  • and /

    `&&`

  • +
  • -
  • *
  • /
  • %

Luckily, we can see that very few of the functions require backticks, and those that do can be autocompleted without you having to type them.

More

KeyValueGroupedDataset

wrapper functions

We initially designed the API so that anyone could call any function that requires a

Decoder

and

Encoder

simply by using the magic

&lt;em&gt;encoder()&lt;/em&gt;

function, which generates everything automagically. It gave our users some flexibility, and it also allowed us not to implement all the functions that the

Dataset

API offers. But we would ultimately like to provide our users with the best developer experience possible. This is why we’ve implemented necessary wrappers over

KeyValueGroupedDataset

, and also why we’ve added support for the following functions:

  • cogroup
  • flatMapGroupsWithState
  • mapGroupsWithState

Support for Scala

TupleN

classes

There are several APIs in the Spark API that return

Datasets

of

Tuples

. Examples of such APIs are the

select

and

joinWith

functions. Before this release, users had to manually find an encoder for tuples:

val encoder = Encoders.tuple(Encoders.STRING(), Encoders.INT())
ds
    .select(ds.col("a").`as`<String>, ds.col("b").`as`<Int>)
    .map({ Tuple2(it._1(), it._2() + 1) }, encoder)

And the more we work with tuples, the more encoders we need, which leads to verbosity and requires us to find increasing numbers of names for new encoders.

After this change, code becomes as simple as any usual Kotlin API code:

ds
    .select(ds.col("a").`as`<String>, ds.col("b").`as`<Int>)
    .map { Tuple2(it._1(), it._2() + 1) }

You no longer need to rely on specific encoders or lambdas inside argument lists.

Support for date and time types

Work with dates and times is an important part of many data engineering workflows. For Spark 3.0, we had default encoders registered for

Date

and

Timestamp

, but inside data structures we had support only for

LocalDate

and

Instant

, which is obviously not enough. We now have full support for

LocalDate

,

Date

,

Timestamp

, and

Instant

both as top-level entities of dataframes and as fields inside of structures.

We have also added support for

Date

and

Timestamp

as fields inside of structures for Spark 2.

Support for maps encoded as tuples

There is a well-known practice of encoding maps as tuples. For example, rather than storing the ID of an entity and the name of the entity in the map, it is fairly common to store them in two columns in a structure like

Dataset&lt;Pair&lt;Long, String&gt;&gt;

(which is how relational databases usually work).

We are aware of this, and we’ve decided to add support for working with such datasets in the same way you work with maps. We have added the functions

takeKeys

and

takeValues

to

Dataset&lt;Tuple2&lt;T1, T2&gt;&gt;

,

Dataset&lt;Pair&lt;T1, T2&gt;&gt;

, and

Dataset&lt;Arity2&lt;T1, T2&gt;&gt;

.

Conclusion

We want to say a huge “thank you” to Jolan Rensen, who helped us tremendously by offering feedback, assisting with the implementation of features, and fixing bugs in this release. He worked with our project while writing his thesis, and we’re happy that we can help him with his brilliant work. If you want to read more about Jolan, please visit his site.

If you want to read more about the details of the new release, please check out the changelog.

As usual, the latest release is available at Maven Central. And we would love to get your feedback, which you can leave in:

Continue ReadingKotlin API for Apache Spark 1.0 Released

Kotlin for Apache Spark: One Step Closer to Your Production Cluster

We have released Preview 2 of Kotlin for Apache Spark. First of all, we’d like to thank the community for providing us with all their feedback and even some pull requests! Now let’s take a look at what we have implemented since Preview 1?

Scala 2.11 and Spark 2.4.1+ support

The main change in this preview is the introduction of Spark 2.4 and Scala 2.11 support. This means that you can now run jobs written in Kotlin in your production environment.

The syntax remains the same as the Apache Spark 3.0 compatible version, but the installation procedure differs a bit. You can now use sbt to add the correct dependency, for example:


libraryDependencies += "org.jetbrains.kotlinx.spark" %% "kotlin-spark-api-2.4" % "1.0.0-preview2"

Of course, for other build systems, such as Maven, we would use the standard:


&lt;dependency&gt;
  &lt;groupId&gt;org.jetbrains.kotlinx.spark&lt;/groupId&gt;
  &lt;artifactId&gt;kotlin-spark-api-2.4_2.11&lt;/artifactId&gt;
  &lt;version&gt;1.0.0-preview2&lt;/version&gt;
&lt;/dependency&gt;

And for Gradle we would use:


implementation 'org.jetbrains.kotlinx.spark:kotlin-spark-api-2.4_2.11:1.0.0-preview2'

Spark 3 only supports Scala 2.12 for now, so there is no need to define it.

You can read more about dependencies here.

Support for the custom

SparkSessionBuilder

Imagine you have a company-wide SparkSession.Builder that is set up to work with your YARN/Mesos/etc. cluster. You don’t want to create it, and the “withSpark” function doesn’t give you enough flexibility. Before Preview 2, you had only one option – you had to rewrite the whole builder from Scala to Kotlin. You now have another one, as the “withSpark” function accepts the SparkSession.Builder as an argument:


val builder = // obtain your builder here
withSpark(builder, logLevel = DEBUG) {
  // your spark is initialized with correct config here
}

Broadcast variable support

Sometimes we need to share data between executor nodes. Occasionally, we need to provide executor nodes with dictionary-like data without pushing it to each executor node individually. Apache Spark allows you to do this using broadcasting variable. You were previously able to do this using the “encoder” function. Still, we are aiming to provide an API experience that is as consistent as possible with that of the Scala API. So we’ve added explicit support for the broadcasting of variables. For example:


// Broadclasted data should implement Serializable
data class SomeClass(val a: IntArray, val b: Int) : Serializable
// some another code snipped
val receivedBroadcast = broadcast.value // broadcasted value is available

We’d like to thank Jolan Rensen for the contribution.

Primitive arrays support for Spark 3

If you use arrays of primitives in your data pipeline, you may be glad to know that they now work in Kotlin for Apache Spark. Arrays of data (or Java bean) classes were supported in Preview 1, but we’ve only just added support for primitive arrays in Preview 2. Please note that this support will work correctly only if you’re using particular Kotlin primitive array types, such as

IntArray

,

BooleanArray

, and

LongArray

. In other cases, we propose using lists or alternative collection types.

Future work

Now that Spark 2.4 is supported, our targets for Preview 3 will be mainly to extend the Column API and add more operations on columns. Once Apache Spark with Scala 2.13 support is available, we will implement support for Scala 2.13, as well.

The awesome support introduced in Preview 2 makes now a great time to give it a try! The official repository and the quick start guide are excellent places to start.

Continue ReadingKotlin for Apache Spark: One Step Closer to Your Production Cluster

End of content

No more pages to load