Kotlin Coroutines 1.5: GlobalScope Marked as Delicate, Refined Channels API, and More

Co-author: Svetlana Isakova

Kotlin Coroutines 1.5.0 is out! Here’s what the new version brings.

  • The refined Channel API. Along with a new naming scheme for the library functions, the non-suspending functions trySend and tryReceive were introduced as better alternatives to offer and poll.

In this blog post, you will also find recommendations for migrating to the new version.

Start Using Coroutines 1.5.0

GlobalScope marked as a delicate API

The GlobalScope class is now marked with the @DelicateCoroutinesApi annotation. From now on, any use of GlobalScope requires an explicit opt-in with @OptIn(DelicateCoroutinesApi::class).

While the use of GlobalScope isn’t recommended for most cases, the official documentation still introduces the concepts via this delicate API.

A global CoroutineScope is not bound to any job. Global scope is used to launch top-level coroutines that operate during the whole application lifetime and are not canceled prematurely. Active coroutines launched in GlobalScope do not keep the process alive. They are like daemon threads.

This is a delicate API and it is easy to accidentally create resource or memory leaks when GlobalScope is used. A coroutine launched in GlobalScope is not subject to the principle of structured concurrency, so if it hangs or gets delayed due to a problem (e.g. due to a slow network), it will keep working and consuming resources. For example, consider the following code:

fun loadConfiguration() {
    GlobalScope.launch {
        val config = fetchConfigFromServer() // network request
        updateConfiguration(config)
    }
}

A call to loadConfiguration creates a coroutine in GlobalScope that works in the background without any provision to cancel it or to wait for its completion. If the network is slow, it keeps waiting in the background, consuming resources. Repeated calls to loadConfiguration will consume more and more resources.

Possible replacements

In many cases, the use of GlobalScope should be avoided and the containing operation should be marked with suspend, for example:

suspend fun loadConfiguration() {
    val config = fetchConfigFromServer() // network request
    updateConfiguration(config)
}

In cases when GlobalScope.launch is used to launch multiple concurrent operations, the corresponding operations should be grouped with coroutineScope instead:

// concurrently load configuration and data
suspend fun loadConfigurationAndData() {
    coroutineScope {
        launch { loadConfiguration() }
        launch { loadData() }
    }
}

In top-level code, when launching a concurrent operation from a non-suspending context, an appropriately confined instance of CoroutineScope should be used instead of GlobalScope.

Legitimate use cases

There are limited circumstances under which GlobalScope can be legitimately and safely used, such as top-level background processes that must stay active for the whole duration of an application’s lifetime. Because of that, any use of GlobalScope requires an explicit opt-in with @OptIn(DelicateCoroutinesApi::class), like this:

// A global coroutine to log statistics every second, 
// must be always active
@OptIn(DelicateCoroutinesApi::class)
val globalScopeReporter = GlobalScope.launch {
    while (true) {
        delay(1000)
        logStatistics()
    }
}

We recommend reviewing all your usages of GlobalScope carefully and annotating only those that fall under the “legitimate use-cases” category. For any other usages, they could likely be a source of bugs in your code – replace such GlobalScope usages as described above.

Extensions for JUnit 5

We have added a CoroutinesTimeout annotation that allows running tests in a separate thread, failing them after the provided time limit and interrupting the thread. Previously, CoroutinesTimeout was available for JUnit 4. In this release, we’re adding the integration for JUnit 5.

To use the new annotation, add the following dependency to your project:

dependencies {
  …
  testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-debug:$coroutinesVersion")
}

Here’s a simple example of how to use the new CoroutinesTimeout in your tests:

import kotlinx.coroutines.debug.junit5.CoroutinesTimeout
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.junit.Test

@CoroutinesTimeout(100)
class CoroutinesTimeoutSimpleTest {

     @CoroutinesTimeout(300)
     @Test
     fun firstTest() {
         runBlocking {
             delay(200)  // succeeds
         }
     }

     @Test
     fun secondTest() {
         runBlocking {
             delay(200)  // fails
         }
     }
 }

In the example, the coroutines timeout is defined at a class level and specifically for the firstTest. The annotated test does not timeout, as the annotation on the function overrides the class-level one. The secondTest uses the class-level annotation, so it times out.

The annotation is declared in the following way:

package kotlinx.coroutines.debug.junit5

public annotation class CoroutinesTimeout(
    val testTimeoutMs: Long,
    val cancelOnTimeout: Boolean = false
)

The first parameter, testTimeoutMs, specifies the timeout duration in milliseconds. The second parameter, cancelOnTimeout, determines if all the running coroutines should be canceled at the moment of the timeout. If it’s set to true, all the coroutines will be automatically canceled.

Whenever you use the CoroutinesTimeout annotation, it automatically enables the coroutines debugger and dumps all coroutines at the moment of the timeout. The dump contains the coroutine creation stack traces. If there is a need to disable the creation stack traces in order to speed tests up, consider directly using CoroutinesTimeoutExtension , which allows this configuration.

Many thanks to Abhijit Sarkar, who created a useful PoC for CoroutinesTimeout for JUnit 5. The idea was developed into the new CoroutinesTimeout annotation that we added in the 1.5 release.

Channel API refinement

Channels are important communication primitives that allow you to pass data between different coroutines and callbacks. In this release, we reworked the Channel API a bit, replacing the offer and poll functions causing confusion with better alternatives. Along the way, we developed a new consistent naming scheme for suspending and non-suspending methods.

The new naming scheme

We tried to work towards a consistent naming scheme to use further in other libraries or Coroutines API. We wanted to make sure that the name of the function would convey the information about its behavior. As a result, we came up with the following:

  • The regular suspending methods are left as-is, e.g., send, receive.
  • Their non-suspending counterparts with error encapsulation are consistently prefixed with “try”: trySend and tryReceive instead of the old offer and poll.
  • New error-encapsulating suspending methods will have the suffix “Catching”.

Let’s dive into the details about these new methods.

Try functions: non-suspending counterparts to send and receive

One coroutine can send some information to a channel, while the other one can receive this information from it. Both send and receive functions are suspending. send suspends its coroutine if the channel is full and can’t take a new element, while receive suspends its coroutine if the channel has no elements to return:

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    launch {
        // Suspends until the element can be sent
        println("Sending...")
        channel.send("Element")
     }
     // Suspends until the element can be received
     println("Receiving...")
     println(channel.receive())
}

These functions have non-suspending counterparts for use in synchronous code: offer and poll, which are now deprecated in favor of trySend and tryReceive. Let’s discuss the reasons for this change.

offer and poll are supposed to do the same thing as send and receive, but without suspension. It sounds easy, and everything works fine when the element can be sent or received. But in case of an error, what happens? send and receive would suspend until they can do their job. offer and poll simply returned false and null, respectively, if the element couldn’t be added because the channel is full, or no element can be retrieved because the channel is empty. They both threw an exception in an attempt to work with a closed channel, and this last detail turned out to be confusing.

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    launch {
        println("Sending...")
        // Doesn't suspend
        // Returns 'false' if the channel is full
        // Or throws an exception if it's closed
        channel.offer("Element")
    }
    println("Receiving...")
    // Doesn't suspend
    // Returns 'null' if the channel is empty
    println(channel.poll())
//  channel.close()
}

In this example, poll is called before any element is added and therefore returns null immediately. Note that it’s not supposed to be used in this way: you should instead continue polling elements regularly, but we call it directly for simplicity of this explanation. The invocation of offer is also unsuccessful since our channel is a rendezvous channel and has zero buffer capacity. As a result, offer returns false, and poll returns null, simply because they were called in the wrong order.

In the example above, try uncommenting the channel.close() statement to make sure that the exception is thrown. In this case, poll returns false, as before. But then offer tries to add an element to an already closed channel, fails, and throws an exception. We received many complaints that such behavior is error-prone. It’s easy to forget to catch this exception, and while you’d rather ignore it or handle it differently, it crashes your program.

The new trySend and tryReceive fix this issue and return a more detailed result. Each returns the ChannelResult instance, which is one of the three things: a successful result, a failure, or an indication that the channel was closed.

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    launch {
        println("Sending...")
        // Doesn't suspend
        // Returns 'Failed' if the channel is full
        // Or 'Channel was closed' result if it's closed
        val result = channel.trySend("Element")
        println(result)

        //We can verify the result
        if(result.isClosed){
            println("Sending failed. The channel is closed.")
        }
    }
    println("Receiving...")
    println(channel.tryReceive())
//  channel.close()
}

This example works in the same way as the previous one, with the only difference that tryReceive and trySend return a more detailed result. You can see Value(Failed) in the output instead of false and null. Uncomment the line closing the channel again and ensure that trySend now returns Closed result capturing an exception.

Thanks to inline value classes, using ChannelResult doesn’t create additional wrappers underneath, and if the successful value is returned, it’s returned as is, without any overhead.

Catching functions: suspending functions that encapsulate errors

Starting from this release, the error-encapsulating suspending methods will have the suffix “Catching”. For instance, the new receiveCatching function handles the exception in the case of a closed channel. Consider this simple example:

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    channel.close()
    println(channel.receiveCatching())
}

The channel is closed before we try retrieving a value. However, the program completes successfully, indicating that the channel was closed. If you replace receiveCatching with the ordinary receive function, it will throw ClosedReceiveChannelException:

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    channel.close()
    println(channel.receive())
}

For now, we only provide receiveCatching and onReceiveCatching (instead of the previously internal receiveOrClosed), but we have plans to add more functions.

Migrating your code to new functions

You can replace all the usages of the offer and poll functions in your project automatically with new calls. Since offer returned Boolean, its equivalent replacement is channel.trySend("Element").isSuccess.

Likewise, the poll function returns a nullable element, so its replacement becomes channel.tryReceive().getOrNull().

If the result of the invocation wasn’t used, you can replace them directly with new calls.

The behavior towards handling exceptions is now different, so you will need to make the necessary updates manually. If your code relies on ‘offer’ and ‘poll’ methods throwing exceptions on a closed channel, you’ll need to use the following replacements.

The equivalent replacement for channel.offer("Element") should throw an exception if the channel was closed, even if it was closed normally:

channel
  .trySend("Element")
  .onClosed { throw it ?: ClosedSendChannelException("Channel was closed") }
  .isSuccess

The equivalent replacement for channel.poll() throws an exception if the channel was closed with an error and returns null if it was closed normally:

channel.tryReceive()
  .onClosed { if (it != null) throw it }
  .getOrNull()

Such changes reflect the old behavior of offer and poll functions.

We assume that in most cases, your code didn’t rely on these subtleties of behavior on a closed channel, but rather that it was a source of bugs. That’s why the automatic replacements provided by IDE simplify the semantics. If this is not true for you, please review and update your usages manually and consider rewriting them completely to handle the cases of closed channels differently, without throwing exceptions.

Reactive integrations on the road to stability

Version 1.5 of Kotlin Coroutines promotes most of the functions responsible for integrations with reactive frameworks to stable API.

In the JVM ecosystem, there are a few frameworks that deal with asynchronous stream processing conforming to the Reactive Streams standard. For instance, Project Reactor and RxJava are the two popular Java frameworks in this area.

While Kotlin Flows are different and the types are not compatible with the ones specified by the standard, they are conceptually still streams. It is possible to convert Flow to the reactive (specification and TCK compliant) Publisher and vice versa. Such converters are provided by kotlinx.coroutines out-of-the-box and can be found in the corresponding reactive modules.

For instance, if you need interoperability with the Project Reactor types, you should add the following dependencies to your project:

dependencies {          
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}

Then you will be able to use Flow<T>.asPublisher() if you want to use the Reactive Streams types, or Flow<T>.asFlux() if you need to use Project Reactor types directly.

// acquire a Flow instance
val flow: Flow<event> = flow { … }

// Convert Flow to Publisher
val publisher = flow.asPublisher()

// Convert Flow to Reactor's Flux
val flux = flow.asFlux()

//Convert back to Flow 
val anotherFlow = flux.asFlow()

This is a very condensed view of the subject at hand. If you are interested in learning more, consider reading Roman Elizarov’s article about Reactive Streams and Kotlin Flows.

While the integrations with reactive libraries are working towards the stabilization of the API, technically, the goal is to get rid of the @ExperimentalCoroutinesApi and implement the leftovers for various topics.

Improved integration with Reactive Streams

Compatibility with Reactive Streams specifications is important in order to ensure interoperability between 3rd-party frameworks and Kotlin Coroutines. It helps to adopt Kotlin Coroutines in legacy projects without the need to rewrite all of the code.

There is a long list of functions that we managed to promote to stable status this time. It is now possible to convert a type from any Reactive Streams implementation to Flow and back. For instance, the new code can be written with Coroutines, but integrated with the old reactive codebase via the opposite converters:

fun legacyFunThatHaveToReturnObservable(): Observable<int> {
  return flow<int> {
    // Use the power of flow!
  }
  // various flow operations
  .asObservable()
}

Also, numerous improvements were made to ReactorContext, which wraps Reactor’s Context into CoroutineContext for seamless integration between Project Reactor and Kotlin Coroutines. With this integration, it is possible to propagate the information about Reactor’s Context through coroutines.

The context is implicitly propagated through the subscribers’ context by all Reactive integrations, such as Mono, Flux, Publisher.asFlow, Flow.asPublisher and Flow.asFlux. Here’s a simple example of propagating the subscriber’s Context to ReactorContext:

import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.reactor.ReactorContext
import kotlinx.coroutines.reactor.asFlux

fun main() {
    val flow = flow<int> {
       println("Reactor context in Flow: " +
          currentCoroutineContext()[ReactorContext]?.context)
    }

    // No context
    // prints "Reactor context in Flow: null"
    flow.asFlux().subscribe() 

    // Add subscriber's context
    // prints "Reactor context in Flow: Context1{answer=42}"
    flow.asFlux()
        .contextWrite { ctx -> ctx.put("answer", 42) }
        .subscribe() 
}

In the example above, we construct a Flow instance which is then converted to Reactor’s Flux instance, with no context. The effect of calling the subscribe() method without an argument is to request the publisher to send all data. As a result, the program prints the phrase “Reactor context in Flow: null”.

The next call chain also converts the Flow to Flux, but then adds a key-value pair, answer=42, to the Reactor’s context for this chain. The call to subscribe() triggers the chain. In this case, since the context is populated, the program prints “Reactor context in Flow: Context1{answer=42}

New convenience functions

When working with reactive types like Mono in the Coroutines context, there are a few convenience functions that allow retrieval without blocking the thread. In this release, we deprecated awaitSingleOr* functions on arbitrary Publishers, and specialized some await* functions for Mono and Maybe.

Mono produces at most one value, so the last element is the same as the first. In this case, the semantics of dropping the remaining elements isn’t useful as well. Therefore, Mono.awaitFirst() and Mono.awaitLast() are deprecated in favour of Mono.awaitSingle().

Start using kotlinx.coroutines 1.5.0!

The new release features an impressive list of changes. The new naming scheme developed while refining the Channels API is a notable achievement by the team. Meanwhile, there’s a great focus on making the Coroutines API as simple and intuitive as possible.

To start using the new version of Kotlin Coroutines, just update the content of your build.gradle.kts file. First, make sure that you have the latest version of the Kotlin Gradle plugin:

plugins {
   kotlin("jvm") version "1.5.0"
}

And then update the versions of the dependencies, including the libraries with specific integrations for the Reactive Streams.

val coroutinesVersion = "1.5.0-RC"

dependencies { 
  implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm:$coroutinesVersion")
  implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactive:$coroutinesVersion")
  implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:$coroutinesVersion")
  implementation("org.jetbrains.kotlinx:kotlinx-coroutines-rx3:$coroutinesVersion")
  ...
}

Watch and read more

If you run into any trouble