Core.async is not a substitute for Rx.<p>As a disclaimer I'm the author of an Rx-inspired library for Scala [1] and that also works for Scala.js in the browser. Shameless plug aside, Scala also has Future/Promise in its standard library and now due to macros support it got scala/async [2], a library that gives you the "await" keyword in Scala, so in Scala you also get this kind of M:N multithreading that looks like synchronous code. This in addition to Akka and other possibilities.<p>In other words I've worked with both approaches and Core.async is not comparable with Rx. I do understand the author's woes, as sometimes the Rx model is misapplied, plus it's hard to understand for the unfamiliar.<p>One of my colleagues was complaining once that "<i>but I don't know what the debounce operator does and it's hard for me to read that</i>". And I told him: <i>yeah dude, but try implementing the logic in debounce by yourself and see how readable that is</i>.<p>And that's exactly why Rx is problematic. On one hand because stream processing is fundamentally hard, no matter what model you choose. And on the other hand Rx comes with a lot of useful operators that do a lot for you, but then you have to learn about them.<p>For the naysayers, I'll just leave this piece of code with a challenge for implementing it with core.async and compare in terms of readability and note this is a copy paste from actual production code ...<p><pre><code> commands
.groupBy(w => (w.assetID, w.commandID))
.mergeMap { gr =>
gr.timeout(30.seconds, Observable.empty)
.throttleLast(1.second)
.distinctUntilChanged
.echoRepeated(5.seconds)
.whileBusyBuffer(DropOld(30))
}
</code></pre>
What it does is to split the signals for each asset and command, for each of these it's supposed to sample the signal by 1 second, but in case the same value is repeated over and over again or in case the channel goes silent, then the last value will end up signaled every 5 seconds (reducing the traffic to our OpenTSDB). Finally for each key we close the stream after 30 seconds of inactivity. And then for each such key it does buffering of at most 30 elements and in case the consumers are too slow, then these buffers start dropping older elements on overflow. And then we merge everything back.<p>I would also show you how we are modeling state machines with the "scan" operator, state machines that are evolved from signals coming from multiple sources, but the sample would be too long. In any case "scan" allows you to use pure functions and data-structures, so you can test your business logic without interactions to third party services, mocks, stubs or whatever.<p>I have to deal with such code all the time. And I've seen such code implemented in a classic fashion as well. You basically end up with Maps storing stuff and with ifs and whiles and with manual timers in an unholy dance of mutation so hard to understand and debug that it would make grown men cry.<p>But then such solutions are not silver bullets. Rx, CSP, futures, actors are not silver bullets to be applied everywhere, with all of them having a sweet spot for which they excel. I'm actually using Rx, actors, futures, scala/sync in the same project and it's great.<p>Also, one last note: Rx is not FRP ;-)<p>[1] <a href="https://github.com/alexandru/monifu/" rel="nofollow">https://github.com/alexandru/monifu/</a><p>[2] <a href="https://github.com/scala/async" rel="nofollow">https://github.com/scala/async</a>