ticket 972

This commit is contained in:
Peter Veentjer 2011-07-15 08:12:15 +03:00
parent 0e933d2442
commit f93624e7e0
50 changed files with 269 additions and 140 deletions

View file

@ -14,10 +14,13 @@ import org.scalatest.matchers.MustMatchers
class Ticket669Spec extends WordSpec with MustMatchers with BeforeAndAfterAll {
import Ticket669Spec._
override def beforeAll = Thread.interrupted() //remove interrupted status.
override def afterAll = Actor.registry.local.shutdownAll
"A supervised actor with lifecycle PERMANENT" should {
"be able to reply on failure during preRestart" in {
val latch = new CountDownLatch(1)
val sender = Actor.actorOf(new Sender(latch)).start()

View file

@ -4,33 +4,49 @@
package akka.actor.dispatch
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import org.scalatest.Assertions._
import akka.testkit.Testing
import akka.dispatch._
import akka.actor.Actor._
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{ ConcurrentHashMap, CountDownLatch, TimeUnit }
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit}
import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor
import akka.util.{ Duration, Switch }
import org.multiverse.api.latches.StandardLatch
import akka.actor.{ ActorKilledException, PoisonPill, ActorRef, Actor }
import akka.util.Switch
import akka.actor.{ActorKilledException, PoisonPill, ActorRef, Actor}
import java.rmi.RemoteException
import org.junit.{After, Test}
object ActorModelSpec {
sealed trait ActorModelMessage
case class Reply_?(expect: Any) extends ActorModelMessage
case class Reply(expect: Any) extends ActorModelMessage
case class Forward(to: ActorRef, msg: Any) extends ActorModelMessage
case class CountDown(latch: CountDownLatch) extends ActorModelMessage
case class Increment(counter: AtomicLong) extends ActorModelMessage
case class Await(latch: CountDownLatch) extends ActorModelMessage
case class Meet(acknowledge: CountDownLatch, waitFor: CountDownLatch) extends ActorModelMessage
case class CountDownNStop(latch: CountDownLatch) extends ActorModelMessage
case class Wait(time: Long) extends ActorModelMessage
case class WaitAck(time: Long, latch: CountDownLatch) extends ActorModelMessage
case object Interrupt extends ActorModelMessage
case object Restart extends ActorModelMessage
case class ThrowException(e: Throwable) extends ActorModelMessage
val Ping = "Ping"
val Pong = "Pong"
@ -52,17 +68,19 @@ object ActorModelSpec {
}
def receive = {
case Await(latch) ack; latch.await(); busy.switchOff()
case Meet(sign, wait) ack; sign.countDown(); wait.await(); busy.switchOff()
case Wait(time) ack; Thread.sleep(time); busy.switchOff()
case WaitAck(time, l) ack; Thread.sleep(time); l.countDown(); busy.switchOff()
case Reply(msg) ack; self.reply(msg); busy.switchOff()
case Reply_?(msg) ack; self.reply_?(msg); busy.switchOff()
case Forward(to, msg) ack; to.forward(msg); busy.switchOff()
case CountDown(latch) ack; latch.countDown(); busy.switchOff()
case Increment(count) ack; count.incrementAndGet(); busy.switchOff()
case Await(latch) ack; latch.await(); busy.switchOff()
case Meet(sign, wait) ack; sign.countDown(); wait.await(); busy.switchOff()
case Wait(time) ack; Thread.sleep(time); busy.switchOff()
case WaitAck(time, l) ack; Thread.sleep(time); l.countDown(); busy.switchOff()
case Reply(msg) ack; self.reply(msg); busy.switchOff()
case Reply_?(msg) ack; self.reply_?(msg); busy.switchOff()
case Forward(to, msg) ack; to.forward(msg); busy.switchOff()
case CountDown(latch) ack; latch.countDown(); busy.switchOff()
case Increment(count) ack; count.incrementAndGet(); busy.switchOff()
case CountDownNStop(l) ack; l.countDown(); self.stop(); busy.switchOff()
case Restart ack; busy.switchOff(); throw new Exception("Restart requested")
case Restart ack; busy.switchOff(); throw new Exception("Restart requested")
case Interrupt => ack; busy.switchOff(); throw new InterruptedException("Ping!")
case ThrowException(e: Throwable) => ack; busy.switchOff(); throw e
}
}
@ -183,7 +201,9 @@ object ActorModelSpec {
if (condition) return true
Thread.sleep(intervalMs)
} catch { case e: InterruptedException }
} catch {
case e: InterruptedException
}
}
false
}
@ -192,10 +212,17 @@ object ActorModelSpec {
}
abstract class ActorModelSpec extends JUnitSuite {
import ActorModelSpec._
protected def newInterceptedDispatcher: MessageDispatcherInterceptor
@After
def after {
//remove the interrupted status since we are messing with interrupted exceptions.
Thread.interrupted()
}
@Test
def dispatcherShouldDynamicallyHandleItsOwnLifeCycle {
implicit val dispatcher = newInterceptedDispatcher
@ -215,13 +242,17 @@ abstract class ActorModelSpec extends JUnitSuite {
msgsProcessed = 0,
restarts = 0)
val futures = for (i 1 to 10) yield Future { i }
val futures = for (i 1 to 10) yield Future {
i
}
await(dispatcher.stops.get == 2)(withinMs = dispatcher.timeoutMs * 5)
assertDispatcher(dispatcher)(starts = 2, stops = 2)
val a2 = newTestActor
a2.start
val futures2 = for (i 1 to 10) yield Future { i }
val futures2 = for (i 1 to 10) yield Future {
i
}
await(dispatcher.starts.get == 3)(withinMs = dispatcher.timeoutMs * 5)
assertDispatcher(dispatcher)(starts = 3, stops = 2)
@ -259,7 +290,13 @@ abstract class ActorModelSpec extends JUnitSuite {
val counter = new CountDownLatch(200)
a.start()
for (i 1 to 10) { spawn { for (i 1 to 20) { a ! WaitAck(1, counter) } } }
for (i 1 to 10) {
spawn {
for (i 1 to 20) {
a ! WaitAck(1, counter)
}
}
}
assertCountDown(counter, Testing.testTime(3000), "Should process 200 messages")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200)
@ -267,7 +304,15 @@ abstract class ActorModelSpec extends JUnitSuite {
}
def spawn(f: Unit) {
val thread = new Thread { override def run { try { f } catch { case e e.printStackTrace } } }
val thread = new Thread {
override def run {
try {
f
} catch {
case e e.printStackTrace
}
}
}
thread.start()
}
@ -329,8 +374,9 @@ abstract class ActorModelSpec extends JUnitSuite {
def flood(num: Int) {
val cachedMessage = CountDownNStop(new CountDownLatch(num))
(1 to num) foreach { _
newTestActor.start() ! cachedMessage
(1 to num) foreach {
_
newTestActor.start() ! cachedMessage
}
assertCountDown(cachedMessage.latch, Testing.testTime(10000), "Should process " + num + " countdowns")
}
@ -356,6 +402,52 @@ abstract class ActorModelSpec extends JUnitSuite {
assert(each.exception.get.isInstanceOf[ActorKilledException])
a.stop()
}
@Test
def dispatcherShouldContinueToProcessMessagesWhenAThreadGetsInterrupted {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor.start()
val f1 = a ? Reply("foo")
val f2 = a ? Reply("bar")
val f3 = a ? Interrupt
val f4 = a ? Reply("foo2")
val f5 = a ? Interrupt
val f6 = a ? Reply("bar2")
assert(f1.get === "foo")
assert(f2.get === "bar")
assert((intercept[InterruptedException] {
f3.get
}).getMessage === "Ping!")
assert(f4.get === "foo2")
assert((intercept[InterruptedException] {
f5.get
}).getMessage === "Ping!")
assert(f6.get === "bar2")
}
@Test
def dispatcherShouldContinueToProcessMessagesWhenExceptionIsThrown {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor.start()
val f1 = a ? Reply("foo")
val f2 = a ? Reply("bar")
val f3 = a ? new ThrowException(new IndexOutOfBoundsException("IndexOutOfBoundsException"))
val f4 = a ? Reply("foo2")
val f5 = a ? new ThrowException(new RemoteException("RemoteException"))
val f6 = a ? Reply("bar2")
assert(f1.get === "foo")
assert(f2.get === "bar")
assert((intercept[IndexOutOfBoundsException] {
f3.get
}).getMessage === "IndexOutOfBoundsException")
assert(f4.get === "foo2")
assert((intercept[RemoteException] {
f5.get
}).getMessage === "RemoteException")
assert(f6.get === "bar2")
}
}
class DispatcherModelTest extends ActorModelSpec {

View file

@ -9,15 +9,15 @@ import akka.dispatch._
import akka.config._
import akka.config.Supervision._
import akka.util._
import akka.serialization.{ Format, Serializer, Serialization }
import akka.serialization.{Serializer, Serialization}
import ReflectiveAccess._
import ClusterModule._
import DeploymentConfig.{ TransactionLog TransactionLogConfig, _ }
import DeploymentConfig.{TransactionLog TransactionLogConfig, _}
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit }
import java.util.{ Map JMap }
import java.util.concurrent.{ScheduledFuture, ConcurrentHashMap, TimeUnit}
import java.util.{Map JMap}
import scala.reflect.BeanProperty
import scala.collection.immutable.Stack
@ -30,10 +30,15 @@ private[akka] object ActorRefInternals {
* LifeCycles for ActorRefs.
*/
private[akka] sealed trait StatusType
object UNSTARTED extends StatusType
object RUNNING extends StatusType
object BEING_RESTARTED extends StatusType
object SHUTDOWN extends StatusType
}
/**
@ -68,7 +73,8 @@ private[akka] object ActorRefInternals {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Comparable[ActorRef] with Serializable { scalaRef: ScalaActorRef
trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Comparable[ActorRef] with Serializable {
scalaRef: ScalaActorRef
// Only mutable for RemoteServer in order to maintain identity across nodes
@volatile
protected[akka] var _uuid = newUuid
@ -105,6 +111,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
def setReceiveTimeout(timeout: Long) {
this.receiveTimeout = Some(timeout)
}
def getReceiveTimeout: Option[Long] = receiveTimeout
/**
@ -121,6 +128,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
* </pre>
*/
def setFaultHandler(handler: FaultHandlingStrategy)
def getFaultHandler: FaultHandlingStrategy
/**
@ -139,6 +147,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
* </pre>
*/
def setLifeCycle(lifeCycle: LifeCycle)
def getLifeCycle: LifeCycle
/**
@ -153,7 +162,10 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
* The default is also that all actors that are created and spawned from within this actor
* is sharing the same dispatcher as its creator.
*/
def setDispatcher(dispatcher: MessageDispatcher) { this.dispatcher = dispatcher }
def setDispatcher(dispatcher: MessageDispatcher) {
this.dispatcher = dispatcher
}
def getDispatcher: MessageDispatcher = dispatcher
/**
@ -177,6 +189,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
* Returns the uuid for the actor.
*/
def getUuid = _uuid
def uuid = _uuid
/**
@ -366,9 +379,13 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
*/
def sendException(ex: Throwable) {}
def isUsableOnlyOnce = false
def isUsable = true
def isReplyable = true
def canSendException = false
/**
@ -382,9 +399,9 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Long,
channel: UntypedChannel): Future[Any]
message: Any,
timeout: Long,
channel: UntypedChannel): Future[Any]
protected[akka] def actorInstance: AtomicReference[Actor]
@ -393,6 +410,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
protected[akka] def supervisor_=(sup: Option[ActorRef])
protected[akka] def mailbox: AnyRef
protected[akka] def mailbox_=(value: AnyRef): AnyRef
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable)
@ -416,7 +434,7 @@ trait ActorRef extends ActorRefShared with ForwardableChannel with java.lang.Com
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class LocalActorRef private[akka] (private[this] val actorFactory: () Actor, val address: String)
class LocalActorRef private[akka](private[this] val actorFactory: () Actor, val address: String)
extends ActorRef with ScalaActorRef {
protected[akka] val guard = new ReentrantGuard
@ -442,7 +460,9 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
@volatile
private[akka] var _dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher
protected[akka] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) }
protected[akka] val actorInstance = guard.withGuard {
new AtomicReference[Actor](newActor)
}
def serializerErrorDueTo(reason: String) =
throw new akka.config.ConfigurationException(
@ -480,16 +500,16 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
// used only for deserialization
private[akka] def this(
__uuid: Uuid,
__address: String,
__timeout: Long,
__receiveTimeout: Option[Long],
__lifeCycle: LifeCycle,
__supervisor: Option[ActorRef],
__hotswap: Stack[PartialFunction[Any, Unit]],
__factory: () Actor) = {
__uuid: Uuid,
__address: String,
__timeout: Long,
__receiveTimeout: Option[Long],
__lifeCycle: LifeCycle,
__supervisor: Option[ActorRef],
__hotswap: Stack[PartialFunction[Any, Unit]],
__factory: () Actor) = {
this(__factory, __address)
this (__factory, __address)
_uuid = __uuid
timeout = __timeout
@ -627,7 +647,9 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
*/
def mailbox: AnyRef = _mailbox
protected[akka] def mailbox_=(value: AnyRef): AnyRef = { _mailbox = value; value }
protected[akka] def mailbox_=(value: AnyRef): AnyRef = {
_mailbox = value; value
}
/**
* Returns the supervisor, if there is one.
@ -651,12 +673,12 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
dispatcher dispatchMessage new MessageInvocation(this, message, channel)
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Long,
channel: UntypedChannel): Future[Any] = {
message: Any,
timeout: Long,
channel: UntypedChannel): Future[Any] = {
val future = channel match {
case f: ActorPromise f
case _ new ActorPromise(timeout)
case _ new ActorPromise(timeout)
}
dispatcher dispatchMessage new MessageInvocation(this, message, future)
future
@ -677,7 +699,8 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
currentMessage = null // reset current message after successful invocation
} catch {
case e: InterruptedException
currentMessage = null // received message while actor is shutting down, ignore
handleExceptionInDispatch(e, messageHandle.message)
Thread.currentThread().interrupt() //Restore interrupt
case e
handleExceptionInDispatch(e, messageHandle.message)
}
@ -716,13 +739,16 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
private def requestRestartPermission(maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Boolean = {
val denied = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal
val denied = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) {
//Immortal
false
} else if (withinTimeRange.isEmpty) { // restrict number of restarts
} else if (withinTimeRange.isEmpty) {
// restrict number of restarts
val retries = maxNrOfRetriesCount + 1
maxNrOfRetriesCount = retries //Increment number of retries
retries > maxNrOfRetries.get
} else { // cannot restart more than N within M timerange
} else {
// cannot restart more than N within M timerange
val retries = maxNrOfRetriesCount + 1
val windowStart = restartTimeWindowStartNanos
@ -816,7 +842,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
actorRef.lifeCycle match {
// either permanent or none where default is permanent
case Temporary shutDownTemporaryActor(actorRef)
case _ actorRef.restart(reason, maxNrOfRetries, withinTimeRange)
case _ actorRef.restart(reason, maxNrOfRetries, withinTimeRange)
}
}
}
@ -826,7 +852,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
// ========= PRIVATE FUNCTIONS =========
private[this] def newActor: Actor = {
import Actor.{ actorRefInCreation refStack }
import Actor.{actorRefInCreation refStack}
val stackBefore = refStack.get
refStack.set(stackBefore.push(this))
try {
@ -837,7 +863,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
refStack.set(if (stackAfter.head eq null) stackAfter.pop.pop else stackAfter.pop) //pop null marker plus self
}
} match {
case null throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'")
case null throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'")
case valid valid
}
@ -861,26 +887,28 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
else {
lifeCycle match {
case Temporary shutDownTemporaryActor(this)
case _ dispatcher.resume(this) //Resume processing for this actor
case _ dispatcher.resume(this) //Resume processing for this actor
}
}
}
private def notifySupervisorWithMessage(notification: LifeCycleMessage) {
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
_supervisor.foreach { sup
if (sup.isShutdown) { // if supervisor is shut down, game over for all linked actors
//Scoped stop all linked actors, to avoid leaking the 'i' val
{
val i = _linkedActors.values.iterator
while (i.hasNext) {
i.next.stop()
i.remove
_supervisor.foreach {
sup
if (sup.isShutdown) {
// if supervisor is shut down, game over for all linked actors
//Scoped stop all linked actors, to avoid leaking the 'i' val
{
val i = _linkedActors.values.iterator
while (i.hasNext) {
i.next.stop()
i.remove
}
}
}
//Stop the actor itself
stop
} else sup ! notification // else notify supervisor
//Stop the actor itself
stop
} else sup ! notification // else notify supervisor
}
}
@ -921,7 +949,8 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
protected[akka] def checkReceiveTimeout() {
cancelReceiveTimeout()
if (receiveTimeout.isDefined && dispatcher.mailboxIsEmpty(this)) { //Only reschedule if desired and there are currently no more messages to be processed
if (receiveTimeout.isDefined && dispatcher.mailboxIsEmpty(this)) {
//Only reschedule if desired and there are currently no more messages to be processed
_futureTimeout = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, receiveTimeout.get, TimeUnit.MILLISECONDS))
}
}
@ -949,11 +978,11 @@ object RemoteActorSystemMessage {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] case class RemoteActorRef private[akka] (
val remoteAddress: InetSocketAddress,
val address: String,
_timeout: Long,
loader: Option[ClassLoader])
private[akka] case class RemoteActorRef private[akka](
val remoteAddress: InetSocketAddress,
val address: String,
_timeout: Long,
loader: Option[ClassLoader])
extends ActorRef with ScalaActorRef {
ClusterModule.ensureEnabled()
@ -965,22 +994,22 @@ private[akka] case class RemoteActorRef private[akka] (
def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = {
val chSender = channel match {
case ref: ActorRef Some(ref)
case _ None
case _ None
}
Actor.remote.send[Any](message, chSender, None, remoteAddress, timeout, true, this, loader)
}
def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Long,
channel: UntypedChannel): Future[Any] = {
message: Any,
timeout: Long,
channel: UntypedChannel): Future[Any] = {
val chSender = channel match {
case ref: ActorRef Some(ref)
case _ None
case _ None
}
val chFuture = channel match {
case f: Promise[Any] Some(f)
case _ None
case _ None
}
val future = Actor.remote.send[Any](message, chSender, chFuture, remoteAddress, timeout, false, this, loader)
if (future.isDefined) ActorPromise(future.get)
@ -1011,34 +1040,49 @@ private[akka] case class RemoteActorRef private[akka] (
def dispatcher_=(md: MessageDispatcher) {
unsupported
}
def dispatcher: MessageDispatcher = unsupported
def link(actorRef: ActorRef) {
unsupported
}
def unlink(actorRef: ActorRef) {
unsupported
}
def startLink(actorRef: ActorRef): ActorRef = unsupported
def supervisor: Option[ActorRef] = unsupported
def linkedActors: JMap[Uuid, ActorRef] = unsupported
protected[akka] def mailbox: AnyRef = unsupported
protected[akka] def mailbox_=(value: AnyRef): AnyRef = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) {
unsupported
}
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
unsupported
}
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
unsupported
}
protected[akka] def invoke(messageHandle: MessageInvocation) {
unsupported
}
protected[akka] def supervisor_=(sup: Option[ActorRef]) {
unsupported
}
protected[akka] def actorInstance: AtomicReference[Actor] = unsupported
private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef")
}
@ -1070,7 +1114,8 @@ trait ActorRefShared {
* There are implicit conversions in ../actor/Implicits.scala
* from ActorRef -> ScalaActorRef and back
*/
trait ScalaActorRef extends ActorRefShared with ForwardableChannel { ref: ActorRef
trait ScalaActorRef extends ActorRefShared with ForwardableChannel {
ref: ActorRef
/**
* Address for actor, must be a unique one.
@ -1114,7 +1159,7 @@ trait ScalaActorRef extends ActorRefShared with ForwardableChannel { ref: ActorR
if (msg eq null) None
else msg.channel match {
case ref: ActorRef Some(ref)
case _ None
case _ None
}
}
@ -1128,7 +1173,7 @@ trait ScalaActorRef extends ActorRefShared with ForwardableChannel { ref: ActorR
if (msg eq null) None
else msg.channel match {
case f: ActorPromise Some(f)
case _ None
case _ None
}
}

View file

@ -160,8 +160,6 @@ class Dispatcher(
private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit =
registerForExecution(mbox)
private[akka] def doneProcessingMailbox(mbox: MessageQueue with ExecutableMailbox): Unit = ()
protected override def cleanUpMailboxFor(actorRef: ActorRef) {
val m = getMailbox(actorRef)
if (!m.isEmpty) {
@ -195,19 +193,10 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue ⇒
def dispatcher: Dispatcher
final def run = {
try {
processMailbox()
} catch {
case ie: InterruptedException
}
finally {
dispatcherLock.unlock()
}
try { processMailbox()} finally {dispatcherLock.unlock()}
if (!self.isEmpty)
dispatcher.reRegisterForExecution(this)
dispatcher.doneProcessingMailbox(this)
}
/**

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,2 +1,2 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replicas = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.home = "node:node1"
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -1,3 +1,3 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "direct"

View file

@ -1,3 +1,3 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "direct"

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "direct"
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -1,3 +1,3 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "direct"

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1","node:node2"]
akka.actor.deployment.service-hello.clustered.replicas = 2

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1","node:node2"]
akka.actor.deployment.service-hello.clustered.replicas = 2

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.replicas = 3

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.replicas = 3

View file

@ -1,4 +1,4 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.replicas = 3

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replicas = 1

View file

@ -1,5 +1,5 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "DEBUG"
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replicas = 1