diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index 3f955088fa..df40fdddc0 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -36,7 +36,7 @@ class SupervisorHierarchySpec extends JUnitSuite { val workerOne, workerTwo, workerThree = actorOf(Props(new CountDownActor(countDown)).withSupervisor(manager)) - workerOne ! Death(workerOne, new FireWorkerException("Fire the worker!")) + workerOne ! Death(workerOne, new FireWorkerException("Fire the worker!"), true) // manager + all workers should be restarted by only killing a worker // manager doesn't trap exits, so boss will restart manager diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index cadc06d290..4dac79256a 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -60,7 +60,7 @@ case object RevertHotSwap extends AutoReceivedMessage with LifeCycleMessage case class Restart(reason: Throwable) extends AutoReceivedMessage with LifeCycleMessage -case class Death(dead: ActorRef, killer: Throwable) extends AutoReceivedMessage with LifeCycleMessage +case class Death(deceased: ActorRef, cause: Throwable, recoverable: Boolean) extends AutoReceivedMessage with LifeCycleMessage case class Link(child: ActorRef) extends AutoReceivedMessage with LifeCycleMessage @@ -717,7 +717,7 @@ trait Actor { msg match { case HotSwap(code, discardOld) ⇒ become(code(self), discardOld) case RevertHotSwap ⇒ unbecome() - case Death(dead, reason) ⇒ self.handleTrapExit(dead, reason) + case d: Death ⇒ self.handleDeath(d) case Link(child) ⇒ self.link(child) case Unlink(child) ⇒ self.unlink(child) case UnlinkAndStop(child) ⇒ self.unlink(child); child.stop() diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 1b95755e8e..7408c5f79a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -493,7 +493,7 @@ abstract class SelfActorRef extends ActorRef with ForwardableChannel { self: Loc final def getDispatcher(): MessageDispatcher = dispatcher /** INTERNAL API ONLY **/ - protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) + protected[akka] def handleDeath(death: Death) } /** @@ -763,11 +763,24 @@ class LocalActorRef private[akka] (private[this] val props: Props, val address: actorInstance.get().apply(messageHandle.message) currentMessage = null // reset current message after successful invocation } catch { - case e: InterruptedException ⇒ - handleExceptionInDispatch(e, messageHandle.message) - throw e case e ⇒ - handleExceptionInDispatch(e, messageHandle.message) + { + EventHandler.error(e, this, e.getMessage) + + //Prevent any further messages to be processed until the actor has been restarted + dispatcher.suspend(this) + + channel.sendException(e) + + if (supervisor.isDefined) notifySupervisorWithMessage(Death(this, e, true)) + else { + lifeCycle match { + case Temporary ⇒ shutDownTemporaryActor(this, e) + case _ ⇒ dispatcher.resume(this) //Resume processing for this actor + } + } + } + if (e.isInstanceOf[InterruptedException]) throw e //Re-throw InterruptedExceptions as expected } finally { checkReceiveTimeout // Reschedule receive timeout } @@ -785,16 +798,16 @@ class LocalActorRef private[akka] (private[this] val props: Props, val address: } } - protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) { + protected[akka] def handleDeath(death: Death) { props.faultHandler match { - case AllForOneStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(reason.getClass)) ⇒ - restartLinkedActors(reason, maxRetries, within) + case AllForOneStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒ + restartLinkedActors(death.cause, maxRetries, within) - case OneForOneStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(reason.getClass)) ⇒ - dead.restart(reason, maxRetries, within) + case OneForOneStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒ + death.deceased.restart(death.cause, maxRetries, within) case _ ⇒ - if (_supervisor.isDefined) throw reason else dead.stop() //Escalate problem if not handled here + if (_supervisor.isDefined) throw death.cause else death.deceased.stop() //Escalate problem if not handled here } } @@ -943,30 +956,13 @@ class LocalActorRef private[akka] (private[this] val props: Props, val address: private def shutDownTemporaryActor(temporaryActor: ActorRef, reason: Throwable) { temporaryActor.stop() _linkedActors.remove(temporaryActor.uuid) // remove the temporary actor - // when this comes down through the handleTrapExit path, we get here when the temp actor is restarted + // when this comes down through the handleDeath path, we get here when the temp actor is restarted notifySupervisorWithMessage(MaximumNumberOfRestartsWithinTimeRangeReached(temporaryActor, Some(0), None, reason)) // if last temporary actor is gone, then unlink me from supervisor if (_linkedActors.isEmpty) notifySupervisorWithMessage(UnlinkAndStop(this)) true } - private def handleExceptionInDispatch(reason: Throwable, message: Any) { - EventHandler.error(reason, this, reason.getMessage) - - //Prevent any further messages to be processed until the actor has been restarted - dispatcher.suspend(this) - - channel.sendException(reason) - - if (supervisor.isDefined) notifySupervisorWithMessage(Death(this, reason)) - else { - lifeCycle match { - case Temporary ⇒ shutDownTemporaryActor(this, reason) - case _ ⇒ dispatcher.resume(this) //Resume processing for this actor - } - } - } - private def notifySupervisorWithMessage(notification: LifeCycleMessage) { val sup = _supervisor if (sup.isDefined) sup.get ! notification diff --git a/akka-actor/src/main/scala/akka/event/DeathWatch.scala b/akka-actor/src/main/scala/akka/event/DeathWatch.scala new file mode 100644 index 0000000000..7ba9392ff6 --- /dev/null +++ b/akka-actor/src/main/scala/akka/event/DeathWatch.scala @@ -0,0 +1,22 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.event + +import akka.actor.{ Death, LocalActorRef, ActorRef } + +trait DeathWatch { + def signal(death: Death): Unit +} + +class StupidInVMDeathWatchImpl extends DeathWatch { + def signal(death: Death) { + death match { + case c @ Death(victim: LocalActorRef, _, _) if victim.supervisor.isDefined ⇒ + victim.supervisor.get ! c + + case other ⇒ EventHandler.debug(this, "No supervisor or not a local actor reference: " + other) + } + } +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala index b407ec120f..8b8cc31c0d 100644 --- a/akka-actor/src/main/scala/akka/routing/Pool.scala +++ b/akka-actor/src/main/scala/akka/routing/Pool.scala @@ -101,7 +101,7 @@ trait DefaultActorPool extends ActorPool { this: Actor ⇒ self tryReply Stats(_delegates length) case MaximumNumberOfRestartsWithinTimeRangeReached(victim, _, _, _) ⇒ _delegates = _delegates filterNot { _.uuid == victim.uuid } - case Death(victim, _) ⇒ + case Death(victim, _, _) ⇒ _delegates = _delegates filterNot { _.uuid == victim.uuid } case msg ⇒ resizeIfAppropriate() diff --git a/akka-remote/src/main/scala/akka/cluster/RemoteFailureDetector.scala b/akka-remote/src/main/scala/akka/cluster/RemoteFailureDetector.scala index 09cf65d88c..8dd29245de 100644 --- a/akka-remote/src/main/scala/akka/cluster/RemoteFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/cluster/RemoteFailureDetector.scala @@ -33,7 +33,7 @@ object RemoteFailureDetector { private case class Unregister(listener: RemoteFailureListener, connectionAddress: InetSocketAddress) extends RemoteFailureDetectorChannelEvent - private[akka] val channel = actorOf(Props(new Channel).copy(dispatcher = new PinnedDispatcher(), localOnly = true)) + private[akka] val channel = new LocalActorRef(Props[Channel].copy(dispatcher = new PinnedDispatcher()), newUuid.toString, systemService = true) def register(listener: RemoteFailureListener, connectionAddress: InetSocketAddress) = channel ! Register(listener, connectionAddress) diff --git a/akka-remote/src/main/scala/akka/cluster/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/cluster/netty/NettyRemoteSupport.scala index 451c43f17f..614f1c60dc 100644 --- a/akka-remote/src/main/scala/akka/cluster/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/cluster/netty/NettyRemoteSupport.scala @@ -769,10 +769,8 @@ trait NettyRemoteServerModule extends RemoteServerModule { } private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) { - if (_isRunning.isOn) { + if (_isRunning.isOn) registry.put(id, actorRef) //TODO change to putIfAbsent - if (!actorRef.isRunning) actorRef.start() - } } /** @@ -1078,7 +1076,7 @@ class RemoteServerHandler( "Looking up a remotely available actor for address [%s] on node [%s]" .format(address, Config.nodename)) - val actorRef = Actor.createActor(address, () ⇒ createSessionActor(actorInfo, channel)) + val actorRef = Actor.createActor(address, () ⇒ createSessionActor(actorInfo, channel), false) if (actorRef eq null) throw new IllegalActorStateException("Could not find a remote actor with address [" + address + "] or uuid [" + uuid + "]") @@ -1098,9 +1096,8 @@ class RemoteServerHandler( case null ⇒ null case factory ⇒ val actorRef = factory() - actorRef.uuid = parseUuid(uuid) //FIXME is this sensible? sessionActors.get(channel).put(address, actorRef) - actorRef.start() //Start it where's it's created + actorRef //Start it where's it's created } case sessionActor ⇒ sessionActor } diff --git a/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala b/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala index 9d0ff3766d..b6067833ba 100644 --- a/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala +++ b/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala @@ -21,12 +21,12 @@ class ActorSerializeSpec extends Spec with ShouldMatchers with BeforeAndAfterAll describe("Serializable actor") { it("should be able to serialize and de-serialize a stateful actor with a given serializer") { - val actor1 = actorOf(Props[MyJavaSerializableActor].withLocalOnly(true)).asInstanceOf[LocalActorRef] + val actor1 = new LocalActorRef(Props[MyJavaSerializableActor], newUuid.toString, systemService = true) (actor1 ? "hello").get should equal("world 1") (actor1 ? "hello").get should equal("world 2") val bytes = toBinary(actor1) - val actor2 = fromBinary(bytes).start().asInstanceOf[LocalActorRef] + val actor2 = fromBinary(bytes).asInstanceOf[LocalActorRef] (actor2 ? "hello").get should equal("world 3") actor2.receiveTimeout should equal(Some(1000)) @@ -36,7 +36,7 @@ class ActorSerializeSpec extends Spec with ShouldMatchers with BeforeAndAfterAll it("should be able to serialize and deserialize a MyStatelessActorWithMessagesInMailbox") { - val actor1 = actorOf(Props[MyStatelessActorWithMessagesInMailbox].withLocalOnly(true)).asInstanceOf[LocalActorRef] + val actor1 = new LocalActorRef(Props[MyStatelessActorWithMessagesInMailbox], newUuid.toString, systemService = true) for (i ← 1 to 10) actor1 ! "hello" actor1.getDispatcher.mailboxSize(actor1) should be > (0) @@ -54,7 +54,7 @@ class ActorSerializeSpec extends Spec with ShouldMatchers with BeforeAndAfterAll it("should be able to serialize and deserialize a PersonActorWithMessagesInMailbox") { val p1 = Person("debasish ghosh", 25, SerializeSpec.Address("120", "Monroe Street", "Santa Clara", "95050")) - val actor1 = actorOf(Props[PersonActorWithMessagesInMailbox].withLocalOnly(true)).asInstanceOf[LocalActorRef] + val actor1 = new LocalActorRef(Props[PersonActorWithMessagesInMailbox], newUuid.toString, systemService = true) (actor1 ! p1) (actor1 ! p1) (actor1 ! p1) @@ -98,7 +98,7 @@ class ActorSerializeSpec extends Spec with ShouldMatchers with BeforeAndAfterAll describe("serialize actor that accepts protobuf message") { it("should serialize") { - val actor1 = actorOf(Props[MyActorWithProtobufMessagesInMailbox].withLocalOnly(true)).asInstanceOf[LocalActorRef] + val actor1 = new LocalActorRef(Props[MyActorWithProtobufMessagesInMailbox], newUuid.toString, systemService = true) val msg = MyMessage(123, "debasish ghosh", true) val b = ProtobufProtocol.MyMessage.newBuilder.setId(msg.id).setName(msg.name).setStatus(msg.status).build for (i ← 1 to 10) actor1 ! b