diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 1a8930437e..7c8cb8948d 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -895,6 +895,13 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa } Await.ready(complex, timeout.duration) must be('completed) } + + "should capture first exception with dataflow" in { + import Future.flow + val f1 = flow { 40 / 0 } + intercept[java.lang.ArithmeticException](Await result (f1, TestLatch.DefaultTimeout)) + } + } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index d7b4f17922..b67feaf11a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -307,7 +307,11 @@ object Future { def flow[A](body: ⇒ A @cps[Future[Any]])(implicit executor: ExecutionContext): Future[A] = { val p = Promise[A] dispatchTask({ () ⇒ - (reify(body) foreachFull (p success, p failure): Future[Any]) onFailure { + try { + (reify(body) foreachFull (p success, p failure): Future[Any]) onFailure { + case NonFatal(e) ⇒ p tryComplete Left(e) + } + } catch { case NonFatal(e) ⇒ p tryComplete Left(e) } }, true)