Prevent throwables thrown in futures from disrupting the rest of the system. Fixes #710
This commit is contained in:
parent
29e7328bdb
commit
32be3162bf
3 changed files with 31 additions and 2 deletions
|
|
@ -520,7 +520,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com
|
||||||
try {
|
try {
|
||||||
func(this)
|
func(this)
|
||||||
} catch {
|
} catch {
|
||||||
case e: Exception => EventHandler notify EventHandler.Error(e, this)
|
case e => EventHandler notify EventHandler.Error(e, this)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,7 @@ final case class FutureInvocation(future: CompletableFuture[Any], function: () =
|
||||||
def run = future complete (try {
|
def run = future complete (try {
|
||||||
Right(function.apply)
|
Right(function.apply)
|
||||||
} catch {
|
} catch {
|
||||||
case e: Exception =>
|
case e =>
|
||||||
EventHandler.error(e, this, e.getMessage)
|
EventHandler.error(e, this, e.getMessage)
|
||||||
Left(e)
|
Left(e)
|
||||||
})
|
})
|
||||||
|
|
@ -203,6 +203,11 @@ trait MessageDispatcher {
|
||||||
* Returns the size of the mailbox for the specified actor
|
* Returns the size of the mailbox for the specified actor
|
||||||
*/
|
*/
|
||||||
def mailboxSize(actorRef: ActorRef): Int
|
def mailboxSize(actorRef: ActorRef): Int
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the size of the Future queue
|
||||||
|
*/
|
||||||
|
def futureQueueSize: Int = futures.size
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -300,4 +300,28 @@ class FutureSpec extends JUnitSuite {
|
||||||
assert(latch.tryAwait(5, TimeUnit.SECONDS))
|
assert(latch.tryAwait(5, TimeUnit.SECONDS))
|
||||||
actor.stop
|
actor.stop
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test def shouldHandleThrowables {
|
||||||
|
class ThrowableTest(m: String) extends Throwable(m)
|
||||||
|
|
||||||
|
val f1 = Future { throw new ThrowableTest("test") }
|
||||||
|
f1.await
|
||||||
|
intercept[ThrowableTest] { f1.resultOrException }
|
||||||
|
|
||||||
|
val latch = new StandardLatch
|
||||||
|
val f2 = Future { latch.tryAwait(5, TimeUnit.SECONDS); "success" }
|
||||||
|
f2 foreach (_ => throw new ThrowableTest("dispatcher foreach"))
|
||||||
|
f2 receive { case _ => throw new ThrowableTest("dispatcher receive") }
|
||||||
|
val f3 = f2 map (s => s.toUpperCase)
|
||||||
|
latch.open
|
||||||
|
f2.await
|
||||||
|
assert(f2.resultOrException === Some("success"))
|
||||||
|
f2 foreach (_ => throw new ThrowableTest("current thread foreach"))
|
||||||
|
f2 receive { case _ => throw new ThrowableTest("current thread receive") }
|
||||||
|
f3.await
|
||||||
|
assert(f3.resultOrException === Some("SUCCESS"))
|
||||||
|
|
||||||
|
// make sure all futures are completed in dispatcher
|
||||||
|
assert(Dispatchers.defaultGlobalDispatcher.futureQueueSize === 0)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue