Merge branch 'link-active-objects'
This commit is contained in:
commit
4cedc47ce0
3 changed files with 85 additions and 23 deletions
|
|
@ -4,6 +4,7 @@
|
|||
|
||||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import _root_.se.scalablesolutions.akka.config.FaultHandlingStrategy
|
||||
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest
|
||||
import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
|
||||
import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future}
|
||||
|
|
@ -199,18 +200,71 @@ object ActiveObject {
|
|||
proxy.asInstanceOf[T]
|
||||
}
|
||||
|
||||
// Jan Kronquist: started work on issue 121
|
||||
// def actorFor(obj: AnyRef): Option[Actor] = {
|
||||
// ActorRegistry.actorsFor(classOf[Dispatcher]).find(a=>a.target == Some(obj))
|
||||
// }
|
||||
//
|
||||
// def link(supervisor: AnyRef, activeObject: AnyRef) = {
|
||||
// actorFor(supervisor).get !! Link(actorFor(activeObject).get)
|
||||
// }
|
||||
//
|
||||
// def unlink(supervisor: AnyRef, activeObject: AnyRef) = {
|
||||
// actorFor(supervisor).get !! Unlink(actorFor(activeObject).get)
|
||||
// }
|
||||
/**
|
||||
* Get the underlying dispatcher actor for the given active object.
|
||||
*/
|
||||
def actorFor(obj: AnyRef): Option[Actor] = {
|
||||
ActorRegistry.actorsFor(classOf[Dispatcher]).find(a=>a.target == Some(obj))
|
||||
}
|
||||
|
||||
/**
|
||||
* Links an other active object to this active object.
|
||||
* @param supervisor the supervisor active object
|
||||
* @param supervised the active object to link
|
||||
*/
|
||||
def link(supervisor: AnyRef, supervised: AnyRef) = {
|
||||
val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't link when the supervisor is not an active object"))
|
||||
val supervisedActor = actorFor(supervised).getOrElse(throw new IllegalStateException("Can't link when the supervised is not an active object"))
|
||||
supervisorActor !! Link(supervisedActor)
|
||||
}
|
||||
|
||||
/**
|
||||
* Links an other active object to this active object and sets the fault handling for the supervisor.
|
||||
* @param supervisor the supervisor active object
|
||||
* @param supervised the active object to link
|
||||
* @param handler fault handling strategy
|
||||
* @param trapExceptions array of exceptions that should be handled by the supervisor
|
||||
*/
|
||||
def link(supervisor: AnyRef, supervised: AnyRef, handler: FaultHandlingStrategy, trapExceptions: Array[Class[_ <: Throwable]]) = {
|
||||
val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't link when the supervisor is not an active object"))
|
||||
val supervisedActor = actorFor(supervised).getOrElse(throw new IllegalStateException("Can't link when the supervised is not an active object"))
|
||||
supervisorActor.trapExit = trapExceptions.toList
|
||||
supervisorActor.faultHandler = Some(handler)
|
||||
supervisorActor !! Link(supervisedActor)
|
||||
}
|
||||
|
||||
/**
|
||||
* Unlink the supervised active object from the supervisor.
|
||||
* @param supervisor the supervisor active object
|
||||
* @param supervised the active object to unlink
|
||||
*/
|
||||
def unlink(supervisor: AnyRef, supervised: AnyRef) = {
|
||||
val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't unlink when the supervisor is not an active object"))
|
||||
val supervisedActor = actorFor(supervised).getOrElse(throw new IllegalStateException("Can't unlink when the supervised is not an active object"))
|
||||
supervisorActor !! Unlink(supervisedActor)
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the trap exit for the given supervisor active object.
|
||||
* @param supervisor the supervisor active object
|
||||
* @param trapExceptions array of exceptions that should be handled by the supervisor
|
||||
*/
|
||||
def trapExit(supervisor: AnyRef, trapExceptions: Array[Class[_ <: Throwable]]) = {
|
||||
val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't set trap exceptions when the supervisor is not an active object"))
|
||||
supervisorActor.trapExit = trapExceptions.toList
|
||||
this
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the fault handling strategy for the given supervisor active object.
|
||||
* @param supervisor the supervisor active object
|
||||
* @param handler fault handling strategy
|
||||
*/
|
||||
def faultHandler(supervisor: AnyRef, handler: FaultHandlingStrategy) = {
|
||||
val supervisorActor = actorFor(supervisor).getOrElse(throw new IllegalStateException("Can't set fault handler when the supervisor is not an active object"))
|
||||
supervisorActor.faultHandler = Some(handler)
|
||||
this
|
||||
}
|
||||
|
||||
private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor = {
|
||||
val factory = SupervisorFactory(SupervisorConfig(restartStrategy, components))
|
||||
|
|
@ -366,7 +420,7 @@ private[akka] sealed class ActiveObjectAspect {
|
|||
}
|
||||
|
||||
// Jan Kronquist: started work on issue 121
|
||||
// private[akka] case class Link(val actor: Actor)
|
||||
private[akka] case class Link(val actor: Actor)
|
||||
|
||||
object Dispatcher {
|
||||
val ZERO_ITEM_CLASS_ARRAY = Array[Class[_]]()
|
||||
|
|
@ -434,8 +488,8 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
|
|||
if (isOneWay) joinPoint.proceed
|
||||
else reply(joinPoint.proceed)
|
||||
// Jan Kronquist: started work on issue 121
|
||||
// case Link(target) =>
|
||||
// link(target)
|
||||
case Link(target) => link(target)
|
||||
case Unlink(target) => unlink(target)
|
||||
case unexpected =>
|
||||
throw new IllegalStateException("Unexpected message [" + unexpected + "] sent to [" + this + "]")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -52,6 +52,7 @@ case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifeCycleMe
|
|||
case class Restart(reason: Throwable) extends LifeCycleMessage
|
||||
case class Exit(dead: Actor, killer: Throwable) extends LifeCycleMessage
|
||||
case class Unlink(child: Actor) extends LifeCycleMessage
|
||||
case class UnlinkAndStop(child: Actor) extends LifeCycleMessage
|
||||
case object Kill extends LifeCycleMessage
|
||||
|
||||
class ActorKilledException private[akka](message: String) extends RuntimeException(message)
|
||||
|
|
@ -316,7 +317,7 @@ trait Actor extends TransactionManagement with Logging {
|
|||
* trapExit = List(classOf[MyApplicationException], classOf[MyApplicationError])
|
||||
* </pre>
|
||||
*/
|
||||
protected var trapExit: List[Class[_ <: Throwable]] = Nil
|
||||
protected[akka] var trapExit: List[Class[_ <: Throwable]] = Nil
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
|
|
@ -329,7 +330,7 @@ trait Actor extends TransactionManagement with Logging {
|
|||
* faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange))
|
||||
* </pre>
|
||||
*/
|
||||
protected var faultHandler: Option[FaultHandlingStrategy] = None
|
||||
protected[akka] var faultHandler: Option[FaultHandlingStrategy] = None
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
|
|
@ -961,11 +962,12 @@ trait Actor extends TransactionManagement with Logging {
|
|||
private def base: PartialFunction[Any, Unit] = lifeCycles orElse (_hotswap getOrElse receive)
|
||||
|
||||
private val lifeCycles: PartialFunction[Any, Unit] = {
|
||||
case HotSwap(code) => _hotswap = code
|
||||
case Restart(reason) => restart(reason)
|
||||
case Exit(dead, reason) => handleTrapExit(dead, reason)
|
||||
case Unlink(child) => unlink(child); child.stop
|
||||
case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
|
||||
case HotSwap(code) => _hotswap = code
|
||||
case Restart(reason) => restart(reason)
|
||||
case Exit(dead, reason) => handleTrapExit(dead, reason)
|
||||
case Unlink(child) => unlink(child)
|
||||
case UnlinkAndStop(child) => unlink(child); child.stop
|
||||
case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message")
|
||||
}
|
||||
|
||||
private[this] def handleTrapExit(dead: Actor, reason: Throwable): Unit = {
|
||||
|
|
@ -998,7 +1000,7 @@ trait Actor extends TransactionManagement with Logging {
|
|||
// if last temporary actor is gone, then unlink me from supervisor
|
||||
if (getLinkedActors.isEmpty) {
|
||||
Actor.log.info("All linked actors have died permanently (they were all configured as TEMPORARY)\n\tshutting down and unlinking supervisor actor as well [%s].", actor.id)
|
||||
_supervisor.foreach(_ ! Unlink(this))
|
||||
_supervisor.foreach(_ ! UnlinkAndStop(this))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,10 +1,16 @@
|
|||
package se.scalablesolutions.akka.spring.foo;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class Bar implements IBar {
|
||||
|
||||
@Override
|
||||
public String getBar() {
|
||||
return "bar";
|
||||
}
|
||||
|
||||
public void throwsIOException() throws IOException {
|
||||
throw new IOException("some IO went wrong");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue