Replies: 4 comments 14 replies
-
I think it's worth pointing out that you can argue the kotlin model is also broken: if you nest a couple of Btw, this translation of your initial example to pure CE could help other people reading: object Main1 extends IOApp.Simple {
def run: IO[Unit] = {
val task =
IO.deferred[Unit]
.bracket { await =>
val task1 =
await.get.uncancelable
.onCancel(IO.println("ERROR: Cancelled task1!"))
val task2 = IO.sleep(30.seconds) >> await.complete(()).void
(task1, task2).parTupled.void
} { await =>
IO.println("cancelling everything") >> await.complete(()).void
}
task.timeout(3.seconds)
}
} |
Beta Was this translation helpful? Give feedback.
-
As for your semantics, I'd do object Main1 extends IOApp.Simple {
def run: IO[Unit] = {
val task =
alexBracket(
IO.deferred[Unit]
)(await => IO.println("cancelling everything") >> await.complete(()).void) {
await =>
val task1 =
await.get.uncancelable
.onCancel(IO.println("ERROR: Cancelled task1!"))
val task2 = IO.sleep(30.seconds) >> await.complete(())
(task2, task1).parTupled.void
}
task.timeout(3.seconds)
}
def alexBracket[A, B](acquire: IO[A])(release: A => IO[Unit])(
f: A => IO[B]
): IO[B] =
IO.uncancelable { poll =>
acquire.flatMap { a =>
f(a).start.flatMap { fiber =>
poll(fiber.joinWithNever).guarantee(release(a))
}
}
}
} [info] running Main1
cancelling everything
[error] java.util.concurrent.TimeoutException: 3 seconds
[error] at timeout @ Main1$.run(Playground.scala:41)
[error] at timeout @ Main1$.run(Playground.scala:41)
[error] at timeout @ Main1$.run(Playground.scala:41)
[error] at main$ @ Main1$.main(Playground.scala:25) |
Beta Was this translation helpful? Give feedback.
-
@SystemFw Here is another variation, and for this one I really think Cats-Effect could do a better job by cancelling children fibers when the main fiber gets cancelled: #!/usr/bin/env -S scala-cli shebang -q
//> using scala "2.13.9"
//> using lib "org.typelevel::cats-effect::3.3.12"
//> using lib "org.apache.commons:commons-text:1.9"
import cats.effect.{ExitCode, IO, IOApp}
import cats.syntax.all._
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.CountDownLatch
import scala.concurrent.duration._
object Main extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
val task = IO {
new CountDownLatch(1)
}.bracket { isActive =>
// This is now cancelable
val task1 =
IO.interruptible {
try {
isActive.await()
} catch {
case e: java.lang.InterruptedException =>
println("Cancelled task1 (1)")
throw e
}
}.onCancel {
IO(println("Cancelled task1 (2)"))
}
val task2 = IO.sleep(30.seconds) *> IO {
isActive.countDown()
}
// Using fiber.start instead of parMap
for {
fiber1 <- task1.start
_ <- task2
_ <- fiber1.joinWithNever
} yield {
ExitCode.Success
}
} { isActive =>
IO {
println("Cancelling everything...")
isActive.countDown()
}
}
// Doesn't work ;-)
task.timeout(3.seconds)
}
} This variation that uses ///usr/bin/env jbang "$0" "$@" ; exit $?
//JAVA 17+
//KOTLIN 1.7.20
//DEPS org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.4
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.runInterruptible
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeout
import java.util.concurrent.CountDownLatch
import kotlin.time.Duration.Companion.seconds
suspend fun run() =
coroutineScope {
val isActive = CountDownLatch(1)
try {
// This one is now cancelable (due to `runInterruptible`)
val task1 = launch {
runInterruptible(Dispatchers.IO) {
try {
isActive.await()
} catch (e: InterruptedException) {
println("Cancelling task1...")
throw e
}
}
}
val task2 = launch {
delay(30.seconds)
isActive.countDown()
}
// Note: the ordering of these joins doesn't matter
task1.join()
task2.join()
println("Done!")
} finally {
println("Stopping everything...")
isActive.countDown()
}
}
fun main() = runBlocking {
withTimeout(3.seconds) {
run()
}
} The difference in behavior is that Kotlin sends the cancelation signal to the forked task, whereas Cats-Effect does not. This is what they call "structured concurrency", meaning that forked tasks have to complete before their scope (the main scope or thread) completes as well, and also, when the main scope gets cancelled (or finishes in error), this cancels all children as well. |
Beta Was this translation helpful? Give feedback.
-
Lots of good stuff in here but just replying quickly… The reason that Cats Effect doesn't kill child fibers when the parents die is simply because structured concurrency is less general than unstructured. What you're asking for is structured concurrency, just with a more complex structure than the way that we normally think of it (i.e. more complex than something like Of course, we could resolve this in the same way ZIO does by having two different primitive A lot of this comes back to the fact that |
Beta Was this translation helpful? Give feedback.
-
Hi all,
Sorry for reviving a dead discussion 🙂 but the feeling that the current behavior is wrong never went away.
I was prepared to submit a bug report, until I realized that Cats-Effect works as intended. For instance, this doesn't work as expected, the script will terminate after 30 seconds, instead of 3:
This program doesn't work by design because
bracket
'srelease
cannot be executed untiluse
is either complete or cancelled. However, I'd like to give Kotlin's Coroutines as a sample that does work as expected:Looking at these 2 pieces of code, the Kotlin sample is more clear, it does the right thing (cancels after exactly 3 seconds, and releases any resources, no leaks), and I can understand it without needing special education. For the Cats-Effect sample, if someone asked — what is the equivalent of
try-finally
or oftry-with-resources
, I'd say thatbracket
is, but that isn't true, as exemplified by the above program.This example is a minified version of a bigger example that happens in real-life. Here's the Kotlin code that, again, does the right thing:
Converting this sample to Cats-Effect is surprisingly difficult, with the naive translation being wrong, for the reasons stated above:
Now that I think of it, I can probably do
awaitStdout.start
and thenfiber.join
. Which may work, but that's only because (AFAIK) child fibers don't get cancelled if the parent fiber was cancelled.So what's the right translation of that Kotlin sample? Anything obvious that I'm not seeing?
Beta Was this translation helpful? Give feedback.
All reactions