From 396ae432356773e9a752794d1ec1c791fd7fba54 Mon Sep 17 00:00:00 2001 From: Michael Kober Date: Wed, 14 Apr 2010 09:56:01 +0200 Subject: [PATCH] implemented link/unlink for active objects --- .../src/main/scala/actor/ActiveObject.scala | 84 +++++++++++++++---- akka-core/src/main/scala/actor/Actor.scala | 18 ++-- .../akka/spring/foo/Bar.java | 6 ++ 3 files changed, 85 insertions(+), 23 deletions(-) diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index e3bd9ef943..cb67ae0244 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -4,6 +4,7 @@ package se.scalablesolutions.akka.actor +import _root_.se.scalablesolutions.akka.config.FaultHandlingStrategy import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory} import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future} @@ -199,18 +200,71 @@ object ActiveObject { proxy.asInstanceOf[T] } -// Jan Kronquist: started work on issue 121 -// def actorFor(obj: AnyRef): Option[Actor] = { -// ActorRegistry.actorsFor(classOf[Dispatcher]).find(a=>a.target == Some(obj)) -// } -// -// def link(supervisor: AnyRef, activeObject: AnyRef) = { -// actorFor(supervisor).get !! Link(actorFor(activeObject).get) -// } -// -// def unlink(supervisor: AnyRef, activeObject: AnyRef) = { -// actorFor(supervisor).get !! Unlink(actorFor(activeObject).get) -// } + /** + * Get the underlying dispatcher actor for the given active object. + */ + def actorFor(obj: AnyRef): Option[Actor] = { + ActorRegistry.actorsFor(classOf[Dispatcher]).find(a=>a.target == Some(obj)) + } + + /** + * Links an other active object to this active object. + * @param supervisor the supervisor active object + * @param supervised the active object to link + */ + def link(supervisor: AnyRef, supervised: AnyRef) = { + val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't link when the supervisor is not an active object")) + val supervisedActor = actorFor(supervised).getOrElse(throw new IllegalStateException("Can't link when the supervised is not an active object")) + supervisorActor !! Link(supervisedActor) + } + + /** + * Links an other active object to this active object and sets the fault handling for the supervisor. + * @param supervisor the supervisor active object + * @param supervised the active object to link + * @param handler fault handling strategy + * @param trapExceptions array of exceptions that should be handled by the supervisor + */ + def link(supervisor: AnyRef, supervised: AnyRef, handler: FaultHandlingStrategy, trapExceptions: Array[Class[_ <: Throwable]]) = { + val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't link when the supervisor is not an active object")) + val supervisedActor = actorFor(supervised).getOrElse(throw new IllegalStateException("Can't link when the supervised is not an active object")) + supervisorActor.trapExit = trapExceptions.toList + supervisorActor.faultHandler = Some(handler) + supervisorActor !! Link(supervisedActor) + } + + /** + * Unlink the supervised active object from the supervisor. + * @param supervisor the supervisor active object + * @param supervised the active object to unlink + */ + def unlink(supervisor: AnyRef, supervised: AnyRef) = { + val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't unlink when the supervisor is not an active object")) + val supervisedActor = actorFor(supervised).getOrElse(throw new IllegalStateException("Can't unlink when the supervised is not an active object")) + supervisorActor !! Unlink(supervisedActor) + } + + /** + * Sets the trap exit for the given supervisor active object. + * @param supervisor the supervisor active object + * @param trapExceptions array of exceptions that should be handled by the supervisor + */ + def trapExit(supervisor: AnyRef, trapExceptions: Array[Class[_ <: Throwable]]) = { + val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't set trap exceptions when the supervisor is not an active object")) + supervisorActor.trapExit = trapExceptions.toList + this + } + + /** + * Sets the fault handling strategy for the given supervisor active object. + * @param supervisor the supervisor active object + * @param handler fault handling strategy + */ + def faultHandler(supervisor: AnyRef, handler: FaultHandlingStrategy) = { + val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't set fault handler when the supervisor is not an active object")) + supervisorActor.faultHandler = Some(handler) + this + } private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor = { val factory = SupervisorFactory(SupervisorConfig(restartStrategy, components)) @@ -366,7 +420,7 @@ private[akka] sealed class ActiveObjectAspect { } // Jan Kronquist: started work on issue 121 -// private[akka] case class Link(val actor: Actor) +private[akka] case class Link(val actor: Actor) object Dispatcher { val ZERO_ITEM_CLASS_ARRAY = Array[Class[_]]() @@ -434,8 +488,8 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op if (isOneWay) joinPoint.proceed else reply(joinPoint.proceed) // Jan Kronquist: started work on issue 121 -// case Link(target) => -// link(target) + case Link(target) => link(target) + case Unlink(target) => unlink(target) case unexpected => throw new IllegalStateException("Unexpected message [" + unexpected + "] sent to [" + this + "]") } diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 4bc3a9dc31..8c38769579 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -52,6 +52,7 @@ case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifeCycleMe case class Restart(reason: Throwable) extends LifeCycleMessage case class Exit(dead: Actor, killer: Throwable) extends LifeCycleMessage case class Unlink(child: Actor) extends LifeCycleMessage +case class UnlinkAndStop(child: Actor) extends LifeCycleMessage case object Kill extends LifeCycleMessage class ActorKilledException private[akka](message: String) extends RuntimeException(message) @@ -318,7 +319,7 @@ trait Actor extends TransactionManagement with Logging { * trapExit = List(classOf[MyApplicationException], classOf[MyApplicationError]) * */ - protected var trapExit: List[Class[_ <: Throwable]] = Nil + protected[akka] var trapExit: List[Class[_ <: Throwable]] = Nil /** * User overridable callback/setting. @@ -331,7 +332,7 @@ trait Actor extends TransactionManagement with Logging { * faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange)) * */ - protected var faultHandler: Option[FaultHandlingStrategy] = None + protected[akka] var faultHandler: Option[FaultHandlingStrategy] = None /** * User overridable callback/setting. @@ -965,11 +966,12 @@ trait Actor extends TransactionManagement with Logging { private def base: PartialFunction[Any, Unit] = lifeCycles orElse (_hotswap getOrElse receive) private val lifeCycles: PartialFunction[Any, Unit] = { - case HotSwap(code) => _hotswap = code - case Restart(reason) => restart(reason) - case Exit(dead, reason) => handleTrapExit(dead, reason) - case Unlink(child) => unlink(child); child.stop - case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message") + case HotSwap(code) => _hotswap = code + case Restart(reason) => restart(reason) + case Exit(dead, reason) => handleTrapExit(dead, reason) + case Unlink(child) => unlink(child) + case UnlinkAndStop(child) => unlink(child); child.stop + case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message") } private[this] def handleTrapExit(dead: Actor, reason: Throwable): Unit = { @@ -1002,7 +1004,7 @@ trait Actor extends TransactionManagement with Logging { // if last temporary actor is gone, then unlink me from supervisor if (getLinkedActors.isEmpty) { Actor.log.info("All linked actors have died permanently (they were all configured as TEMPORARY)\n\tshutting down and unlinking supervisor actor as well [%s].", actor.id) - _supervisor.foreach(_ ! Unlink(this)) + _supervisor.foreach(_ ! UnlinkAndStop(this)) } } } diff --git a/akka-spring/akka-spring-test-java/src/main/java/se/scalablesolutions/akka/spring/foo/Bar.java b/akka-spring/akka-spring-test-java/src/main/java/se/scalablesolutions/akka/spring/foo/Bar.java index 7e21aaea8f..24e02a3fd6 100644 --- a/akka-spring/akka-spring-test-java/src/main/java/se/scalablesolutions/akka/spring/foo/Bar.java +++ b/akka-spring/akka-spring-test-java/src/main/java/se/scalablesolutions/akka/spring/foo/Bar.java @@ -1,10 +1,16 @@ package se.scalablesolutions.akka.spring.foo; +import java.io.IOException; + public class Bar implements IBar { @Override public String getBar() { return "bar"; } + + public void throwsIOException() throws IOException { + throw new IOException("some IO went wrong"); + } }