Tweaking the interrupt restore it and breaking out of throughput
This commit is contained in:
parent
11cbebbee4
commit
f3c019df8c
4 changed files with 16 additions and 11 deletions
|
|
@ -217,12 +217,6 @@ abstract class ActorModelSpec extends JUnitSuite {
|
||||||
|
|
||||||
protected def newInterceptedDispatcher: MessageDispatcherInterceptor
|
protected def newInterceptedDispatcher: MessageDispatcherInterceptor
|
||||||
|
|
||||||
@After
|
|
||||||
def after {
|
|
||||||
//remove the interrupted status since we are messing with interrupted exceptions.
|
|
||||||
Thread.interrupted()
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def dispatcherShouldDynamicallyHandleItsOwnLifeCycle {
|
def dispatcherShouldDynamicallyHandleItsOwnLifeCycle {
|
||||||
implicit val dispatcher = newInterceptedDispatcher
|
implicit val dispatcher = newInterceptedDispatcher
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ package akka.testkit
|
||||||
|
|
||||||
import akka.actor.dispatch.ActorModelSpec
|
import akka.actor.dispatch.ActorModelSpec
|
||||||
import java.util.concurrent.CountDownLatch
|
import java.util.concurrent.CountDownLatch
|
||||||
|
import org.junit.{After, Test}
|
||||||
|
|
||||||
class CallingThreadDispatcherModelSpec extends ActorModelSpec {
|
class CallingThreadDispatcherModelSpec extends ActorModelSpec {
|
||||||
import ActorModelSpec._
|
import ActorModelSpec._
|
||||||
|
|
@ -42,6 +43,13 @@ class CallingThreadDispatcherModelSpec extends ActorModelSpec {
|
||||||
//Can't handle this...
|
//Can't handle this...
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@After
|
||||||
|
def after {
|
||||||
|
//remove the interrupted status since we are messing with interrupted exceptions.
|
||||||
|
Thread.interrupted()
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// vim: set ts=2 sw=2 et:
|
// vim: set ts=2 sw=2 et:
|
||||||
|
|
|
||||||
|
|
@ -700,7 +700,7 @@ class LocalActorRef private[akka](private[this] val actorFactory: () ⇒ Actor,
|
||||||
} catch {
|
} catch {
|
||||||
case e: InterruptedException ⇒
|
case e: InterruptedException ⇒
|
||||||
handleExceptionInDispatch(e, messageHandle.message)
|
handleExceptionInDispatch(e, messageHandle.message)
|
||||||
Thread.currentThread().interrupt() //Restore interrupt
|
throw e
|
||||||
case e ⇒
|
case e ⇒
|
||||||
handleExceptionInDispatch(e, messageHandle.message)
|
handleExceptionInDispatch(e, messageHandle.message)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -193,10 +193,13 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue ⇒
|
||||||
def dispatcher: Dispatcher
|
def dispatcher: Dispatcher
|
||||||
|
|
||||||
final def run = {
|
final def run = {
|
||||||
try { processMailbox()} finally {dispatcherLock.unlock()}
|
try { processMailbox() } catch {
|
||||||
|
case ie: InterruptedException => Thread.currentThread().interrupt() //Restore interrupt
|
||||||
if (!self.isEmpty)
|
} finally {
|
||||||
dispatcher.reRegisterForExecution(this)
|
dispatcherLock.unlock()
|
||||||
|
if (!self.isEmpty)
|
||||||
|
dispatcher.reRegisterForExecution(this)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue