diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 7ad12913be..2bdce31f6f 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -520,7 +520,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com try { func(this) } catch { - case e: Exception => EventHandler notify EventHandler.Error(e, this) + case e => EventHandler notify EventHandler.Error(e, this) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index 9eb46d5c30..d12ad7463f 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -35,7 +35,7 @@ final case class FutureInvocation(future: CompletableFuture[Any], function: () = def run = future complete (try { Right(function.apply) } catch { - case e: Exception => + case e => EventHandler.error(e, this, e.getMessage) Left(e) }) @@ -203,6 +203,11 @@ trait MessageDispatcher { * Returns the size of the mailbox for the specified actor */ def mailboxSize(actorRef: ActorRef): Int + + /** + * Returns the size of the Future queue + */ + def futureQueueSize: Int = futures.size } /** diff --git a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala index 3a9a3b258e..dfd94b40b5 100644 --- a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala @@ -300,4 +300,28 @@ class FutureSpec extends JUnitSuite { assert(latch.tryAwait(5, TimeUnit.SECONDS)) actor.stop } + + @Test def shouldHandleThrowables { + class ThrowableTest(m: String) extends Throwable(m) + + val f1 = Future { throw new ThrowableTest("test") } + f1.await + intercept[ThrowableTest] { f1.resultOrException } + + val latch = new StandardLatch + val f2 = Future { latch.tryAwait(5, TimeUnit.SECONDS); "success" } + f2 foreach (_ => throw new ThrowableTest("dispatcher foreach")) + f2 receive { case _ => throw new ThrowableTest("dispatcher receive") } + val f3 = f2 map (s => s.toUpperCase) + latch.open + f2.await + assert(f2.resultOrException === Some("success")) + f2 foreach (_ => throw new ThrowableTest("current thread foreach")) + f2 receive { case _ => throw new ThrowableTest("current thread receive") } + f3.await + assert(f3.resultOrException === Some("SUCCESS")) + + // make sure all futures are completed in dispatcher + assert(Dispatchers.defaultGlobalDispatcher.futureQueueSize === 0) + } }