From ddb226adcf77c0c838cbc418613293b09fc4af87 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 3 Mar 2011 19:15:34 +0100 Subject: [PATCH] Removing shutdownLinkedActors, making linkedActors and getLinkedActors public, fixing checkinit problem in AkkaException --- .../src/main/scala/akka/AkkaException.scala | 9 ++- .../src/main/scala/akka/actor/ActorRef.scala | 67 ++++++++++--------- .../main/scala/akka/actor/Supervisor.scala | 12 +++- project/build/AkkaProject.scala | 2 +- 4 files changed, 49 insertions(+), 41 deletions(-) diff --git a/akka-actor/src/main/scala/akka/AkkaException.scala b/akka-actor/src/main/scala/akka/AkkaException.scala index 73072b2894..fbeae4b105 100644 --- a/akka-actor/src/main/scala/akka/AkkaException.scala +++ b/akka-actor/src/main/scala/akka/AkkaException.scala @@ -23,14 +23,13 @@ import java.net.{InetAddress, UnknownHostException} import AkkaException._ val exceptionName = getClass.getName - val uuid = "%s_%s".format(hostname, newUuid) + lazy val uuid = "%s_%s".format(hostname, newUuid) - override val toString = "%s\n\t[%s]\n\t%s\n\t%s".format(exceptionName, uuid, message, stackTrace) + override lazy val toString = "%s\n\t[%s]\n\t%s\n\t%s".format(exceptionName, uuid, message, stackTrace) - val stackTrace = { + lazy val stackTrace = { val sw = new StringWriter - val pw = new PrintWriter(sw) - printStackTrace(pw) + printStackTrace(new PrintWriter(sw)) sw.toString } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 7b2487b9a2..8febe461ff 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -481,6 +481,19 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal */ def getSupervisor(): ActorRef = supervisor getOrElse null + /** + * Returns an unmodifiable Java Map containing the linked actors, + * please note that the backing map is thread-safe but not immutable + */ + def linkedActors: JMap[Uuid, ActorRef] + + /** + * Java API + * Returns an unmodifiable Java Map containing the linked actors, + * please note that the backing map is thread-safe but not immutable + */ + def getLinkedActors(): JMap[Uuid, ActorRef] = linkedActors + protected[akka] def invoke(messageHandle: MessageInvocation): Unit protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit @@ -508,8 +521,6 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] - protected[akka] def linkedActors: JMap[Uuid, ActorRef] - override def hashCode: Int = HashCode.hash(HashCode.SEED, uuid) override def equals(that: Any): Boolean = { @@ -535,7 +546,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal } /** - * Local (serializable) ActorRef that is used when referencing the Actor on its "home" node. + * Local (serializable) ActorRef that is used when referencing the Actor on its "home" node. * * @author Jonas Bonér */ @@ -679,7 +690,7 @@ class LocalActorRef private[akka] ( def link(actorRef: ActorRef) = guard.withGuard { if (actorRef.supervisor.isDefined) throw new IllegalActorStateException( "Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails") - linkedActors.put(actorRef.uuid, actorRef) + _linkedActors.put(actorRef.uuid, actorRef) actorRef.supervisor = Some(this) } @@ -689,9 +700,9 @@ class LocalActorRef private[akka] ( * To be invoked from within the actor itself. */ def unlink(actorRef: ActorRef) = guard.withGuard { - if (!linkedActors.containsKey(actorRef.uuid)) throw new IllegalActorStateException( - "Actor [" + actorRef + "] is not a linked actor, can't unlink") - linkedActors.remove(actorRef.uuid) + if(_linkedActors.remove(actorRef.uuid) eq null) + throw new IllegalActorStateException("Actor [" + actorRef + "] is not a linked actor, can't unlink") + actorRef.supervisor = None } @@ -758,17 +769,6 @@ class LocalActorRef private[akka] ( protected[akka] def mailbox_=(value: AnyRef): AnyRef = { _mailbox = value; value } - /** - * Shuts down and removes all linked actors. - */ - def shutdownLinkedActors() { - val i = linkedActors.values.iterator - while(i.hasNext) { - i.next.stop - i.remove - } - } - /** * Returns the supervisor, if there is one. */ @@ -946,8 +946,9 @@ class LocalActorRef private[akka] ( } protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) = { - import scala.collection.JavaConversions._ - linkedActors.values foreach { actorRef => + val i = _linkedActors.values.iterator + while(i.hasNext) { + val actorRef = i.next actorRef.lifeCycle match { // either permanent or none where default is permanent case Temporary => shutDownTemporaryActor(actorRef) @@ -965,7 +966,7 @@ class LocalActorRef private[akka] ( } else None } - protected[akka] def linkedActors: JMap[Uuid, ActorRef] = _linkedActors + def linkedActors: JMap[Uuid, ActorRef] = java.util.Collections.unmodifiableMap(_linkedActors) // ========= PRIVATE FUNCTIONS ========= @@ -977,11 +978,11 @@ class LocalActorRef private[akka] ( private def shutDownTemporaryActor(temporaryActor: ActorRef) { temporaryActor.stop - linkedActors.remove(temporaryActor.uuid) // remove the temporary actor + _linkedActors.remove(temporaryActor.uuid) // remove the temporary actor // if last temporary actor is gone, then unlink me from supervisor - if (linkedActors.isEmpty) { + if (_linkedActors.isEmpty) notifySupervisorWithMessage(UnlinkAndStop(this)) - } + true } @@ -1006,7 +1007,15 @@ class LocalActorRef private[akka] ( // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client _supervisor.foreach { sup => if (sup.isShutdown) { // if supervisor is shut down, game over for all linked actors - shutdownLinkedActors + //Scoped stop all linked actors, to avoid leaking the 'i' val + { + val i = _linkedActors.values.iterator + while(i.hasNext) { + i.next.stop + i.remove + } + } + //Stop the actor itself stop } else sup ! notification // else notify supervisor } @@ -1121,13 +1130,12 @@ private[akka] case class RemoteActorRef private[akka] ( def spawnLink(clazz: Class[_ <: Actor]): ActorRef = unsupported def spawnLinkRemote(clazz: Class[_ <: Actor], hostname: String, port: Int, timeout: Long): ActorRef = unsupported def supervisor: Option[ActorRef] = unsupported - def shutdownLinkedActors: Unit = unsupported + def linkedActors: JMap[Uuid, ActorRef] = unsupported protected[akka] def mailbox: AnyRef = unsupported protected[akka] def mailbox_=(value: AnyRef): AnyRef = unsupported protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported - protected[akka] def linkedActors: JMap[Uuid, ActorRef] = unsupported protected[akka] def invoke(messageHandle: MessageInvocation): Unit = unsupported protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = unsupported protected[akka] def actorInstance: AtomicReference[Actor] = unsupported @@ -1149,11 +1157,6 @@ trait ActorRefShared { * Returns the uuid for the actor. */ def uuid: Uuid - - /** - * Shuts down and removes all linked actors. - */ - def shutdownLinkedActors(): Unit } /** diff --git a/akka-actor/src/main/scala/akka/actor/Supervisor.scala b/akka-actor/src/main/scala/akka/actor/Supervisor.scala index d9e77dcbcb..bb08bcdf80 100644 --- a/akka-actor/src/main/scala/akka/actor/Supervisor.scala +++ b/akka-actor/src/main/scala/akka/actor/Supervisor.scala @@ -157,10 +157,16 @@ sealed class Supervisor(handler: FaultHandlingStrategy) { * @author Jonas Bonér */ final class SupervisorActor private[akka] (handler: FaultHandlingStrategy) extends Actor { - import self._ - faultHandler = handler + self.faultHandler = handler - override def postStop(): Unit = shutdownLinkedActors + override def postStop(): Unit = { + val i = self.linkedActors.values.iterator + while(i.hasNext) { + val ref = i.next + ref.stop + self.unlink(ref) + } + } def receive = { // FIXME add a way to respond to MaximumNumberOfRestartsWithinTimeRangeReached in declaratively configured Supervisor diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index c6208b5d4f..444ca67785 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -19,7 +19,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { val scalaCompileSettings = Seq("-deprecation", "-Xmigration", - "-Xcheckinit", + //"-Xcheckinit", //Never use this for anything but debugging "-optimise", "-Xwarninit", "-encoding", "utf8")