diff --git a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala index 8c0d5d1711..70ead0027d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala @@ -12,12 +12,19 @@ import akka.util.duration._ import akka.config.Config.config import akka.config.Supervision._ +object LoggingReceiveSpec { + class TestLogActor extends Actor { + def receive = { case _ ⇒ } + } +} + class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAndAfterAll with MustMatchers with TestKit { + import LoggingReceiveSpec._ val level = EventHandler.level @@ -52,7 +59,8 @@ class LoggingReceiveSpec ignoreMsg { case EventHandler.Debug(_, s: String) ⇒ !s.startsWith("received") && s != "started" && s != "stopping" && s != "restarting" && - s != "restarted" && !s.startsWith("now supervising") && !s.startsWith("stopped supervising") + s != "restarted" && !s.startsWith("now supervising") && !s.startsWith("stopped supervising") && + !s.startsWith("now monitoring") && !s.startsWith("stopped monitoring") case EventHandler.Debug(_, _) ⇒ true case EventHandler.Error(_: UnhandledMessageException, _, _) ⇒ false case _: EventHandler.Error ⇒ true @@ -131,36 +139,42 @@ class LoggingReceiveSpec "log LifeCycle changes if requested" in { within(2 seconds) { - val supervisor = TestActorRef(Props(new Actor { - def receive = { case _ ⇒ } - }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 5, 5000))) + val supervisor = TestActorRef[TestLogActor](Props[TestLogActor].withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 5, 5000))) + val f = Actor.getClass.getDeclaredField("debugLifecycle") f.setAccessible(true) f.setBoolean(Actor, true) - val actor = TestActorRef(new Actor { - def receive = { - case _ ⇒ - } - }) + + val actor = TestActorRef[TestLogActor](Props[TestLogActor].withSupervisor(supervisor)) val actor1 = actor.underlyingActor - expectMsg(EventHandler.Debug(actor1, "started")) - supervisor link actor + expectMsgPF() { case EventHandler.Debug(ref, msg: String) ⇒ ref == supervisor.underlyingActor && msg.startsWith("now supervising") } + + expectMsg(EventHandler.Debug(actor1, "started")) + + supervisor link actor + expectMsgPF(hint = "now monitoring") { + case EventHandler.Debug(ref, msg: String) ⇒ + ref == supervisor.underlyingActor && msg.startsWith("now monitoring") + } + + supervisor unlink actor + expectMsgPF(hint = "stopped monitoring") { + case EventHandler.Debug(ref, msg: String) ⇒ + ref == supervisor.underlyingActor && msg.startsWith("stopped monitoring") + } + actor ! Kill expectMsg(EventHandler.Debug(actor1, "restarting")) awaitCond(msgAvailable) val actor2 = actor.underlyingActor - expectMsgPF() { + expectMsgPF(hint = "restarted") { case EventHandler.Debug(ref, "restarted") if ref eq actor2 ⇒ true } - supervisor unlink actor - expectMsgPF() { - case EventHandler.Debug(ref, msg: String) ⇒ - ref == supervisor.underlyingActor && msg.startsWith("stopped supervising") - } + actor.stop() expectMsg(EventHandler.Debug(actor2, "stopping")) supervisor.stop() diff --git a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala index 9a3d15e52f..f786b54163 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala @@ -224,7 +224,7 @@ class RestartStrategySpec extends JUnitSuite with BeforeAndAfterAll { val countDownLatch = new CountDownLatch(2) val boss = actorOf(Props(new Actor { - def receive = { case m: MaximumNumberOfRestartsWithinTimeRangeReached ⇒ maxNoOfRestartsLatch.open } + def receive = { case t: Terminated ⇒ maxNoOfRestartsLatch.open } }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), None, Some(1000)))) val slave = actorOf(Props(new Actor { diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index b271392857..71c57a8ad2 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -51,7 +51,7 @@ class SupervisorHierarchySpec extends JUnitSuite { val countDownMax = new CountDownLatch(1) val boss = actorOf(Props(new Actor { protected def receive = { - case MaximumNumberOfRestartsWithinTimeRangeReached(_, _, _, _) ⇒ countDownMax.countDown() + case Terminated(_, _) ⇒ countDownMax.countDown() } }).withFaultHandler(OneForOnePermanentStrategy(List(classOf[Throwable]), 1, 5000))) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index 7b6bbbcf60..d45b13e70b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -60,8 +60,8 @@ object SupervisorSpec { val temp = actorOf(Props[PingPongActor].withSupervisor(self)) override def receive = { - case Die ⇒ (temp.?(Die, TimeoutMillis)).get - case _: MaximumNumberOfRestartsWithinTimeRangeReached ⇒ + case Die ⇒ (temp.?(Die, TimeoutMillis)).get + case _: Terminated ⇒ } } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 2db25c8e54..1b680ba02c 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -53,28 +53,22 @@ case class HotSwap(code: ActorRef ⇒ Actor.Receive, discardOld: Boolean = true) def this(code: akka.japi.Function[ActorRef, Procedure[Any]]) = this(code, true) } -case class Failed(actor: ActorRef, cause: Throwable, recoverable: Boolean, timesRestarted: Int, restartTimeWindowStartMs: Long) extends AutoReceivedMessage with PossiblyHarmful +case class Failed(@BeanProperty actor: ActorRef, + @BeanProperty cause: Throwable, + @BeanProperty recoverable: Boolean, + @BeanProperty timesRestarted: Int, + @BeanProperty restartTimeWindowStartMs: Long) extends AutoReceivedMessage with PossiblyHarmful + +case class ChildTerminated(child: ActorRef, cause: Throwable) extends AutoReceivedMessage with PossiblyHarmful case object RevertHotSwap extends AutoReceivedMessage with PossiblyHarmful -case class Link(child: ActorRef) extends AutoReceivedMessage with PossiblyHarmful - -case class Unlink(child: ActorRef) extends AutoReceivedMessage with PossiblyHarmful - -case class UnlinkAndStop(child: ActorRef) extends AutoReceivedMessage with PossiblyHarmful - case object PoisonPill extends AutoReceivedMessage with PossiblyHarmful case object Kill extends AutoReceivedMessage with PossiblyHarmful case object ReceiveTimeout extends PossiblyHarmful -case class MaximumNumberOfRestartsWithinTimeRangeReached( - @BeanProperty victim: ActorRef, - @BeanProperty maxNrOfRetries: Option[Int], - @BeanProperty withinTimeRange: Option[Int], - @BeanProperty lastExceptionCausingRestart: Throwable) //FIXME should be removed and replaced with Terminated - case class Terminated(@BeanProperty actor: ActorRef, @BeanProperty cause: Throwable) // Exceptions for Actors @@ -609,9 +603,7 @@ trait Actor { case HotSwap(code, discardOld) ⇒ become(code(self), discardOld) case RevertHotSwap ⇒ unbecome() case f: Failed ⇒ context.handleFailure(f) - case Link(child) ⇒ self.link(child) - case Unlink(child) ⇒ self.unlink(child) - case UnlinkAndStop(child) ⇒ self.unlink(child); child.stop() + case ct: ChildTerminated ⇒ context.handleChildTerminated(ct) case Kill ⇒ throw new ActorKilledException("Kill") case PoisonPill ⇒ val ch = channel diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index a26cfc162a..4a0cb6819a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -4,15 +4,23 @@ package akka.actor -import akka.event.EventHandler -import akka.config.Supervision._ +import akka.config.Supervision.{ + AllForOnePermanentStrategy, + AllForOneTemporaryStrategy, + FaultHandlingStrategy, + OneForOnePermanentStrategy, + OneForOneTemporaryStrategy, + Temporary, + Permanent +} import akka.dispatch._ import akka.util._ import java.util.{ Collection ⇒ JCollection } -import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit } import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec import scala.collection.immutable.Stack +import akka.event.{ DumbMonitoring, EventHandler } +import java.util.concurrent.{ ConcurrentLinkedQueue, ScheduledFuture, ConcurrentHashMap, TimeUnit } /** * The actor context - the view of the actor cell from the actor. @@ -45,7 +53,9 @@ private[akka] trait ActorContext { def dispatcher: MessageDispatcher - def handleFailure(fail: Failed) + def handleFailure(fail: Failed): Unit + + def handleChildTerminated(childtermination: ChildTerminated): Unit } private[akka] object ActorCell { @@ -68,16 +78,13 @@ private[akka] class ActorCell( @volatile var futureTimeout: Option[ScheduledFuture[AnyRef]] = None - @volatile //Should be a final field - var _supervisor: Option[ActorRef] = None - @volatile //FIXME doesn't need to be volatile var maxNrOfRetriesCount: Int = 0 @volatile //FIXME doesn't need to be volatile var restartTimeWindowStartNanos: Long = 0L - lazy val _linkedActors = new ConcurrentHashMap[Uuid, ActorRef] + val _linkedActors = new ConcurrentLinkedQueue[ActorRef] @volatile //FIXME doesn't need to be volatile var hotswap: Stack[PartialFunction[Any, Unit]] = _hotswap // TODO: currently settable from outside for compatibility @@ -104,9 +111,17 @@ private[akka] class ActorCell( @volatile var mailbox: Mailbox = _ - def start() { - if (props.supervisor.isDefined) props.supervisor.get.link(self) + def start(): Unit = { mailbox = dispatcher.createMailbox(this) + + if (props.supervisor.isDefined) { + props.supervisor.get match { + case l: LocalActorRef ⇒ + l.underlying.dispatcher.systemDispatch(SystemEnvelope(l.underlying, akka.dispatch.Supervise(self), NullChannel)) + case other ⇒ throw new UnsupportedOperationException("Supervision failure: " + other + " cannot be a supervisor, only LocalActorRefs can") + } + } + Actor.registry.register(self) dispatcher.attach(this) } @@ -136,40 +151,22 @@ private[akka] class ActorCell( if (isRunning) dispatcher.systemDispatch(SystemEnvelope(this, Terminate, NullChannel)) - def link(actorRef: ActorRef): ActorRef = { - guard.withGuard { - val actorRefSupervisor = actorRef.supervisor - val hasSupervisorAlready = actorRefSupervisor.isDefined - if (hasSupervisorAlready && actorRefSupervisor.get.uuid == self.uuid) return actorRef // we already supervise this guy - else if (hasSupervisorAlready) throw new IllegalActorStateException( - "Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails") - else { - _linkedActors.put(actorRef.uuid, actorRef) - actorRef.supervisor = Some(self) - } - } - if (Actor.debugLifecycle) EventHandler.debug(actor.get(), "now supervising " + actorRef) - actorRef + def link(subject: ActorRef): ActorRef = { + dispatcher.systemDispatch(SystemEnvelope(this, Link(subject), NullChannel)) + subject } - def unlink(actorRef: ActorRef): ActorRef = { - guard.withGuard { - if (_linkedActors.remove(actorRef.uuid) eq null) - throw new IllegalActorStateException("Actor [" + actorRef + "] is not a linked actor, can't unlink") - actorRef.supervisor = None - if (Actor.debugLifecycle) EventHandler.debug(actor.get(), "stopped supervising " + actorRef) - } - actorRef + def unlink(subject: ActorRef): ActorRef = { + dispatcher.systemDispatch(SystemEnvelope(this, Unlink(subject), NullChannel)) + subject } - def linkedActors: JCollection[ActorRef] = java.util.Collections.unmodifiableCollection(_linkedActors.values) + def linkedActors: JCollection[ActorRef] = java.util.Collections.unmodifiableCollection(_linkedActors) - def supervisor: Option[ActorRef] = _supervisor + //TODO FIXME remove this method + def supervisor: Option[ActorRef] = props.supervisor - def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup - - def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = - dispatcher dispatch Envelope(this, message, channel) + def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = dispatcher dispatch Envelope(this, message, channel) def postMessageToMailboxAndCreateFutureResultWithTimeout( message: Any, @@ -216,10 +213,11 @@ private[akka] class ActorCell( } } catch { case e ⇒ - e.printStackTrace(System.err) envelope.channel.sendException(e) - if (supervisor.isDefined) supervisor.get ! Failed(self, e, false, maxNrOfRetriesCount, restartTimeWindowStartNanos) - else throw e + if (supervisor.isDefined) + DumbMonitoring.signal(Failed(self, e, false, maxNrOfRetriesCount, restartTimeWindowStartNanos), supervisor.get) + else + throw e } def suspend(): Unit = dispatcher suspend this @@ -239,20 +237,18 @@ private[akka] class ActorCell( if (a ne null) a.postStop() { //Stop supervised actors - val i = _linkedActors.values.iterator - while (i.hasNext) { - i.next.stop() - i.remove() + var a = _linkedActors.poll() + while (a ne null) { + a.stop() + a = _linkedActors.poll() } } } finally { - try { - if (supervisor.isDefined) - supervisor.get ! Failed(self, new ActorKilledException("Stopped"), false, maxNrOfRetriesCount, restartTimeWindowStartNanos) //Death(self, new ActorKilledException("Stopped"), false) - } catch { - case e: ActorInitializationException ⇒ - // TODO: remove when ! cannot throw anymore - } + val cause = new ActorKilledException("Stopped") //FIXME make this an object, can be reused everywhere + + if (supervisor.isDefined) supervisor ! ChildTerminated(self, cause) + DumbMonitoring.signal(Terminated(self, cause)) + currentMessage = null clearActorContext() } @@ -262,11 +258,14 @@ private[akka] class ActorCell( try { if (!mailbox.isClosed) { envelope.message match { - case Create ⇒ create(recreation = false) - case Recreate ⇒ create(recreation = true) - case Suspend ⇒ suspend() - case Resume ⇒ resume() - case Terminate ⇒ terminate() + case Create ⇒ create(recreation = false) + case Recreate ⇒ create(recreation = true) + case Link(subject) ⇒ akka.event.DumbMonitoring.link(self, subject); if (Actor.debugLifecycle) EventHandler.debug(actor.get(), "now monitoring " + subject) + case Unlink(subject) ⇒ akka.event.DumbMonitoring.unlink(self, subject); if (Actor.debugLifecycle) EventHandler.debug(actor.get(), "stopped monitoring " + subject) + case Suspend ⇒ suspend() + case Resume ⇒ resume() + case Terminate ⇒ terminate() + case Supervise(child) ⇒ if (!_linkedActors.contains(child)) { _linkedActors.offer(child); if (Actor.debugLifecycle) EventHandler.debug(actor.get(), "now supervising " + child) } } } } catch { @@ -299,7 +298,10 @@ private[akka] class ActorCell( channel.sendException(e) - if (supervisor.isDefined) supervisor.get ! Failed(self, e, true, maxNrOfRetriesCount, restartTimeWindowStartNanos) else dispatcher.resume(this) + if (supervisor.isDefined) + DumbMonitoring.signal(Failed(self, e, true, maxNrOfRetriesCount, restartTimeWindowStartNanos), supervisor.get) + else + dispatcher.resume(this) if (e.isInstanceOf[InterruptedException]) throw e //Re-throw InterruptedExceptions as expected } finally { @@ -333,10 +335,27 @@ private[akka] class ActorCell( case OneForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(fail.cause.getClass)) ⇒ fail.actor.stop() - self ! MaximumNumberOfRestartsWithinTimeRangeReached(fail.actor, None, None, fail.cause) //FIXME this should be removed, you should link to an actor to get Terminated messages case _ ⇒ - if (_supervisor.isDefined) throw fail.cause else fail.actor.stop() //Escalate problem if not handled here + if (supervisor.isDefined) throw fail.cause else fail.actor.stop() //Escalate problem if not handled here + } + } + + def handleChildTerminated(ct: ChildTerminated): Unit = { + props.faultHandler match { + case AllForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(ct.cause.getClass)) ⇒ + //STOP ALL AND ESCALATE + + case AllForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(ct.cause.getClass)) ⇒ + //STOP ALL? + + case OneForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(ct.cause.getClass)) ⇒ + //ESCALATE? + + case OneForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(ct.cause.getClass)) ⇒ + _linkedActors.remove(ct.child) + + case _ ⇒ throw ct.cause //Escalate problem if not handled here } } @@ -379,9 +398,6 @@ private[akka] class ActorCell( success } } else { - // tooManyRestarts - if (supervisor.isDefined) - supervisor.get ! MaximumNumberOfRestartsWithinTimeRangeReached(self, maxNrOfRetries, withinTimeRange, reason) stop() true // done } @@ -429,21 +445,16 @@ private[akka] class ActorCell( protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { props.faultHandler.lifeCycle match { case Temporary ⇒ - val i = _linkedActors.values.iterator - while (i.hasNext) { - val actorRef = i.next() - - i.remove() - - actorRef.stop() - - //FIXME if last temporary actor is gone, then unlink me from supervisor <-- should this exist? - if (!i.hasNext && supervisor.isDefined) - supervisor.get ! UnlinkAndStop(self) + { //Stop supervised actors + var a = _linkedActors.poll() + while (a ne null) { + a.stop() + a = _linkedActors.poll() + } } case Permanent ⇒ - val i = _linkedActors.values.iterator + val i = _linkedActors.iterator while (i.hasNext) i.next().restart(reason, maxNrOfRetries, withinTimeRange) } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 2915c66f20..161fdf5cc9 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -161,8 +161,6 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha timeout: Timeout, channel: UntypedChannel): Future[Any] - protected[akka] def supervisor_=(sup: Option[ActorRef]) - protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) override def hashCode: Int = HashCode.hash(HashCode.SEED, address) @@ -249,7 +247,7 @@ class LocalActorRef private[akka] ( * To be invoked from within the actor itself. * Returns the ref that was passed into it */ - def link(actorRef: ActorRef): ActorRef = actorCell.link(actorRef) + def link(subject: ActorRef): ActorRef = actorCell.link(subject) /** * Unlink the actor. @@ -257,7 +255,7 @@ class LocalActorRef private[akka] ( * To be invoked from within the actor itself. * Returns the ref that was passed into it */ - def unlink(actorRef: ActorRef): ActorRef = actorCell.unlink(actorRef) + def unlink(subject: ActorRef): ActorRef = actorCell.unlink(subject) /** * Returns the supervisor, if there is one. @@ -283,9 +281,6 @@ class LocalActorRef private[akka] ( protected[akka] override def timeout: Long = props.timeout.duration.toMillis // TODO: remove this if possible - protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = - actorCell.supervisor = sup - protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = actorCell.postMessageToMailbox(message, channel) @@ -391,10 +386,6 @@ private[akka] case class RemoteActorRef private[akka] ( unsupported } - protected[akka] def supervisor_=(sup: Option[ActorRef]) { - unsupported - } - private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef") } @@ -487,10 +478,6 @@ trait UnsupportedActorRef extends ActorRef with ScalaActorRef { def supervisor: Option[ActorRef] = unsupported - protected[akka] def supervisor_=(sup: Option[ActorRef]) { - unsupported - } - def suspend(): Unit = unsupported def resume(): Unit = unsupported diff --git a/akka-actor/src/main/scala/akka/actor/Supervisor.scala b/akka-actor/src/main/scala/akka/actor/Supervisor.scala index 57e69625fd..5559cbc43d 100644 --- a/akka-actor/src/main/scala/akka/actor/Supervisor.scala +++ b/akka-actor/src/main/scala/akka/actor/Supervisor.scala @@ -106,7 +106,7 @@ case class SupervisorFactory(val config: SupervisorConfig) { * * @author Jonas Bonér */ -sealed class Supervisor(handler: FaultHandlingStrategy, maxRestartsHandler: (ActorRef, MaximumNumberOfRestartsWithinTimeRangeReached) ⇒ Unit) { +sealed class Supervisor(handler: FaultHandlingStrategy, maxRestartsHandler: (ActorRef, Terminated) ⇒ Unit) { import Supervisor._ private val _childActors = new CopyOnWriteArrayList[ActorRef] @@ -164,7 +164,7 @@ sealed class Supervisor(handler: FaultHandlingStrategy, maxRestartsHandler: (Act * * @author Jonas Bonér */ -final class SupervisorActor private[akka] (maxRestartsHandler: (ActorRef, MaximumNumberOfRestartsWithinTimeRangeReached) ⇒ Unit) extends Actor { +final class SupervisorActor private[akka] (maxRestartsHandler: (ActorRef, Terminated) ⇒ Unit) extends Actor { override def postStop() { val i = linkedActors.iterator @@ -176,7 +176,7 @@ final class SupervisorActor private[akka] (maxRestartsHandler: (ActorRef, Maximu } def receive = { - case max @ MaximumNumberOfRestartsWithinTimeRangeReached(_, _, _, _) ⇒ maxRestartsHandler(self, max) + case termination: Terminated ⇒ maxRestartsHandler(self, termination) case unknown ⇒ throw new SupervisorException( "SupervisorActor can not respond to messages.\n\tUnknown message [" + unknown + "]") } diff --git a/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala b/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala index 1494c1b0d9..a81b888d84 100644 --- a/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala +++ b/akka-actor/src/main/scala/akka/config/SupervisionConfig.scala @@ -5,7 +5,7 @@ package akka.config import akka.dispatch.MessageDispatcher -import akka.actor.{ MaximumNumberOfRestartsWithinTimeRangeReached, ActorRef } +import akka.actor.{ Terminated, ActorRef } import akka.japi.{ Procedure2 } case class RemoteAddress(val hostname: String, val port: Int) @@ -22,10 +22,10 @@ object Supervision { sealed abstract class LifeCycle extends ConfigElement sealed abstract class FaultHandlingStrategy(val trapExit: List[Class[_ <: Throwable]], val lifeCycle: LifeCycle) extends ConfigElement - case class SupervisorConfig(restartStrategy: FaultHandlingStrategy, worker: List[Server], maxRestartsHandler: (ActorRef, MaximumNumberOfRestartsWithinTimeRangeReached) ⇒ Unit = { (aRef, max) ⇒ () }) extends Server { + case class SupervisorConfig(restartStrategy: FaultHandlingStrategy, worker: List[Server], maxRestartsHandler: (ActorRef, Terminated) ⇒ Unit = { (aRef, max) ⇒ () }) extends Server { //Java API def this(restartStrategy: FaultHandlingStrategy, worker: Array[Server]) = this(restartStrategy, worker.toList) - def this(restartStrategy: FaultHandlingStrategy, worker: Array[Server], restartHandler: Procedure2[ActorRef, MaximumNumberOfRestartsWithinTimeRangeReached]) = this(restartStrategy, worker.toList, { (aRef, max) ⇒ restartHandler.apply(aRef, max) }) + def this(restartStrategy: FaultHandlingStrategy, worker: Array[Server], restartHandler: Procedure2[ActorRef, Terminated]) = this(restartStrategy, worker.toList, { (aRef, max) ⇒ restartHandler.apply(aRef, max) }) } class Supervise(val actorRef: ActorRef, val lifeCycle: LifeCycle, val registerAsRemoteService: Boolean = false) extends Server { diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index fc610426a3..abdc5dd791 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -24,12 +24,15 @@ final case class Envelope(val receiver: ActorCell, val message: Any, val channel } } -sealed trait SystemMessage +sealed trait SystemMessage extends PossiblyHarmful case object Create extends SystemMessage case object Recreate extends SystemMessage case object Suspend extends SystemMessage case object Resume extends SystemMessage case object Terminate extends SystemMessage +case class Supervise(child: ActorRef) extends SystemMessage +case class Link(subject: ActorRef) extends SystemMessage +case class Unlink(subject: ActorRef) extends SystemMessage final case class SystemEnvelope(val receiver: ActorCell, val message: SystemMessage, val channel: UntypedChannel) { if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null") diff --git a/akka-actor/src/main/scala/akka/event/DeathWatch.scala b/akka-actor/src/main/scala/akka/event/DeathWatch.scala index 41d73f197b..5e3ea08226 100644 --- a/akka-actor/src/main/scala/akka/event/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/event/DeathWatch.scala @@ -2,99 +2,53 @@ * Copyright (C) 2009-2011 Typesafe Inc. */ -/*package akka.event +package akka.event -import akka.actor.{ Death, ActorRef } import akka.config.Supervision.{ FaultHandlingStrategy } import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.locks.ReentrantReadWriteLock +import akka.actor._ +import akka.dispatch.SystemEnvelope trait DeathWatch { - def signal(death: Death): Unit + def signal(fail: Failed, supervisor: ActorRef): Unit + def signal(terminated: Terminated): Unit } -object Supervision { - case class ActiveEntry(monitoring: Vector[ActorRef] = Vector(), supervising: Vector[ActorRef] = Vector(), strategy: FaultHandlingStrategy) - case class PassiveEntry(monitors: Vector[ActorRef] = Vector(), supervisor: Option[ActorRef] = None) +trait Monitoring { + + def link(monitor: ActorRef, monitored: ActorRef): Unit + + def unlink(monitor: ActorRef, monitored: ActorRef): Unit } -trait Supervision { self: DeathWatch => +object DumbMonitoring extends DeathWatch with Monitoring { - import Supervision._ + val monitoring = new akka.util.Index[ActorRef, ActorRef] //Key == monitored, Values == monitors - val guard = new ReentrantReadWriteLock - val read = guard.readLock() - val write = guard.writeLock() + def signal(fail: Failed, supervisor: ActorRef): Unit = + supervisor match { + case l: LocalActorRef ⇒ l ! fail //FIXME, should Failed be a system message ? => l.underlying.dispatcher.systemDispatch(SystemEnvelope(l.underlying, fail, NullChannel)) + case other ⇒ throw new IllegalStateException("Supervision only works for local actors currently") + } - val activeEntries = new ConcurrentHashMap[ActorRef, ActiveEntry](1024) - val passiveEntries = new ConcurrentHashMap[ActorRef, PassiveEntry](1024) + def signal(terminated: Terminated): Unit = { + val monitors = monitoring.remove(terminated.actor) + if (monitors.isDefined) + monitors.get.foreach(_ ! terminated) + } - def registerMonitorable(monitor: ActorRef, monitorsSupervisor: Option[ActorRef], faultHandlingStrategy: FaultHandlingStrategy) { - read.lock() - try { - activeEntries.putIfAbsent(monitor, ActiveEntry(strategy = faultHandlingStrategy)) - passiveEntries.putIfAbsent(monitor, PassiveEntry(supervisor = monitorsSupervisor)) - } finally { - read.unlock() + def link(monitor: ActorRef, monitored: ActorRef): Unit = { + if (monitored.isShutdown) monitor ! Terminated(monitored, new ActorKilledException("Already terminated when linking")) + else { // FIXME race between shutting down + monitoring.put(monitored, monitor) } } - def deregisterMonitorable(monitor: ActorRef) { - read.lock() - try { - activeEntries.remove(monitor) - passiveEntries.remove(monitor) - } finally { - read.unlock() - } - } - - def startMonitoring(monitor: ActorRef, monitored: ActorRef): ActorRef = { - def addActiveEntry(): ActorRef = - activeEntries.get(monitor) match { - case null => null//He's stopped or not started, which is unlikely - case entry => - val updated = entry.copy(monitoring = entry.monitoring :+ monitored) - if (activeEntries.replace(monitor, entry, updated)) - monitored - else - addActiveEntry() - } - - def addPassiveEntry(): ActorRef = - activeEntries.get(monitored) match { - case null => null//The thing we're trying to monitor isn't registered, abort - case _ => - passiveEntries.get(monitored) match { - case null => - passiveEntries.putIfAbsent(monitored, PassiveEntry(monitors = Vector(monitor))) match { - case null => monitored//All good - case _ => addPassiveEntry() - } - - case existing => - val updated = existing.copy(monitors = existing.monitors :+ monitor) - if (passiveEntries.replace(monitored, existing, updated)) - monitored - else - addPassiveEntry() - } - } - - read.lock() - try { - addActiveEntry() - addPassiveEntry() - } finally { - read.unlock() - } - } - - def stopMonitoring(monitor: ActorRef, monitored: ActorRef, strategy: FaultHandlingStrategy, supervise: Boolean): ActorRef = { - monitored + def unlink(monitor: ActorRef, monitored: ActorRef): Unit = { + monitoring.remove(monitored, monitor) } } -*/ /* * Scenarios that can occur: diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala index 69a9164543..0044baf69f 100644 --- a/akka-actor/src/main/scala/akka/routing/Pool.scala +++ b/akka-actor/src/main/scala/akka/routing/Pool.scala @@ -100,8 +100,6 @@ trait DefaultActorPool extends ActorPool { this: Actor ⇒ // for testing... case Stat ⇒ tryReply(Stats(_delegates length)) - case MaximumNumberOfRestartsWithinTimeRangeReached(victim, _, _, _) ⇒ - _delegates = _delegates filterNot { _.uuid == victim.uuid } case Terminated(victim, _) ⇒ _delegates = _delegates filterNot { _.uuid == victim.uuid } case msg ⇒ diff --git a/akka-actor/src/main/scala/akka/util/Index.scala b/akka-actor/src/main/scala/akka/util/Index.scala index 4d47a04b2b..7d08431230 100644 --- a/akka-actor/src/main/scala/akka/util/Index.scala +++ b/akka-actor/src/main/scala/akka/util/Index.scala @@ -112,6 +112,17 @@ class Index[K <: AnyRef, V <: AnyRef: Manifest] { } else false //Remove failed } + def remove(key: K): Option[Iterable[V]] = { + val set = container get key + + if (set ne null) { + set.synchronized { + container.remove(key, set) + Some(scala.collection.JavaConverters.collectionAsScalaIterableConverter(set).asScala) + } + } else None //Remove failed + } + /** * @return true if the underlying containers is empty, may report false negatives when the last remove is underway */ diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index 966f98debb..01c5f721d9 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -36,24 +36,7 @@ class TestActorRef[T <: Actor](props: Props, address: String) extends LocalActor override def toString = "TestActor[" + address + ":" + uuid + "]" - override def equals(other: Any) = - other.isInstanceOf[TestActorRef[_]] && - other.asInstanceOf[TestActorRef[_]].uuid == uuid - - /** - * Override to check whether the new supervisor is running on the CallingThreadDispatcher, - * as it should be. This can of course be tricked by linking before setting the dispatcher before starting the - * supervisor, but then you just asked for trouble. - */ - override def supervisor_=(a: Option[ActorRef]) { - a match { //TODO This should probably be removed since the Supervisor could be a remote actor for all we know - case Some(l: LocalActorRef) if !l.underlying.dispatcher.isInstanceOf[CallingThreadDispatcher] ⇒ - EventHandler.warning(this, "supervisor " + l + " does not use CallingThreadDispatcher") - case _ ⇒ - } - super.supervisor_=(a) - } - + override def equals(other: Any) = other.isInstanceOf[TestActorRef[_]] && other.asInstanceOf[TestActorRef[_]].uuid == uuid } object TestActorRef { diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index 8485f41768..fb2efbb899 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -42,11 +42,11 @@ class TestActor(queue: BlockingDeque[TestActor.Message]) extends Actor with FSM[ case Ev(SetIgnore(ign)) ⇒ stay using ign case Ev(StateTimeout) ⇒ stop - case Event(x: AnyRef, ign) ⇒ - val ignore = ign map (z ⇒ if (z isDefinedAt x) z(x) else false) getOrElse false - if (!ignore) { + case Event(x: AnyRef, data) ⇒ + val observe = data map (ignoreFunc ⇒ if (ignoreFunc isDefinedAt x) !ignoreFunc(x) else true) getOrElse true + if (observe) queue.offerLast(RealMessage(x, channel)) - } + stay } initialize @@ -269,10 +269,10 @@ trait TestKitLight { * * @return the received object as transformed by the partial function */ - def expectMsgPF[T](max: Duration = Duration.MinusInf)(f: PartialFunction[Any, T]): T = { + def expectMsgPF[T](max: Duration = Duration.MinusInf, hint: String = "")(f: PartialFunction[Any, T]): T = { val _max = if (max eq Duration.MinusInf) remaining else max.dilated val o = receiveOne(_max) - assert(o ne null, "timeout during expectMsg") + assert(o ne null, "timeout during expectMsg: " + hint) assert(f.isDefinedAt(o), "does not match: " + o) f(o) } diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 0050d6dca3..ad826bce09 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -224,25 +224,6 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac a.underlying.dispatcher.getClass must be(classOf[CallingThreadDispatcher]) } - "warn about scheduled supervisor" in { - val boss = Actor.actorOf(new Actor { def receive = { case _ ⇒ } }) - val ref = TestActorRef[WorkerActor] - - val filter = EventFilter.custom(_ ⇒ true) - EventHandler.notify(TestEvent.Mute(filter)) - val log = TestActorRef[Logger] - EventHandler.addListener(log) - val eventHandlerLevel = EventHandler.level - EventHandler.level = EventHandler.WarningLevel - boss link ref - val la = log.underlyingActor - la.count must be(1) - la.msg must (include("supervisor") and include("CallingThreadDispatcher")) - EventHandler.level = eventHandlerLevel - EventHandler.removeListener(log) - EventHandler.notify(TestEvent.UnMute(filter)) - } - "proxy apply for the underlying actor" in { val ref = TestActorRef[WorkerActor] intercept[IllegalActorStateException] { ref("work") }