Merge pull request #356 from jboner/wip-ticket-1869

Capture early exception within Future.flow, fixes #1869
This commit is contained in:
viktorklang 2012-02-27 23:59:55 -08:00
commit b329bf3718
2 changed files with 12 additions and 1 deletions

View file

@ -895,6 +895,13 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
Await.ready(complex, timeout.duration) must be('completed)
}
"should capture first exception with dataflow" in {
import Future.flow
val f1 = flow { 40 / 0 }
intercept[java.lang.ArithmeticException](Await result (f1, TestLatch.DefaultTimeout))
}
}
}

View file

@ -307,7 +307,11 @@ object Future {
def flow[A](body: A @cps[Future[Any]])(implicit executor: ExecutionContext): Future[A] = {
val p = Promise[A]
dispatchTask({ ()
(reify(body) foreachFull (p success, p failure): Future[Any]) onFailure {
try {
(reify(body) foreachFull (p success, p failure): Future[Any]) onFailure {
case NonFatal(e) p tryComplete Left(e)
}
} catch {
case NonFatal(e) p tryComplete Left(e)
}
}, true)