Kotlin Coroutines 1.5: GlobalScope Marked as Delicate, Refined Channels API, and More

Co-author: Svetlana Isakova

Kotlin Coroutines 1.5.0 is out! Here’s what the new version brings.

  • The refined Channel API. Along with a new naming scheme for the library functions, the non-suspending functions

    trySend

    and

    tryReceive

    were introduced as better alternatives to

    offer

    and

    poll

    .

In this blog post, you will also find recommendations for migrating to the new version.

Start Using Coroutines 1.5.0

GlobalScope marked as a delicate API

The

GlobalScope

class is now marked with the

@DelicateCoroutinesApi

annotation. From now on, any use of

GlobalScope

requires an explicit opt-in with

@OptIn(DelicateCoroutinesApi::class)

.

While the use of

GlobalScope

isn’t recommended for most cases, the official documentation still introduces the concepts via this delicate API.

A global

CoroutineScope

is not bound to any job. Global scope is used to launch top-level coroutines that operate during the whole application lifetime and are not canceled prematurely. Active coroutines launched in

GlobalScope

do not keep the process alive. They are like daemon threads.

This is a delicate API and it is easy to accidentally create resource or memory leaks when

GlobalScope

is used. A coroutine launched in

GlobalScope

is not subject to the principle of structured concurrency, so if it hangs or gets delayed due to a problem (e.g. due to a slow network), it will keep working and consuming resources. For example, consider the following code:

fun loadConfiguration() {
    GlobalScope.launch {
        val config = fetchConfigFromServer() // network request
        updateConfiguration(config)
    }
}

A call to

loadConfiguration

creates a coroutine in

GlobalScope

that works in the background without any provision to cancel it or to wait for its completion. If the network is slow, it keeps waiting in the background, consuming resources. Repeated calls to

loadConfiguration

will consume more and more resources.

Possible replacements

In many cases, the use of

GlobalScope

should be avoided and the containing operation should be marked with

suspend

, for example:

suspend fun loadConfiguration() {
    val config = fetchConfigFromServer() // network request
    updateConfiguration(config)
}

In cases when

GlobalScope.launch

is used to launch multiple concurrent operations, the corresponding operations should be grouped with

coroutineScope

instead:

// concurrently load configuration and data
suspend fun loadConfigurationAndData() {
    coroutineScope {
        launch { loadConfiguration() }
        launch { loadData() }
    }
}

In top-level code, when launching a concurrent operation from a non-suspending context, an appropriately confined instance of

CoroutineScope

should be used instead of

GlobalScope

.

Legitimate use cases

There are limited circumstances under which

GlobalScope

can be legitimately and safely used, such as top-level background processes that must stay active for the whole duration of an application’s lifetime. Because of that, any use of

GlobalScope

requires an explicit opt-in with

@OptIn(DelicateCoroutinesApi::class)

, like this:

// A global coroutine to log statistics every second, 
// must be always active
@OptIn(DelicateCoroutinesApi::class)
val globalScopeReporter = GlobalScope.launch {
    while (true) {
        delay(1000)
        logStatistics()
    }
}

We recommend reviewing all your usages of

GlobalScope

carefully and annotating only those that fall under the “legitimate use-cases” category. For any other usages, they could likely be a source of bugs in your code – replace such

GlobalScope

usages as described above.

Extensions for JUnit 5

We have added a

CoroutinesTimeout

annotation that allows running tests in a separate thread, failing them after the provided time limit and interrupting the thread. Previously,

CoroutinesTimeout

was available for JUnit 4. In this release, we’re adding the integration for JUnit 5.

To use the new annotation, add the following dependency to your project:

dependencies {
  …
  testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-debug:$coroutinesVersion")
}

Here’s a simple example of how to use the new

CoroutinesTimeout

in your tests:

import kotlinx.coroutines.debug.junit5.CoroutinesTimeout
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.junit.Test

@CoroutinesTimeout(100)
class CoroutinesTimeoutSimpleTest {

     @CoroutinesTimeout(300)
     @Test
     fun firstTest() {
         runBlocking {
             delay(200)  // succeeds
         }
     }

     @Test
     fun secondTest() {
         runBlocking {
             delay(200)  // fails
         }
     }
 }

In the example, the coroutines timeout is defined at a class level and specifically for the

firstTest

. The annotated test does not timeout, as the annotation on the function overrides the class-level one. The

secondTest

uses the class-level annotation, so it times out.

The annotation is declared in the following way:

package kotlinx.coroutines.debug.junit5

public annotation class CoroutinesTimeout(
    val testTimeoutMs: Long,
    val cancelOnTimeout: Boolean = false
)

The first parameter,

testTimeoutMs

, specifies the timeout duration in milliseconds. The second parameter,

cancelOnTimeout

, determines if all the running coroutines should be canceled at the moment of the timeout. If it’s set to

true

, all the coroutines will be automatically canceled.

Whenever you use the

CoroutinesTimeout

annotation, it automatically enables the coroutines debugger and dumps all coroutines at the moment of the timeout. The dump contains the coroutine creation stack traces. If there is a need to disable the creation stack traces in order to speed tests up, consider directly using

<a href="https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-debug/src/junit/junit5/CoroutinesTimeoutExtension.kt" class="ek-link">CoroutinesTimeoutExtension</a>

, which allows this configuration.

Many thanks to Abhijit Sarkar, who created a useful PoC for CoroutinesTimeout for JUnit 5. The idea was developed into the new

CoroutinesTimeout

annotation that we added in the 1.5 release.

Channel API refinement

Channels are important communication primitives that allow you to pass data between different coroutines and callbacks. In this release, we reworked the Channel API a bit, replacing the

offer

and

poll

functions causing confusion with better alternatives. Along the way, we developed a new consistent naming scheme for suspending and non-suspending methods.

The new naming scheme

We tried to work towards a consistent naming scheme to use further in other libraries or Coroutines API. We wanted to make sure that the name of the function would convey the information about its behavior. As a result, we came up with the following:

  • The regular suspending methods are left as-is, e.g.,
    send

    ,

    receive

    .

  • Their non-suspending counterparts with error encapsulation are consistently prefixed with “try”:
    trySend

    and

    tryReceive

    instead of the old

    offer

    and

    poll

    .

  • New error-encapsulating suspending methods will have the suffix “Catching”.

Let’s dive into the details about these new methods.

Try

functions: non-suspending counterparts to

send

and

receive

One coroutine can send some information to a channel, while the other one can receive this information from it. Both

send

and

receive

functions are suspending.

send

suspends its coroutine if the channel is full and can’t take a new element, while

receive

suspends its coroutine if the channel has no elements to return:

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    launch {
        // Suspends until the element can be sent
        println("Sending...")
        channel.send("Element")
     }
     // Suspends until the element can be received
     println("Receiving...")
     println(channel.receive())
}

These functions have non-suspending counterparts for use in synchronous code:

offer

and

poll

, which are now deprecated in favor of

trySend

and

tryReceive

. Let’s discuss the reasons for this change.

offer

and

poll

are supposed to do the same thing as

send

and

receive

, but without suspension. It sounds easy, and everything works fine when the element can be sent or received. But in case of an error, what happens?

send

and

receive

would suspend until they can do their job.

offer

and

poll

simply returned

false

and

null

, respectively, if the element couldn’t be added because the channel is full, or no element can be retrieved because the channel is empty. They both threw an exception in an attempt to work with a closed channel, and this last detail turned out to be confusing.

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    launch {
        println("Sending...")
        // Doesn't suspend
        // Returns 'false' if the channel is full
        // Or throws an exception if it's closed
        channel.offer("Element")
    }
    println("Receiving...")
    // Doesn't suspend
    // Returns 'null' if the channel is empty
    println(channel.poll())
//  channel.close()
}

In this example,

poll

is called before any element is added and therefore returns

null

immediately. Note that it’s not supposed to be used in this way: you should instead continue polling elements regularly, but we call it directly for simplicity of this explanation. The invocation of

offer

is also unsuccessful since our channel is a rendezvous channel and has zero buffer capacity. As a result,

offer

returns

false

, and

poll

returns

null

, simply because they were called in the wrong order.

In the example above, try uncommenting the

channel.close()

statement to make sure that the exception is thrown. In this case,

poll

returns

false

, as before. But then

offer

tries to add an element to an already closed channel, fails, and throws an exception. We received many complaints that such behavior is error-prone. It’s easy to forget to catch this exception, and while you’d rather ignore it or handle it differently, it crashes your program.

The new

trySend

and

tryReceive

fix this issue and return a more detailed result. Each returns the

ChannelResult

instance, which is one of the three things: a successful result, a failure, or an indication that the channel was closed.

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    launch {
        println("Sending...")
        // Doesn't suspend
        // Returns 'Failed' if the channel is full
        // Or 'Channel was closed' result if it's closed
        val result = channel.trySend("Element")
        println(result)

        //We can verify the result
        if(result.isClosed){
            println("Sending failed. The channel is closed.")
        }
    }
    println("Receiving...")
    println(channel.tryReceive())
//  channel.close()
}

This example works in the same way as the previous one, with the only difference that

tryReceive

and

trySend

return a more detailed result. You can see

Value(Failed)

in the output instead of

false

and

null

. Uncomment the line closing the channel again and ensure that

trySend

now returns

Closed

result capturing an exception.

Thanks to inline value classes, using

ChannelResult

doesn’t create additional wrappers underneath, and if the successful value is returned, it’s returned as is, without any overhead.

Catching functions: suspending functions that encapsulate errors

Starting from this release, the error-encapsulating suspending methods will have the suffix “Catching”. For instance, the new

receiveCatching

function handles the exception in the case of a closed channel. Consider this simple example:

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    channel.close()
    println(channel.receiveCatching())
}

The channel is closed before we try retrieving a value. However, the program completes successfully, indicating that the channel was closed. If you replace

receiveCatching

with the ordinary

receive

function, it will throw

ClosedReceiveChannelException

:

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    channel.close()
    println(channel.receive())
}

For now, we only provide

receiveCatching

and

onReceiveCatching

(instead of the previously internal

receiveOrClosed

), but we have plans to add more functions.

Migrating your code to new functions

You can replace all the usages of the

offer

and

poll

functions in your project automatically with new calls. Since

offer

returned

Boolean

, its equivalent replacement is

channel.trySend("Element").isSuccess

.

Likewise, the

poll

function returns a nullable element, so its replacement becomes

channel.tryReceive().getOrNull()

.

If the result of the invocation wasn’t used, you can replace them directly with new calls.

The behavior towards handling exceptions is now different, so you will need to make the necessary updates manually. If your code relies on ‘offer’ and ‘poll’ methods throwing exceptions on a closed channel, you’ll need to use the following replacements.

The equivalent replacement for

channel.offer("Element")

should throw an exception if the channel was closed, even if it was closed normally:

channel
  .trySend("Element")
  .onClosed { throw it ?: ClosedSendChannelException("Channel was closed") }
  .isSuccess

The equivalent replacement for

channel.poll()

throws an exception if the channel was closed with an error and returns

null

if it was closed normally:

channel.tryReceive()
  .onClosed { if (it != null) throw it }
  .getOrNull()

Such changes reflect the old behavior of

offer

and

poll

functions.

We assume that in most cases, your code didn’t rely on these subtleties of behavior on a closed channel, but rather that it was a source of bugs. That’s why the automatic replacements provided by IDE simplify the semantics. If this is not true for you, please review and update your usages manually and consider rewriting them completely to handle the cases of closed channels differently, without throwing exceptions.

Reactive integrations on the road to stability

Version 1.5 of Kotlin Coroutines promotes most of the functions responsible for integrations with reactive frameworks to stable API.

In the JVM ecosystem, there are a few frameworks that deal with asynchronous stream processing conforming to the Reactive Streams standard. For instance, Project Reactor and RxJava are the two popular Java frameworks in this area.

While Kotlin Flows are different and the types are not compatible with the ones specified by the standard, they are conceptually still streams. It is possible to convert

Flow

to the reactive (specification and TCK compliant)

Publisher

and vice versa. Such converters are provided by

kotlinx.coroutines

out-of-the-box and can be found in the corresponding reactive modules.

For instance, if you need interoperability with the Project Reactor types, you should add the following dependencies to your project:

dependencies {          
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}

Then you will be able to use

Flow&lt;T&gt;.asPublisher()

if you want to use the Reactive Streams types, or

Flow&lt;T&gt;.asFlux()

if you need to use Project Reactor types directly.

// acquire a Flow instance
val flow: Flow<event> = flow { … }

// Convert Flow to Publisher
val publisher = flow.asPublisher()

// Convert Flow to Reactor's Flux
val flux = flow.asFlux()

//Convert back to Flow 
val anotherFlow = flux.asFlow()

This is a very condensed view of the subject at hand. If you are interested in learning more, consider reading Roman Elizarov’s article about Reactive Streams and Kotlin Flows.

While the integrations with reactive libraries are working towards the stabilization of the API, technically, the goal is to get rid of the

@ExperimentalCoroutinesApi

and implement the leftovers for various topics.

Improved integration with Reactive Streams

Compatibility with Reactive Streams specifications is important in order to ensure interoperability between 3rd-party frameworks and Kotlin Coroutines. It helps to adopt Kotlin Coroutines in legacy projects without the need to rewrite all of the code.

There is a long list of functions that we managed to promote to stable status this time. It is now possible to convert a type from any Reactive Streams implementation to

Flow

and back. For instance, the new code can be written with Coroutines, but integrated with the old reactive codebase via the opposite converters:

fun legacyFunThatHaveToReturnObservable(): Observable<int> {
  return flow<int> {
    // Use the power of flow!
  }
  // various flow operations
  .asObservable()
}

Also, numerous improvements were made to

ReactorContext

, which wraps Reactor’s Context into CoroutineContext for seamless integration between Project Reactor and Kotlin Coroutines. With this integration, it is possible to propagate the information about Reactor’s Context through coroutines.

The context is implicitly propagated through the subscribers’ context by all Reactive integrations, such as

Mono

,

Flux

,

Publisher.asFlow

,

Flow.asPublisher

and

Flow.asFlux

. Here’s a simple example of propagating the subscriber’s

Context

to

ReactorContext

:

import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.reactor.ReactorContext
import kotlinx.coroutines.reactor.asFlux

fun main() {
    val flow = flow<int> {
       println("Reactor context in Flow: " +
          currentCoroutineContext()[ReactorContext]?.context)
    }

    // No context
    // prints "Reactor context in Flow: null"
    flow.asFlux().subscribe() 

    // Add subscriber's context
    // prints "Reactor context in Flow: Context1{answer=42}"
    flow.asFlux()
        .contextWrite { ctx -> ctx.put("answer", 42) }
        .subscribe() 
}

In the example above, we construct a

Flow

instance which is then converted to Reactor’s Flux instance, with no context. The effect of calling the

subscribe()

method without an argument is to request the publisher to send all data. As a result, the program prints the phrase “Reactor context in Flow: null”.

The next call chain also converts the

Flow

to

Flux

, but then adds a key-value pair, answer=42, to the Reactor’s context for this chain. The call to

subscribe()

triggers the chain. In this case, since the context is populated, the program prints “Reactor context in Flow: Context1{answer=42}

New convenience functions

When working with reactive types like

Mono

in the Coroutines context, there are a few convenience functions that allow retrieval without blocking the thread. In this release, we deprecated

awaitSingleOr*

functions on arbitrary

Publisher

s, and specialized some

await*

functions for

Mono

and

Maybe

.

Mono

produces at most one value, so the last element is the same as the first. In this case, the semantics of dropping the remaining elements isn’t useful as well. Therefore,

Mono.awaitFirst()

and

Mono.awaitLast()

are deprecated in favour of

Mono.awaitSingle()

.

Start using kotlinx.coroutines 1.5.0!

The new release features an impressive list of changes. The new naming scheme developed while refining the Channels API is a notable achievement by the team. Meanwhile, there’s a great focus on making the Coroutines API as simple and intuitive as possible.

To start using the new version of Kotlin Coroutines, just update the content of your build.gradle.kts file. First, make sure that you have the latest version of the Kotlin Gradle plugin:

plugins {
   kotlin("jvm") version "1.5.0"
}

And then update the versions of the dependencies, including the libraries with specific integrations for the Reactive Streams.

val coroutinesVersion = "1.5.0-RC"

dependencies { 
  implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm:$coroutinesVersion")
  implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactive:$coroutinesVersion")
  implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:$coroutinesVersion")
  implementation("org.jetbrains.kotlinx:kotlinx-coroutines-rx3:$coroutinesVersion")
  ...
}

Watch and read more

If you run into any trouble