Kotlin: How Kotlin coroutines are better than RxKotlin?

Kotlin Programming

Question or issue of Kotlin Programming:

Why would I want to use Kotlin’s coroutines?

It seems that the RxKotlin library is much more versatile.
Kotlin’s coroutines look significantly less powerful and more cumbersome to use in comparison.

I base my opinion on coroutines on this design talk by Andrey Breslav (JetBrains)

Slideshow from the talk is accessible here.

EDIT (thanks to @hotkey):

Better source on the current state of coroutines here.

How to solve this issue?

Solution no. 1:

Disclaimer: Parts of this answer are irrelevant since Coroutines now have the flow API, very similar to Rx one. If you want an up-to-date answer, jump to the last edit.

There is two parts in Rx; the Observable pattern, and a solid set of operators to manipulate, transform and combine them. The Observable pattern, by itself, doesn’t do a lot. Same with Coroutines; it’s just another paradigm to deal with asynchronism. You can compare the pro/cons of callbacks, Observable and coroutines to solve a given problem, but you can’t compare a paradigm with a fully featured library. It’s like comparing a language with a framework.

How Kotlin coroutines are better than RxKotlin ? Didn’t used coroutines yet, but it’s look similar to async/wait in C#. You just write sequential code, everything is as easy as writing synchronous code … except it execute asynchronously. It’s easier to grasp.

Why would I want to use kotlin coroutines ? I will answer for myself. Most of the time I will stick to Rx, because I favor event-driven architecture. But should arise the situation where I am writing sequential code, and I need to call an asynchronous method in the middle, I will happily leverage coroutines to keep it that way and avoiding wrapping everything in Observable.

Edit: Now that I am using coroutines it’s time for an update.

RxKotlin is just syntactic sugar to use RxJava in Kotlin, so I will speak about RxJava and not RxKotlin in the following. Coroutines are a lower lever and more general concept than RxJava, they serve others use-cases. That said, there is one use-case where you could compare RxJava and coroutines (channel), it’s passing around data asynchronously. Coroutines have a clear advantage over RxJava here:

Coroutines are better to deal with resources
  • In RxJava you can assign computations to schedulers but subscribeOn() and ObserveOn()are confusing. Every coroutine is given a thread context and return to parent context. For a channel, both side (producer, consumer) execute on his own context. Coroutines are more intuitive on thread or thread pool affectation.
  • Coroutines give more control on when those computation occur. You can for example pass hand (yield), prioritize (select), parallelize (multiple producer/actor on channel) or lock resource (Mutex) for a given computation. It may not matter on server (where RxJava came first) but on resources limited environment this level of control may be required.
  • Due to it’s reactive nature, backpressure doesn’t fit well in RxJava. In the other end send() to channel is a suspensive function that suspend when channel capacity is reached. It’s out-of-the-box backpressure given by nature. You could also offer() to channel, in which case the call never suspend but return false in case the channel is full, effectively reproducing onBackpressureDrop() from RxJava. Or you could just write your own custom backpressure logic, which won’t be difficult with coroutines, especially compared to do the same with RxJava.

There is another use-case, where coroutines shine and this will answer your second question “Why would I want to use Kotlin coroutines?”. Coroutines are the perfect replacement for background threads or AsyncTask (Android). It’s as easy as launch { someBlockingFunction() }. Of course you could achieve this with RxJava too, using Schedulers and Completable perhaps. You won’t (or little) use the Observer pattern and the operators which are the signature of RxJava, a hint that this work is out of scope for RxJava. RxJava complexity (a useless tax here) will make your code more verbose and less clean than Coroutine’s version.

Readability matters. On this regard, RxJava and coroutines approach differ a lot. Coroutines are simpler than RxJava. If you are not at ease with map(), flatmap() and functional reactive programming in general, coroutines manipulations are easier, involving basics instructions: for, if, try/catch … But I personally find coroutine’s code harder to understand for non-trivial tasks. Especially it involves more nesting and indentation whereas operator chaining in RxJava keep everything in line. Functional-style programming make processing more explicit. On top of that RxJava can solve complex transformations with a few standard operators from their rich (OK, way too rich) operator set. RxJava shine when you have complex data flows requiring a lot of combinations and transformations.

I hope those considerations will help you choose the right tool given your needs.

Edit: Coroutine now have flow, an API very, very similar to Rx. One could compare pro/cons of each, but the truth is the differences are minor.

Coroutines as it’s core is a concurrency design pattern, with add-on libraries, one of those being a stream API similar to Rx. Obviously, Coroutines having a far broader scope than Rx, there is a lot of things that Coroutines can that Rx can’t, and I can’t list them all. But usually if I use Coroutines in one of my project it boil down to one reason:

Coroutines are better at removing callback from your code

I avoid using callback wich harm readability too much. Coroutines make asynchronous code simple and easy to write. By leveraging the suspend keyword, your code look like synchronous one.

I have seen Rx used in project mostly for the same purpose of replacing callback, but if you don’t plan to modify your architecture to commit to the reactive pattern, Rx will be a burden. Consider this interface:

interface Foo {
   fun bar(callback: Callback)
}

The Coroutine equivalent is more explicit, with a return type and the keyword suspend indicating it’s an asynchronous operation.

interface Foo {
   suspend fun bar: Result
}

But there is a problem with the Rx equivalent:

interface Foo {
   fun bar: Single
}

When you call bar() in the callback or Coroutine version, you trigger the computation; with the Rx version, you get a representation of a computation that you can trigger at will. You need to call bar() then subscribing to the Single. Usually not a big deal, but it’s a little confusing for beginner and can lead to subtle problem.

One exemple of such problems, suppose the callback bar function is implemented as such:

fun bar(callback: Callback) {
   setCallback(callback)
   refreshData()
}

If you don’t port it properly, you will end with a Single that can be triggered only once because refreshData() is called in bar() function and not at subscription time. A beginner mistake, granted, but the thing is Rx is way more than a callback replacement and a lot of developers struggle to grasp Rx.

If your objective is to transform an asynchronous task from callback to a nicer paradigm, Coroutines are a perfect fit whereas Rx add some complexity.

Solution no. 2:

Kotlin coroutines are different from Rx. It is hard to compare them apples-to-apples, because Kotlin coroutines are a thin language feature (with just a couple of basic concepts and a few basic functions to manipulate them), while Rx is a pretty heavy library with quite large variety of ready-to-use operators. Both are designed to address a problem of asynchronous programming, however their approach to solution is very different:

  • Rx comes with a particular functional style of programming that can be implemented in virtually any programming language without support from the language itself. It works well when the problem at hand easily decomposes into a sequence of standard operators and not so well otherwise.

  • Kotlin coroutines provide a language feature that let library writers implement various asynchronous programming styles, including, but not limited to functional reactive style (Rx). With Kotlin coroutines you can also write your asynchronous code in imperative style, in promise/futures-based style, in actor-style, etc.

It is more appropriate to compare Rx with some specific libraries that are implemented based on Kotlin coroutines.

Take kotlinx.coroutines library as one example. This library provides a set of primitives like async/await and channels that are typically baked into other programming languages. It also has support for light-weight future-less actors. You can read more in the Guide to kotlinx.coroutines by example.

Channels provided by kotlinx.coroutines can replace or augment Rx in certain use-cases. There is a separate Guide to reactive streams with coroutines that goes deeper into similarities and differences with Rx.

Solution no. 3:

I know RxJava very well and I’ve recently switched to Kotlin Coroutines and Flow.

RxKotlin is basically the same as RxJava, it just adds some syntactic sugar to make it more comfortable / idiomatic writing RxJava code in Kotlin.

A “fair” comparison between RxJava and Kotlin Coroutines should include Flow in the mix and I’m gonna try to explain why here. This is gonna be a bit long but I’ll try to keep it as simple as I can with examples.

With RxJava you have different objects (since version 2):

// 0-n events without backpressure management
fun observeEventsA(): Observable

// 0-n events with explicit backpressure management
fun observeEventsB(): Flowable

// exactly 1 event
fun encrypt(original: String): Single

// 0-1 events
fun cached(key: String): Maybe

// just completes with no specific results
fun syncPending(): Completable

In kotlin coroutines + flow you do not need many entities cause if you do not have a stream of events you can just use simple coroutines (suspending functions):

// 0-n events, the backpressure is automatically taken care off
fun observeEvents(): Flow

// exactly 1 event
suspend fun encrypt(original: String): String

// 0-1 events
suspend fun cached(key: String): MyData?

// just completes with no specific results
suspend fun syncPending()

Bonus: Kotlin Flow / Coroutines support null values (support removed with RxJava 2)

What about the operators?

With RxJava you have so many operators (map, filter, flatMap, switchMap, …), and for most of them there’s a version for each entity type (Single.map(), Observable.map(), …).

Kotlin Coroutines + Flow do not need that many operators, let’s see why with some example on the most common operators

map()

RxJava:

fun getPerson(id: String): Single
fun observePersons(): Observable

fun getPersonName(id: String): Single {
  return getPerson(id)
     .map { it.firstName }
}

fun observePersonsNames(): Observable {
  return observePersons()
     .map { it.firstName }
}

Kotlin coroutines + Flow

suspend fun getPerson(id: String): Person
fun observePersons(): Flow

suspend fun getPersonName(id: String): String? {
  return getPerson(id).firstName
}

fun observePersonsNames(): Flow {
  return observePersons()
     .map { it.firstName }
}

You do not need an operator for the “single” case and it is fairly similar for the Flow case.

flatMap()

Say you need, for each person, to grab from a database (or remote service) it’s insurance

RxJava

fun fetchInsurance(insuranceId: String): Single

fun getPersonInsurance(id: String): Single {
  return getPerson(id)
    .flatMap { person ->
      fetchInsurance(person.insuranceId)
    }
}

fun obseverPersonsInsurances(): Observable {
  return observePersons()
    .flatMap { person ->
      fetchInsurance(person.insuranceId) // this is a Single
          .toObservable() // flatMap expect an Observable
    }
}

Let’s see with Kotlin Coroutiens + Flow

suspend fun fetchInsurance(insuranceId: String): Insurance

suspend fun getPersonInsurance(id: String): Insurance {
  val person = getPerson(id)
  return fetchInsurance(person.insuranceId)
}

fun obseverPersonsInsurances(): Flow {
  return observePersons()
    .map { person ->
      fetchInsurance(person.insuranceId)
    }
}

Like before, with the simple coroutine case we do not need operators, we just write the code like we would if it wasn’t async, just using suspending functions.

And with Flow that is NOT a typo, there’s no need for a flatMap operator, we can just use map. And the reason is that map lambda is a suspending function! We can execute suspending code in it!!!

We don’t need another operator just for that.

For more complex stuff you can use the Flow transform() operator.

Every Flow operator accept a suspending function!

so if you need to filter() but your filter need to perform a network call you can!

fun observePersonsWithValidInsurance(): Flow {
  return observerPersons()
    .filter { person ->
        val insurance = fetchInsurance(person.insuranceId)
        insurance.isValid()
    }
}

delay(), startWith(), concatWith(), …

In RxJava you have many operators for applying delay or adding items before and after:

  • delay()
  • delaySubscription()
  • startWith(T)
  • startWith(Observable)
  • concatWith(…)

with kotlin Flow you can simply:

grabMyFlow()
  .onStart {
    // delay by 3 seconds before starting
    delay(3000L)
    // just emitting an item first
    emit("First item!")
    emit(cachedItem()) // call another suspending function and emit the result
  }
  .onEach { value ->
    // insert a delay of 1 second after a value only on some condition
    if (value.length() > 5) {
      delay(1000L)
    }
  }
  .onCompletion {
    val endingSequence: Flow = grabEndingSequence()
    emitAll(endingSequence)
  }

error handling

RxJava have lot of operators to handle errors:

  • onErrorResumeWith()
  • onErrorReturn()
  • onErrorComplete()

with Flow you don’t need much more than the operator catch():

  grabMyFlow()
    .catch { error ->
       // emit something from the flow
       emit("We got an error: $error.message")
       // then if we can recover from this error emit it
       if (error is RecoverableError) {
          // error.recover() here is supposed to return a Flow<> to recover
          emitAll(error.recover())
       } else {
          // re-throw the error if we can't recover (aka = don't catch it)
          throw error
       }
    }

and with suspending function you can just use try {} catch() {}.

easy to write Flow operators

Due to the coroutines powering Flow under the hood it is way easier to write operators. If you ever checked an RxJava operator you would see how hard it is and how many things you need to learn.

Writing Kotlin Flow operators is easier, you can get an idea just by looking at the source code of the operators that are already part of Flow here. The reason is coroutines makes it easier to write async code and operators just feels more natural to use.

As a bonus, Flow operators are all kotlin Extension Functions, which means either you or libraries can easily add operators and they will not feel weird to use (ex. observable.lift() or observable.compose()).

Upstream thread doesn’t leak downstream

What does this even mean?

Let’s take this RxJava example:

urlsToCall()
  .switchMap { url ->
    if (url.scheme == "local") {
       val data = grabFromMemory(url.path)
       Flowable.just(data)
    } else {
       performNetworkCall(url)
        .subscribeOn(Subscribers.io())
        .toObservable()
    }
  }
  .subscribe {
    // in which thread is this call executed?
  }

So where is the callback in subscribe executed?

The answer is:


depends…

if it comes from the network it’s in an IO thread; if it comes from the other branch it is undefined, depends on which thread is used to send the url.

This is the concept of “upstream thread leaking downstream”.

With Flow and Coroutines this is not the case, unless you explicitly require this behavior (using Dispatchers.Unconfined).

suspend fun myFunction() { // execute this coroutine body in the main thread withContext(Dispatchers.Main) { urlsToCall() .conflate() // to achieve the effect of switchMap .transform { url -> if (url.scheme == "local") { val data = grabFromMemory(url.path) emit(data) } else { withContext(Dispatchers.IO) { performNetworkCall(url) } } } .collect { // this will always execute in the main thread // because this is where we collect, // inside withContext(Dispatchers.Main) } } } 

Coroutines code will run in the context that they have been executed into. And only the part with the network call will run on the IO thread, while everything else we see here will run on the main thread.

Well, actually, we don’t know where code inside grabFromMemory() will run, if it’s a suspending function we only know that it will be called inside the Main thread, but inside that suspending function we could have another Dispatcher being used, but when it will get back with the result val data this will be again in the main thread.

Which means, looking at a piece of code, it’s easier to tell in which thread it will run, if you see an explicit Dispatcher = it’s that dispatcher, if you do not see it: in whatever thread dispatcher the suspension call you are looking at is being called.

Structured Concurrency

This is not a concept invented by kotlin, but it is something they embraced more than any other language I know of.

If what i explain here is not enough for you read this article or watch this video.

So what is it?

With RxJava you subscribe to observables, and they give you a Disposable object.

You need to take care of disposing of it when it’s not needed anymore. So what you usually do is keep a reference to it (or put it in a CompositeDisposable) to later call dispose() on it when it’s not needed anymore. If you don’t the linter will give you a warning.

RxJava is somewhat nicer than a traditional thread. When you create a new thread and execute something on it, it’s a “fire and forget”, you do not even get a way to cancel it: Thread.stop() is deprecated, harmful, and recent implementation actually do nothing. Thread.interrupt() makes your thread fail etc.. Any exceptions goes lost.. you get the picture.

With kotlin coroutines and flow they reverse the “Disposable” concept. You CANNOT create a coroutine without a CoroutineContext.

This context define the scope of your coroutine. Every child coroutine spawned inside that one will share the same scope.

If you subscribe to a flow you have to be inside a coroutine or provide a scope too.

You can still keep reference of the coroutines you start (Job) and cancel them. This will cancel every child of that coroutine automatically.

If you are an Android developer they give you these scopes automatically. Example: viewModelScope and you can launch coroutines inside a viewModel with that scope knowing they will automatically be cancelled when the viewmodel is cleared.

viewModelScope.launch { // my coroutine here } 

Some scope will terminate if any children fail, some other scope will let each children leave his own lifecycle without stopping other children if one fails (SupervisedJob).

Why is this a good thing?

Let me try to explain it like Roman Elizarov did.

Some old programming language had this concept of goto which basically let you jump from one line of code to another at will.

Very powerful, but if abused you could end up with very hard to understand code, difficult to debug and reason upon.

So new programming languages eventually completely removed it from the language.

When you use if or while or when it is way easier to reason on the code: doesn’t matter what happens inside those blocks, you’ll eventually come out of them, it’s a “context”, you don’t have weird jumps in and out.

Launching a thread or subscribing to an RxJava observable is similar to the goto: you are executing code which than will keep going until “elsewhere” is stopped.

With coroutines, by demanding you provide a context/scope, you know that when your scope is over everything inside that coroutines will complete when your context completes, doesn’t matter if you have a single coroutines or 10 thousands.

You can still “goto” with coroutines by using GlobalScope, which you shouldn’t for the same reason you shouldn’t use goto in languages that provides it.

Any Drawback?

Flow is still in development and some features available in RxJava right now are still not available in Kotlin Coroutines Flow.

The big missing, right now, is share() operators and its friends (publish(), replay() etc…)

They are actually in advanced state of development and expected to be released soon (shortly after the already released kotlin 1.4.0), you can see the API design here:

Solution no. 4:

The talk/doc you linked does not talk about channels. Channels are what fill the gap between your current understanding of coroutines and event driven programming.

With coroutines and channels you can do event driven programming as you are probably used to do with rx, but you can do it with synchronous-looking code and without as many “custom” operators.

If you want to understand this better I suggest to look outside of kotlin, where those concepts are more mature and refined (not experimental). Look at core.async from Clojure, Rich Hickey videos, posts and related discussions.

  • http://discuss.purelyfunctional.tv/t/core-async-channels-vs-rx-observables/519/2
  • https://github.com/matthiasn/talk-transcripts/blob/master/Hickey_Rich/CoreAsync.md

Solution no. 5:

Coroutines are designed to provide a lightweight asynchronous programming framework. Lightweight in terms of resources needed to start the async job. Coroutines don’t enforce using an external API and are more natural for the users (programmers). In contrast, RxJava + RxKotlin has an additional data processing package that is not really needed in Kotlin which has a really rich API in the standard library for Sequences and Collections processing.

If you’d like to see more about the practical use of coroutines on Android I can recommend my article:
https://www.netguru.com/codestories/android-coroutines-%EF%B8%8Fin-2020

Hope this helps!